Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/code-quality.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ jobs:
echo "::endgroup::"

echo "::group::Running MyPy"
mypy_output=$(mypy . --explicit-package-bases 2>&1) || mypy_failed=true
mypy_output=$(mypy fabric_rti_mcp --explicit-package-bases 2>&1) || mypy_failed=true
echo "$mypy_output"
echo "::endgroup::"

Expand Down
2 changes: 1 addition & 1 deletion fabric_rti_mcp/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def with_args() -> GlobalFabricRTIConfig:
parser.add_argument("--host", type=str, help="HTTP host to listen on")
parser.add_argument("--port", type=int, help="HTTP port to listen on")
parser.add_argument("--stateless-http", type=bool, help="Enable or disable stateless HTTP")
args = parser.parse_args()
args, _ = parser.parse_known_args()

transport = base_config.transport
if args.stdio:
Expand Down
303 changes: 303 additions & 0 deletions fabric_rti_mcp/kusto/kusto_formatter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,303 @@
import csv
import io
import json
from dataclasses import dataclass
from typing import Any, cast

from azure.kusto.data.response import KustoResponseDataSet


@dataclass(slots=True, frozen=True)
class KustoResponseFormat:
format: str
data: Any


class KustoFormatter:
"""Formatter for Kusto query results in various compact formats"""

@staticmethod
def to_json(result_set: KustoResponseDataSet | None) -> KustoResponseFormat:
if not result_set or not getattr(result_set, "primary_results", None):
return KustoResponseFormat(format="json", data=[])

first_result = result_set.primary_results[0]
column_names = [col.column_name for col in first_result.columns]

return KustoResponseFormat(format="json", data=[dict(zip(column_names, row)) for row in first_result.rows])

@staticmethod
def to_csv(result_set: KustoResponseDataSet | None) -> KustoResponseFormat:
if not result_set or not getattr(result_set, "primary_results", None):
return KustoResponseFormat(format="csv", data="")

first_result = result_set.primary_results[0]
output = io.StringIO()

# Create CSV writer with standard settings
writer = csv.writer(output, quoting=csv.QUOTE_MINIMAL)

# Write header
header = [col.column_name for col in first_result.columns]
writer.writerow(header)

# Write data rows
for row in first_result.rows:
# Convert None to empty string, keep other types
formatted_row = ["" if v is None else v for v in row]
writer.writerow(formatted_row)

return KustoResponseFormat(format="csv", data=output.getvalue())

@staticmethod
def to_tsv(result_set: KustoResponseDataSet | None) -> KustoResponseFormat:
result = KustoResponseFormat(format="tsv", data="")
if not result_set or not getattr(result_set, "primary_results", None):
return result

first_result = result_set.primary_results[0]
lines: list[str] = []

# Header row
header = "\t".join(col.column_name for col in first_result.columns)
lines.append(header)

# Data rows
for row in first_result.rows:
formatted_row: list[str] = []
for value in row:
if value is None:
formatted_row.append("")
else:
# Escape tabs, newlines, and backslashes
str_value = str(value)
str_value = str_value.replace("\\", "\\\\") # Escape backslashes first
str_value = str_value.replace("\t", "\\t")
str_value = str_value.replace("\n", "\\n")
str_value = str_value.replace("\r", "\\r")
formatted_row.append(str_value)

lines.append("\t".join(formatted_row))

return KustoResponseFormat(format="tsv", data="\n".join(lines))

@staticmethod
def to_columnar(result_set: KustoResponseDataSet | None) -> KustoResponseFormat:
if not result_set or not getattr(result_set, "primary_results", None):
return KustoResponseFormat(format="columnar", data={})

first_result = result_set.primary_results[0]

# Build columnar structure
columnar_data: dict[str, list[Any]] = {}

# Initialize columns
for i, col in enumerate(first_result.columns):
columnar_data[col.column_name] = []

# Populate columns
for row in first_result.rows:
for i, col in enumerate(first_result.columns):
columnar_data[col.column_name].append(row[i]) # type: ignore

# Compact JSON (no spaces)
return KustoResponseFormat(format="columnar", data=columnar_data)

@staticmethod
def to_header_arrays(result_set: KustoResponseDataSet | None) -> KustoResponseFormat:
if not result_set or not getattr(result_set, "primary_results", None):
return KustoResponseFormat(format="header_arrays", data=[])

first_result = result_set.primary_results[0]
lines: list[str] = []

# Header as JSON array
columns = [col.column_name for col in first_result.columns]
lines.append(json.dumps(columns, separators=(",", ":")))

# Each row as JSON array
for row in first_result.rows:
row_list = list(row)
lines.append(json.dumps(row_list, separators=(",", ":")))

return KustoResponseFormat(format="header_arrays", data="\n".join(lines))

@staticmethod
def parse(response: KustoResponseFormat | dict[str, Any]) -> list[dict[str, Any]] | None:
"""
Parse any KustoResponseFormat back to canonical JSON array format.

Args:
response: Either a KustoResponseFormat object or a dict with 'format' and 'data' keys

Returns:
List of dictionaries where each dict represents a row with column names as keys
"""
if response is None: # type: ignore
return None # type: ignore

if isinstance(response, dict):
format_type = response.get("format", "")
data = response.get("data")
elif isinstance(response, KustoResponseFormat): # type: ignore
format_type = response.format
data = response.data
else:
raise ValueError("Invalid KustoResponseFormat")

# Handle None data early
if data is None:
return None

if format_type == "json":
return KustoFormatter._parse_json(data)
elif format_type == "csv":
return KustoFormatter._parse_csv(data)
elif format_type == "tsv":
return KustoFormatter._parse_tsv(data)
elif format_type == "columnar":
return KustoFormatter._parse_columnar(data)
elif format_type == "header_arrays":
return KustoFormatter._parse_header_arrays(data)
else:
raise ValueError(f"Unsupported format: {format_type}")

@staticmethod
def _parse_json(data: Any) -> list[dict[str, Any]]:
"""Parse JSON format data (already in canonical format)"""
if data is None or (not isinstance(data, list) and not isinstance(data, dict)): # type: ignore
raise ValueError("Invalid JSON format")
return data # type: ignore

@staticmethod
def _parse_csv(data: str) -> list[dict[str, Any]]:
"""Parse CSV format data back to canonical JSON"""
if data == "":
return []
if data is None: # type: ignore
return None # type: ignore
if not isinstance(data, str): # type: ignore
raise ValueError("Invalid CSV format")

lines = data.strip().split("\n")
if len(lines) < 1:
raise ValueError("Invalid CSV format")

# Parse CSV using csv.reader to handle escaping properly
csv_reader = csv.reader(io.StringIO(data))
rows = list(csv_reader)

if len(rows) < 1:
return []

headers = rows[0]
result: list[dict[str, Any]] = []

for row in rows[1:]:
# Pad row with empty strings if shorter than headers
padded_row = row + [""] * (len(headers) - len(row))
row_dict: dict[str, Any] = {}
for i, header in enumerate(headers):
value = padded_row[i] if i < len(padded_row) else ""
# Convert empty strings back to None if needed
row_dict[header] = None if value == "" else value
result.append(row_dict)

return result

@staticmethod
def _parse_tsv(data: str) -> list[dict[str, Any]]:
"""Parse TSV format data back to canonical JSON"""
if data == "":
return []
if not isinstance(data, str): # type: ignore
raise ValueError("Invalid TSV format")

lines = data.strip().split("\n")
if len(lines) < 1:
raise ValueError("Invalid TSV format")

# Parse header
headers = lines[0].split("\t")
result: list[dict[str, Any]] = []

# Parse data rows
for line in lines[1:]:
values = line.split("\t")
row_dict: dict[str, Any] = {}

for i, header in enumerate(headers):
value = values[i] if i < len(values) else ""

# Unescape TSV special characters
if value:
value = value.replace("\\t", "\t")
value = value.replace("\\n", "\n")
value = value.replace("\\r", "\r")
value = value.replace("\\\\", "\\") # Unescape backslashes last

# Convert empty strings back to None
row_dict[header] = None if value == "" else value

result.append(row_dict)

return result

@staticmethod
def _parse_columnar(data: Any) -> list[dict[str, Any]]:
"""Parse columnar format data back to canonical JSON"""
if data is None or not isinstance(data, dict):
raise ValueError("Invalid columnar format")
data = cast(dict[str, list[Any]], data)

# Get column names and determine row count
columns: list[str] = list(data.keys()) # type: ignore
if not columns:
return []

# All columns should have the same length
row_count = len(data[columns[0]]) if columns[0] in data else 0

result: list[dict[str, Any]] = []
for row_idx in range(row_count):
row_dict: dict[str, Any] = {}
for col_name in columns:
col_values = data.get(col_name, [])
row_dict[col_name] = col_values[row_idx] if row_idx < len(col_values) else None
result.append(row_dict)

return result

@staticmethod
def _parse_header_arrays(data: str) -> list[dict[str, Any]]:
"""Parse header_arrays format data back to canonical JSON"""
if data is None or not isinstance(data, str): # type: ignore
raise ValueError("Invalid header_arrays format")

lines = data.strip().split("\n")
if len(lines) < 1:
return []

try:
# Parse header (first line)
headers: list[str] = json.loads(lines[0])
if not isinstance(headers, list): # type: ignore
return [] # type: ignore

result: list[dict[str, Any]] = []

# Parse data rows (remaining lines)
for line in lines[1:]:
row_values: list[Any] = json.loads(line)
if not isinstance(row_values, list): # type: ignore
continue # type: ignore

row_dict: dict[str, Any] = {}
for i, header in enumerate(headers):
row_dict[header] = row_values[i] if i < len(row_values) else None
result.append(row_dict)

return result

except json.JSONDecodeError:
return []
13 changes: 0 additions & 13 deletions fabric_rti_mcp/kusto/kusto_response_formatter.py

This file was deleted.

Loading
Loading