|
| 1 | +import csv |
| 2 | +import io |
| 3 | +import json |
| 4 | +from dataclasses import dataclass |
| 5 | +from typing import Any, cast |
| 6 | + |
| 7 | +from azure.kusto.data.response import KustoResponseDataSet |
| 8 | + |
| 9 | + |
| 10 | +@dataclass(slots=True, frozen=True) |
| 11 | +class KustoResponseFormat: |
| 12 | + format: str |
| 13 | + data: Any |
| 14 | + |
| 15 | + |
| 16 | +class KustoFormatter: |
| 17 | + """Formatter for Kusto query results in various compact formats""" |
| 18 | + |
| 19 | + @staticmethod |
| 20 | + def to_json(result_set: KustoResponseDataSet | None) -> KustoResponseFormat: |
| 21 | + if not result_set or not getattr(result_set, "primary_results", None): |
| 22 | + return KustoResponseFormat(format="json", data=[]) |
| 23 | + |
| 24 | + first_result = result_set.primary_results[0] |
| 25 | + column_names = [col.column_name for col in first_result.columns] |
| 26 | + |
| 27 | + return KustoResponseFormat(format="json", data=[dict(zip(column_names, row)) for row in first_result.rows]) |
| 28 | + |
| 29 | + @staticmethod |
| 30 | + def to_csv(result_set: KustoResponseDataSet | None) -> KustoResponseFormat: |
| 31 | + if not result_set or not getattr(result_set, "primary_results", None): |
| 32 | + return KustoResponseFormat(format="csv", data="") |
| 33 | + |
| 34 | + first_result = result_set.primary_results[0] |
| 35 | + output = io.StringIO() |
| 36 | + |
| 37 | + # Create CSV writer with standard settings |
| 38 | + writer = csv.writer(output, quoting=csv.QUOTE_MINIMAL) |
| 39 | + |
| 40 | + # Write header |
| 41 | + header = [col.column_name for col in first_result.columns] |
| 42 | + writer.writerow(header) |
| 43 | + |
| 44 | + # Write data rows |
| 45 | + for row in first_result.rows: |
| 46 | + # Convert None to empty string, keep other types |
| 47 | + formatted_row = ["" if v is None else v for v in row] |
| 48 | + writer.writerow(formatted_row) |
| 49 | + |
| 50 | + return KustoResponseFormat(format="csv", data=output.getvalue()) |
| 51 | + |
| 52 | + @staticmethod |
| 53 | + def to_tsv(result_set: KustoResponseDataSet | None) -> KustoResponseFormat: |
| 54 | + result = KustoResponseFormat(format="tsv", data="") |
| 55 | + if not result_set or not getattr(result_set, "primary_results", None): |
| 56 | + return result |
| 57 | + |
| 58 | + first_result = result_set.primary_results[0] |
| 59 | + lines: list[str] = [] |
| 60 | + |
| 61 | + # Header row |
| 62 | + header = "\t".join(col.column_name for col in first_result.columns) |
| 63 | + lines.append(header) |
| 64 | + |
| 65 | + # Data rows |
| 66 | + for row in first_result.rows: |
| 67 | + formatted_row: list[str] = [] |
| 68 | + for value in row: |
| 69 | + if value is None: |
| 70 | + formatted_row.append("") |
| 71 | + else: |
| 72 | + # Escape tabs, newlines, and backslashes |
| 73 | + str_value = str(value) |
| 74 | + str_value = str_value.replace("\\", "\\\\") # Escape backslashes first |
| 75 | + str_value = str_value.replace("\t", "\\t") |
| 76 | + str_value = str_value.replace("\n", "\\n") |
| 77 | + str_value = str_value.replace("\r", "\\r") |
| 78 | + formatted_row.append(str_value) |
| 79 | + |
| 80 | + lines.append("\t".join(formatted_row)) |
| 81 | + |
| 82 | + return KustoResponseFormat(format="tsv", data="\n".join(lines)) |
| 83 | + |
| 84 | + @staticmethod |
| 85 | + def to_columnar(result_set: KustoResponseDataSet | None) -> KustoResponseFormat: |
| 86 | + if not result_set or not getattr(result_set, "primary_results", None): |
| 87 | + return KustoResponseFormat(format="columnar", data={}) |
| 88 | + |
| 89 | + first_result = result_set.primary_results[0] |
| 90 | + |
| 91 | + # Build columnar structure |
| 92 | + columnar_data: dict[str, list[Any]] = {} |
| 93 | + |
| 94 | + # Initialize columns |
| 95 | + for i, col in enumerate(first_result.columns): |
| 96 | + columnar_data[col.column_name] = [] |
| 97 | + |
| 98 | + # Populate columns |
| 99 | + for row in first_result.rows: |
| 100 | + for i, col in enumerate(first_result.columns): |
| 101 | + columnar_data[col.column_name].append(row[i]) # type: ignore |
| 102 | + |
| 103 | + # Compact JSON (no spaces) |
| 104 | + return KustoResponseFormat(format="columnar", data=columnar_data) |
| 105 | + |
| 106 | + @staticmethod |
| 107 | + def to_header_arrays(result_set: KustoResponseDataSet | None) -> KustoResponseFormat: |
| 108 | + if not result_set or not getattr(result_set, "primary_results", None): |
| 109 | + return KustoResponseFormat(format="header_arrays", data=[]) |
| 110 | + |
| 111 | + first_result = result_set.primary_results[0] |
| 112 | + lines: list[str] = [] |
| 113 | + |
| 114 | + # Header as JSON array |
| 115 | + columns = [col.column_name for col in first_result.columns] |
| 116 | + lines.append(json.dumps(columns, separators=(",", ":"))) |
| 117 | + |
| 118 | + # Each row as JSON array |
| 119 | + for row in first_result.rows: |
| 120 | + row_list = list(row) |
| 121 | + lines.append(json.dumps(row_list, separators=(",", ":"))) |
| 122 | + |
| 123 | + return KustoResponseFormat(format="header_arrays", data="\n".join(lines)) |
| 124 | + |
| 125 | + @staticmethod |
| 126 | + def parse(response: KustoResponseFormat | dict[str, Any]) -> list[dict[str, Any]] | None: |
| 127 | + """ |
| 128 | + Parse any KustoResponseFormat back to canonical JSON array format. |
| 129 | +
|
| 130 | + Args: |
| 131 | + response: Either a KustoResponseFormat object or a dict with 'format' and 'data' keys |
| 132 | +
|
| 133 | + Returns: |
| 134 | + List of dictionaries where each dict represents a row with column names as keys |
| 135 | + """ |
| 136 | + if response is None: # type: ignore |
| 137 | + return None # type: ignore |
| 138 | + |
| 139 | + if isinstance(response, dict): |
| 140 | + format_type = response.get("format", "") |
| 141 | + data = response.get("data") |
| 142 | + elif isinstance(response, KustoResponseFormat): # type: ignore |
| 143 | + format_type = response.format |
| 144 | + data = response.data |
| 145 | + else: |
| 146 | + raise ValueError("Invalid KustoResponseFormat") |
| 147 | + |
| 148 | + # Handle None data early |
| 149 | + if data is None: |
| 150 | + return None |
| 151 | + |
| 152 | + if format_type == "json": |
| 153 | + return KustoFormatter._parse_json(data) |
| 154 | + elif format_type == "csv": |
| 155 | + return KustoFormatter._parse_csv(data) |
| 156 | + elif format_type == "tsv": |
| 157 | + return KustoFormatter._parse_tsv(data) |
| 158 | + elif format_type == "columnar": |
| 159 | + return KustoFormatter._parse_columnar(data) |
| 160 | + elif format_type == "header_arrays": |
| 161 | + return KustoFormatter._parse_header_arrays(data) |
| 162 | + else: |
| 163 | + raise ValueError(f"Unsupported format: {format_type}") |
| 164 | + |
| 165 | + @staticmethod |
| 166 | + def _parse_json(data: Any) -> list[dict[str, Any]]: |
| 167 | + """Parse JSON format data (already in canonical format)""" |
| 168 | + if data is None or (not isinstance(data, list) and not isinstance(data, dict)): # type: ignore |
| 169 | + raise ValueError("Invalid JSON format") |
| 170 | + return data # type: ignore |
| 171 | + |
| 172 | + @staticmethod |
| 173 | + def _parse_csv(data: str) -> list[dict[str, Any]]: |
| 174 | + """Parse CSV format data back to canonical JSON""" |
| 175 | + if data == "": |
| 176 | + return [] |
| 177 | + if data is None: # type: ignore |
| 178 | + return None # type: ignore |
| 179 | + if not isinstance(data, str): # type: ignore |
| 180 | + raise ValueError("Invalid CSV format") |
| 181 | + |
| 182 | + lines = data.strip().split("\n") |
| 183 | + if len(lines) < 1: |
| 184 | + raise ValueError("Invalid CSV format") |
| 185 | + |
| 186 | + # Parse CSV using csv.reader to handle escaping properly |
| 187 | + csv_reader = csv.reader(io.StringIO(data)) |
| 188 | + rows = list(csv_reader) |
| 189 | + |
| 190 | + if len(rows) < 1: |
| 191 | + return [] |
| 192 | + |
| 193 | + headers = rows[0] |
| 194 | + result: list[dict[str, Any]] = [] |
| 195 | + |
| 196 | + for row in rows[1:]: |
| 197 | + # Pad row with empty strings if shorter than headers |
| 198 | + padded_row = row + [""] * (len(headers) - len(row)) |
| 199 | + row_dict: dict[str, Any] = {} |
| 200 | + for i, header in enumerate(headers): |
| 201 | + value = padded_row[i] if i < len(padded_row) else "" |
| 202 | + # Convert empty strings back to None if needed |
| 203 | + row_dict[header] = None if value == "" else value |
| 204 | + result.append(row_dict) |
| 205 | + |
| 206 | + return result |
| 207 | + |
| 208 | + @staticmethod |
| 209 | + def _parse_tsv(data: str) -> list[dict[str, Any]]: |
| 210 | + """Parse TSV format data back to canonical JSON""" |
| 211 | + if data == "": |
| 212 | + return [] |
| 213 | + if not isinstance(data, str): # type: ignore |
| 214 | + raise ValueError("Invalid TSV format") |
| 215 | + |
| 216 | + lines = data.strip().split("\n") |
| 217 | + if len(lines) < 1: |
| 218 | + raise ValueError("Invalid TSV format") |
| 219 | + |
| 220 | + # Parse header |
| 221 | + headers = lines[0].split("\t") |
| 222 | + result: list[dict[str, Any]] = [] |
| 223 | + |
| 224 | + # Parse data rows |
| 225 | + for line in lines[1:]: |
| 226 | + values = line.split("\t") |
| 227 | + row_dict: dict[str, Any] = {} |
| 228 | + |
| 229 | + for i, header in enumerate(headers): |
| 230 | + value = values[i] if i < len(values) else "" |
| 231 | + |
| 232 | + # Unescape TSV special characters |
| 233 | + if value: |
| 234 | + value = value.replace("\\t", "\t") |
| 235 | + value = value.replace("\\n", "\n") |
| 236 | + value = value.replace("\\r", "\r") |
| 237 | + value = value.replace("\\\\", "\\") # Unescape backslashes last |
| 238 | + |
| 239 | + # Convert empty strings back to None |
| 240 | + row_dict[header] = None if value == "" else value |
| 241 | + |
| 242 | + result.append(row_dict) |
| 243 | + |
| 244 | + return result |
| 245 | + |
| 246 | + @staticmethod |
| 247 | + def _parse_columnar(data: Any) -> list[dict[str, Any]]: |
| 248 | + """Parse columnar format data back to canonical JSON""" |
| 249 | + if data is None or not isinstance(data, dict): |
| 250 | + raise ValueError("Invalid columnar format") |
| 251 | + data = cast(dict[str, list[Any]], data) |
| 252 | + |
| 253 | + # Get column names and determine row count |
| 254 | + columns: list[str] = list(data.keys()) # type: ignore |
| 255 | + if not columns: |
| 256 | + return [] |
| 257 | + |
| 258 | + # All columns should have the same length |
| 259 | + row_count = len(data[columns[0]]) if columns[0] in data else 0 |
| 260 | + |
| 261 | + result: list[dict[str, Any]] = [] |
| 262 | + for row_idx in range(row_count): |
| 263 | + row_dict: dict[str, Any] = {} |
| 264 | + for col_name in columns: |
| 265 | + col_values = data.get(col_name, []) |
| 266 | + row_dict[col_name] = col_values[row_idx] if row_idx < len(col_values) else None |
| 267 | + result.append(row_dict) |
| 268 | + |
| 269 | + return result |
| 270 | + |
| 271 | + @staticmethod |
| 272 | + def _parse_header_arrays(data: str) -> list[dict[str, Any]]: |
| 273 | + """Parse header_arrays format data back to canonical JSON""" |
| 274 | + if data is None or not isinstance(data, str): # type: ignore |
| 275 | + raise ValueError("Invalid header_arrays format") |
| 276 | + |
| 277 | + lines = data.strip().split("\n") |
| 278 | + if len(lines) < 1: |
| 279 | + return [] |
| 280 | + |
| 281 | + try: |
| 282 | + # Parse header (first line) |
| 283 | + headers: list[str] = json.loads(lines[0]) |
| 284 | + if not isinstance(headers, list): # type: ignore |
| 285 | + return [] # type: ignore |
| 286 | + |
| 287 | + result: list[dict[str, Any]] = [] |
| 288 | + |
| 289 | + # Parse data rows (remaining lines) |
| 290 | + for line in lines[1:]: |
| 291 | + row_values: list[Any] = json.loads(line) |
| 292 | + if not isinstance(row_values, list): # type: ignore |
| 293 | + continue # type: ignore |
| 294 | + |
| 295 | + row_dict: dict[str, Any] = {} |
| 296 | + for i, header in enumerate(headers): |
| 297 | + row_dict[header] = row_values[i] if i < len(row_values) else None |
| 298 | + result.append(row_dict) |
| 299 | + |
| 300 | + return result |
| 301 | + |
| 302 | + except json.JSONDecodeError: |
| 303 | + return [] |
0 commit comments