Skip to content

feat: add file size-based chunking to JsonOutput (WIP) #650

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions application_sdk/outputs/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import orjson
from temporalio import activity

from application_sdk.constants import DAPR_MAX_GRPC_MESSAGE_LENGTH
from application_sdk.observability.logger_adaptor import get_logger
from application_sdk.observability.metrics_adaptor import MetricType, get_metrics
from application_sdk.outputs import Output
Expand Down Expand Up @@ -52,6 +53,22 @@ def convert_datetime_to_epoch(data: Any) -> Any:
return data


def estimate_dataframe_json_size(dataframe: "pd.DataFrame") -> int:
"""Estimate JSON size of a DataFrame by sampling a few records."""
if len(dataframe) == 0:
return 0

# Sample up to 10 records to estimate average size
sample_size = min(10, len(dataframe))
sample = dataframe.head(sample_size)
sample_json = sample.to_json(orient="records", lines=True)
if sample_json is not None:
avg_record_size = len(sample_json.encode("utf-8")) / sample_size
return int(avg_record_size * len(dataframe))

return 0


class JsonOutput(Output):
"""Output handler for writing data to JSON files.

Expand Down Expand Up @@ -123,6 +140,10 @@ def __init__(
self.chunk_size = chunk_size or 100000
self.buffer: List[Union["pd.DataFrame", "daft.DataFrame"]] = [] # noqa: F821
self.current_buffer_size = 0
self.current_buffer_size_bytes = 0 # Track estimated buffer size in bytes
self.max_file_size_bytes = int(
DAPR_MAX_GRPC_MESSAGE_LENGTH * 0.9
) # 90% of DAPR limit as safety buffer
self.path_gen = path_gen
self.start_marker = start_marker
self.end_marker = end_marker
Expand Down Expand Up @@ -171,8 +192,21 @@ async def write_dataframe(self, dataframe: "pd.DataFrame"):
]

for chunk in chunks:
# Estimate size of this chunk
chunk_size_bytes = estimate_dataframe_json_size(chunk)

# Check if adding this chunk would exceed size limit
if (
self.current_buffer_size_bytes + chunk_size_bytes
> self.max_file_size_bytes
and self.current_buffer_size > 0
):
# Flush current buffer before adding this chunk
await self._flush_buffer()

self.buffer.append(chunk)
self.current_buffer_size += len(chunk)
self.current_buffer_size_bytes += chunk_size_bytes
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: JsonOutput Buffer Flush Fails for Oversized Initial Chunks

The JsonOutput's size-based flush condition (current_buffer_size > 0) prevents flushing when the buffer is empty, even if the first chunk's estimated size exceeds max_file_size_bytes. This results in oversized files being written, which then fail to upload via ObjectStoreOutput.push_file_to_object_store due to DAPR gRPC message size limits. The current logic does not split an oversized single chunk.

Fix in Cursor Fix in Web


if self.current_buffer_size >= partition:
await self._flush_buffer()
Expand Down Expand Up @@ -350,6 +384,7 @@ async def _flush_buffer(self):

self.buffer.clear()
self.current_buffer_size = 0
self.current_buffer_size_bytes = 0

except Exception as e:
# Record metrics for failed write
Expand Down
Loading