"""MCP (Model Context Protocol) client for connecting to external data sources."""
import json
from pathlib import Path
from typing import Any, Dict, List, Literal, Optional
import httpx
import jinja2
from mcp import ClientSession, StdioServerParameters # type: ignore[import-untyped]
from mcp.client.sse import sse_client # type: ignore[import-untyped]
from mcp.client.stdio import stdio_client # type: ignore[import-untyped]
from mcp.client.streamable_http import streamablehttp_client # type: ignore[import-untyped]
[docs]
class MCPClient:
"""
Client for connecting to and communicating with MCP servers.
Supports multiple transport types (stdio, HTTP, SSE).
"""
[docs]
def __init__(
self,
server_name: str,
transport_type: Literal["stdio", "http", "sse"],
transport_params: Dict[str, Any],
):
"""
Initialize MCP client with transport configuration.
Args:
server_name: Name for the server (e.g., "notion")
transport_type: Type of transport ("stdio", "http", or "sse")
transport_params: Transport-specific parameters:
- stdio: {"command": str, "args": List[str], "env": Dict[str, str]}
- http: {"url": str, "headers": Dict[str, str]}
- sse: {"url": str, "headers": Dict[str, str]}
"""
self.server_name = server_name
self.transport_type = transport_type
self.transport_params = transport_params
self.session: Optional[ClientSession] = None
self._transport_context = None
[docs]
async def connect(self) -> None:
"""
Connect to MCP server using the configured transport.
Routes to transport-specific connection method based on transport type.
Must be called before any other operations.
"""
if self.transport_type == "stdio":
await self._connect_stdio()
elif self.transport_type == "http":
await self._connect_http()
elif self.transport_type == "sse":
await self._connect_sse()
async def _connect_stdio(self) -> None:
"""Connect via stdio transport."""
server_params = StdioServerParameters(
command=self.transport_params["command"],
args=self.transport_params["args"],
env=self.transport_params.get("env", {}),
)
# stdio_client returns an async context manager
stdio_context = stdio_client(server_params)
read, write = await stdio_context.__aenter__()
self._transport_context = stdio_context
self.session = ClientSession(read, write)
await self.session.__aenter__()
await self.session.initialize()
async def _validate_http_connection(self, url: str, headers: Dict[str, Any]) -> None:
"""Validate HTTP connection before entering transport context manager."""
async with httpx.AsyncClient() as client:
try:
response = await client.get(url, headers=headers)
response.raise_for_status()
except httpx.HTTPStatusError as e:
# Only fail on auth errors (401, 403)
if e.response.status_code in (400, 401, 403):
error_msg = f"{e.response.status_code} {e.response.reason_phrase}"
raise ConnectionError(error_msg) from e
# For other status codes (like 405), let it through
# The actual MCP connection will handle it
except httpx.RequestError as e:
raise ConnectionError(str(e)) from e
async def _connect_http(self) -> None:
"""Connect via HTTP/StreamableHTTP transport."""
url = self.transport_params["url"]
headers = self.transport_params.get("headers", {})
await self._validate_http_connection(url, headers)
http_context = streamablehttp_client(url=url, headers=headers)
read, write, get_session_id = await http_context.__aenter__()
self._transport_context = http_context
self.session = ClientSession(read, write)
await self.session.__aenter__()
await self.session.initialize()
async def _connect_sse(self) -> None:
"""Connect via SSE transport."""
url = self.transport_params["url"]
headers = self.transport_params.get("headers", {})
await self._validate_http_connection(url, headers)
sse_context = sse_client(url=url, headers=headers)
read, write = await sse_context.__aenter__()
self._transport_context = sse_context
self.session = ClientSession(read, write)
await self.session.__aenter__()
await self.session.initialize()
[docs]
async def disconnect(self) -> None:
"""Disconnect from the MCP server."""
if self.session:
await self.session.__aexit__(None, None, None)
self.session = None
if self._transport_context:
await self._transport_context.__aexit__(None, None, None)
self._transport_context = None
[docs]
async def list_resources(self) -> List[Dict[str, Any]]:
"""
List all available resources from the MCP server.
Returns:
List of resource dictionaries with uri, name, description, etc.
"""
if not self.session:
raise RuntimeError("Not connected to MCP server. Call connect() first.")
result = await self.session.list_resources()
return [
{
"uri": resource.uri,
"name": resource.name,
"description": resource.description,
"mimeType": resource.mimeType if hasattr(resource, "mimeType") else None,
}
for resource in result.resources
]
[docs]
async def read_resource(self, uri: str) -> str:
"""
Read content from a specific resource.
Args:
uri: The URI of the resource to read
Returns:
The text content of the resource
"""
if not self.session:
raise RuntimeError("Not connected to MCP server. Call connect() first.")
result = await self.session.read_resource(uri)
# Extract text content from the response
if result.contents:
content_parts = []
for content in result.contents:
if hasattr(content, "text"):
content_parts.append(content.text)
elif hasattr(content, "data"):
# Handle binary or other data types if needed
content_parts.append(str(content.data))
return "\n".join(content_parts)
return ""
[docs]
class MCPClientFactory:
"""
Factory for creating MCP clients from configuration.
Loads and parses MCP server configurations from files, dicts, or templates,
detects transport types, and creates pre-configured MCPClient instances.
"""
[docs]
def __init__(self, config_path: Optional[str] = None, config_dict: Optional[Dict] = None):
"""
Initialize client factory with config file path or config dict.
Args:
config_path: Path to mcp.json config file. Required if config_dict is not provided.
config_dict: Direct configuration dictionary in MCP format:
{"mcpServers": {"serverName": {...}}}
Required if config_path is not provided.
Raises:
ValueError: If neither config_path nor config_dict is provided.
"""
if config_path is None and config_dict is None:
raise ValueError(
"Either 'config_path' or 'config_dict' must be provided. "
"Cannot default to any config location."
)
self.config_path = config_path
self.config_dict = config_dict
def _load_config(self) -> Dict[str, Any]:
"""
Load MCP configuration from file or use provided config dict.
Returns:
Full MCP configuration with mcpServers structure
"""
# If config_dict is provided, use it directly
if self.config_dict is not None:
return self.config_dict
# Load from file (config_path is guaranteed to be set by __init__)
config_file = Path(self.config_path)
if not config_file.exists():
raise FileNotFoundError(
f"MCP configuration file not found at {config_file}. "
"Please create it with your MCP server configurations."
)
with open(config_file, "r") as f:
config = json.load(f)
return config
[docs]
def create_client(self, server_name: str) -> MCPClient:
"""
Create an MCP client from configuration.
Loads config, detects transport type, and creates a pre-configured MCPClient instance.
Args:
server_name: Name of the MCP server from the config
Returns:
Configured MCPClient instance ready to connect
Raises:
ValueError: If server not found or config invalid
FileNotFoundError: If config file doesn't exist
"""
config = self._load_config()
# Support both "mcpServers" and "servers" format
servers = config.get("mcpServers") or config.get("servers")
if not servers:
raise ValueError("Invalid MCP configuration: no 'mcpServers' or 'servers' key found")
if server_name not in servers:
available = ", ".join(servers.keys())
raise ValueError(f"Server '{server_name}' not found. Available: {available}")
server_config = servers[server_name]
# Detect transport type
transport_type = server_config.get("transport")
# Create client with pre-configured transport
client = MCPClient(
server_name=server_name,
transport_type=transport_type,
transport_params=server_config,
)
return client
[docs]
@classmethod
def from_provider(cls, provider: str, credentials: Dict[str, str]):
"""
Create MCPClientFactory from a built-in provider template.
Loads the provider's template file and renders it with credentials.
Args:
provider: Provider name (e.g., "notion", "github", "jira")
credentials: Dictionary of credential key-value pairs
Returns:
MCPClientFactory instance ready to use
Example:
factory = MCPClientFactory.from_provider(
"notion",
{"NOTION_TOKEN": "ntn_abc123..."}
)
"""
# Load template file
templates_dir = Path(__file__).parent / "provider_templates"
template_file = templates_dir / f"{provider}.json.j2"
if not template_file.exists():
available = [p.stem.split(".")[0] for p in templates_dir.glob("*.json.j2")]
raise ValueError(f"Provider '{provider}' not supported. Available: {available}")
# Render the Jinja2 template with credentials
env = jinja2.Environment(autoescape=False)
template = env.from_string(template_file.read_text())
rendered = template.render(**credentials)
# Parse the rendered JSON
tool_config = json.loads(rendered)
# Validate structure
if "mcpServers" not in tool_config:
raise ValueError(
f"Provider template for '{provider}' is invalid. "
"Expected format: {'mcpServers': {'serverName': {...}}}"
)
# Return factory with the rendered config
return cls(config_dict=tool_config)