diff --git a/.idea/workspace.xml b/.idea/workspace.xml new file mode 100644 index 0000000..65dca5e --- /dev/null +++ b/.idea/workspace.xml @@ -0,0 +1,186 @@ + + + + + + + + + + + { + "lastFilter": { + "state": "OPEN", + "assignee": "ZLBillShaw" + } +} + { + "selectedUrlAndAccountId": { + "url": "https://github.com/ZLBillShaw/mcp-clickhouse.git", + "accountId": "e07c32f1-e572-4504-834a-8f3b25850d40" + } +} + + + + + + + { + "associatedIndex": 6 +} + + + + + + + + + + + + + + + 1747724722324 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/.mcp_clickhouse_env b/.mcp_clickhouse_env new file mode 100644 index 0000000..652fc6c --- /dev/null +++ b/.mcp_clickhouse_env @@ -0,0 +1,38 @@ +# MCP Server Configuration +export MCP_SERVER_PORT=3000 # Set port to 3000 +export MCP_SERVER_HOST=0.0.0.0 # Allow connections from all network interfaces + +# Define list of ClickHouse servers to connect to +export CLICKHOUSE_SERVERS=prod,test,dev + +# Default ClickHouse connection +export CLICKHOUSE_HOST=default-clickhouse-host.example.com +export CLICKHOUSE_USER=default_user +export CLICKHOUSE_PASSWORD=default_password +export CLICKHOUSE_DATABASE=default_db +export CLICKHOUSE_PORT=8443 +export CLICKHOUSE_SECURE=true + +# Production ClickHouse +export CLICKHOUSE_PROD_HOST=prod-clickhouse.example.com +export CLICKHOUSE_PROD_USER=prod_user +export CLICKHOUSE_PROD_PASSWORD=prod_password +export CLICKHOUSE_PROD_DATABASE=analytics +export CLICKHOUSE_PROD_PORT=8443 +export CLICKHOUSE_PROD_SECURE=true + +# Test ClickHouse +export CLICKHOUSE_TEST_HOST=test-clickhouse.example.com +export CLICKHOUSE_TEST_USER=test_user +export CLICKHOUSE_TEST_PASSWORD=test_password +export CLICKHOUSE_TEST_DATABASE=test_db +export CLICKHOUSE_TEST_PORT=8443 +export CLICKHOUSE_TEST_SECURE=true + +# Development ClickHouse +export CLICKHOUSE_DEV_HOST=dev-clickhouse.example.com +export CLICKHOUSE_DEV_USER=dev_user +export CLICKHOUSE_DEV_PASSWORD=dev_password +export CLICKHOUSE_DEV_DATABASE=dev_db +export CLICKHOUSE_DEV_PORT=8123 +export CLICKHOUSE_DEV_SECURE=false \ No newline at end of file diff --git a/MULTI_SERVER_CONFIG.md b/MULTI_SERVER_CONFIG.md new file mode 100644 index 0000000..fc2b8b7 --- /dev/null +++ b/MULTI_SERVER_CONFIG.md @@ -0,0 +1,103 @@ +# MCP ClickHouse Multi-Server Connection Guide + +This document explains how to configure mcp-clickhouse to support multiple ClickHouse server connections and custom MCP server ports. + +## Configuring Custom MCP Server Port + +You can set the MCP server's listening port and host through environment variables: + +```sh +# Default port is 8080, default host is 0.0.0.0 +export MCP_SERVER_PORT=9000 # Set custom port +export MCP_SERVER_HOST=127.0.0.1 # Listen only on localhost +``` + +## Configuring Multiple ClickHouse Servers + +You can configure multiple ClickHouse server connections using environment variables. + +### Default Connection + +The default connection is configured using the following environment variables: + +```sh +# Default connection - Required parameters +export CLICKHOUSE_HOST=your-default-clickhouse-host +export CLICKHOUSE_USER=default +export CLICKHOUSE_PASSWORD=your-password + +# Default connection - Optional parameters +export CLICKHOUSE_PORT=8443 # Default is 8443 (HTTPS) or 8123 (HTTP) +export CLICKHOUSE_SECURE=true # Use HTTPS, set to false for HTTP +export CLICKHOUSE_VERIFY=true # Verify SSL certificates +export CLICKHOUSE_DATABASE=default # Default database +export CLICKHOUSE_CONNECT_TIMEOUT=30 # Connection timeout (seconds) +export CLICKHOUSE_SEND_RECEIVE_TIMEOUT=300 # Send/receive timeout (seconds) +``` + +### Additional Connections + +To configure additional ClickHouse server connections, first define a list of server names using the `CLICKHOUSE_SERVERS` environment variable: + +```sh +# Define additional server names (comma-separated) +export CLICKHOUSE_SERVERS=prod,staging,test +``` + +Then, configure the corresponding environment variables for each server, using the format `CLICKHOUSE__`: + +```sh +# Production ClickHouse server +export CLICKHOUSE_PROD_HOST=prod-clickhouse.example.com +export CLICKHOUSE_PROD_USER=prod_user +export CLICKHOUSE_PROD_PASSWORD=prod_password +export CLICKHOUSE_PROD_DATABASE=analytics + +# Test ClickHouse server +export CLICKHOUSE_TEST_HOST=test-clickhouse.example.com +export CLICKHOUSE_TEST_USER=test_user +export CLICKHOUSE_TEST_PASSWORD=test_password +export CLICKHOUSE_TEST_PORT=9440 +``` + +## Using Multi-Server Connections + +In the MCP ClickHouse tools, you can specify which server to use with the additional `clickhouse_server` parameter: + +```python +# Query databases on a specific server +list_databases(clickhouse_server="prod") + +# Query tables on a specific server +list_tables(database="analytics", clickhouse_server="prod") + +# Execute a query on a specific server +run_select_query("SELECT count() FROM analytics.events", clickhouse_server="prod") +``` + +If you don't specify the `clickhouse_server` parameter, the default server (configured by the `CLICKHOUSE_HOST` etc. environment variables) will be used. + +## Viewing Available Servers + +You can use the `list_clickhouse_servers()` function to list all configured ClickHouse servers: + +```python +servers = list_clickhouse_servers() +print(servers) # ['default', 'prod', 'staging', 'test'] +``` + +## Adding this Server to the MCP Client + +If you want to add this server to an MCP client, make sure you have set a custom port and use the following command to add it to the MCP client: + +```sh +mcp add-server http://localhost: +``` + +For example: + +```sh +mcp add-server clickhouse-analyzer http://localhost:9000 +``` + +After adding it, you can interact with the server in the MCP client, including selecting different ClickHouse servers for queries. \ No newline at end of file diff --git a/mcp_clickhouse/__init__.py b/mcp_clickhouse/__init__.py index 879259d..9511aeb 100644 --- a/mcp_clickhouse/__init__.py +++ b/mcp_clickhouse/__init__.py @@ -6,7 +6,9 @@ create_chdb_client, run_chdb_select_query, chdb_initial_prompt, + list_clickhouse_servers, ) +from .mcp_env import get_config, get_all_configs, get_mcp_server_config __all__ = [ "list_databases", @@ -16,4 +18,8 @@ "create_chdb_client", "run_chdb_select_query", "chdb_initial_prompt", + "list_clickhouse_servers", + "get_config", + "get_all_configs", + "get_mcp_server_config", ] diff --git a/mcp_clickhouse/mcp_env.py b/mcp_clickhouse/mcp_env.py index c201a20..6f891fa 100644 --- a/mcp_clickhouse/mcp_env.py +++ b/mcp_clickhouse/mcp_env.py @@ -4,9 +4,9 @@ and type conversion. """ -from dataclasses import dataclass +from dataclasses import dataclass, field import os -from typing import Optional +from typing import Optional, Dict, List from enum import Enum @@ -49,126 +49,123 @@ class ClickHouseConfig: CLICKHOUSE_ENABLED: Enable ClickHouse server (default: true) """ - def __init__(self): - """Initialize the configuration from environment variables.""" + def __init__(self, name: str = "default", env_prefix: str = "CLICKHOUSE"): + """Initialize the configuration from environment variables. + + Args: + name: Name identifier for this ClickHouse connection + env_prefix: Prefix for environment variables + """ + self.name = name + self.env_prefix = env_prefix if self.enabled: self._validate_required_vars() + def _env_key(self, key: str) -> str: + """Generate environment variable key based on server name.""" + if self.name == "default": + return f"{self.env_prefix}_{key}" + else: + return f"{self.env_prefix}_{self.name.upper()}_{key}" + + def _get_env(self, key: str, default=None) -> Optional[str]: + """Get environment variable value.""" + return os.getenv(self._env_key(key), default) + + def _get_bool(self, key: str, default: bool = True) -> bool: + """Get boolean environment variable value.""" + return self._get_env(key, str(default)).lower() == "true" + + def _get_int(self, key: str, default: int) -> int: + """Get integer environment variable value.""" + return int(self._get_env(key, str(default))) + + def _validate_required_vars(self): + """Validate that all required environment variables are set.""" + required = ["HOST", "USER", "PASSWORD"] + missing = [k for k in required if not self._get_env(k)] + if missing: + raise ValueError(f"Missing required ClickHouse env vars for '{self.name}': {missing}") + @property def enabled(self) -> bool: - """Get whether ClickHouse server is enabled. - - Default: True - """ - return os.getenv("CLICKHOUSE_ENABLED", "true").lower() == "true" + """Get whether ClickHouse server is enabled.""" + return self._get_bool("ENABLED", True) @property def host(self) -> str: - """Get the ClickHouse host.""" - return os.environ["CLICKHOUSE_HOST"] + """Get the ClickHouse hostname.""" + return self._get_env("HOST") @property def port(self) -> int: - """Get the ClickHouse port. - - Defaults to 8443 if secure=True, 8123 if secure=False. - Can be overridden by CLICKHOUSE_PORT environment variable. - """ - if "CLICKHOUSE_PORT" in os.environ: - return int(os.environ["CLICKHOUSE_PORT"]) + """Get the ClickHouse port number.""" + if self._get_env("PORT") is not None: + return self._get_int("PORT", 8123) return 8443 if self.secure else 8123 @property def username(self) -> str: """Get the ClickHouse username.""" - return os.environ["CLICKHOUSE_USER"] + return self._get_env("USER") @property def password(self) -> str: """Get the ClickHouse password.""" - return os.environ["CLICKHOUSE_PASSWORD"] + return self._get_env("PASSWORD") @property def database(self) -> Optional[str]: - """Get the default database name if set.""" - return os.getenv("CLICKHOUSE_DATABASE") + """Get the default database name.""" + return self._get_env("DATABASE") @property def secure(self) -> bool: - """Get whether HTTPS is enabled. - - Default: True - """ - return os.getenv("CLICKHOUSE_SECURE", "true").lower() == "true" + """Get whether to use HTTPS connection.""" + return self._get_bool("SECURE", True) @property def verify(self) -> bool: - """Get whether SSL certificate verification is enabled. - - Default: True - """ - return os.getenv("CLICKHOUSE_VERIFY", "true").lower() == "true" + """Get whether to verify SSL certificates.""" + return self._get_bool("VERIFY", True) @property def connect_timeout(self) -> int: - """Get the connection timeout in seconds. - - Default: 30 - """ - return int(os.getenv("CLICKHOUSE_CONNECT_TIMEOUT", "30")) + """Get the connection timeout in seconds.""" + return self._get_int("CONNECT_TIMEOUT", 30) @property def send_receive_timeout(self) -> int: - """Get the send/receive timeout in seconds. - - Default: 300 (ClickHouse default) - """ - return int(os.getenv("CLICKHOUSE_SEND_RECEIVE_TIMEOUT", "300")) + """Get the send/receive timeout in seconds.""" + return self._get_int("SEND_RECEIVE_TIMEOUT", 300) @property - def proxy_path(self) -> str: - return os.getenv("CLICKHOUSE_PROXY_PATH") + def proxy_path(self) -> Optional[str]: + """Get the proxy path.""" + return self._get_env("PROXY_PATH") @property def mcp_server_transport(self) -> str: - """Get the MCP server transport method. - - Valid options: "stdio", "http", "sse" - Default: "stdio" - """ + """Get the MCP server transport method.""" transport = os.getenv("CLICKHOUSE_MCP_SERVER_TRANSPORT", TransportType.STDIO.value).lower() - - # Validate transport type if transport not in TransportType.values(): - valid_options = ", ".join(f'"{t}"' for t in TransportType.values()) - raise ValueError(f"Invalid transport '{transport}'. Valid options: {valid_options}") + valid = ", ".join(f'"{t}"' for t in TransportType.values()) + raise ValueError(f"Invalid transport '{transport}'. Valid: {valid}") return transport @property def mcp_bind_host(self) -> str: - """Get the host to bind the MCP server to. - - Only used when transport is "http" or "sse". - Default: "127.0.0.1" - """ + """Get the host to bind the MCP server to.""" return os.getenv("CLICKHOUSE_MCP_BIND_HOST", "127.0.0.1") @property def mcp_bind_port(self) -> int: - """Get the port to bind the MCP server to. - - Only used when transport is "http" or "sse". - Default: 8000 - """ + """Get the port to bind the MCP server to.""" return int(os.getenv("CLICKHOUSE_MCP_BIND_PORT", "8000")) def get_client_config(self) -> dict: - """Get the configuration dictionary for clickhouse_connect client. - - Returns: - dict: Configuration ready to be passed to clickhouse_connect.get_client() - """ - config = { + """Get the configuration dictionary for clickhouse_connect client.""" + cfg = { "host": self.host, "port": self.port, "username": self.username, @@ -177,43 +174,69 @@ def get_client_config(self) -> dict: "verify": self.verify, "connect_timeout": self.connect_timeout, "send_receive_timeout": self.send_receive_timeout, - "client_name": "mcp_clickhouse", + "client_name": f"mcp_clickhouse_{self.name}", } - - # Add optional database if set if self.database: - config["database"] = self.database - + cfg["database"] = self.database if self.proxy_path: - config["proxy_path"] = self.proxy_path + cfg["proxy_path"] = self.proxy_path + return cfg - return config + def validate(self) -> bool: + """Validate if configuration is complete.""" + return bool(self.host and self.username and self.password) - def _validate_required_vars(self) -> None: - """Validate that all required environment variables are set. - Raises: - ValueError: If any required environment variable is missing. - """ - missing_vars = [] - for var in ["CLICKHOUSE_HOST", "CLICKHOUSE_USER", "CLICKHOUSE_PASSWORD"]: - if var not in os.environ: - missing_vars.append(var) - - if missing_vars: - raise ValueError(f"Missing required environment variables: {', '.join(missing_vars)}") +@dataclass +class MultiClickHouseConfig: + """Manager for multiple ClickHouse connection configurations.""" + configs: Dict[str, ClickHouseConfig] = field(default_factory=dict) + default_config_name: str = "default" + + def __init__(self, allow_empty: bool = False): + """Initialize all ClickHouse connection configurations from environment variables.""" + self.configs = {} + self.default_config_name = None + + # Load server names from CLICKHOUSE_SERVERS environment variable + servers_str = os.getenv("CLICKHOUSE_SERVERS", "") + names = [n.strip() for n in servers_str.split(",")] if servers_str else [] + + # Process named servers first + for name in names: + if name and name != "default": + cfg = ClickHouseConfig(name=name) + if cfg.validate(): + self.configs[name] = cfg + if self.default_config_name is None: + self.default_config_name = name + + # Try to load default configuration + if self.default_config_name is None: + default_cfg = ClickHouseConfig(name="default") + if default_cfg.validate(): + self.configs["default"] = default_cfg + self.default_config_name = "default" + + if not self.configs and not allow_empty: + raise ValueError("No valid ClickHouse configuration found.") + + def get_config(self, name: Optional[str] = None) -> ClickHouseConfig: + """Get configuration by name, or default if name not specified or not found.""" + if name and name in self.configs: + return self.configs[name] + if self.default_config_name and self.default_config_name in self.configs: + return self.configs[self.default_config_name] + raise ValueError("No valid ClickHouse configuration found.") + + def get_available_servers(self) -> List[str]: + """Get list of all available ClickHouse server names.""" + return list(self.configs.keys()) @dataclass class ChDBConfig: - """Configuration for chDB connection settings. - - This class handles all environment variable configuration with sensible defaults - and type conversion. It provides typed methods for accessing each configuration value. - - Required environment variables: - CHDB_DATA_PATH: The path to the chDB data directory (only required if CHDB_ENABLED=true) - """ + """Configuration for chDB connection settings.""" def __init__(self): """Initialize the configuration from environment variables.""" @@ -222,10 +245,7 @@ def __init__(self): @property def enabled(self) -> bool: - """Get whether chDB is enabled. - - Default: False - """ + """Get whether chDB is enabled.""" return os.getenv("CHDB_ENABLED", "false").lower() == "true" @property @@ -234,50 +254,72 @@ def data_path(self) -> str: return os.getenv("CHDB_DATA_PATH", ":memory:") def get_client_config(self) -> dict: - """Get the configuration dictionary for chDB client. - - Returns: - dict: Configuration ready to be passed to chDB client - """ - return { - "data_path": self.data_path, - } + """Get the configuration dictionary for chDB client.""" + return {"data_path": self.data_path} def _validate_required_vars(self) -> None: - """Validate that all required environment variables are set. - - Raises: - ValueError: If any required environment variable is missing. - """ + """Validate that all required environment variables are set.""" + # Only data_path is optional, no required vars pass -# Global instance placeholders for the singleton pattern +@dataclass +class MCPServerConfig: + """MCP server configuration.""" + port: int = 8080 + host: str = "0.0.0.0" + + def __init__(self): + """Initialize MCP server configuration from environment variables.""" + if os.getenv("MCP_SERVER_PORT"): + self.port = int(os.getenv("MCP_SERVER_PORT")) + if os.getenv("MCP_SERVER_HOST"): + self.host = os.getenv("MCP_SERVER_HOST") + + +# Global singletons +_MULTI_CONFIG_INSTANCE = None +_MCP_SERVER_CONFIG = None _CONFIG_INSTANCE = None _CHDB_CONFIG_INSTANCE = None -def get_config(): - """ - Gets the singleton instance of ClickHouseConfig. - Instantiates it on the first call. - """ - global _CONFIG_INSTANCE - if _CONFIG_INSTANCE is None: - # Instantiate the config object here, ensuring load_dotenv() has likely run - _CONFIG_INSTANCE = ClickHouseConfig() - return _CONFIG_INSTANCE - +def get_config(name: Optional[str] = None, allow_empty: bool = False) -> ClickHouseConfig: + """Get ClickHouse configuration instance. -def get_chdb_config() -> ChDBConfig: - """ - Gets the singleton instance of ChDBConfig. - Instantiates it on the first call. + Args: + name: Optional configuration name, uses default if not specified + allow_empty: Allow empty configurations for testing Returns: - ChDBConfig: The chDB configuration instance + ClickHouse configuration instance for the specified name """ + global _MULTI_CONFIG_INSTANCE + if _MULTI_CONFIG_INSTANCE is None: + _MULTI_CONFIG_INSTANCE = MultiClickHouseConfig(allow_empty=allow_empty) + return _MULTI_CONFIG_INSTANCE.get_config(name) + + +def get_all_configs(allow_empty: bool = False) -> MultiClickHouseConfig: + """Get multi-ClickHouse configuration manager instance.""" + global _MULTI_CONFIG_INSTANCE + if _MULTI_CONFIG_INSTANCE is None: + _MULTI_CONFIG_INSTANCE = MultiClickHouseConfig(allow_empty=allow_empty) + return _MULTI_CONFIG_INSTANCE + + +def get_mcp_server_config() -> MCPServerConfig: + """Get MCP server configuration instance.""" + global _MCP_SERVER_CONFIG + if _MCP_SERVER_CONFIG is None: + _MCP_SERVER_CONFIG = MCPServerConfig() + return _MCP_SERVER_CONFIG + + +def get_chdb_config() -> ChDBConfig: + """Get chDB configuration instance.""" global _CHDB_CONFIG_INSTANCE if _CHDB_CONFIG_INSTANCE is None: _CHDB_CONFIG_INSTANCE = ChDBConfig() return _CHDB_CONFIG_INSTANCE + diff --git a/mcp_clickhouse/mcp_server.py b/mcp_clickhouse/mcp_server.py index e9b9a09..05f5e9e 100644 --- a/mcp_clickhouse/mcp_server.py +++ b/mcp_clickhouse/mcp_server.py @@ -1,6 +1,6 @@ import logging import json -from typing import Optional, List, Any +from typing import Optional, List, Any, Dict import concurrent.futures import atexit import os @@ -17,12 +17,13 @@ from starlette.requests import Request from starlette.responses import PlainTextResponse -from mcp_clickhouse.mcp_env import get_config, get_chdb_config +from mcp_clickhouse.mcp_env import get_config, get_all_configs, get_mcp_server_config, get_chdb_config from mcp_clickhouse.chdb_prompt import CHDB_PROMPT @dataclass class Column: + """ClickHouse column metadata.""" database: str table: str name: str @@ -34,6 +35,7 @@ class Column: @dataclass class Table: + """ClickHouse table metadata.""" database: str name: str engine: str @@ -61,10 +63,12 @@ class Table: ) logger = logging.getLogger(MCP_SERVER_NAME) +# Initialize query executor thread pool QUERY_EXECUTOR = concurrent.futures.ThreadPoolExecutor(max_workers=10) atexit.register(lambda: QUERY_EXECUTOR.shutdown(wait=True)) SELECT_QUERY_TIMEOUT_SECS = 30 +# Load environment variables load_dotenv() mcp = FastMCP( @@ -95,14 +99,40 @@ async def health_check(request: Request) -> PlainTextResponse: def result_to_table(query_columns, result) -> List[Table]: + """Convert query result to Table objects. + + Args: + query_columns: Column names from the query + result: Query result rows + + Returns: + List of Table objects + """ return [Table(**dict(zip(query_columns, row))) for row in result] def result_to_column(query_columns, result) -> List[Column]: + """Convert query result to Column objects. + + Args: + query_columns: Column names from the query + result: Query result rows + + Returns: + List of Column objects + """ return [Column(**dict(zip(query_columns, row))) for row in result] -def to_json(obj: Any) -> str: +def to_json(obj: Any) -> Any: + """Convert dataclasses to JSON-serializable objects. + + Args: + obj: Object to convert + + Returns: + JSON-serializable version of the object + """ if is_dataclass(obj): return json.dumps(asdict(obj), default=to_json) elif isinstance(obj, list): @@ -112,71 +142,194 @@ def to_json(obj: Any) -> str: return obj -def list_databases(): - """List available ClickHouse databases""" - logger.info("Listing all databases") - client = create_clickhouse_client() - result = client.command("SHOW DATABASES") +def list_clickhouse_servers(): + """List all available ClickHouse server configurations.""" + logger.info("Listing all configured ClickHouse servers") + try: + servers = get_all_configs().get_available_servers() + return servers + except Exception as e: + logger.error(f"Error listing servers: {str(e)}") + return {"error": str(e)} + + +def list_databases(clickhouse_server: Optional[str] = None): + """List available ClickHouse databases. + + Args: + clickhouse_server: Optional ClickHouse server name, uses default if not specified + """ + logger.info(f"Listing all databases from server: {clickhouse_server or 'default'}") + try: + if not clickhouse_server: + available_servers = get_all_configs().get_available_servers() + if not available_servers: + return {"error": "No valid ClickHouse configurations available"} + + client = create_clickhouse_client(clickhouse_server) + result = client.command("SHOW DATABASES") - # Convert newline-separated string to list and trim whitespace - if isinstance(result, str): - databases = [db.strip() for db in result.strip().split("\n")] - else: - databases = [result] + # Convert newline-separated string to list and trim whitespace + if isinstance(result, str): + databases = [db.strip() for db in result.strip().split("\n")] + else: + databases = [result] - logger.info(f"Found {len(databases)} databases") - return json.dumps(databases) + logger.info(f"Found {len(databases)} databases") + return json.dumps(databases) + except Exception as e: + logger.error(f"Error listing databases: {str(e)}") + return {"error": str(e)} -def list_tables(database: str, like: Optional[str] = None, not_like: Optional[str] = None): +def list_tables( + database: str, + like: Optional[str] = None, + not_like: Optional[str] = None, + clickhouse_server: Optional[str] = None +): """List available ClickHouse tables in a database, including schema, comment, - row count, and column count.""" - logger.info(f"Listing tables in database '{database}'") - client = create_clickhouse_client() - query = f"SELECT database, name, engine, create_table_query, dependencies_database, dependencies_table, engine_full, sorting_key, primary_key, total_rows, total_bytes, total_bytes_uncompressed, parts, active_parts, total_marks, comment FROM system.tables WHERE database = {format_query_value(database)}" - if like: - query += f" AND name LIKE {format_query_value(like)}" - - if not_like: - query += f" AND name NOT LIKE {format_query_value(not_like)}" - - result = client.query(query) - - # Deserialize result as Table dataclass instances - tables = result_to_table(result.column_names, result.result_rows) - - for table in tables: - column_data_query = f"SELECT database, table, name, type AS column_type, default_kind, default_expression, comment FROM system.columns WHERE database = {format_query_value(database)} AND table = {format_query_value(table.name)}" - column_data_query_result = client.query(column_data_query) - table.columns = [ - c - for c in result_to_column( - column_data_query_result.column_names, - column_data_query_result.result_rows, - ) - ] + row count, and column count. - logger.info(f"Found {len(tables)} tables") - return [asdict(table) for table in tables] + Args: + database: Database name + like: Optional table name pattern to match + not_like: Optional table name pattern to exclude + clickhouse_server: Optional ClickHouse server name, uses default if not specified + """ + logger.info(f"Listing tables in database '{database}' from server: {clickhouse_server or 'default'}") + try: + client = create_clickhouse_client(clickhouse_server) + query = f"SELECT database, name, engine, create_table_query, dependencies_database, dependencies_table, engine_full, sorting_key, primary_key, total_rows, total_bytes, total_bytes_uncompressed, parts, active_parts, total_marks, comment FROM system.tables WHERE database = {format_query_value(database)}" + if like: + query += f" AND name LIKE {format_query_value(like)}" + + if not_like: + query += f" AND name NOT LIKE {format_query_value(not_like)}" + + result = client.query(query) + + # Deserialize result as Table dataclass instances + tables = result_to_table(result.column_names, result.result_rows) + + for table in tables: + column_data_query = f"SELECT database, table, name, type AS column_type, default_kind, default_expression, comment FROM system.columns WHERE database = {format_query_value(database)} AND table = {format_query_value(table.name)}" + column_data_query_result = client.query(column_data_query) + table.columns = [ + c + for c in result_to_column( + column_data_query_result.column_names, + column_data_query_result.result_rows, + ) + ] + + logger.info(f"Found {len(tables)} tables") + return [asdict(table) for table in tables] + except Exception as e: + logger.error(f"Error listing tables: {str(e)}") + return {"error": str(e)} -def execute_query(query: str): - client = create_clickhouse_client() +def execute_query(query: str, clickhouse_server: Optional[str] = None): + """Execute a query on the ClickHouse server. + + Args: + query: SQL query to execute + clickhouse_server: Optional server name + + Returns: + Query results or error dictionary + """ try: - read_only = get_readonly_setting(client) - res = client.query(query, settings={"readonly": read_only}) - logger.info(f"Query returned {len(res.result_rows)} rows") - return {"columns": res.column_names, "rows": res.result_rows} - except Exception as err: - logger.error(f"Error executing query: {err}") - raise ToolError(f"Query execution failed: {str(err)}") + query_upper = query.upper().strip() + + allowed_prefixes = [ + "SELECT ", + "SHOW ", + "DESCRIBE ", + "DESC ", + "EXISTS ", + "EXPLAIN " + ] + + forbidden_keywords = [ + "INSERT", + "UPDATE", + "DELETE", + "DROP", + "CREATE", + "ALTER", + "RENAME", + "TRUNCATE", + "OPTIMIZE", + "KILL", + "ATTACH", + "DETACH", + "SYSTEM", + "GRANT", + "REVOKE", + "SET " + ] + + is_allowed = any(query_upper.startswith(prefix) for prefix in allowed_prefixes) + contains_forbidden = any(f" {keyword} " in f" {query_upper} " or query_upper.startswith(keyword) for keyword in forbidden_keywords) -def run_select_query(query: str): - """Run a SELECT query in a ClickHouse database""" - logger.info(f"Executing SELECT query: {query}") + if not is_allowed or contains_forbidden: + logger.warning(f"Rejected non-read-only query: {query}") + return { + "error": "Only read-only queries (SELECT, SHOW, DESCRIBE, etc.) are allowed for security reasons." + } + + client = create_clickhouse_client(clickhouse_server) + + # Check for server name prefix in query and clean it + if clickhouse_server and f"{clickhouse_server}." in query: + query = query.replace(f"{clickhouse_server}.", "") + logger.info(f"Removed server name prefix from query") + + try: + # Get readonly setting + read_only = get_readonly_setting(client) + res = client.query(query, settings={"readonly": read_only}) + column_names = res.column_names + rows = [] + for row in res.result_rows: + row_dict = {} + for i, col_name in enumerate(column_names): + # Handle special data types for serialization + value = row[i] + if hasattr(value, 'isoformat'): # For datetime objects + value = value.isoformat() + row_dict[col_name] = value + rows.append(row_dict) + logger.info(f"Query returned {len(rows)} rows") + return rows + except Exception as err: + logger.error(f"Error executing query: {err}") + # Return a structured dictionary rather than a string to ensure proper serialization + # by the MCP protocol. String responses for errors can cause BrokenResourceError. + return {"error": str(err)} + except Exception as e: + logger.error(f"Failed to create client: {e}") + return {"error": f"Connection error: {str(e)}"} + + +def run_select_query(query: str, clickhouse_server: Optional[str] = None): + """Run a SELECT query in a ClickHouse database. + + Args: + query: SQL query to execute + clickhouse_server: Optional ClickHouse server name, uses default if not specified + """ + logger.info(f"Executing SELECT query on server '{clickhouse_server or 'default'}': {query}") try: - future = QUERY_EXECUTOR.submit(execute_query, query) + # Auto-correct query if it contains server name as database prefix + if clickhouse_server and f"{clickhouse_server}." in query: + query = query.replace(f"{clickhouse_server}.", "") + logger.info(f"Modified query to remove server prefix") + + future = QUERY_EXECUTOR.submit(execute_query, query, clickhouse_server) try: result = future.result(timeout=SELECT_QUERY_TIMEOUT_SECS) # Check if we received an error structure from execute_query @@ -188,6 +341,17 @@ def run_select_query(query: str): "status": "error", "message": f"Query failed: {result['error']}", } + + # Ensure result is serializable + try: + json.dumps(result) + except (TypeError, OverflowError) as e: + logger.error(f"Query result contains non-serializable data: {e}") + return { + "status": "error", + "message": f"Result format error: {e}", + } + return result except concurrent.futures.TimeoutError: logger.warning(f"Query timed out after {SELECT_QUERY_TIMEOUT_SECS} seconds: {query}") @@ -197,11 +361,35 @@ def run_select_query(query: str): raise except Exception as e: logger.error(f"Unexpected error in run_select_query: {str(e)}") - raise RuntimeError(f"Unexpected error during query execution: {str(e)}") + # Return structured error instead of raising to prevent MCP serialization failures + return {"status": "error", "message": f"Unexpected error: {str(e)}"} + + +def get_readonly_setting(client): + """Get the appropriate readonly setting for the client.""" + try: + read_only = client.server_settings.get("readonly") + if read_only: + if read_only == "0": + return "1" # Force read-only mode if server has it disabled + else: + return read_only # Respect server's readonly setting (likely 2) + else: + return "1" # Default to basic read-only mode if setting isn't present + except: + return "1" # Default to basic read-only mode if we can't get server settings + + +def create_clickhouse_client(server_name: Optional[str] = None): + """Create a ClickHouse client connection. + Args: + server_name: Optional server name, uses default if not specified -def create_clickhouse_client(): - client_config = get_config().get_client_config() + Returns: + ClickHouse client instance + """ + client_config = get_config(server_name).get_client_config() logger.info( f"Creating ClickHouse client connection to {client_config['host']}:{client_config['port']} " f"as {client_config['username']} " @@ -221,36 +409,6 @@ def create_clickhouse_client(): raise -def get_readonly_setting(client) -> str: - """Get the appropriate readonly setting value to use for queries. - - This function handles potential conflicts between server and client readonly settings: - - readonly=0: No read-only restrictions - - readonly=1: Only read queries allowed, settings cannot be changed - - readonly=2: Only read queries allowed, settings can be changed (except readonly itself) - - If server has readonly=2 and client tries to set readonly=1, it would cause: - "Setting readonly is unknown or readonly" error - - This function preserves the server's readonly setting unless it's 0, in which case - we enforce readonly=1 to ensure queries are read-only. - - Args: - client: ClickHouse client connection - - Returns: - String value of readonly setting to use - """ - read_only = client.server_settings.get("readonly") - if read_only: - if read_only == "0": - return "1" # Force read-only mode if server has it disabled - else: - return read_only.value # Respect server's readonly setting (likely 2) - else: - return "1" # Default to basic read-only mode if setting isn't present - - def create_chdb_client(): """Create a chDB client connection.""" if not get_chdb_config().enabled: @@ -335,6 +493,7 @@ def _init_chdb_client(): # Register tools based on configuration if os.getenv("CLICKHOUSE_ENABLED", "true").lower() == "true": + mcp.add_tool(Tool.from_function(list_clickhouse_servers)) mcp.add_tool(Tool.from_function(list_databases)) mcp.add_tool(Tool.from_function(list_tables)) mcp.add_tool(Tool.from_function(run_select_query)) @@ -354,3 +513,13 @@ def _init_chdb_client(): ) mcp.add_prompt(chdb_prompt) logger.info("chDB tools and prompts registered") + + +def run_server(): + """Start the MCP server with the configured transport and port settings.""" + server_config = get_mcp_server_config() + logger.info(f"Starting MCP server on {server_config.host}:{server_config.port} with streamable-http transport") + + # Use streamable-http transport to listen for HTTP requests + mcp.run(transport="streamable-http") + diff --git a/run.py b/run.py new file mode 100644 index 0000000..70aacd8 --- /dev/null +++ b/run.py @@ -0,0 +1,35 @@ +#!/usr/bin/env python3 +""" +Launch script for MCP ClickHouse Server. + +This script loads environment variables and starts the MCP server. +""" + +import os +from dotenv import load_dotenv +import logging + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" +) +logger = logging.getLogger("mcp-clickhouse") + +def main(): + """Initialize environment and start the server.""" + logger.info("Initializing MCP ClickHouse server") + + # Load environment variables + env_file = '.mcp_clickhouse_env' + if os.path.exists(env_file): + logger.info(f"Loading environment from {env_file}") + load_dotenv(env_file) + + # Start MCP server + logger.info("Starting MCP server") + from mcp_clickhouse.main import main as mcp_main + mcp_main() + +if __name__ == "__main__": + main() \ No newline at end of file