diff --git a/.github/workflows/code-quality.yml b/.github/workflows/code-quality.yml index 841fdba..f28ebb6 100644 --- a/.github/workflows/code-quality.yml +++ b/.github/workflows/code-quality.yml @@ -46,7 +46,7 @@ jobs: echo "::endgroup::" echo "::group::Running MyPy" - mypy_output=$(mypy . --explicit-package-bases 2>&1) || mypy_failed=true + mypy_output=$(mypy fabric_rti_mcp --explicit-package-bases 2>&1) || mypy_failed=true echo "$mypy_output" echo "::endgroup::" diff --git a/fabric_rti_mcp/common.py b/fabric_rti_mcp/common.py index 1121ba1..0583199 100644 --- a/fabric_rti_mcp/common.py +++ b/fabric_rti_mcp/common.py @@ -86,7 +86,7 @@ def with_args() -> GlobalFabricRTIConfig: parser.add_argument("--host", type=str, help="HTTP host to listen on") parser.add_argument("--port", type=int, help="HTTP port to listen on") parser.add_argument("--stateless-http", type=bool, help="Enable or disable stateless HTTP") - args = parser.parse_args() + args, _ = parser.parse_known_args() transport = base_config.transport if args.stdio: diff --git a/fabric_rti_mcp/kusto/kusto_formatter.py b/fabric_rti_mcp/kusto/kusto_formatter.py new file mode 100644 index 0000000..a2a980d --- /dev/null +++ b/fabric_rti_mcp/kusto/kusto_formatter.py @@ -0,0 +1,303 @@ +import csv +import io +import json +from dataclasses import dataclass +from typing import Any, cast + +from azure.kusto.data.response import KustoResponseDataSet + + +@dataclass(slots=True, frozen=True) +class KustoResponseFormat: + format: str + data: Any + + +class KustoFormatter: + """Formatter for Kusto query results in various compact formats""" + + @staticmethod + def to_json(result_set: KustoResponseDataSet | None) -> KustoResponseFormat: + if not result_set or not getattr(result_set, "primary_results", None): + return KustoResponseFormat(format="json", data=[]) + + first_result = result_set.primary_results[0] + column_names = [col.column_name for col in first_result.columns] + + return KustoResponseFormat(format="json", data=[dict(zip(column_names, row)) for row in first_result.rows]) + + @staticmethod + def to_csv(result_set: KustoResponseDataSet | None) -> KustoResponseFormat: + if not result_set or not getattr(result_set, "primary_results", None): + return KustoResponseFormat(format="csv", data="") + + first_result = result_set.primary_results[0] + output = io.StringIO() + + # Create CSV writer with standard settings + writer = csv.writer(output, quoting=csv.QUOTE_MINIMAL) + + # Write header + header = [col.column_name for col in first_result.columns] + writer.writerow(header) + + # Write data rows + for row in first_result.rows: + # Convert None to empty string, keep other types + formatted_row = ["" if v is None else v for v in row] + writer.writerow(formatted_row) + + return KustoResponseFormat(format="csv", data=output.getvalue()) + + @staticmethod + def to_tsv(result_set: KustoResponseDataSet | None) -> KustoResponseFormat: + result = KustoResponseFormat(format="tsv", data="") + if not result_set or not getattr(result_set, "primary_results", None): + return result + + first_result = result_set.primary_results[0] + lines: list[str] = [] + + # Header row + header = "\t".join(col.column_name for col in first_result.columns) + lines.append(header) + + # Data rows + for row in first_result.rows: + formatted_row: list[str] = [] + for value in row: + if value is None: + formatted_row.append("") + else: + # Escape tabs, newlines, and backslashes + str_value = str(value) + str_value = str_value.replace("\\", "\\\\") # Escape backslashes first + str_value = str_value.replace("\t", "\\t") + str_value = str_value.replace("\n", "\\n") + str_value = str_value.replace("\r", "\\r") + formatted_row.append(str_value) + + lines.append("\t".join(formatted_row)) + + return KustoResponseFormat(format="tsv", data="\n".join(lines)) + + @staticmethod + def to_columnar(result_set: KustoResponseDataSet | None) -> KustoResponseFormat: + if not result_set or not getattr(result_set, "primary_results", None): + return KustoResponseFormat(format="columnar", data={}) + + first_result = result_set.primary_results[0] + + # Build columnar structure + columnar_data: dict[str, list[Any]] = {} + + # Initialize columns + for i, col in enumerate(first_result.columns): + columnar_data[col.column_name] = [] + + # Populate columns + for row in first_result.rows: + for i, col in enumerate(first_result.columns): + columnar_data[col.column_name].append(row[i]) # type: ignore + + # Compact JSON (no spaces) + return KustoResponseFormat(format="columnar", data=columnar_data) + + @staticmethod + def to_header_arrays(result_set: KustoResponseDataSet | None) -> KustoResponseFormat: + if not result_set or not getattr(result_set, "primary_results", None): + return KustoResponseFormat(format="header_arrays", data=[]) + + first_result = result_set.primary_results[0] + lines: list[str] = [] + + # Header as JSON array + columns = [col.column_name for col in first_result.columns] + lines.append(json.dumps(columns, separators=(",", ":"))) + + # Each row as JSON array + for row in first_result.rows: + row_list = list(row) + lines.append(json.dumps(row_list, separators=(",", ":"))) + + return KustoResponseFormat(format="header_arrays", data="\n".join(lines)) + + @staticmethod + def parse(response: KustoResponseFormat | dict[str, Any]) -> list[dict[str, Any]] | None: + """ + Parse any KustoResponseFormat back to canonical JSON array format. + + Args: + response: Either a KustoResponseFormat object or a dict with 'format' and 'data' keys + + Returns: + List of dictionaries where each dict represents a row with column names as keys + """ + if response is None: # type: ignore + return None # type: ignore + + if isinstance(response, dict): + format_type = response.get("format", "") + data = response.get("data") + elif isinstance(response, KustoResponseFormat): # type: ignore + format_type = response.format + data = response.data + else: + raise ValueError("Invalid KustoResponseFormat") + + # Handle None data early + if data is None: + return None + + if format_type == "json": + return KustoFormatter._parse_json(data) + elif format_type == "csv": + return KustoFormatter._parse_csv(data) + elif format_type == "tsv": + return KustoFormatter._parse_tsv(data) + elif format_type == "columnar": + return KustoFormatter._parse_columnar(data) + elif format_type == "header_arrays": + return KustoFormatter._parse_header_arrays(data) + else: + raise ValueError(f"Unsupported format: {format_type}") + + @staticmethod + def _parse_json(data: Any) -> list[dict[str, Any]]: + """Parse JSON format data (already in canonical format)""" + if data is None or (not isinstance(data, list) and not isinstance(data, dict)): # type: ignore + raise ValueError("Invalid JSON format") + return data # type: ignore + + @staticmethod + def _parse_csv(data: str) -> list[dict[str, Any]]: + """Parse CSV format data back to canonical JSON""" + if data == "": + return [] + if data is None: # type: ignore + return None # type: ignore + if not isinstance(data, str): # type: ignore + raise ValueError("Invalid CSV format") + + lines = data.strip().split("\n") + if len(lines) < 1: + raise ValueError("Invalid CSV format") + + # Parse CSV using csv.reader to handle escaping properly + csv_reader = csv.reader(io.StringIO(data)) + rows = list(csv_reader) + + if len(rows) < 1: + return [] + + headers = rows[0] + result: list[dict[str, Any]] = [] + + for row in rows[1:]: + # Pad row with empty strings if shorter than headers + padded_row = row + [""] * (len(headers) - len(row)) + row_dict: dict[str, Any] = {} + for i, header in enumerate(headers): + value = padded_row[i] if i < len(padded_row) else "" + # Convert empty strings back to None if needed + row_dict[header] = None if value == "" else value + result.append(row_dict) + + return result + + @staticmethod + def _parse_tsv(data: str) -> list[dict[str, Any]]: + """Parse TSV format data back to canonical JSON""" + if data == "": + return [] + if not isinstance(data, str): # type: ignore + raise ValueError("Invalid TSV format") + + lines = data.strip().split("\n") + if len(lines) < 1: + raise ValueError("Invalid TSV format") + + # Parse header + headers = lines[0].split("\t") + result: list[dict[str, Any]] = [] + + # Parse data rows + for line in lines[1:]: + values = line.split("\t") + row_dict: dict[str, Any] = {} + + for i, header in enumerate(headers): + value = values[i] if i < len(values) else "" + + # Unescape TSV special characters + if value: + value = value.replace("\\t", "\t") + value = value.replace("\\n", "\n") + value = value.replace("\\r", "\r") + value = value.replace("\\\\", "\\") # Unescape backslashes last + + # Convert empty strings back to None + row_dict[header] = None if value == "" else value + + result.append(row_dict) + + return result + + @staticmethod + def _parse_columnar(data: Any) -> list[dict[str, Any]]: + """Parse columnar format data back to canonical JSON""" + if data is None or not isinstance(data, dict): + raise ValueError("Invalid columnar format") + data = cast(dict[str, list[Any]], data) + + # Get column names and determine row count + columns: list[str] = list(data.keys()) # type: ignore + if not columns: + return [] + + # All columns should have the same length + row_count = len(data[columns[0]]) if columns[0] in data else 0 + + result: list[dict[str, Any]] = [] + for row_idx in range(row_count): + row_dict: dict[str, Any] = {} + for col_name in columns: + col_values = data.get(col_name, []) + row_dict[col_name] = col_values[row_idx] if row_idx < len(col_values) else None + result.append(row_dict) + + return result + + @staticmethod + def _parse_header_arrays(data: str) -> list[dict[str, Any]]: + """Parse header_arrays format data back to canonical JSON""" + if data is None or not isinstance(data, str): # type: ignore + raise ValueError("Invalid header_arrays format") + + lines = data.strip().split("\n") + if len(lines) < 1: + return [] + + try: + # Parse header (first line) + headers: list[str] = json.loads(lines[0]) + if not isinstance(headers, list): # type: ignore + return [] # type: ignore + + result: list[dict[str, Any]] = [] + + # Parse data rows (remaining lines) + for line in lines[1:]: + row_values: list[Any] = json.loads(line) + if not isinstance(row_values, list): # type: ignore + continue # type: ignore + + row_dict: dict[str, Any] = {} + for i, header in enumerate(headers): + row_dict[header] = row_values[i] if i < len(row_values) else None + result.append(row_dict) + + return result + + except json.JSONDecodeError: + return [] diff --git a/fabric_rti_mcp/kusto/kusto_response_formatter.py b/fabric_rti_mcp/kusto/kusto_response_formatter.py deleted file mode 100644 index 7f1984d..0000000 --- a/fabric_rti_mcp/kusto/kusto_response_formatter.py +++ /dev/null @@ -1,13 +0,0 @@ -from typing import Any, Dict, List, Optional - -from azure.kusto.data.response import KustoResponseDataSet - - -def format_results(result_set: Optional[KustoResponseDataSet]) -> List[Dict[str, Any]]: - if not result_set or not getattr(result_set, "primary_results", None): - return [] - - first_result = result_set.primary_results[0] - column_names = [col.column_name for col in first_result.columns] - - return [dict(zip(column_names, row)) for row in first_result.rows] diff --git a/fabric_rti_mcp/kusto/kusto_service.py b/fabric_rti_mcp/kusto/kusto_service.py index 24986d3..a8ebb0b 100644 --- a/fabric_rti_mcp/kusto/kusto_service.py +++ b/fabric_rti_mcp/kusto/kusto_service.py @@ -15,7 +15,7 @@ from fabric_rti_mcp.common import logger from fabric_rti_mcp.kusto.kusto_config import KustoConfig from fabric_rti_mcp.kusto.kusto_connection import KustoConnection, sanitize_uri -from fabric_rti_mcp.kusto.kusto_response_formatter import format_results +from fabric_rti_mcp.kusto.kusto_formatter import KustoFormatter _DEFAULT_DB_NAME = KustoConnectionStringBuilder.DEFAULT_DATABASE_NAME CONFIG = KustoConfig.from_env() @@ -115,7 +115,7 @@ def _execute( cluster_uri: str, readonly_override: bool = False, database: Optional[str] = None, -) -> List[Dict[str, Any]]: +) -> Dict[str, Any]: caller_frame = inspect.currentframe().f_back # type: ignore action_name = caller_frame.f_code.co_name # type: ignore caller_func = caller_frame.f_globals.get(action_name) # type: ignore @@ -136,7 +136,7 @@ def _execute( database = database.strip() result_set = client.execute(database, query, crp) - return format_results(result_set) + return asdict(KustoFormatter.to_columnar(result_set)) except Exception as e: error_msg = f"Error executing Kusto operation '{action_name}' (correlation ID: {correlation_id}): {str(e)}" @@ -156,7 +156,7 @@ def kusto_known_services() -> List[Dict[str, str]]: return [asdict(service) for service in services] -def kusto_query(query: str, cluster_uri: str, database: Optional[str] = None) -> List[Dict[str, Any]]: +def kusto_query(query: str, cluster_uri: str, database: Optional[str] = None) -> Dict[str, Any]: """ Executes a KQL query on the specified database. If no database is provided, it will use the default database. @@ -170,7 +170,7 @@ def kusto_query(query: str, cluster_uri: str, database: Optional[str] = None) -> @destructive_operation -def kusto_command(command: str, cluster_uri: str, database: Optional[str] = None) -> List[Dict[str, Any]]: +def kusto_command(command: str, cluster_uri: str, database: Optional[str] = None) -> Dict[str, Any]: """ Executes a kusto management command on the specified database. If no database is provided, it will use the default database. @@ -183,7 +183,7 @@ def kusto_command(command: str, cluster_uri: str, database: Optional[str] = None return _execute(command, cluster_uri, database=database) -def kusto_list_databases(cluster_uri: str) -> List[Dict[str, Any]]: +def kusto_list_databases(cluster_uri: str) -> Dict[str, Any]: """ Retrieves a list of all databases in the Kusto cluster. @@ -193,18 +193,18 @@ def kusto_list_databases(cluster_uri: str) -> List[Dict[str, Any]]: return _execute(".show databases", cluster_uri) -def kusto_list_tables(cluster_uri: str, database: str) -> List[Dict[str, Any]]: +def kusto_list_tables(cluster_uri: str, database: str) -> Dict[str, Any]: """ Retrieves a list of all tables in the specified database. :param cluster_uri: The URI of the Kusto cluster. :param database: The name of the database to list tables from. - :return: List of dictionaries containing table information. + :return: List of dictionaries containing table names, cslschema and a docstring. """ - return _execute(".show tables", cluster_uri, database=database) + return _execute(".show database cslschema | project-away DatabaseName", cluster_uri, database=database) -def kusto_get_entities_schema(cluster_uri: str, database: Optional[str] = None) -> List[Dict[str, Any]]: +def kusto_get_entities_schema(cluster_uri: str, database: Optional[str] = None) -> Dict[str, Any]: """ Retrieves schema information for all entities (tables, materialized views, functions) in the specified database. If no database is provided, uses the default database. @@ -222,7 +222,7 @@ def kusto_get_entities_schema(cluster_uri: str, database: Optional[str] = None) ) -def kusto_get_table_schema(table_name: str, cluster_uri: str, database: Optional[str] = None) -> List[Dict[str, Any]]: +def kusto_get_table_schema(table_name: str, cluster_uri: str, database: Optional[str] = None) -> Dict[str, Any]: """ Retrieves the schema information for a specific table in the specified database. If no database is provided, uses the default database. @@ -239,7 +239,7 @@ def kusto_get_function_schema( function_name: str, cluster_uri: str, database: Optional[str] = None, -) -> List[Dict[str, Any]]: +) -> Dict[str, Any]: """ Retrieves schema information for a specific function, including parameters and output schema. If no database is provided, uses the default database. @@ -257,7 +257,7 @@ def kusto_sample_table_data( cluster_uri: str, sample_size: int = 10, database: Optional[str] = None, -) -> List[Dict[str, Any]]: +) -> Dict[str, Any]: """ Retrieves a random sample of records from the specified table. If no database is provided, uses the default database. @@ -276,7 +276,7 @@ def kusto_sample_function_data( cluster_uri: str, sample_size: int = 10, database: Optional[str] = None, -) -> List[Dict[str, Any]]: +) -> Dict[str, Any]: """ Retrieves a random sample of records from the result of a function call. If no database is provided, uses the default database. @@ -300,7 +300,7 @@ def kusto_ingest_inline_into_table( data_comma_separator: str, cluster_uri: str, database: Optional[str] = None, -) -> List[Dict[str, Any]]: +) -> Dict[str, Any]: """ Ingests inline CSV data into a specified table. The data should be provided as a comma-separated string. If no database is provided, uses the default database. @@ -325,7 +325,7 @@ def kusto_get_shots( sample_size: int = 3, database: Optional[str] = None, embedding_endpoint: Optional[str] = None, -) -> List[Dict[str, Any]]: +) -> Dict[str, Any]: """ Retrieves shots that are most semantic similar to the supplied prompt from the specified shots table. diff --git a/makefile b/makefile index fb83a60..20ccc63 100644 --- a/makefile +++ b/makefile @@ -4,17 +4,12 @@ fmt: lint: flake8 . - mypy . --explicit-package-bases + mypy fabric_rti_mcp --explicit-package-bases test: pytest -precommit: - isort . - black . - flake8 . - mypy . --explicit-package-bases - pytest +precommit: fmt lint test run: uvx . diff --git a/tests/live/test_kusto_tools_live.py b/tests/live/test_kusto_tools_live.py index 0b89bd5..375f3a3 100644 --- a/tests/live/test_kusto_tools_live.py +++ b/tests/live/test_kusto_tools_live.py @@ -6,8 +6,11 @@ import json import os import sys +from datetime import datetime, timedelta, timezone from typing import Any +from fabric_rti_mcp.kusto.kusto_formatter import KustoFormatter + try: from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client @@ -244,22 +247,20 @@ async def test_list_databases(self) -> None: print(f"List databases result: {json.dumps(result, indent=2)}") if result.get("success"): - # Handle both list and single object responses - databases = result.get("result", []) - if not isinstance(databases, list): - databases = [databases] if databases else [] + # Use the new parser to convert to canonical format + query_result = result.get("result", {}) + parsed_data = KustoFormatter.parse(query_result) + + # Extract database names from parsed rows + databases = [row.get("DatabaseName", "") for row in parsed_data] + databases = [db for db in databases if db] # Filter out empty strings # Assert minimum count to verify array fix is working min_expected_dbs = 8 - if len(databases) < min_expected_dbs: - raise AssertionError( - f"Expected at least {min_expected_dbs} databases, got {len(databases)}." - "This suggests the array truncation bug is still present." - ) - - print(f"✅ Found {len(databases)} databases (>= {min_expected_dbs} as expected)") - for db in databases[:5]: # Show first 5 - print(f" - {db.get('DatabaseName', 'N/A')}") + assert ( + len(databases) >= min_expected_dbs + ), f"Expected at least {min_expected_dbs} databases, got {len(databases)}." + print(f"✅ Found {len(databases)} databases") else: print(f"❌ Failed to list databases: {result}") raise AssertionError(f"Database listing failed: {result}") @@ -284,16 +285,24 @@ async def test_simple_query(self) -> None: "kusto_query", {"query": "print now()", "cluster_uri": self.test_cluster_uri, "database": self.test_database}, ) - print(f"Query result: {json.dumps(result, indent=2)}") if result.get("success"): - # Handle both list and single object responses - query_results = result.get("result", []) - if not isinstance(query_results, list): - query_results = [query_results] if query_results else [] - print(f"✅ Query executed successfully, got {len(query_results)} rows") - if query_results: - print(f" Sample result: {query_results[0]}") + # Use the new parser to convert to canonical format + query_results = result.get("result", {}) + print(f"Query result: {json.dumps(query_results, indent=2)}") + parsed_data = KustoFormatter.parse(query_results) + + if parsed_data and len(parsed_data) > 0: + # Get the timestamp value from the first row + scalar_value = parsed_data[0].get("print_0", "") + print(f"✅ Query succeeded, current time from Kusto: {scalar_value}") + if scalar_value: + parsed_date = datetime.fromisoformat(scalar_value.replace("Z", "+00:00")) + assert datetime.now(tz=timezone.utc) - parsed_date < timedelta( + minutes=1 + ), "Query result is stale" + else: + print("❌ No data returned from query") else: print(f"❌ Query failed: {result}") @@ -314,25 +323,22 @@ async def test_list_tables(self) -> None: result = await self.client.call_tool( "kusto_list_tables", {"cluster_uri": self.test_cluster_uri, "database": self.test_database} ) - print(f"List tables result: {json.dumps(result, indent=2)}") if result.get("success"): - # Handle both list and single object responses - tables = result.get("result", []) - if not isinstance(tables, list): - tables = [tables] if tables else [] + # Use the new parser to convert to canonical format + query_result = result.get("result", {}) + parsed_data = KustoFormatter.parse(query_result) + + # Extract table names from parsed rows + tables = [row.get("TableName", "") for row in parsed_data] + tables = [table for table in tables if table] # Filter out empty strings # Assert minimum count to verify array fix is working min_expected_tables = 50 # Samples database has many tables - if len(tables) < min_expected_tables: - raise AssertionError( - f"Expected at least {min_expected_tables} tables, got {len(tables)}. " - "This suggests the array truncation bug is still present." - ) - + assert ( + len(tables) > min_expected_tables + ), f"Expected at least {min_expected_tables} tables, got {len(tables)}. " print(f"✅ Found {len(tables)} tables (>= {min_expected_tables} as expected)") - for table in tables[:5]: # Show first 5 - print(f" - {table.get('TableName', 'N/A')}") else: print(f"❌ Failed to list tables: {result}") raise AssertionError(f"Table listing failed: {result}") @@ -361,7 +367,6 @@ async def test_table_sample(self) -> None: "database": self.test_database, }, ) - print(f"Sample table data result (truncated): {str(result)[:500]}...") if result.get("success"): # Handle both list and single object responses diff --git a/tests/unit/kusto/test_global_timeout.py b/tests/unit/kusto/test_global_timeout.py index cac5e5d..fc19a8a 100644 --- a/tests/unit/kusto/test_global_timeout.py +++ b/tests/unit/kusto/test_global_timeout.py @@ -31,19 +31,18 @@ def test_config_no_timeout_env() -> None: @patch("fabric_rti_mcp.kusto.kusto_service.get_kusto_connection") -@patch("fabric_rti_mcp.kusto.kusto_service.format_results") -def test_global_timeout_applied_to_query(mock_format_results: Mock, mock_get_connection: Mock) -> None: +def test_global_timeout_applied_to_query(mock_get_connection: Mock) -> None: """Test that global timeout is applied to Kusto queries.""" # Mock connection mock_connection = Mock() mock_connection.default_database = "TestDB" mock_client = Mock() + mock_result = Mock() + mock_result.primary_results = None + mock_client.execute.return_value = mock_result mock_connection.query_client = mock_client mock_get_connection.return_value = mock_connection - # Mock format_results to return expected result - mock_format_results.return_value = [{"test": "result"}] - # Mock the kusto config with timeout with patch("fabric_rti_mcp.kusto.kusto_service.CONFIG") as mock_config: mock_config.timeout_seconds = 600 @@ -62,18 +61,19 @@ def test_global_timeout_applied_to_query(mock_format_results: Mock, mock_get_con @patch("fabric_rti_mcp.kusto.kusto_service.get_kusto_connection") -@patch("fabric_rti_mcp.kusto.kusto_service.format_results") -def test_no_timeout_when_not_configured(mock_format_results: Mock, mock_get_connection: Mock) -> None: +def test_no_timeout_when_not_configured(mock_get_connection: Mock) -> None: """Test that no timeout is set when not configured.""" # Mock connection mock_connection = Mock() mock_connection.default_database = "TestDB" mock_client = Mock() + mock_result = Mock() + mock_result.primary_results = None + mock_client.execute.return_value = mock_result mock_connection.query_client = mock_client mock_get_connection.return_value = mock_connection - # Mock format_results to return expected result - mock_format_results.return_value = [{"test": "result"}] + # Mock format_results_as_json to return expected result # Mock the kusto config without timeout with patch("fabric_rti_mcp.kusto.kusto_service.CONFIG") as mock_config: diff --git a/tests/unit/kusto/test_kusto_formatter.py b/tests/unit/kusto/test_kusto_formatter.py new file mode 100644 index 0000000..334f3ed --- /dev/null +++ b/tests/unit/kusto/test_kusto_formatter.py @@ -0,0 +1,383 @@ +from typing import Any +from unittest.mock import Mock + +from azure.kusto.data.response import KustoResponseDataSet + +from fabric_rti_mcp.kusto.kusto_formatter import KustoFormatter + + +class TestFormatResults: + """Test cases for the KustoFormatter.to_json function.""" + + def test_KustoFormatter_to_json_with_valid_data(self) -> None: + """Test KustoFormatter.to_json with valid KustoResponseDataSet containing data.""" + # Arrange + mock_column1 = Mock() + mock_column1.column_name = "Name" + mock_column2 = Mock() + mock_column2.column_name = "Age" + mock_column3 = Mock() + mock_column3.column_name = "City" + + mock_primary_result = Mock() + mock_primary_result.columns = [mock_column1, mock_column2, mock_column3] + mock_primary_result.rows = [["Alice", 30, "New York"], ["Bob", 25, "San Francisco"], ["Charlie", 35, "Chicago"]] + + mock_result_set = Mock(spec=KustoResponseDataSet) + mock_result_set.primary_results = [mock_primary_result] + + expected_result: list[dict[str, Any]] = [ + {"Name": "Alice", "Age": 30, "City": "New York"}, + {"Name": "Bob", "Age": 25, "City": "San Francisco"}, + {"Name": "Charlie", "Age": 35, "City": "Chicago"}, + ] + + # Act + result = KustoFormatter.to_json(mock_result_set) + + # Assert + assert result.format == "json" + assert result.data == expected_result + assert len(result.data) == 3 + assert all(isinstance(row, dict) for row in result.data) + + def test_KustoFormatter_to_json_with_single_row(self) -> None: + """Test KustoFormatter.to_json with a single row of data.""" + # Arrange + mock_column = Mock() + mock_column.column_name = "Message" + + mock_primary_result = Mock() + mock_primary_result.columns = [mock_column] + mock_primary_result.rows = [["Hello World"]] + + mock_result_set = Mock(spec=KustoResponseDataSet) + mock_result_set.primary_results = [mock_primary_result] + + expected_result = [{"Message": "Hello World"}] + + # Act + result = KustoFormatter.to_json(mock_result_set) + + # Assert + assert result.format == "json" + assert result.data == expected_result + assert len(result.data) == 1 + + def test_KustoFormatter_to_columnar_with_valid_data(self) -> None: + """Test KustoFormatter.to_columnar with valid data containing escaped characters.""" + # Arrange + mock_column1 = Mock() + mock_column1.column_name = "ID" + mock_column2 = Mock() + mock_column2.column_name = "Message" + mock_column3 = Mock() + mock_column3.column_name = "Details" + + mock_primary_result = Mock() + mock_primary_result.columns = [mock_column1, mock_column2, mock_column3] + mock_primary_result.rows = [[1, "Hello\tWorld", "Line1\nLine2"], [2, 'Quote"Test', "Path\\File"]] + + mock_result_set = Mock(spec=KustoResponseDataSet) + mock_result_set.primary_results = [mock_primary_result] + + expected_data = { + "ID": [1, 2], + "Message": ["Hello\tWorld", 'Quote"Test'], + "Details": ["Line1\nLine2", "Path\\File"], + } + + # Act + result = KustoFormatter.to_columnar(mock_result_set) + + # Assert + assert result.format == "columnar" + assert result.data == expected_data + assert len(result.data["ID"]) == 2 + assert result.data["Message"][0] == "Hello\tWorld" + assert result.data["Details"][1] == "Path\\File" + + def test_KustoFormatter_to_csv_with_valid_data(self) -> None: + """Test KustoFormatter.to_csv with valid data containing escaped characters.""" + # Arrange + mock_column1 = Mock() + mock_column1.column_name = "ID" + mock_column2 = Mock() + mock_column2.column_name = "Message" + mock_column3 = Mock() + mock_column3.column_name = "Details" + + mock_primary_result = Mock() + mock_primary_result.columns = [mock_column1, mock_column2, mock_column3] + mock_primary_result.rows = [[1, "Hello,World", "Line1\nLine2"], [2, 'Quote"Test', None]] + + mock_result_set = Mock(spec=KustoResponseDataSet) + mock_result_set.primary_results = [mock_primary_result] + + # Act + result = KustoFormatter.to_csv(mock_result_set) + + # Assert + assert result.format == "csv" + csv_data = result.data.strip() + + # Check that it starts with header + assert csv_data.startswith("ID,Message,Details") + + # Check specific content is present - CSV handles escaping properly + assert '"Hello,World"' in csv_data # Comma properly quoted + assert '"Quote""Test"' in csv_data # Quote properly escaped as double quote + assert '"Line1\nLine2"' in csv_data # Newline preserved within quotes + + # Check that None is converted to empty string (CSV writer handles this) + assert csv_data.endswith(",") # Last field is empty (None becomes empty) + + +class TestParsingFunctionality: + """Test cases for the KustoFormatter parsing functions.""" + + def test_parse_json_format(self) -> None: + """Test parsing JSON format data.""" + # Arrange + json_data = [ + {"Name": "Alice", "Age": 30, "City": "New York"}, + {"Name": "Bob", "Age": 25, "City": "San Francisco"}, + ] + response = {"format": "json", "data": json_data} + + # Act + result = KustoFormatter.parse(response) + + # Assert + assert result == json_data + assert len(result) == 2 + assert result[0]["Name"] == "Alice" + assert result[1]["Age"] == 25 + + def test_parse_csv_format(self) -> None: + """Test parsing CSV format data.""" + # Arrange + csv_data = 'Name,Age,City\nAlice,30,"New York"\nBob,25,"San Francisco"' + response = {"format": "csv", "data": csv_data} + + expected_result = [ + {"Name": "Alice", "Age": "30", "City": "New York"}, + {"Name": "Bob", "Age": "25", "City": "San Francisco"}, + ] + + # Act + result = KustoFormatter.parse(response) + + # Assert + assert result == expected_result + assert len(result) == 2 + assert result[0]["Name"] == "Alice" + assert result[1]["City"] == "San Francisco" + + def test_parse_csv_format_with_escaped_characters(self) -> None: + """Test parsing CSV format with escaped characters.""" + # Arrange + csv_data = 'ID,Message,Details\n1,"Hello,World","Line1\nLine2"\n2,"Quote""Test",""' + response = {"format": "csv", "data": csv_data} + + expected_result = [ + {"ID": "1", "Message": "Hello,World", "Details": "Line1\nLine2"}, + {"ID": "2", "Message": 'Quote"Test', "Details": None}, + ] + + # Act + result = KustoFormatter.parse(response) + + # Assert + assert result == expected_result + assert result[0]["Message"] == "Hello,World" # Comma properly unescaped + assert result[0]["Details"] == "Line1\nLine2" # Newline preserved + assert result[1]["Message"] == 'Quote"Test' # Quote properly unescaped + assert result[1]["Details"] is None # Empty string converted to None + + def test_parse_tsv_format(self) -> None: + """Test parsing TSV format data.""" + # Arrange + tsv_data = "Name\tAge\tCity\nAlice\t30\tNew York\nBob\t25\tSan Francisco" + response = {"format": "tsv", "data": tsv_data} + + expected_result = [ + {"Name": "Alice", "Age": "30", "City": "New York"}, + {"Name": "Bob", "Age": "25", "City": "San Francisco"}, + ] + + # Act + result = KustoFormatter.parse(response) + + # Assert + assert result == expected_result + assert len(result) == 2 + assert result[0]["Name"] == "Alice" + assert result[1]["City"] == "San Francisco" + + def test_parse_tsv_format_with_escaped_characters(self) -> None: + """Test parsing TSV format with escaped characters.""" + # Arrange + tsv_data = "ID\tMessage\tDetails\n1\tHello\\tWorld\tLine1\\nLine2\n2\tPath\\\\File\t" + response = {"format": "tsv", "data": tsv_data} + + expected_result = [ + {"ID": "1", "Message": "Hello\tWorld", "Details": "Line1\nLine2"}, + {"ID": "2", "Message": "Path\\File", "Details": None}, + ] + + # Act + result = KustoFormatter.parse(response) + + # Assert + assert result == expected_result + assert result[0]["Message"] == "Hello\tWorld" # Tab properly unescaped + assert result[0]["Details"] == "Line1\nLine2" # Newline properly unescaped + assert result[1]["Message"] == "Path\\File" # Backslash properly unescaped + assert result[1]["Details"] is None # Empty string converted to None + + def test_parse_columnar_format(self) -> None: + """Test parsing columnar format data.""" + # Arrange + columnar_data = {"Name": ["Alice", "Bob"], "Age": [30, 25], "City": ["New York", "San Francisco"]} + response = {"format": "columnar", "data": columnar_data} + + expected_result = [ + {"Name": "Alice", "Age": 30, "City": "New York"}, + {"Name": "Bob", "Age": 25, "City": "San Francisco"}, + ] + + # Act + result = KustoFormatter.parse(response) + + # Assert + assert result == expected_result + assert len(result) == 2 + assert result[0]["Name"] == "Alice" + assert result[1]["Age"] == 25 + + def test_parse_header_arrays_format(self) -> None: + """Test parsing header_arrays format data.""" + # Arrange + header_arrays_data = '["Name","Age","City"]\n["Alice",30,"New York"]\n["Bob",25,"San Francisco"]' + response = {"format": "header_arrays", "data": header_arrays_data} + + expected_result = [ + {"Name": "Alice", "Age": 30, "City": "New York"}, + {"Name": "Bob", "Age": 25, "City": "San Francisco"}, + ] + + # Act + result = KustoFormatter.parse(response) + + # Assert + assert result == expected_result + assert len(result) == 2 + assert result[0]["Name"] == "Alice" + assert result[1]["Age"] == 25 + + def test_parse_with_KustoResponseFormat_object(self) -> None: + """Test parsing with KustoResponseFormat object instead of dict.""" + # Arrange + from fabric_rti_mcp.kusto.kusto_formatter import KustoResponseFormat + + json_data = [{"Name": "Alice", "Age": 30}] + response = KustoResponseFormat(format="json", data=json_data) + + # Act + result = KustoFormatter.parse(response) + + # Assert + assert result == json_data + assert len(result) == 1 + assert result[0]["Name"] == "Alice" + + def test_parse_with_invalid_format(self) -> None: + """Test parsing with unsupported format raises ValueError.""" + # Arrange + response = {"format": "invalid_format", "data": []} + + # Act & Assert + try: + KustoFormatter.parse(response) + assert False, "Expected ValueError to be raised" + except ValueError as e: + assert "Unsupported format: invalid_format" in str(e) + + def test_parse_empty_data_cases(self) -> None: + """Test parsing with various empty data cases.""" + # Test empty JSON + assert KustoFormatter.parse({"format": "json", "data": []}) == [] + + # Test empty CSV + assert KustoFormatter.parse({"format": "csv", "data": ""}) == [] + + # Test empty TSV + assert KustoFormatter.parse({"format": "tsv", "data": ""}) == [] + + # Test empty columnar + assert KustoFormatter.parse({"format": "columnar", "data": {}}) == [] + + # Test empty header_arrays + assert KustoFormatter.parse({"format": "header_arrays", "data": ""}) == [] + + def test_parse_malformed_data_cases(self) -> None: + """Test parsing with malformed data returns empty list.""" + # Test invalid json should raise + try: + KustoFormatter.parse({"format": "json", "data": "not a list"}) + assert False, "Expected ValueError to be raised" + except ValueError as e: + assert "Invalid JSON format" in str(e) + + # Test None is a noop + assert KustoFormatter.parse({"format": "csv", "data": None}) is None + + # Test invalid TSV data should raise + try: + KustoFormatter.parse({"format": "tsv", "data": 123}) + assert False, "Expected ValueError to be raised" + except ValueError as e: + assert "Invalid TSV format" in str(e) + + # Test invalid columnar data should raise + try: + KustoFormatter.parse({"format": "columnar", "data": "not a dict"}) + assert False, "Expected ValueError to be raised" + except ValueError as e: + assert "Invalid columnar format" in str(e) + + # Test malformed JSON in header_arrays + assert KustoFormatter.parse({"format": "header_arrays", "data": "invalid json"}) == [] + + def test_round_trip_conversion(self) -> None: + """Test that we can convert to a format and parse it back to get the same data.""" + # Arrange - create mock data + mock_column1 = Mock() + mock_column1.column_name = "Name" + mock_column2 = Mock() + mock_column2.column_name = "Age" + + mock_primary_result = Mock() + mock_primary_result.columns = [mock_column1, mock_column2] + mock_primary_result.rows = [["Alice", 30], ["Bob", 25]] + + mock_result_set = Mock(spec=KustoResponseDataSet) + mock_result_set.primary_results = [mock_primary_result] + + expected_data = [{"Name": "Alice", "Age": 30}, {"Name": "Bob", "Age": 25}] + + # Test JSON round-trip + json_result = KustoFormatter.to_json(mock_result_set) + parsed_json = KustoFormatter.parse(json_result) + assert parsed_json == expected_data + + # Test columnar round-trip + columnar_result = KustoFormatter.to_columnar(mock_result_set) + parsed_columnar = KustoFormatter.parse(columnar_result) + assert parsed_columnar == expected_data + + # Test header_arrays round-trip + header_arrays_result = KustoFormatter.to_header_arrays(mock_result_set) + parsed_header_arrays = KustoFormatter.parse(header_arrays_result) + assert parsed_header_arrays == expected_data diff --git a/tests/unit/kusto/test_kusto_service.py b/tests/unit/kusto/test_kusto_service.py index 8ce2225..bb0f930 100644 --- a/tests/unit/kusto/test_kusto_service.py +++ b/tests/unit/kusto/test_kusto_service.py @@ -49,9 +49,8 @@ def test_execute_basic_query( assert crp.has_option("request_readonly") # Verify result format - assert isinstance(result, list) - assert len(result) == 1 - assert result[0]["TestColumn"] == "TestValue" + assert result["format"] == "columnar" + assert result["data"]["TestColumn"][0] == "TestValue" @patch("fabric_rti_mcp.kusto.kusto_service.get_kusto_connection")