Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ The Fabric RTI MCP Server acts as a bridge between AI agents and Microsoft Fabri

#### Eventhouse (Kusto) - 12 Tools:
- **`kusto_known_services`** - List all available Kusto services configured in the MCP
- **`kusto_query`** - Execute KQL queries on the specified database
- **`kusto_kql_query`** - Execute KQL queries on the specified database
- **`kusto_tsql_query`** - Execute T-SQL queries on the specified database
- **`kusto_command`** - Execute Kusto management commands (destructive operations)
- **`kusto_list_databases`** - List all databases in the Kusto cluster
- **`kusto_list_tables`** - List all tables in a specified database
Expand Down
28 changes: 25 additions & 3 deletions fabric_rti_mcp/kusto/kusto_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ def wrapper(*args, **kwargs): # type: ignore
return wrapper # type: ignore


def _crp(action: str, is_destructive: bool, ignore_readonly: bool) -> ClientRequestProperties:
def _crp(
action: str, is_destructive: bool, ignore_readonly: bool, additional_properties: Optional[Dict[str, Any]] = None
) -> ClientRequestProperties:
crp: ClientRequestProperties = ClientRequestProperties()
crp.application = f"fabric-rti-mcp{{{__version__}}}" # type: ignore
crp.client_request_id = f"KFRTI_MCP.{action}:{str(uuid.uuid4())}" # type: ignore
Expand All @@ -107,6 +109,11 @@ def _crp(action: str, is_destructive: bool, ignore_readonly: bool) -> ClientRequ
timeout_str = f"{hours:02d}:{minutes:02d}:{seconds:02d}"
crp.set_option("servertimeout", timeout_str)

# Apply any additional properties provided by the user (can override global settings)
if additional_properties:
for key, value in additional_properties.items():
crp.set_option(key, value)

return crp


Expand All @@ -115,14 +122,15 @@ def _execute(
cluster_uri: str,
readonly_override: bool = False,
database: Optional[str] = None,
client_request_properties: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
caller_frame = inspect.currentframe().f_back # type: ignore
action_name = caller_frame.f_code.co_name # type: ignore
caller_func = caller_frame.f_globals.get(action_name) # type: ignore
is_destructive = hasattr(caller_func, "_is_destructive")

# Generate correlation ID for tracing
crp = _crp(action_name, is_destructive, readonly_override)
crp = _crp(action_name, is_destructive, readonly_override, client_request_properties)
correlation_id = crp.client_request_id # type: ignore

try:
Expand Down Expand Up @@ -156,7 +164,7 @@ def kusto_known_services() -> List[Dict[str, str]]:
return [asdict(service) for service in services]


def kusto_query(query: str, cluster_uri: str, database: Optional[str] = None) -> Dict[str, Any]:
def kusto_kql_query(query: str, cluster_uri: str, database: Optional[str] = None) -> Dict[str, Any]:
"""
Executes a KQL query on the specified database. If no database is provided,
it will use the default database.
Expand All @@ -169,6 +177,20 @@ def kusto_query(query: str, cluster_uri: str, database: Optional[str] = None) ->
return _execute(query, cluster_uri, database=database)


def kusto_tsql_query(query: str, cluster_uri: str, database: Optional[str] = None) -> Dict[str, Any]:
"""
Executes a T-SQL query on the specified database. If no database is provided,
it will use the default database.

:param query: The T-SQL query to execute.
:param cluster_uri: The URI of the Kusto cluster.
:param database: Optional database name. If not provided, uses the default database.
:return: The result of the query execution as a list of dictionaries (json).
"""
client_request_properties = {"query_language": "sql"}
return _execute(query, cluster_uri, database=database, client_request_properties=client_request_properties)


@destructive_operation
def kusto_command(command: str, cluster_uri: str, database: Optional[str] = None) -> Dict[str, Any]:
"""
Expand Down
6 changes: 5 additions & 1 deletion fabric_rti_mcp/kusto/kusto_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ def register_tools(mcp: FastMCP) -> None:
annotations=ToolAnnotations(readOnlyHint=True, destructiveHint=False),
)
mcp.add_tool(
kusto_service.kusto_query,
kusto_service.kusto_kql_query,
annotations=ToolAnnotations(readOnlyHint=True, destructiveHint=False),
)
mcp.add_tool(
kusto_service.kusto_tsql_query,
annotations=ToolAnnotations(readOnlyHint=True, destructiveHint=False),
)
mcp.add_tool(
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ dependencies = [
"azure-kusto-data~=5.0.0",
"azure-identity",
"azure-kusto-ingest~=5.0.0",
"msal~=1.28.0"
"msal~=1.30.0"
]

[project.optional-dependencies]
Expand Down
46 changes: 42 additions & 4 deletions tests/live/test_kusto_tools_live.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ async def test_list_tools(self) -> None:

expected_kusto_tools = [
"kusto_known_services",
"kusto_query",
"kusto_kql_query",
"kusto_tsql_query",
"kusto_command",
"kusto_list_databases",
"kusto_list_tables",
Expand Down Expand Up @@ -270,8 +271,8 @@ async def test_list_databases(self) -> None:
raise

async def test_simple_query(self) -> None:
"""Test kusto_query tool with a simple query."""
print("\n🔍 Testing kusto_query...")
"""Test kusto_kql_query tool with a simple query."""
print("\n🔍 Testing kusto_kql_query...")
if not self.client:
raise RuntimeError("Client not initialized")

Expand All @@ -282,7 +283,7 @@ async def test_simple_query(self) -> None:
try:
# Simple query to get current time
result = await self.client.call_tool(
"kusto_query",
"kusto_kql_query",
{"query": "print now()", "cluster_uri": self.test_cluster_uri, "database": self.test_database},
)

Expand Down Expand Up @@ -380,6 +381,42 @@ async def test_table_sample(self) -> None:
except Exception as e:
print(f"❌ Error testing table sample: {e}")

async def test_tsql_query(self) -> None:
"""Test kusto_tsql_query tool with a simple T-SQL query."""
print("\n🧮 Testing kusto_tsql_query...")
if not self.client:
raise RuntimeError("Client not initialized")

if not self.test_cluster_uri:
print("⚠️ No KUSTO_CLUSTER_URI configured, skipping T-SQL query test")
return

try:
# Simple T-SQL query to return a scalar value
result = await self.client.call_tool(
"kusto_tsql_query",
{"query": "SELECT 1 AS value", "cluster_uri": self.test_cluster_uri, "database": self.test_database},
)

if result.get("success"):
query_results = result.get("result", {})
print(f"T-SQL query result: {json.dumps(query_results, indent=2)}")
parsed_data = KustoFormatter.parse(query_results)

if parsed_data and len(parsed_data) > 0:
value = parsed_data[0].get("value", None)
print(f"✅ T-SQL query succeeded, value: {value}")
assert value == 1 or value == "1", f"Expected value 1, got {value}"
else:
print("❌ No data returned from T-SQL query")
raise AssertionError("No data returned from T-SQL query")
else:
print(f"❌ T-SQL query failed: {result}")
raise AssertionError(f"T-SQL query failed: {result}")

except Exception as e:
print(f"❌ Error testing T-SQL query: {e}")

async def run_all_tests(self) -> None:
"""Run all available tests."""
print("🚀 Starting Kusto tools live testing...")
Expand All @@ -394,6 +431,7 @@ async def run_all_tests(self) -> None:
await self.test_simple_query()
await self.test_list_tables()
await self.test_table_sample()
await self.test_tsql_query()

print("\n✅ All tests completed!")

Expand Down
6 changes: 3 additions & 3 deletions tests/unit/kusto/test_global_timeout.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from azure.kusto.data import ClientRequestProperties

from fabric_rti_mcp.kusto.kusto_config import KustoConfig
from fabric_rti_mcp.kusto.kusto_service import kusto_query
from fabric_rti_mcp.kusto.kusto_service import kusto_kql_query


def test_config_loads_timeout_from_env() -> None:
Expand Down Expand Up @@ -46,7 +46,7 @@ def test_global_timeout_applied_to_query(mock_get_connection: Mock) -> None:
# Mock the kusto config with timeout
with patch("fabric_rti_mcp.kusto.kusto_service.CONFIG") as mock_config:
mock_config.timeout_seconds = 600
kusto_query("TestQuery", "https://test.kusto.windows.net")
kusto_kql_query("TestQuery", "https://test.kusto.windows.net")

# Verify that execute was called with ClientRequestProperties
mock_client.execute.assert_called_once()
Expand Down Expand Up @@ -78,7 +78,7 @@ def test_no_timeout_when_not_configured(mock_get_connection: Mock) -> None:
# Mock the kusto config without timeout
with patch("fabric_rti_mcp.kusto.kusto_service.CONFIG") as mock_config:
mock_config.timeout_seconds = None
kusto_query("TestQuery", "https://test.kusto.windows.net")
kusto_kql_query("TestQuery", "https://test.kusto.windows.net")

# Verify that execute was called with ClientRequestProperties
mock_client.execute.assert_called_once()
Expand Down
60 changes: 53 additions & 7 deletions tests/unit/kusto/test_kusto_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from azure.kusto.data.response import KustoResponseDataSet

from fabric_rti_mcp import __version__
from fabric_rti_mcp.kusto.kusto_service import kusto_command, kusto_query
from fabric_rti_mcp.kusto.kusto_service import kusto_command, kusto_kql_query, kusto_tsql_query


@patch("fabric_rti_mcp.kusto.kusto_service.get_kusto_connection")
Expand All @@ -30,7 +30,7 @@ def test_execute_basic_query(
database = "test_db"

# Act
result = kusto_query(query, sample_cluster_uri, database=database)
result = kusto_kql_query(query, sample_cluster_uri, database=database)

# Assert
mock_get_kusto_connection.assert_called_once_with(sample_cluster_uri)
Expand All @@ -45,7 +45,7 @@ def test_execute_basic_query(
crp = mock_client.execute.call_args[0][2]
assert isinstance(crp, ClientRequestProperties)
assert crp.application == f"fabric-rti-mcp{{{__version__}}}"
assert crp.client_request_id.startswith("KFRTI_MCP.kusto_query:") # type: ignore
assert crp.client_request_id.startswith("KFRTI_MCP.kusto_kql_query:") # type: ignore
assert crp.has_option("request_readonly")

# Verify result format
Expand Down Expand Up @@ -73,14 +73,14 @@ def test_execute_error_includes_correlation_id(

# Act & Assert
with pytest.raises(RuntimeError) as exc_info:
kusto_query(query, sample_cluster_uri, database=database)
kusto_kql_query(query, sample_cluster_uri, database=database)

error_message = str(exc_info.value)

# Verify the error message includes correlation ID and operation name
assert "correlation ID:" in error_message
assert "KFRTI_MCP.kusto_query:" in error_message
assert "kusto_query" in error_message
assert "KFRTI_MCP.kusto_kql_query:" in error_message
assert "kusto_kql_query" in error_message
assert "Kusto execution failed" in error_message


Expand Down Expand Up @@ -135,8 +135,54 @@ def test_successful_operations_do_not_log_correlation_id(
query = "TestTable | take 10"

# Act
kusto_query(query, sample_cluster_uri)
kusto_kql_query(query, sample_cluster_uri)

# Assert - verify no info or debug logging occurs for successful operations
assert not mock_logger.info.called
assert not mock_logger.debug.called


@patch("fabric_rti_mcp.kusto.kusto_service.get_kusto_connection")
def test_tsql_query_sets_query_language(
mock_get_kusto_connection: Mock,
sample_cluster_uri: str,
mock_kusto_response: KustoResponseDataSet,
) -> None:
"""Test that kusto_tsql_query properly sets the query_language option to 'sql'."""
# Arrange
mock_client = MagicMock()
mock_client.execute.return_value = mock_kusto_response

mock_connection = MagicMock()
mock_connection.query_client = mock_client
mock_connection.default_database = "default_db"
mock_get_kusto_connection.return_value = mock_connection

query = "SELECT * FROM TestTable"
database = "test_db"

# Act
result = kusto_tsql_query(query, sample_cluster_uri, database=database)

# Assert
mock_get_kusto_connection.assert_called_once_with(sample_cluster_uri)
mock_client.execute.assert_called_once()

# Verify database and query
args = mock_client.execute.call_args[0]
assert args[0] == database
assert args[1] == query

# Verify ClientRequestProperties settings
crp = mock_client.execute.call_args[0][2]
assert isinstance(crp, ClientRequestProperties)
assert crp.application == f"fabric-rti-mcp{{{__version__}}}"
assert crp.client_request_id.startswith("KFRTI_MCP.kusto_tsql_query:") # type: ignore
assert crp.has_option("request_readonly")
# Verify that query_language is set to 'sql'
assert crp.has_option("query_language")
assert crp.get_option("query_language", "") == "sql"

# Verify result format
assert result["format"] == "columnar"
assert result["data"]["TestColumn"][0] == "TestValue"
Loading
Loading