diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 76012030b6..7ac2b03f28 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -559,10 +559,12 @@ def to_pandas( return df, query_job def try_peek( - self, n: int = 20, force: bool = False + self, n: int = 20, force: bool = False, allow_large_results=None ) -> typing.Optional[pd.DataFrame]: if force or self.expr.supports_fast_peek: - result = self.session._executor.peek(self.expr, n) + result = self.session._executor.peek( + self.expr, n, use_explicit_destination=allow_large_results + ) df = io_pandas.arrow_to_pandas(result.to_arrow_table(), self.expr.schema) self._copy_index_to_pandas(df) return df @@ -614,17 +616,27 @@ def _materialize_local( self.expr, ordered=materialize_options.ordered, use_explicit_destination=materialize_options.allow_large_results, - get_size_bytes=True, ) - assert execute_result.total_bytes is not None - table_mb = execute_result.total_bytes / _BYTES_TO_MEGABYTES sample_config = materialize_options.downsampling - max_download_size = sample_config.max_download_size - fraction = ( - max_download_size / table_mb - if (max_download_size is not None) and (table_mb != 0) - else 2 - ) + if execute_result.total_bytes is not None: + table_mb = execute_result.total_bytes / _BYTES_TO_MEGABYTES + max_download_size = sample_config.max_download_size + fraction = ( + max_download_size / table_mb + if (max_download_size is not None) and (table_mb != 0) + else 2 + ) + else: + # Since we cannot acquire the table size without a query_job, + # we skip the sampling. + if sample_config.enable_downsampling: + warnings.warn( + "Sampling is disabled and there is no download size limit when 'allow_large_results' is set to " + "False. To prevent downloading excessive data, it is recommended to use the peek() method, or " + "limit the data with methods like .head() or .sample() before proceeding with downloads.", + UserWarning, + ) + fraction = 2 # TODO: Maybe materialize before downsampling # Some downsampling methods diff --git a/bigframes/core/indexes/base.py b/bigframes/core/indexes/base.py index 3f48fd3db2..84da6c5de0 100644 --- a/bigframes/core/indexes/base.py +++ b/bigframes/core/indexes/base.py @@ -505,7 +505,8 @@ def to_pandas(self, *, allow_large_results: Optional[bool] = None) -> pandas.Ind df, query_job = self._block.index.to_pandas( ordered=True, allow_large_results=allow_large_results ) - self._query_job = query_job + if query_job: + self._query_job = query_job return df def to_numpy(self, dtype=None, *, allow_large_results=None, **kwargs) -> np.ndarray: diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 816e714998..b5174dbd3e 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -1587,7 +1587,8 @@ def to_arrow( pa_table, query_job = self._block.to_arrow( ordered=ordered, allow_large_results=allow_large_results ) - self._set_internal_query_job(query_job) + if query_job: + self._set_internal_query_job(query_job) return pa_table def to_pandas( @@ -1637,7 +1638,8 @@ def to_pandas( ordered=ordered, allow_large_results=allow_large_results, ) - self._set_internal_query_job(query_job) + if query_job: + self._set_internal_query_job(query_job) return df.set_axis(self._block.column_labels, axis=1, copy=False) def to_pandas_batches( @@ -1687,7 +1689,9 @@ def head(self, n: int = 5) -> DataFrame: def tail(self, n: int = 5) -> DataFrame: return typing.cast(DataFrame, self.iloc[-n:]) - def peek(self, n: int = 5, *, force: bool = True) -> pandas.DataFrame: + def peek( + self, n: int = 5, *, force: bool = True, allow_large_results=None + ) -> pandas.DataFrame: """ Preview n arbitrary rows from the dataframe. No guarantees about row selection or ordering. ``DataFrame.peek(force=False)`` will always be very fast, but will not succeed if data requires @@ -1700,17 +1704,22 @@ def peek(self, n: int = 5, *, force: bool = True) -> pandas.DataFrame: force (bool, default True): If the data cannot be peeked efficiently, the dataframe will instead be fully materialized as part of the operation if ``force=True``. If ``force=False``, the operation will throw a ValueError. + allow_large_results (bool, default None): + If not None, overrides the global setting to allow or disallow large query results + over the default size limit of 10 GB. Returns: pandas.DataFrame: A pandas DataFrame with n rows. Raises: ValueError: If force=False and data cannot be efficiently peeked. """ - maybe_result = self._block.try_peek(n) + maybe_result = self._block.try_peek(n, allow_large_results=allow_large_results) if maybe_result is None: if force: self._cached() - maybe_result = self._block.try_peek(n, force=True) + maybe_result = self._block.try_peek( + n, force=True, allow_large_results=allow_large_results + ) assert maybe_result is not None else: raise ValueError( diff --git a/bigframes/functions/_function_client.py b/bigframes/functions/_function_client.py index 68580332c3..3e69563db6 100644 --- a/bigframes/functions/_function_client.py +++ b/bigframes/functions/_function_client.py @@ -125,6 +125,7 @@ def _create_bq_function(self, create_function_ddl: str) -> None: create_function_ddl, job_config=bigquery.QueryJobConfig(), ) + assert query_job is not None logger.info(f"Created bigframes function {query_job.ddl_target_routine}") def _format_function_options(self, function_options: dict) -> str: diff --git a/bigframes/series.py b/bigframes/series.py index d40ed39262..2c37913679 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -420,7 +420,8 @@ def to_pandas( ordered=ordered, allow_large_results=allow_large_results, ) - self._set_internal_query_job(query_job) + if query_job: + self._set_internal_query_job(query_job) series = df.squeeze(axis=1) series.name = self._name return series @@ -690,7 +691,9 @@ def head(self, n: int = 5) -> Series: def tail(self, n: int = 5) -> Series: return typing.cast(Series, self.iloc[-n:]) - def peek(self, n: int = 5, *, force: bool = True) -> pandas.Series: + def peek( + self, n: int = 5, *, force: bool = True, allow_large_results=None + ) -> pandas.Series: """ Preview n arbitrary elements from the series without guarantees about row selection or ordering. @@ -704,17 +707,22 @@ def peek(self, n: int = 5, *, force: bool = True) -> pandas.Series: force (bool, default True): If the data cannot be peeked efficiently, the series will instead be fully materialized as part of the operation if ``force=True``. If ``force=False``, the operation will throw a ValueError. + allow_large_results (bool, default None): + If not None, overrides the global setting to allow or disallow large query results + over the default size limit of 10 GB. Returns: pandas.Series: A pandas Series with n rows. Raises: ValueError: If force=False and data cannot be efficiently peeked. """ - maybe_result = self._block.try_peek(n) + maybe_result = self._block.try_peek(n, allow_large_results=allow_large_results) if maybe_result is None: if force: self._cached() - maybe_result = self._block.try_peek(n, force=True) + maybe_result = self._block.try_peek( + n, force=True, allow_large_results=allow_large_results + ) assert maybe_result is not None else: raise ValueError( diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 8f53dccc06..13e49fca42 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -344,11 +344,25 @@ def _project(self): @property def bytes_processed_sum(self): """The sum of all bytes processed by bigquery jobs using this session.""" + warnings.warn( + "Queries executed with `allow_large_results=False` within the session will not " + "have their bytes processed counted in this sum. If you need precise " + "bytes processed information, query the `INFORMATION_SCHEMA` tables " + "to get relevant metrics.", + UserWarning, + ) return self._metrics.bytes_processed @property def slot_millis_sum(self): """The sum of all slot time used by bigquery jobs in this session.""" + warnings.warn( + "Queries executed with `allow_large_results=False` within the session will not " + "have their slot milliseconds counted in this sum. If you need precise slot " + "milliseconds information, query the `INFORMATION_SCHEMA` tables " + "to get relevant metrics.", + UserWarning, + ) return self._metrics.slot_millis @property @@ -1675,11 +1689,13 @@ def _start_query_ml_ddl( # so we must reset any encryption set in the job config # https://cloud.google.com/bigquery/docs/customer-managed-encryption#encrypt-model job_config.destination_encryption_configuration = None - - return bf_io_bigquery.start_query_with_client( + iterator, query_job = bf_io_bigquery.start_query_with_client( self.bqclient, sql, job_config=job_config, metrics=self._metrics ) + assert query_job is not None + return iterator, query_job + def _create_object_table(self, path: str, connection: str) -> str: """Create a random id Object Table from the input path and connection.""" table = str(self._loader._storage_manager._random_table()) diff --git a/bigframes/session/_io/bigquery/__init__.py b/bigframes/session/_io/bigquery/__init__.py index 8fcc36b4d3..94cab7cbf6 100644 --- a/bigframes/session/_io/bigquery/__init__.py +++ b/bigframes/session/_io/bigquery/__init__.py @@ -228,7 +228,9 @@ def start_query_with_client( timeout: Optional[float] = None, api_name: Optional[str] = None, metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None, -) -> Tuple[bigquery.table.RowIterator, bigquery.QueryJob]: + *, + query_with_job: bool = True, +) -> Tuple[bigquery.table.RowIterator, Optional[bigquery.QueryJob]]: """ Starts query job and waits for results. """ @@ -236,6 +238,18 @@ def start_query_with_client( # Note: Ensure no additional labels are added to job_config after this point, # as `add_and_trim_labels` ensures the label count does not exceed 64. add_and_trim_labels(job_config, api_name=api_name) + if not query_with_job: + results_iterator = bq_client.query_and_wait( + sql, + job_config=job_config, + location=location, + project=project, + api_timeout=timeout, + ) + if metrics is not None: + metrics.count_job_stats() + return results_iterator, None + query_job = bq_client.query( sql, job_config=job_config, @@ -338,6 +352,7 @@ def create_bq_dataset_reference( # to the dataset, no BigQuery Session required. Note: there is a # different anonymous dataset per location. See: # https://cloud.google.com/bigquery/docs/cached-results#how_cached_results_are_stored + assert query_job is not None query_destination = query_job.destination return bigquery.DatasetReference( query_destination.project, diff --git a/bigframes/session/executor.py b/bigframes/session/executor.py index eb444c332c..22d1c1dcea 100644 --- a/bigframes/session/executor.py +++ b/bigframes/session/executor.py @@ -105,7 +105,6 @@ def execute( *, ordered: bool = True, use_explicit_destination: Optional[bool] = False, - get_size_bytes: bool = False, page_size: Optional[int] = None, max_results: Optional[int] = None, ): @@ -152,6 +151,7 @@ def peek( self, array_value: bigframes.core.ArrayValue, n_rows: int, + use_explicit_destination: Optional[bool] = False, ) -> ExecuteResult: """ A 'peek' efficiently accesses a small number of rows in the dataframe. @@ -233,8 +233,7 @@ def execute( array_value: bigframes.core.ArrayValue, *, ordered: bool = True, - use_explicit_destination: Optional[bool] = False, - get_size_bytes: bool = False, + use_explicit_destination: Optional[bool] = None, page_size: Optional[int] = None, max_results: Optional[int] = None, ): @@ -259,13 +258,14 @@ def execute( job_config=job_config, page_size=page_size, max_results=max_results, + query_with_job=use_explicit_destination, ) # Though we provide the read client, iterator may or may not use it based on what is efficient for the result def iterator_supplier(): return iterator.to_arrow_iterable(bqstorage_client=self.bqstoragereadclient) - if get_size_bytes is True or use_explicit_destination: + if query_job: size_bytes = self.bqclient.get_table(query_job.destination).num_bytes else: size_bytes = None @@ -329,8 +329,7 @@ def export_gbq( if if_exists != "append" and has_timedelta_col: # Only update schema if this is not modifying an existing table, and the # new table contains timedelta columns. - assert query_job.destination is not None - table = self.bqclient.get_table(query_job.destination) + table = self.bqclient.get_table(destination) table.schema = array_value.schema.to_bigquery() self.bqclient.update_table(table, ["schema"]) @@ -377,6 +376,7 @@ def peek( self, array_value: bigframes.core.ArrayValue, n_rows: int, + use_explicit_destination: Optional[bool] = None, ) -> ExecuteResult: """ A 'peek' efficiently accesses a small number of rows in the dataframe. @@ -385,12 +385,24 @@ def peek( if not tree_properties.can_fast_peek(plan): msg = "Peeking this value cannot be done efficiently." warnings.warn(msg) + if use_explicit_destination is None: + use_explicit_destination = bigframes.options.bigquery.allow_large_results + + job_config = bigquery.QueryJobConfig() + # Use explicit destination to avoid 10GB limit of temporary table + if use_explicit_destination: + destination_table = self.storage_manager.create_temp_table( + array_value.schema.to_bigquery(), cluster_cols=[] + ) + job_config.destination = destination_table sql = self.compiler.compile(plan, ordered=False, limit=n_rows) # TODO(swast): plumb through the api_name of the user-facing api that # caused this query. - iterator, query_job = self._run_execute_query(sql=sql) + iterator, query_job = self._run_execute_query( + sql=sql, job_config=job_config, query_with_job=use_explicit_destination + ) return ExecuteResult( # Probably don't need read client for small peek results, but let client decide arrow_batches=lambda: iterator.to_arrow_iterable( @@ -485,7 +497,8 @@ def _run_execute_query( api_name: Optional[str] = None, page_size: Optional[int] = None, max_results: Optional[int] = None, - ) -> Tuple[bq_table.RowIterator, bigquery.QueryJob]: + query_with_job: bool = True, + ) -> Tuple[bq_table.RowIterator, Optional[bigquery.QueryJob]]: """ Starts BigQuery query job and waits for results. """ @@ -503,7 +516,7 @@ def _run_execute_query( # as `add_and_trim_labels` ensures the label count does not exceed 64. bq_io.add_and_trim_labels(job_config, api_name=api_name) try: - return bq_io.start_query_with_client( + iterator, query_job = bq_io.start_query_with_client( self.bqclient, sql, job_config=job_config, @@ -511,7 +524,9 @@ def _run_execute_query( max_results=max_results, page_size=page_size, metrics=self.metrics, + query_with_job=query_with_job, ) + return iterator, query_job except google.api_core.exceptions.BadRequest as e: # Unfortunately, this error type does not have a separate error code or exception type @@ -642,7 +657,7 @@ def _sql_as_cached_temp_table( job_config=job_config, api_name="cached", ) - query_job.destination + assert query_job is not None query_job.result() return query_job.destination diff --git a/bigframes/session/loader.py b/bigframes/session/loader.py index 7204a14870..7c2586fe76 100644 --- a/bigframes/session/loader.py +++ b/bigframes/session/loader.py @@ -726,7 +726,7 @@ def _start_query( job_config.maximum_bytes_billed = ( bigframes.options.compute.maximum_bytes_billed ) - return bf_io_bigquery.start_query_with_client( + iterator, query_job = bf_io_bigquery.start_query_with_client( self._bqclient, sql, job_config=job_config, @@ -734,6 +734,8 @@ def _start_query( timeout=timeout, api_name=api_name, ) + assert query_job is not None + return iterator, query_job def _transform_read_gbq_configuration(configuration: Optional[dict]) -> dict: diff --git a/bigframes/session/metrics.py b/bigframes/session/metrics.py index 33bcd7fbf5..1cb561693b 100644 --- a/bigframes/session/metrics.py +++ b/bigframes/session/metrics.py @@ -32,7 +32,11 @@ class ExecutionMetrics: execution_secs: float = 0 query_char_count: int = 0 - def count_job_stats(self, query_job: bq_job.QueryJob): + def count_job_stats(self, query_job: Optional[bq_job.QueryJob] = None): + if query_job is None: + self.execution_count += 1 + return + stats = get_performance_stats(query_job) if stats is not None: bytes_processed, slot_millis, execution_secs, query_char_count = stats diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index 7f43583ef6..55a2a59bc7 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -662,7 +662,25 @@ def test_rename(scalars_dfs): def test_df_peek(scalars_dfs_maybe_ordered): scalars_df, scalars_pandas_df = scalars_dfs_maybe_ordered + + session = scalars_df._block.session + slot_millis_sum = session.slot_millis_sum peek_result = scalars_df.peek(n=3, force=False) + + assert session.slot_millis_sum - slot_millis_sum > 1000 + pd.testing.assert_index_equal(scalars_pandas_df.columns, peek_result.columns) + assert len(peek_result) == 3 + + +def test_df_peek_with_large_results_not_allowed(scalars_dfs_maybe_ordered): + scalars_df, scalars_pandas_df = scalars_dfs_maybe_ordered + + session = scalars_df._block.session + slot_millis_sum = session.slot_millis_sum + peek_result = scalars_df.peek(n=3, force=False, allow_large_results=False) + + # The metrics won't be fully updated when we call query_and_wait. + assert session.slot_millis_sum - slot_millis_sum < 500 pd.testing.assert_index_equal(scalars_pandas_df.columns, peek_result.columns) assert len(peek_result) == 3 diff --git a/tests/system/small/test_dataframe_io.py b/tests/system/small/test_dataframe_io.py index dac0c62567..4758c2d5b4 100644 --- a/tests/system/small/test_dataframe_io.py +++ b/tests/system/small/test_dataframe_io.py @@ -256,24 +256,26 @@ def test_to_pandas_override_global_option(scalars_df_index): # Direct call to_pandas uses global default setting (allow_large_results=True), # table has 'bqdf' prefix. scalars_df_index.to_pandas() - assert scalars_df_index._query_job.destination.table_id.startswith("bqdf") + table_id = scalars_df_index._query_job.destination.table_id + assert table_id.startswith("bqdf") - # When allow_large_results=False, a destination table is implicitly created, - # table has 'anon' prefix. + # When allow_large_results=False, a query_job object should not be created. + # Therefore, the table_id should remain unchanged. scalars_df_index.to_pandas(allow_large_results=False) - assert scalars_df_index._query_job.destination.table_id.startswith("anon") + assert scalars_df_index._query_job.destination.table_id == table_id def test_to_arrow_override_global_option(scalars_df_index): - # Direct call to_pandas uses global default setting (allow_large_results=True), + # Direct call to_arrow uses global default setting (allow_large_results=True), # table has 'bqdf' prefix. scalars_df_index.to_arrow() - assert scalars_df_index._query_job.destination.table_id.startswith("bqdf") + table_id = scalars_df_index._query_job.destination.table_id + assert table_id.startswith("bqdf") - # When allow_large_results=False, a destination table is implicitly created, - # table has 'anon' prefix. + # When allow_large_results=False, a query_job object should not be created. + # Therefore, the table_id should remain unchanged. scalars_df_index.to_arrow(allow_large_results=False) - assert scalars_df_index._query_job.destination.table_id.startswith("anon") + assert scalars_df_index._query_job.destination.table_id == table_id def test_load_json_w_unboxed_py_value(session): diff --git a/tests/system/small/test_index_io.py b/tests/system/small/test_index_io.py index 31818dfad8..a7cd4013b9 100644 --- a/tests/system/small/test_index_io.py +++ b/tests/system/small/test_index_io.py @@ -15,25 +15,29 @@ def test_to_pandas_override_global_option(scalars_df_index): bf_index = scalars_df_index.index + # Direct call to_pandas uses global default setting (allow_large_results=True), # table has 'bqdf' prefix. bf_index.to_pandas() - assert bf_index._query_job.destination.table_id.startswith("bqdf") + table_id = bf_index._query_job.destination.table_id + assert table_id.startswith("bqdf") - # When allow_large_results=False, a destination table is implicitly created, - # table has 'anon' prefix. + # When allow_large_results=False, a query_job object should not be created. + # Therefore, the table_id should remain unchanged. bf_index.to_pandas(allow_large_results=False) - assert bf_index._query_job.destination.table_id.startswith("anon") + assert bf_index._query_job.destination.table_id == table_id def test_to_numpy_override_global_option(scalars_df_index): bf_index = scalars_df_index.index - # Direct call to_pandas uses global default setting (allow_large_results=True), + + # Direct call to_numpy uses global default setting (allow_large_results=True), # table has 'bqdf' prefix. bf_index.to_numpy() - assert bf_index._query_job.destination.table_id.startswith("bqdf") + table_id = bf_index._query_job.destination.table_id + assert table_id.startswith("bqdf") - # When allow_large_results=False, a destination table is implicitly created, - # table has 'anon' prefix. + # When allow_large_results=False, a query_job object should not be created. + # Therefore, the table_id should remain unchanged. bf_index.to_numpy(allow_large_results=False) - assert bf_index._query_job.destination.table_id.startswith("anon") + assert bf_index._query_job.destination.table_id == table_id diff --git a/tests/system/small/test_series.py b/tests/system/small/test_series.py index 2daa7dd825..980f2226b7 100644 --- a/tests/system/small/test_series.py +++ b/tests/system/small/test_series.py @@ -2269,11 +2269,36 @@ def test_head_then_series_operation(scalars_dfs): def test_series_peek(scalars_dfs): scalars_df, scalars_pandas_df = scalars_dfs + + session = scalars_df._block.session + slot_millis_sum = session.slot_millis_sum peek_result = scalars_df["float64_col"].peek(n=3, force=False) + + assert session.slot_millis_sum - slot_millis_sum > 1000 + pd.testing.assert_series_equal( + peek_result, + scalars_pandas_df["float64_col"].reindex_like(peek_result), + ) + assert len(peek_result) == 3 + + +def test_series_peek_with_large_results_not_allowed(scalars_dfs): + scalars_df, scalars_pandas_df = scalars_dfs + + session = scalars_df._block.session + slot_millis_sum = session.slot_millis_sum + peek_result = scalars_df["float64_col"].peek( + n=3, force=False, allow_large_results=False + ) + + # The metrics won't be fully updated when we call query_and_wait. + print(session.slot_millis_sum - slot_millis_sum) + assert session.slot_millis_sum - slot_millis_sum < 500 pd.testing.assert_series_equal( peek_result, scalars_pandas_df["float64_col"].reindex_like(peek_result), ) + assert len(peek_result) == 3 def test_series_peek_multi_index(scalars_dfs): diff --git a/tests/system/small/test_series_io.py b/tests/system/small/test_series_io.py index ed27246a80..d44d1e5b24 100644 --- a/tests/system/small/test_series_io.py +++ b/tests/system/small/test_series_io.py @@ -15,12 +15,18 @@ def test_to_pandas_override_global_option(scalars_df_index): bf_series = scalars_df_index["int64_col"] + # Direct call to_pandas uses global default setting (allow_large_results=True), # table has 'bqdf' prefix. bf_series.to_pandas() - assert bf_series._query_job.destination.table_id.startswith("bqdf") + table_id = bf_series._query_job.destination.table_id + assert table_id.startswith("bqdf") + + session = bf_series._block.session + execution_count = session._metrics.execution_count - # When allow_large_results=False, a destination table is implicitly created, - # table has 'anon' prefix. + # When allow_large_results=False, a query_job object should not be created. + # Therefore, the table_id should remain unchanged. bf_series.to_pandas(allow_large_results=False) - assert bf_series._query_job.destination.table_id.startswith("anon") + assert bf_series._query_job.destination.table_id == table_id + assert session._metrics.execution_count - execution_count == 1 diff --git a/tests/unit/polars_session.py b/tests/unit/polars_session.py index 4f941c4e10..a27db0e438 100644 --- a/tests/unit/polars_session.py +++ b/tests/unit/polars_session.py @@ -40,7 +40,6 @@ def execute( *, ordered: bool = True, use_explicit_destination: Optional[bool] = False, - get_size_bytes: bool = False, page_size: Optional[int] = None, max_results: Optional[int] = None, ):