diff --git a/bigframes/_config/compute_options.py b/bigframes/_config/compute_options.py index 89c0dc8d6a..97cd6e99af 100644 --- a/bigframes/_config/compute_options.py +++ b/bigframes/_config/compute_options.py @@ -55,29 +55,7 @@ class ComputeOptions: {'test2': 'abc', 'test3': False} Attributes: - maximum_bytes_billed (int, Options): - Limits the bytes billed for query jobs. Queries that will have - bytes billed beyond this limit will fail (without incurring a - charge). If unspecified, this will be set to your project default. - See `maximum_bytes_billed`: https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJobConfig#google_cloud_bigquery_job_QueryJobConfig_maximum_bytes_billed. - - enable_multi_query_execution (bool, Options): - If enabled, large queries may be factored into multiple smaller queries - in order to avoid generating queries that are too complex for the query - engine to handle. However this comes at the cost of increase cost and latency. - - extra_query_labels (Dict[str, Any], Options): - Stores additional custom labels for query configuration. - - semantic_ops_confirmation_threshold (int, optional): - .. deprecated:: 1.42.0 - Semantic operators are deprecated. Please use AI operators instead - - semantic_ops_threshold_autofail (bool): - .. deprecated:: 1.42.0 - Semantic operators are deprecated. Please use AI operators instead - - ai_ops_confirmation_threshold (int, optional): + ai_ops_confirmation_threshold (int | None): Guards against unexpected processing of large amount of rows by semantic operators. If the number of rows exceeds the threshold, the user will be asked to confirm their operations to resume. The default value is 0. Set the value to None @@ -87,26 +65,57 @@ class ComputeOptions: Guards against unexpected processing of large amount of rows by semantic operators. When set to True, the operation automatically fails without asking for user inputs. - allow_large_results (bool): + allow_large_results (bool | None): Specifies whether query results can exceed 10 GB. Defaults to False. Setting this to False (the default) restricts results to 10 GB for potentially faster execution; BigQuery will raise an error if this limit is exceeded. Setting to True removes this result size limit. + + enable_multi_query_execution (bool | None): + If enabled, large queries may be factored into multiple smaller queries + in order to avoid generating queries that are too complex for the query + engine to handle. However this comes at the cost of increase cost and latency. + + extra_query_labels (Dict[str, Any] | None): + Stores additional custom labels for query configuration. + + maximum_bytes_billed (int | None): + Limits the bytes billed for query jobs. Queries that will have + bytes billed beyond this limit will fail (without incurring a + charge). If unspecified, this will be set to your project default. + See `maximum_bytes_billed`: https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJobConfig#google_cloud_bigquery_job_QueryJobConfig_maximum_bytes_billed. + + maximum_result_rows (int | None): + Limits the number of rows in an execution result. When converting + a BigQuery DataFrames object to a pandas DataFrame or Series (e.g., + using ``.to_pandas()``, ``.peek()``, ``.__repr__()``, direct + iteration), the data is downloaded from BigQuery to the client + machine. This option restricts the number of rows that can be + downloaded. If the number of rows to be downloaded exceeds this + limit, a ``bigframes.exceptions.MaximumResultRowsExceeded`` + exception is raised. + + semantic_ops_confirmation_threshold (int | None): + .. deprecated:: 1.42.0 + Semantic operators are deprecated. Please use AI operators instead + + semantic_ops_threshold_autofail (bool): + .. deprecated:: 1.42.0 + Semantic operators are deprecated. Please use AI operators instead """ - maximum_bytes_billed: Optional[int] = None + ai_ops_confirmation_threshold: Optional[int] = 0 + ai_ops_threshold_autofail: bool = False + allow_large_results: Optional[bool] = None enable_multi_query_execution: bool = False extra_query_labels: Dict[str, Any] = dataclasses.field( default_factory=dict, init=False ) + maximum_bytes_billed: Optional[int] = None + maximum_result_rows: Optional[int] = None semantic_ops_confirmation_threshold: Optional[int] = 0 semantic_ops_threshold_autofail = False - ai_ops_confirmation_threshold: Optional[int] = 0 - ai_ops_threshold_autofail: bool = False - - allow_large_results: Optional[bool] = None - def assign_extra_query_labels(self, **kwargs: Any) -> None: """ Assigns additional custom labels for query configuration. The method updates the diff --git a/bigframes/exceptions.py b/bigframes/exceptions.py index 8924295c29..eda24a74f0 100644 --- a/bigframes/exceptions.py +++ b/bigframes/exceptions.py @@ -71,6 +71,10 @@ class OperationAbortedError(RuntimeError): """Operation is aborted.""" +class MaximumResultRowsExceeded(RuntimeError): + """Maximum number of rows in the result was exceeded.""" + + class TimeTravelDisabledWarning(Warning): """A query was reattempted without time travel.""" diff --git a/bigframes/session/bq_caching_executor.py b/bigframes/session/bq_caching_executor.py index 47be6fa768..9ad8da33a8 100644 --- a/bigframes/session/bq_caching_executor.py +++ b/bigframes/session/bq_caching_executor.py @@ -28,6 +28,8 @@ import google.cloud.bigquery.table as bq_table import google.cloud.bigquery_storage_v1 +import bigframes +from bigframes import exceptions as bfe import bigframes.constants import bigframes.core from bigframes.core import compile, local_data, rewrite @@ -38,7 +40,6 @@ import bigframes.core.schema as schemata import bigframes.core.tree_properties as tree_properties import bigframes.dtypes -import bigframes.exceptions as bfe import bigframes.features from bigframes.session import executor, loader, local_scan_executor, read_api_execution import bigframes.session._io.bigquery as bq_io @@ -415,7 +416,7 @@ def _run_execute_query( # Unfortunately, this error type does not have a separate error code or exception type if "Resources exceeded during query execution" in e.message: new_message = "Computation is too complex to execute as a single query. Try using DataFrame.cache() on intermediate results, or setting bigframes.options.compute.enable_multi_query_execution." - raise bigframes.exceptions.QueryComplexityError(new_message) from e + raise bfe.QueryComplexityError(new_message) from e else: raise @@ -688,7 +689,7 @@ def _execute_plan( ) return executor.ExecuteResult( - arrow_batches=iterator.to_arrow_iterable( + _arrow_batches=iterator.to_arrow_iterable( bqstorage_client=self.bqstoragereadclient ), schema=plan.schema, diff --git a/bigframes/session/direct_gbq_execution.py b/bigframes/session/direct_gbq_execution.py index 4b19f7441d..1d46192ac3 100644 --- a/bigframes/session/direct_gbq_execution.py +++ b/bigframes/session/direct_gbq_execution.py @@ -50,7 +50,7 @@ def execute( ) return executor.ExecuteResult( - arrow_batches=iterator.to_arrow_iterable(), + _arrow_batches=iterator.to_arrow_iterable(), schema=plan.schema, query_job=query_job, total_rows=iterator.total_rows, diff --git a/bigframes/session/executor.py b/bigframes/session/executor.py index ee1218017b..c913f39791 100644 --- a/bigframes/session/executor.py +++ b/bigframes/session/executor.py @@ -24,20 +24,46 @@ import pandas as pd import pyarrow +import bigframes import bigframes.core from bigframes.core import pyarrow_utils import bigframes.core.schema import bigframes.session._io.pandas as io_pandas +_ROW_LIMIT_EXCEEDED_TEMPLATE = ( + "Execution has downloaded {result_rows} rows so far, which exceeds the " + "limit of {maximum_result_rows}. You can adjust this limit by setting " + "`bpd.options.compute.maximum_result_rows`." +) + @dataclasses.dataclass(frozen=True) class ExecuteResult: - arrow_batches: Iterator[pyarrow.RecordBatch] + _arrow_batches: Iterator[pyarrow.RecordBatch] schema: bigframes.core.schema.ArraySchema query_job: Optional[bigquery.QueryJob] = None total_bytes: Optional[int] = None total_rows: Optional[int] = None + @property + def arrow_batches(self) -> Iterator[pyarrow.RecordBatch]: + result_rows = 0 + + for batch in self._arrow_batches: + result_rows += batch.num_rows + + maximum_result_rows = bigframes.options.compute.maximum_result_rows + if maximum_result_rows is not None and result_rows > maximum_result_rows: + message = bigframes.exceptions.format_message( + _ROW_LIMIT_EXCEEDED_TEMPLATE.format( + result_rows=result_rows, + maximum_result_rows=maximum_result_rows, + ) + ) + raise bigframes.exceptions.MaximumResultRowsExceeded(message) + + yield batch + def to_arrow_table(self) -> pyarrow.Table: # Need to provide schema if no result rows, as arrow can't infer # If ther are rows, it is safest to infer schema from batches. diff --git a/bigframes/session/local_scan_executor.py b/bigframes/session/local_scan_executor.py index b4d7b226e2..65f088e8a1 100644 --- a/bigframes/session/local_scan_executor.py +++ b/bigframes/session/local_scan_executor.py @@ -58,7 +58,7 @@ def execute( total_rows = min(peek, total_rows) return executor.ExecuteResult( - arrow_batches=arrow_table.to_batches(), + _arrow_batches=arrow_table.to_batches(), schema=plan.schema, query_job=None, total_bytes=None, diff --git a/bigframes/session/polars_executor.py b/bigframes/session/polars_executor.py index 6f1f35764c..6e3e15499d 100644 --- a/bigframes/session/polars_executor.py +++ b/bigframes/session/polars_executor.py @@ -80,7 +80,7 @@ def execute( lazy_frame = lazy_frame.limit(peek) pa_table = lazy_frame.collect().to_arrow() return executor.ExecuteResult( - arrow_batches=iter(map(self._adapt_batch, pa_table.to_batches())), + _arrow_batches=iter(map(self._adapt_batch, pa_table.to_batches())), schema=plan.schema, total_bytes=pa_table.nbytes, total_rows=pa_table.num_rows, diff --git a/bigframes/session/read_api_execution.py b/bigframes/session/read_api_execution.py index d4bbf2783c..d5bcf1dbc7 100644 --- a/bigframes/session/read_api_execution.py +++ b/bigframes/session/read_api_execution.py @@ -111,7 +111,7 @@ def process_page(page): rows = min(peek, rows) return executor.ExecuteResult( - arrow_batches=batches, + _arrow_batches=batches, schema=plan.schema, query_job=None, total_bytes=None, diff --git a/bigframes/testing/polars_session.py b/bigframes/testing/polars_session.py index 723841a672..7b898a9f00 100644 --- a/bigframes/testing/polars_session.py +++ b/bigframes/testing/polars_session.py @@ -46,7 +46,7 @@ def peek( # Currently, pyarrow types might not quite be exactly the ones in the bigframes schema. # Nullability may be different, and might use large versions of list, string datatypes. return bigframes.session.executor.ExecuteResult( - arrow_batches=pa_table.to_batches(), + _arrow_batches=pa_table.to_batches(), schema=array_value.schema, total_bytes=pa_table.nbytes, total_rows=pa_table.num_rows, @@ -69,7 +69,7 @@ def execute( # Currently, pyarrow types might not quite be exactly the ones in the bigframes schema. # Nullability may be different, and might use large versions of list, string datatypes. return bigframes.session.executor.ExecuteResult( - arrow_batches=pa_table.to_batches(), + _arrow_batches=pa_table.to_batches(), schema=array_value.schema, total_bytes=pa_table.nbytes, total_rows=pa_table.num_rows, diff --git a/tests/system/small/test_pandas_options.py b/tests/system/small/test_pandas_options.py index 55e5036a42..1d360e0d4f 100644 --- a/tests/system/small/test_pandas_options.py +++ b/tests/system/small/test_pandas_options.py @@ -18,11 +18,10 @@ import warnings import google.api_core.exceptions -import google.auth -import google.auth.exceptions +import pandas.testing import pytest -import bigframes.core.global_session +import bigframes.exceptions import bigframes.pandas as bpd @@ -327,3 +326,45 @@ def test_credentials_need_reauthentication( # Now verify that use is able to start over df = bpd.read_gbq(test_query) assert df is not None + + +def test_max_rows_normal_execution_within_limit( + scalars_df_index, scalars_pandas_df_index +): + """Test queries execute normally when the number of rows is within the limit.""" + with bpd.option_context("compute.maximum_result_rows", 10): + df = scalars_df_index.head(10) + result = df.to_pandas() + + expected = scalars_pandas_df_index.head(10) + pandas.testing.assert_frame_equal(result, expected) + + with bpd.option_context("compute.maximum_result_rows", 10), bpd.option_context( + "display.repr_mode", "head" + ): + df = scalars_df_index.head(10) + assert repr(df) is not None + + # We should be able to get away with only a single row for shape. + with bpd.option_context("compute.maximum_result_rows", 1): + shape = scalars_df_index.shape + assert shape == scalars_pandas_df_index.shape + + # 0 is not recommended, as it would stop aggregations and many other + # necessary operations, but we shouldn't need even 1 row for to_gbq(). + with bpd.option_context("compute.maximum_result_rows", 0): + destination = scalars_df_index.to_gbq() + assert destination is not None + + +def test_max_rows_exceeds_limit(scalars_df_index): + """Test to_pandas() raises MaximumRowsDownloadedExceeded when the limit is exceeded.""" + with bpd.option_context("compute.maximum_result_rows", 5), pytest.raises( + bigframes.exceptions.MaximumResultRowsExceeded, match="5" + ): + scalars_df_index.to_pandas() + + with bpd.option_context("compute.maximum_result_rows", 5), pytest.raises( + bigframes.exceptions.MaximumResultRowsExceeded, match="5" + ): + next(iter(scalars_df_index.to_pandas_batches()))