diff --git a/application_sdk/outputs/json.py b/application_sdk/outputs/json.py index 216ff820..fcb9b55c 100644 --- a/application_sdk/outputs/json.py +++ b/application_sdk/outputs/json.py @@ -5,6 +5,7 @@ import orjson from temporalio import activity +from application_sdk.constants import DAPR_MAX_GRPC_MESSAGE_LENGTH from application_sdk.observability.logger_adaptor import get_logger from application_sdk.observability.metrics_adaptor import MetricType, get_metrics from application_sdk.outputs import Output @@ -31,7 +32,7 @@ def path_gen(chunk_start: int | None, chunk_count: int) -> str: if chunk_start is None: return f"{str(chunk_count)}.json" else: - return f"{str(chunk_start+chunk_count)}.json" + return f"{chunk_start}-{chunk_count}.json" def convert_datetime_to_epoch(data: Any) -> Any: @@ -52,6 +53,22 @@ def convert_datetime_to_epoch(data: Any) -> Any: return data +def estimate_dataframe_json_size(dataframe: "pd.DataFrame") -> int: + """Estimate JSON size of a DataFrame by sampling a few records.""" + if len(dataframe) == 0: + return 0 + + # Sample up to 10 records to estimate average size + sample_size = min(10, len(dataframe)) + sample = dataframe.head(sample_size) + sample_json = sample.to_json(orient="records", lines=True) + if sample_json is not None: + avg_record_size = len(sample_json.encode("utf-8")) / sample_size + return int(avg_record_size * len(dataframe)) + + return 0 + + class JsonOutput(Output): """Output handler for writing data to JSON files. @@ -123,6 +140,10 @@ def __init__( self.chunk_size = chunk_size or 100000 self.buffer: List[Union["pd.DataFrame", "daft.DataFrame"]] = [] # noqa: F821 self.current_buffer_size = 0 + self.current_buffer_size_bytes = 0 # Track estimated buffer size in bytes + self.max_file_size_bytes = int( + DAPR_MAX_GRPC_MESSAGE_LENGTH * 0.9 + ) # 90% of DAPR limit as safety buffer self.path_gen = path_gen self.start_marker = start_marker self.end_marker = end_marker @@ -171,8 +192,21 @@ async def write_dataframe(self, dataframe: "pd.DataFrame"): ] for chunk in chunks: + # Estimate size of this chunk + chunk_size_bytes = estimate_dataframe_json_size(chunk) + + # Check if adding this chunk would exceed size limit + if ( + self.current_buffer_size_bytes + chunk_size_bytes + > self.max_file_size_bytes + and self.current_buffer_size > 0 + ): + # Flush current buffer before adding this chunk + await self._flush_buffer() + self.buffer.append(chunk) self.current_buffer_size += len(chunk) + self.current_buffer_size_bytes += chunk_size_bytes if self.current_buffer_size >= partition: await self._flush_buffer() @@ -236,45 +270,19 @@ async def write_daft_dataframe( row, preserve_fields, null_to_empty_dict_fields ) # Serialize the row and add it to the buffer - buffer.append( - orjson.dumps(cleaned_row, option=orjson.OPT_APPEND_NEWLINE).decode( - "utf-8" - ) - ) - - # If the buffer reaches the specified size, write it to the file - if self.chunk_size and len(buffer) >= self.chunk_size: - self.chunk_count += 1 - output_file_name = f"{self.output_path}/{self.path_gen(self.chunk_start, self.chunk_count)}" - with open(output_file_name, "w") as f: - f.writelines(buffer) - buffer.clear() # Clear the buffer - - # Record chunk metrics - self.metrics.record_metric( - name="json_chunks_written", - value=1, - metric_type=MetricType.COUNTER, - labels={"type": "daft"}, - description="Number of chunks written to JSON files", - ) + serialized_row = orjson.dumps( + cleaned_row, option=orjson.OPT_APPEND_NEWLINE + ).decode("utf-8") + buffer.append(serialized_row) + self.current_buffer_size_bytes += len(serialized_row) + if (self.chunk_size and len(buffer) >= self.chunk_size) or ( + self.current_buffer_size_bytes > self.max_file_size_bytes + ): + await self.flush_daft_buffer(buffer) # Write any remaining rows in the buffer if buffer: - self.chunk_count += 1 - output_file_name = f"{self.output_path}/{self.path_gen(self.chunk_start, self.chunk_count)}" - with open(output_file_name, "w") as f: - f.writelines(buffer) - buffer.clear() - - # Record chunk metrics - self.metrics.record_metric( - name="json_chunks_written", - value=1, - metric_type=MetricType.COUNTER, - labels={"type": "daft"}, - description="Number of chunks written to JSON files", - ) + await self.flush_daft_buffer(buffer) # Record metrics for successful write self.metrics.record_metric( @@ -301,6 +309,32 @@ async def write_daft_dataframe( ) logger.error(f"Error writing daft dataframe to json: {str(e)}") + async def flush_daft_buffer(self, buffer: List[str]): + """Flush the current buffer to a JSON file. + + This method combines all DataFrames in the buffer, writes them to a JSON file, + and uploads the file to the object store. + """ + self.chunk_count += 1 + output_file_name = ( + f"{self.output_path}/{self.path_gen(self.chunk_start, self.chunk_count)}" + ) + with open(output_file_name, "w") as f: + f.writelines(buffer) + buffer.clear() # Clear the buffer + + self.current_buffer_size = 0 + self.current_buffer_size_bytes = 0 + + # Record chunk metrics + self.metrics.record_metric( + name="json_chunks_written", + value=1, + metric_type=MetricType.COUNTER, + labels={"type": "daft"}, + description="Number of chunks written to JSON files", + ) + async def _flush_buffer(self): """Flush the current buffer to a JSON file. @@ -350,6 +384,7 @@ async def _flush_buffer(self): self.buffer.clear() self.current_buffer_size = 0 + self.current_buffer_size_bytes = 0 except Exception as e: # Record metrics for failed write