Skip to content

feat: Add bpd.options.compute.maximum_result_rows option to limit client data download #1829

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jun 23, 2025
Merged
69 changes: 39 additions & 30 deletions bigframes/_config/compute_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,29 +55,7 @@ class ComputeOptions:
{'test2': 'abc', 'test3': False}

Attributes:
maximum_bytes_billed (int, Options):
Limits the bytes billed for query jobs. Queries that will have
bytes billed beyond this limit will fail (without incurring a
charge). If unspecified, this will be set to your project default.
See `maximum_bytes_billed`: https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJobConfig#google_cloud_bigquery_job_QueryJobConfig_maximum_bytes_billed.

enable_multi_query_execution (bool, Options):
If enabled, large queries may be factored into multiple smaller queries
in order to avoid generating queries that are too complex for the query
engine to handle. However this comes at the cost of increase cost and latency.

extra_query_labels (Dict[str, Any], Options):
Stores additional custom labels for query configuration.

semantic_ops_confirmation_threshold (int, optional):
.. deprecated:: 1.42.0
Semantic operators are deprecated. Please use AI operators instead

semantic_ops_threshold_autofail (bool):
.. deprecated:: 1.42.0
Semantic operators are deprecated. Please use AI operators instead

ai_ops_confirmation_threshold (int, optional):
ai_ops_confirmation_threshold (int | None):
Guards against unexpected processing of large amount of rows by semantic operators.
If the number of rows exceeds the threshold, the user will be asked to confirm
their operations to resume. The default value is 0. Set the value to None
Expand All @@ -87,26 +65,57 @@ class ComputeOptions:
Guards against unexpected processing of large amount of rows by semantic operators.
When set to True, the operation automatically fails without asking for user inputs.

allow_large_results (bool):
allow_large_results (bool | None):
Specifies whether query results can exceed 10 GB. Defaults to False. Setting this
to False (the default) restricts results to 10 GB for potentially faster execution;
BigQuery will raise an error if this limit is exceeded. Setting to True removes
this result size limit.

enable_multi_query_execution (bool | None):
If enabled, large queries may be factored into multiple smaller queries
in order to avoid generating queries that are too complex for the query
engine to handle. However this comes at the cost of increase cost and latency.

extra_query_labels (Dict[str, Any] | None):
Stores additional custom labels for query configuration.

maximum_bytes_billed (int | None):
Limits the bytes billed for query jobs. Queries that will have
bytes billed beyond this limit will fail (without incurring a
charge). If unspecified, this will be set to your project default.
See `maximum_bytes_billed`: https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJobConfig#google_cloud_bigquery_job_QueryJobConfig_maximum_bytes_billed.

maximum_result_rows (int | None):
Limits the number of rows in an execution result. When converting
a BigQuery DataFrames object to a pandas DataFrame or Series (e.g.,
using ``.to_pandas()``, ``.peek()``, ``.__repr__()``, direct
iteration), the data is downloaded from BigQuery to the client
machine. This option restricts the number of rows that can be
downloaded. If the number of rows to be downloaded exceeds this
limit, a ``bigframes.exceptions.MaximumResultRowsExceeded``
exception is raised.

semantic_ops_confirmation_threshold (int | None):
.. deprecated:: 1.42.0
Semantic operators are deprecated. Please use AI operators instead

semantic_ops_threshold_autofail (bool):
.. deprecated:: 1.42.0
Semantic operators are deprecated. Please use AI operators instead
"""

maximum_bytes_billed: Optional[int] = None
ai_ops_confirmation_threshold: Optional[int] = 0
ai_ops_threshold_autofail: bool = False
allow_large_results: Optional[bool] = None
enable_multi_query_execution: bool = False
extra_query_labels: Dict[str, Any] = dataclasses.field(
default_factory=dict, init=False
)
maximum_bytes_billed: Optional[int] = None
maximum_result_rows: Optional[int] = None
semantic_ops_confirmation_threshold: Optional[int] = 0
semantic_ops_threshold_autofail = False

ai_ops_confirmation_threshold: Optional[int] = 0
ai_ops_threshold_autofail: bool = False

allow_large_results: Optional[bool] = None

def assign_extra_query_labels(self, **kwargs: Any) -> None:
"""
Assigns additional custom labels for query configuration. The method updates the
Expand Down
4 changes: 4 additions & 0 deletions bigframes/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ class OperationAbortedError(RuntimeError):
"""Operation is aborted."""


class MaximumResultRowsExceeded(RuntimeError):
"""Maximum number of rows in the result was exceeded."""


class TimeTravelDisabledWarning(Warning):
"""A query was reattempted without time travel."""

Expand Down
7 changes: 4 additions & 3 deletions bigframes/session/bq_caching_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import google.cloud.bigquery.table as bq_table
import google.cloud.bigquery_storage_v1

import bigframes
from bigframes import exceptions as bfe
import bigframes.constants
import bigframes.core
from bigframes.core import compile, local_data, rewrite
Expand All @@ -38,7 +40,6 @@
import bigframes.core.schema as schemata
import bigframes.core.tree_properties as tree_properties
import bigframes.dtypes
import bigframes.exceptions as bfe
import bigframes.features
from bigframes.session import executor, loader, local_scan_executor, read_api_execution
import bigframes.session._io.bigquery as bq_io
Expand Down Expand Up @@ -415,7 +416,7 @@ def _run_execute_query(
# Unfortunately, this error type does not have a separate error code or exception type
if "Resources exceeded during query execution" in e.message:
new_message = "Computation is too complex to execute as a single query. Try using DataFrame.cache() on intermediate results, or setting bigframes.options.compute.enable_multi_query_execution."
raise bigframes.exceptions.QueryComplexityError(new_message) from e
raise bfe.QueryComplexityError(new_message) from e
else:
raise

Expand Down Expand Up @@ -688,7 +689,7 @@ def _execute_plan(
)

return executor.ExecuteResult(
arrow_batches=iterator.to_arrow_iterable(
_arrow_batches=iterator.to_arrow_iterable(
bqstorage_client=self.bqstoragereadclient
),
schema=plan.schema,
Expand Down
2 changes: 1 addition & 1 deletion bigframes/session/direct_gbq_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def execute(
)

return executor.ExecuteResult(
arrow_batches=iterator.to_arrow_iterable(),
_arrow_batches=iterator.to_arrow_iterable(),
schema=plan.schema,
query_job=query_job,
total_rows=iterator.total_rows,
Expand Down
28 changes: 27 additions & 1 deletion bigframes/session/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,46 @@
import pandas as pd
import pyarrow

import bigframes
import bigframes.core
from bigframes.core import pyarrow_utils
import bigframes.core.schema
import bigframes.session._io.pandas as io_pandas

_ROW_LIMIT_EXCEEDED_TEMPLATE = (
"Execution has downloaded {result_rows} rows so far, which exceeds the "
"limit of {maximum_result_rows}. You can adjust this limit by setting "
"`bpd.options.compute.maximum_result_rows`."
)


@dataclasses.dataclass(frozen=True)
class ExecuteResult:
arrow_batches: Iterator[pyarrow.RecordBatch]
_arrow_batches: Iterator[pyarrow.RecordBatch]
schema: bigframes.core.schema.ArraySchema
query_job: Optional[bigquery.QueryJob] = None
total_bytes: Optional[int] = None
total_rows: Optional[int] = None

@property
def arrow_batches(self) -> Iterator[pyarrow.RecordBatch]:
result_rows = 0

for batch in self._arrow_batches:
result_rows += batch.num_rows

maximum_result_rows = bigframes.options.compute.maximum_result_rows
if maximum_result_rows is not None and result_rows > maximum_result_rows:
message = bigframes.exceptions.format_message(
_ROW_LIMIT_EXCEEDED_TEMPLATE.format(
result_rows=result_rows,
maximum_result_rows=maximum_result_rows,
)
)
raise bigframes.exceptions.MaximumResultRowsExceeded(message)

yield batch

def to_arrow_table(self) -> pyarrow.Table:
# Need to provide schema if no result rows, as arrow can't infer
# If ther are rows, it is safest to infer schema from batches.
Expand Down
2 changes: 1 addition & 1 deletion bigframes/session/local_scan_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def execute(
total_rows = min(peek, total_rows)

return executor.ExecuteResult(
arrow_batches=arrow_table.to_batches(),
_arrow_batches=arrow_table.to_batches(),
schema=plan.schema,
query_job=None,
total_bytes=None,
Expand Down
2 changes: 1 addition & 1 deletion bigframes/session/polars_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def execute(
lazy_frame = lazy_frame.limit(peek)
pa_table = lazy_frame.collect().to_arrow()
return executor.ExecuteResult(
arrow_batches=iter(map(self._adapt_batch, pa_table.to_batches())),
_arrow_batches=iter(map(self._adapt_batch, pa_table.to_batches())),
schema=plan.schema,
total_bytes=pa_table.nbytes,
total_rows=pa_table.num_rows,
Expand Down
2 changes: 1 addition & 1 deletion bigframes/session/read_api_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def process_page(page):
rows = min(peek, rows)

return executor.ExecuteResult(
arrow_batches=batches,
_arrow_batches=batches,
schema=plan.schema,
query_job=None,
total_bytes=None,
Expand Down
4 changes: 2 additions & 2 deletions bigframes/testing/polars_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def peek(
# Currently, pyarrow types might not quite be exactly the ones in the bigframes schema.
# Nullability may be different, and might use large versions of list, string datatypes.
return bigframes.session.executor.ExecuteResult(
arrow_batches=pa_table.to_batches(),
_arrow_batches=pa_table.to_batches(),
schema=array_value.schema,
total_bytes=pa_table.nbytes,
total_rows=pa_table.num_rows,
Expand All @@ -69,7 +69,7 @@ def execute(
# Currently, pyarrow types might not quite be exactly the ones in the bigframes schema.
# Nullability may be different, and might use large versions of list, string datatypes.
return bigframes.session.executor.ExecuteResult(
arrow_batches=pa_table.to_batches(),
_arrow_batches=pa_table.to_batches(),
schema=array_value.schema,
total_bytes=pa_table.nbytes,
total_rows=pa_table.num_rows,
Expand Down
47 changes: 44 additions & 3 deletions tests/system/small/test_pandas_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@
import warnings

import google.api_core.exceptions
import google.auth
import google.auth.exceptions
import pandas.testing
import pytest

import bigframes.core.global_session
import bigframes.exceptions
import bigframes.pandas as bpd


Expand Down Expand Up @@ -327,3 +326,45 @@ def test_credentials_need_reauthentication(
# Now verify that use is able to start over
df = bpd.read_gbq(test_query)
assert df is not None


def test_max_rows_normal_execution_within_limit(
scalars_df_index, scalars_pandas_df_index
):
"""Test queries execute normally when the number of rows is within the limit."""
with bpd.option_context("compute.maximum_result_rows", 10):
df = scalars_df_index.head(10)
result = df.to_pandas()

expected = scalars_pandas_df_index.head(10)
pandas.testing.assert_frame_equal(result, expected)

with bpd.option_context("compute.maximum_result_rows", 10), bpd.option_context(
"display.repr_mode", "head"
):
df = scalars_df_index.head(10)
assert repr(df) is not None

# We should be able to get away with only a single row for shape.
with bpd.option_context("compute.maximum_result_rows", 1):
shape = scalars_df_index.shape
assert shape == scalars_pandas_df_index.shape

# 0 is not recommended, as it would stop aggregations and many other
# necessary operations, but we shouldn't need even 1 row for to_gbq().
with bpd.option_context("compute.maximum_result_rows", 0):
destination = scalars_df_index.to_gbq()
assert destination is not None


def test_max_rows_exceeds_limit(scalars_df_index):
"""Test to_pandas() raises MaximumRowsDownloadedExceeded when the limit is exceeded."""
with bpd.option_context("compute.maximum_result_rows", 5), pytest.raises(
bigframes.exceptions.MaximumResultRowsExceeded, match="5"
):
scalars_df_index.to_pandas()

with bpd.option_context("compute.maximum_result_rows", 5), pytest.raises(
bigframes.exceptions.MaximumResultRowsExceeded, match="5"
):
next(iter(scalars_df_index.to_pandas_batches()))