From dac34d7f61f6ade8266b1793071f8c64e9365a5d Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Thu, 27 Feb 2025 22:09:08 +0000 Subject: [PATCH 01/11] feat: Support dry_run in --- bigframes/core/blocks.py | 11 ++++++++-- bigframes/core/indexes/base.py | 28 ++++++++++++++++++++----- bigframes/dataframe.py | 25 +++++++++++++++++++--- bigframes/series.py | 23 +++++++++++++++++--- tests/system/small/test_dataframe_io.py | 9 ++++++++ tests/system/small/test_index.py | 18 +++++++++++++--- tests/system/small/test_multiindex.py | 6 +++++- tests/system/small/test_series.py | 10 +++++++++ 8 files changed, 113 insertions(+), 17 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 10970b24e8..1b26758d97 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -511,6 +511,7 @@ def to_pandas( random_state: Optional[int] = None, *, ordered: bool = True, + dry_run: bool = False, ) -> Tuple[pd.DataFrame, Optional[bigquery.QueryJob]]: """Run query and download results as a pandas DataFrame. @@ -533,10 +534,16 @@ def to_pandas( ordered (bool, default True): Determines whether the resulting pandas dataframe will be ordered. Whether the row ordering is deterministics depends on whether session ordering is strict. + dry_run (bool, default False): + Whether to perfrom a dry run. If true, the method will return a dataframe containing dry run + stats instead. Returns: pandas.DataFrame, QueryJob """ + if dry_run: + self._compute_dry_run() + if (sampling_method is not None) and (sampling_method not in _SAMPLING_METHODS): raise NotImplementedError( f"The downsampling method {sampling_method} is not implemented, " @@ -780,10 +787,10 @@ def split( return [sliced_block.drop_columns(drop_cols) for sliced_block in sliced_blocks] def _compute_dry_run( - self, value_keys: Optional[Iterable[str]] = None + self, value_keys: Optional[Iterable[str]] = None, ordered: bool = True ) -> bigquery.QueryJob: expr = self._apply_value_keys_to_expr(value_keys=value_keys) - query_job = self.session._executor.dry_run(expr) + query_job = self.session._executor.dry_run(expr, ordered) return query_job def _apply_value_keys_to_expr(self, value_keys: Optional[Iterable[str]] = None): diff --git a/bigframes/core/indexes/base.py b/bigframes/core/indexes/base.py index b3a07d33bc..de06f81074 100644 --- a/bigframes/core/indexes/base.py +++ b/bigframes/core/indexes/base.py @@ -236,7 +236,7 @@ def query_job(self) -> Optional[bigquery.QueryJob]: `_. """ if self._query_job is None: - self._query_job = self._block._compute_dry_run() + self._query_job = self._block._compute_dry_run(ordered=True) return self._query_job def __repr__(self) -> str: @@ -252,7 +252,7 @@ def __repr__(self) -> str: opts = bigframes.options.display max_results = opts.max_rows if opts.repr_mode == "deferred": - return formatter.repr_query_job(self._block._compute_dry_run()) + return formatter.repr_query_job(self._block._compute_dry_run(ordered=True)) pandas_df, _, query_job = self._block.retrieve_repr_request_results(max_results) self._query_job = query_job @@ -490,13 +490,31 @@ def __getitem__(self, key: int) -> typing.Any: else: raise NotImplementedError(f"Index key not supported {key}") - def to_pandas(self) -> pandas.Index: + def to_pandas(self, *, dry_run: bool = False) -> pandas.Index | pandas.Series: """Gets the Index as a pandas Index. + Args: + dry_run (bool, default False): + If this argument is true, this method will not process the data. Instead, it returns + a Pandas series containing dtype and the amount of bytes to be processed. + Returns: - pandas.Index: - A pandas Index with all of the labels from this Index. + pandas.Index | pandas.Series: + A pandas Index with all of the labels from this Index. If dry run is set to True, + returns a series containing dry run statistics. """ + + if dry_run: + query_job = self._block._compute_dry_run(ordered=True) + + return pandas.Series( + data=[ + self.dtype, + query_job.total_bytes_processed, + ], + index=["dtype", "total_bytes_processed"], + ) + return self._block.index.to_pandas(ordered=True) def to_numpy(self, dtype=None, **kwargs) -> np.ndarray: diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index caf1b62e07..5c9a1c6803 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -1554,6 +1554,7 @@ def to_pandas( random_state: Optional[int] = None, *, ordered: bool = True, + dry_run: bool = False, ) -> pandas.DataFrame: """Write DataFrame to pandas DataFrame. @@ -1576,12 +1577,30 @@ def to_pandas( ordered (bool, default True): Determines whether the resulting pandas dataframe will be ordered. In some cases, unordered may result in a faster-executing query. + dry_run (bool, default False): + If this argument is true, this method will not process the data. Instead, it returns + a Pandas dataframe containing dry run statistics Returns: pandas.DataFrame: A pandas DataFrame with all rows and columns of this DataFrame if the data_sampling_threshold_mb is not exceeded; otherwise, a pandas DataFrame with - downsampled rows and all columns of this DataFrame. + downsampled rows and all columns of this DataFrame. If dry_run is set, a pandas + DataFrame containing dry run statistics will be returned. """ + if dry_run: + dry_run_job = self._compute_dry_run(ordered) + + return pandas.DataFrame( + data={ + "dry_run_stats": [ + *self.dtypes, + self.index.dtype, + dry_run_job.total_bytes_processed, + ] + }, + index=[*self.columns, "index_dtype", "total_bytes_processed"], + ) + # TODO(orrbradford): Optimize this in future. Potentially some cases where we can return the stored query job df, query_job = self._block.to_pandas( max_download_size=max_download_size, @@ -1616,8 +1635,8 @@ def to_pandas_batches( page_size=page_size, max_results=max_results ) - def _compute_dry_run(self) -> bigquery.QueryJob: - return self._block._compute_dry_run() + def _compute_dry_run(self, ordered: bool = True) -> bigquery.QueryJob: + return self._block._compute_dry_run(ordered=ordered) def copy(self) -> DataFrame: return DataFrame(self._block) diff --git a/bigframes/series.py b/bigframes/series.py index 5a84dee32f..c73e55be05 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -381,6 +381,7 @@ def to_pandas( random_state: Optional[int] = None, *, ordered: bool = True, + dry_run: bool = False, ) -> pandas.Series: """Writes Series to pandas Series. @@ -403,12 +404,28 @@ def to_pandas( ordered (bool, default True): Determines whether the resulting pandas series will be ordered. In some cases, unordered may result in a faster-executing query. + dry_run (bool, default False): + If this argument is true, this method will not process the data. Instead, it returns + a Pandas series containing dtype and the amount of bytes to be processed. Returns: pandas.Series: A pandas Series with all rows of this Series if the data_sampling_threshold_mb - is not exceeded; otherwise, a pandas Series with downsampled rows of the DataFrame. + is not exceeded; otherwise, a pandas Series with downsampled rows of the DataFrame. If dry_run + is set to True, a pandas Series containing dry run statistics will be returned. """ + if dry_run: + dry_run_job = self._compute_dry_run(ordered) + + return pandas.Series( + data=[ + self.dtype, + self.index.dtype, + dry_run_job.total_bytes_processed, + ], + index=["dtype", "index_dtype", "total_bytes_processed"], + ) + df, query_job = self._block.to_pandas( max_download_size=max_download_size, sampling_method=sampling_method, @@ -420,8 +437,8 @@ def to_pandas( series.name = self._name return series - def _compute_dry_run(self) -> bigquery.QueryJob: - return self._block._compute_dry_run((self._value_column,)) + def _compute_dry_run(self, ordered: bool = True) -> bigquery.QueryJob: + return self._block._compute_dry_run((self._value_column,), ordered) def drop( self, diff --git a/tests/system/small/test_dataframe_io.py b/tests/system/small/test_dataframe_io.py index b07213f943..2d325f5d3d 100644 --- a/tests/system/small/test_dataframe_io.py +++ b/tests/system/small/test_dataframe_io.py @@ -847,3 +847,12 @@ def test_to_sql_query_named_index_excluded( utils.assert_pandas_df_equal( roundtrip.to_pandas(), pd_df, check_index_type=False, ignore_order=True ) + + +def test_to_pandas_dry_run(scalars_df_index): + result = scalars_df_index.to_pandas(dry_run=True) + + for col in scalars_df_index.columns: + assert result["dry_run_stats", col] == scalars_df_index[col].dtype + assert result["dry_run_stats", "index_dtype"] == scalars_df_index.index.dtype + assert result["dry_run_stats", "total_bytes_processed"] >= 0 diff --git a/tests/system/small/test_index.py b/tests/system/small/test_index.py index 4d01bc5ee9..cff1b633ab 100644 --- a/tests/system/small/test_index.py +++ b/tests/system/small/test_index.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import typing + import numpy import pandas as pd import pytest @@ -24,8 +26,9 @@ def test_index_construct_from_list(): bf_result = bpd.Index( [3, 14, 159], dtype=pd.Int64Dtype(), name="my_index" ).to_pandas() + pd_result: pd.Index = pd.Index([3, 14, 159], dtype=pd.Int64Dtype(), name="my_index") - pd.testing.assert_index_equal(bf_result, pd_result) + pd.testing.assert_index_equal(typing.cast(pd.Index, bf_result), pd_result) def test_index_construct_from_series(): @@ -39,7 +42,7 @@ def test_index_construct_from_series(): name="index_name", dtype=pd.Int64Dtype(), ) - pd.testing.assert_index_equal(bf_result, pd_result) + pd.testing.assert_index_equal(typing.cast(pd.Index, bf_result), pd_result) def test_index_construct_from_index(): @@ -55,7 +58,7 @@ def test_index_construct_from_index(): pd_result: pd.Index = pd.Index( pd_index_input, dtype=pd.Int64Dtype(), name="index_name" ) - pd.testing.assert_index_equal(bf_result, pd_result) + pd.testing.assert_index_equal(typing.cast(pd.Index, bf_result), pd_result) def test_get_index(scalars_df_index, scalars_pandas_df_index): @@ -425,3 +428,12 @@ def test_multiindex_repr_includes_all_names(session): ) index = session.read_pandas(df).set_index(["A", "B"]).index assert "names=['A', 'B']" in repr(index) + + +def test_to_pandas_dry_run(scalars_df_index): + index = scalars_df_index.index + + result = index.to_pandas(dry_run=True) + + assert result["dtype"] == index.dtype + assert result["total_bytes_processed"] >= 0 diff --git a/tests/system/small/test_multiindex.py b/tests/system/small/test_multiindex.py index 1c78ac63d9..b382e3dc2a 100644 --- a/tests/system/small/test_multiindex.py +++ b/tests/system/small/test_multiindex.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import typing + import numpy as np import pandas import pytest @@ -42,7 +44,9 @@ def test_multi_index_from_arrays(): names=[" 1index 1", "_1index 2"], ) assert bf_idx.names == pd_idx.names - pandas.testing.assert_index_equal(bf_idx.to_pandas(), pd_idx) + pandas.testing.assert_index_equal( + typing.cast(pandas.Index, bf_idx.to_pandas()), pd_idx + ) @skip_legacy_pandas diff --git a/tests/system/small/test_series.py b/tests/system/small/test_series.py index 2daa7dd825..52bf384a17 100644 --- a/tests/system/small/test_series.py +++ b/tests/system/small/test_series.py @@ -4357,3 +4357,13 @@ def test_series_struct_class_attributes_shadow_struct_fields(nested_structs_df): series = nested_structs_df["person"] assert series.name == "person" + + +def test_series_to_pandas_dry_run(scalars_df_index): + bf_series = scalars_df_index["int64_col"] + + result = bf_series.to_pandas(dry_run=True) + + assert result["dtype"] == bf_series.dtype + assert result["index_dtype"] == bf_series.index.dtype + assert result["total_bytes_processed"] >= 0 From 5f8a76af467c75ddd15d9ce20b9d6cd8ab73c0bc Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Fri, 28 Feb 2025 19:23:06 +0000 Subject: [PATCH 02/11] centralize dry_run logics at block level --- bigframes/core/blocks.py | 29 ++++++++++++++++++++----- bigframes/core/indexes/base.py | 23 ++++++++------------ bigframes/dataframe.py | 21 +++++------------- bigframes/series.py | 17 ++++----------- tests/system/small/test_dataframe_io.py | 6 ++--- tests/system/small/test_index.py | 2 +- tests/system/small/test_series.py | 4 ++-- tests/unit/core/test_blocks.py | 16 ++++++++++++++ 8 files changed, 65 insertions(+), 53 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 047c68e7fa..902a56c3b5 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -538,9 +538,6 @@ def to_pandas( Returns: pandas.DataFrame, QueryJob """ - if dry_run: - self._compute_dry_run() - if (sampling_method is not None) and (sampling_method not in _SAMPLING_METHODS): raise NotImplementedError( f"The downsampling method {sampling_method} is not implemented, " @@ -555,6 +552,11 @@ def to_pandas( else: sampling = sampling.with_disabled() + if dry_run: + if sampling.enable_downsampling: + raise NotImplementedError("Dry run with sampling is not supproted") + return self._compute_dry_run(ordered=ordered) + df, query_job = self._materialize_local( materialize_options=MaterializationOptions( downsampling=sampling, @@ -793,10 +795,27 @@ def split( def _compute_dry_run( self, value_keys: Optional[Iterable[str]] = None, ordered: bool = True - ) -> bigquery.QueryJob: + ) -> typing.Tuple[pd.DataFrame, bigquery.QueryJob]: expr = self._apply_value_keys_to_expr(value_keys=value_keys) query_job = self.session._executor.dry_run(expr, ordered) - return query_job + + if len(self.index.dtypes) > 1: + index_type = tuple(self.index.dtypes) + else: + index_type = self.index.dtypes[0] + + df = pd.DataFrame( + data={ + "dry_run_stats": [ + *self.dtypes, + index_type, + query_job.total_bytes_processed, + ] + }, + index=[*self.column_labels, "[index]", "total_bytes_processed"], + ) + + return df, query_job def _apply_value_keys_to_expr(self, value_keys: Optional[Iterable[str]] = None): expr = self._expr diff --git a/bigframes/core/indexes/base.py b/bigframes/core/indexes/base.py index 6f28cabd1e..15f46d3d8e 100644 --- a/bigframes/core/indexes/base.py +++ b/bigframes/core/indexes/base.py @@ -236,7 +236,7 @@ def query_job(self) -> Optional[bigquery.QueryJob]: `_. """ if self._query_job is None: - self._query_job = self._block._compute_dry_run(ordered=True) + self._query_job = self._block._compute_dry_run() return self._query_job def __repr__(self) -> str: @@ -252,7 +252,7 @@ def __repr__(self) -> str: opts = bigframes.options.display max_results = opts.max_rows if opts.repr_mode == "deferred": - return formatter.repr_query_job(self._block._compute_dry_run(ordered=True)) + return formatter.repr_query_job(self._block._compute_dry_run()) pandas_df, _, query_job = self._block.retrieve_repr_request_results(max_results) self._query_job = query_job @@ -490,8 +490,9 @@ def __getitem__(self, key: int) -> typing.Any: else: raise NotImplementedError(f"Index key not supported {key}") - - def to_pandas(self, *, allow_large_results: Optional[bool] = None, dry_run: bool = False) -> pandas.Index | pandas.Series: + def to_pandas( + self, *, allow_large_results: Optional[bool] = None, dry_run: bool = False + ) -> pandas.Index | pandas.Series: """Gets the Index as a pandas Index. Args: @@ -509,15 +510,10 @@ def to_pandas(self, *, allow_large_results: Optional[bool] = None, dry_run: bool """ if dry_run: - query_job = self._block._compute_dry_run(ordered=True) - - return pandas.Series( - data=[ - self.dtype, - query_job.total_bytes_processed, - ], - index=["dtype", "total_bytes_processed"], - ) + df, query_job = self._block.to_pandas(ordered=True, dry_run=True) + self._query_job = query_job + + return df.squeeze(axis=1) df, query_job = self._block.index.to_pandas( ordered=True, allow_large_results=allow_large_results @@ -525,7 +521,6 @@ def to_pandas(self, *, allow_large_results: Optional[bool] = None, dry_run: bool self._query_job = query_job return df - def to_numpy(self, dtype=None, *, allow_large_results=None, **kwargs) -> np.ndarray: return self.to_pandas(allow_large_results=allow_large_results).to_numpy( dtype, **kwargs diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index ddf1e2ebe5..0cd4698893 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -1634,19 +1634,6 @@ def to_pandas( downsampled rows and all columns of this DataFrame. If dry_run is set, a pandas DataFrame containing dry run statistics will be returned. """ - if dry_run: - dry_run_job = self._compute_dry_run(ordered) - - return pandas.DataFrame( - data={ - "dry_run_stats": [ - *self.dtypes, - self.index.dtype, - dry_run_job.total_bytes_processed, - ] - }, - index=[*self.columns, "index_dtype", "total_bytes_processed"], - ) # TODO(orrbradford): Optimize this in future. Potentially some cases where we can return the stored query job df, query_job = self._block.to_pandas( @@ -1654,9 +1641,12 @@ def to_pandas( sampling_method=sampling_method, random_state=random_state, ordered=ordered, + dry_run=dry_run, allow_large_results=allow_large_results, ) self._set_internal_query_job(query_job) + if dry_run: + return df return df.set_axis(self._block.column_labels, axis=1, copy=False) def to_pandas_batches( @@ -1692,8 +1682,9 @@ def to_pandas_batches( allow_large_results=allow_large_results, ) - def _compute_dry_run(self, ordered: bool = True) -> bigquery.QueryJob: - return self._block._compute_dry_run(ordered=ordered) + def _compute_dry_run(self) -> bigquery.QueryJob: + _, query_job = self._block._compute_dry_run() + return query_job def copy(self) -> DataFrame: return DataFrame(self._block) diff --git a/bigframes/series.py b/bigframes/series.py index dc40125e34..d4203c6bf6 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -417,23 +417,13 @@ def to_pandas( is not exceeded; otherwise, a pandas Series with downsampled rows of the DataFrame. If dry_run is set to True, a pandas Series containing dry run statistics will be returned. """ - if dry_run: - dry_run_job = self._compute_dry_run(ordered) - - return pandas.Series( - data=[ - self.dtype, - self.index.dtype, - dry_run_job.total_bytes_processed, - ], - index=["dtype", "index_dtype", "total_bytes_processed"], - ) df, query_job = self._block.to_pandas( max_download_size=max_download_size, sampling_method=sampling_method, random_state=random_state, ordered=ordered, + dry_run=dry_run, allow_large_results=allow_large_results, ) self._set_internal_query_job(query_job) @@ -441,8 +431,9 @@ def to_pandas( series.name = self._name return series - def _compute_dry_run(self, ordered: bool = True) -> bigquery.QueryJob: - return self._block._compute_dry_run((self._value_column,), ordered) + def _compute_dry_run(self) -> bigquery.QueryJob: + _, query_job = self._block._compute_dry_run((self._value_column,)) + return query_job def drop( self, diff --git a/tests/system/small/test_dataframe_io.py b/tests/system/small/test_dataframe_io.py index 3af37a6bea..461c5feb6d 100644 --- a/tests/system/small/test_dataframe_io.py +++ b/tests/system/small/test_dataframe_io.py @@ -877,6 +877,6 @@ def test_to_pandas_dry_run(scalars_df_index): result = scalars_df_index.to_pandas(dry_run=True) for col in scalars_df_index.columns: - assert result["dry_run_stats", col] == scalars_df_index[col].dtype - assert result["dry_run_stats", "index_dtype"] == scalars_df_index.index.dtype - assert result["dry_run_stats", "total_bytes_processed"] >= 0 + assert result["dry_run_stats"][col] == scalars_df_index[col].dtype + assert result["dry_run_stats"]["[index]"] == scalars_df_index.index.dtype + assert result["dry_run_stats"]["total_bytes_processed"] >= 0 diff --git a/tests/system/small/test_index.py b/tests/system/small/test_index.py index cff1b633ab..f88a6b6807 100644 --- a/tests/system/small/test_index.py +++ b/tests/system/small/test_index.py @@ -435,5 +435,5 @@ def test_to_pandas_dry_run(scalars_df_index): result = index.to_pandas(dry_run=True) - assert result["dtype"] == index.dtype + assert result["[index]"] == index.dtype assert result["total_bytes_processed"] >= 0 diff --git a/tests/system/small/test_series.py b/tests/system/small/test_series.py index 52bf384a17..8bae8a779e 100644 --- a/tests/system/small/test_series.py +++ b/tests/system/small/test_series.py @@ -4364,6 +4364,6 @@ def test_series_to_pandas_dry_run(scalars_df_index): result = bf_series.to_pandas(dry_run=True) - assert result["dtype"] == bf_series.dtype - assert result["index_dtype"] == bf_series.index.dtype + assert result["int64_col"] == bf_series.dtype + assert result["[index]"] == bf_series.index.dtype assert result["total_bytes_processed"] >= 0 diff --git a/tests/unit/core/test_blocks.py b/tests/unit/core/test_blocks.py index 8ed3acba0f..4b295001c6 100644 --- a/tests/unit/core/test_blocks.py +++ b/tests/unit/core/test_blocks.py @@ -92,3 +92,19 @@ def test_block_from_local(data): pandas.testing.assert_index_equal(block.column_labels, expected.columns) assert tuple(block.index.names) == tuple(expected.index.names) assert block.shape == expected.shape + + +def test_block_to_pandas_dry_run__raises_error_when_sampling_is_enabled(): + mock_session = mock.create_autospec(spec=bigframes.Session) + mock_executor = mock.create_autospec( + spec=bigframes.session.executor.BigQueryCachingExecutor + ) + + # hard-coded the returned dimension of the session for that each of the test case contains 3 rows. + mock_session._executor = mock_executor + mock_executor.get_row_count.return_value = 3 + + block = blocks.Block.from_local(pandas.DataFrame(), mock_session) + + with pytest.raises(NotImplementedError): + block.to_pandas(sampling_method="UNIFORM", dry_run=True) \ No newline at end of file From 75f4ce11a18787ca1d017b6cfc4e5617327f8b87 Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Fri, 28 Feb 2025 19:42:37 +0000 Subject: [PATCH 03/11] fix lint errors --- bigframes/core/blocks.py | 8 ++------ bigframes/core/indexes/base.py | 12 +++++++----- tests/unit/core/test_blocks.py | 2 +- 3 files changed, 10 insertions(+), 12 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 902a56c3b5..89faa42642 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -799,16 +799,12 @@ def _compute_dry_run( expr = self._apply_value_keys_to_expr(value_keys=value_keys) query_job = self.session._executor.dry_run(expr, ordered) - if len(self.index.dtypes) > 1: - index_type = tuple(self.index.dtypes) - else: - index_type = self.index.dtypes[0] - + index_types = self.index.dtypes df = pd.DataFrame( data={ "dry_run_stats": [ *self.dtypes, - index_type, + tuple(index_types) if len(index_types) > 1 else index_types[0], query_job.total_bytes_processed, ] }, diff --git a/bigframes/core/indexes/base.py b/bigframes/core/indexes/base.py index 15f46d3d8e..465bd59ffc 100644 --- a/bigframes/core/indexes/base.py +++ b/bigframes/core/indexes/base.py @@ -228,7 +228,7 @@ def T(self) -> Index: return self.transpose() @property - def query_job(self) -> Optional[bigquery.QueryJob]: + def query_job(self) -> bigquery.QueryJob: """BigQuery job metadata for the most recent query. Returns: @@ -236,7 +236,8 @@ def query_job(self) -> Optional[bigquery.QueryJob]: `_. """ if self._query_job is None: - self._query_job = self._block._compute_dry_run() + _, query_job = self._block._compute_dry_run() + self._query_job = query_job return self._query_job def __repr__(self) -> str: @@ -252,7 +253,8 @@ def __repr__(self) -> str: opts = bigframes.options.display max_results = opts.max_rows if opts.repr_mode == "deferred": - return formatter.repr_query_job(self._block._compute_dry_run()) + _, dry_run_query_job = self._block._compute_dry_run() + return formatter.repr_query_job(dry_run_query_job) pandas_df, _, query_job = self._block.retrieve_repr_request_results(max_results) self._query_job = query_job @@ -510,10 +512,10 @@ def to_pandas( """ if dry_run: - df, query_job = self._block.to_pandas(ordered=True, dry_run=True) + dry_run_df, query_job = self._block.to_pandas(ordered=True, dry_run=True) self._query_job = query_job - return df.squeeze(axis=1) + return dry_run_df.squeeze(axis=1) df, query_job = self._block.index.to_pandas( ordered=True, allow_large_results=allow_large_results diff --git a/tests/unit/core/test_blocks.py b/tests/unit/core/test_blocks.py index 4b295001c6..7ae2cbf8af 100644 --- a/tests/unit/core/test_blocks.py +++ b/tests/unit/core/test_blocks.py @@ -107,4 +107,4 @@ def test_block_to_pandas_dry_run__raises_error_when_sampling_is_enabled(): block = blocks.Block.from_local(pandas.DataFrame(), mock_session) with pytest.raises(NotImplementedError): - block.to_pandas(sampling_method="UNIFORM", dry_run=True) \ No newline at end of file + block.to_pandas(sampling_method="UNIFORM", dry_run=True) From fe82c6d5b652893582899bb4483ecc039cc12c5b Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Fri, 28 Feb 2025 22:04:51 +0000 Subject: [PATCH 04/11] remove unnecessary code --- tests/unit/core/test_blocks.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/tests/unit/core/test_blocks.py b/tests/unit/core/test_blocks.py index 7ae2cbf8af..14a13832f5 100644 --- a/tests/unit/core/test_blocks.py +++ b/tests/unit/core/test_blocks.py @@ -96,14 +96,6 @@ def test_block_from_local(data): def test_block_to_pandas_dry_run__raises_error_when_sampling_is_enabled(): mock_session = mock.create_autospec(spec=bigframes.Session) - mock_executor = mock.create_autospec( - spec=bigframes.session.executor.BigQueryCachingExecutor - ) - - # hard-coded the returned dimension of the session for that each of the test case contains 3 rows. - mock_session._executor = mock_executor - mock_executor.get_row_count.return_value = 3 - block = blocks.Block.from_local(pandas.DataFrame(), mock_session) with pytest.raises(NotImplementedError): From cde29a0535de64f82566761058c90b438f85ec45 Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Thu, 6 Mar 2025 01:40:46 +0000 Subject: [PATCH 05/11] use dataframe for dry_run stats --- bigframes/core/blocks.py | 25 +++++----- bigframes/core/convert.py | 3 +- bigframes/core/indexes/base.py | 25 ++++++++-- bigframes/series.py | 66 +++++++++++++++++++++++-- tests/system/small/test_dataframe_io.py | 19 ++++--- tests/system/small/test_index.py | 12 ++--- tests/system/small/test_multiindex.py | 6 +-- tests/system/small/test_series.py | 6 +-- 8 files changed, 120 insertions(+), 42 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index a7c2a539cb..1673fe3593 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -808,22 +808,25 @@ def split( def _compute_dry_run( self, value_keys: Optional[Iterable[str]] = None, ordered: bool = True ) -> typing.Tuple[pd.DataFrame, bigquery.QueryJob]: + column_dtypes = pd.Series( + [*self.dtypes], index=[*self.column_labels], name="column_dtypes" + ) + + index_names = [self.index.names[n] or str(n) for n in range(self.index.nlevels)] + index_dtypes = pd.Series( + [*self.index.dtypes], index=index_names, name="index_dtypes" + ) + expr = self._apply_value_keys_to_expr(value_keys=value_keys) query_job = self.session._executor.dry_run(expr, ordered) - index_types = self.index.dtypes - df = pd.DataFrame( - data={ - "dry_run_stats": [ - *self.dtypes, - tuple(index_types) if len(index_types) > 1 else index_types[0], - query_job.total_bytes_processed, - ] - }, - index=[*self.column_labels, "[index]", "total_bytes_processed"], + job_stats = pd.Series( + [query_job.total_bytes_processed], + index=["total_bytes_processed"], + name="job_statistics", ) - return df, query_job + return pd.concat([column_dtypes, index_dtypes, job_stats], axis=1), query_job def _apply_value_keys_to_expr(self, value_keys: Optional[Iterable[str]] = None): expr = self._expr diff --git a/bigframes/core/convert.py b/bigframes/core/convert.py index 94a0564556..07ae31a80a 100644 --- a/bigframes/core/convert.py +++ b/bigframes/core/convert.py @@ -13,6 +13,7 @@ # limitations under the License. from __future__ import annotations +import typing from typing import Optional import pandas as pd @@ -87,7 +88,7 @@ def to_pd_series(obj, default_index: pd.Index) -> pd.Series: pandas.Series """ if isinstance(obj, series.Series): - return obj.to_pandas() + return typing.cast(pd.Series, obj.to_pandas()) if isinstance(obj, pd.Series): return obj if isinstance(obj, indexes.Index): diff --git a/bigframes/core/indexes/base.py b/bigframes/core/indexes/base.py index f77197a710..f3e295415f 100644 --- a/bigframes/core/indexes/base.py +++ b/bigframes/core/indexes/base.py @@ -17,7 +17,7 @@ from __future__ import annotations import typing -from typing import Hashable, Literal, Optional, Sequence, Union +from typing import Hashable, Literal, Optional, overload, Sequence, Union import bigframes_vendored.constants as constants import bigframes_vendored.pandas.core.indexes.base as vendored_pandas_index @@ -492,9 +492,24 @@ def __getitem__(self, key: int) -> typing.Any: else: raise NotImplementedError(f"Index key not supported {key}") + @overload + def to_pandas( + self, + *, + allow_large_results: Optional[bool] = ..., + dry_run: Literal[False] = ..., + ) -> pandas.Index: + ... + + @overload + def to_pandas( + self, *, allow_large_results: Optional[bool] = ..., dry_run: Literal[True] = ... + ) -> pandas.DataFrame: + ... + def to_pandas( self, *, allow_large_results: Optional[bool] = None, dry_run: bool = False - ) -> pandas.Index | pandas.Series: + ) -> pandas.Index | pandas.DataFrame: """Gets the Index as a pandas Index. Args: @@ -506,16 +521,16 @@ def to_pandas( a Pandas series containing dtype and the amount of bytes to be processed. Returns: - pandas.Index | pandas.Series: + pandas.Index | pandas.Dataframe: A pandas Index with all of the labels from this Index. If dry run is set to True, - returns a series containing dry run statistics. + returns a DataFrame containing dry run statistics. """ if dry_run: dry_run_df, query_job = self._block.to_pandas(ordered=True, dry_run=True) self._query_job = query_job - return dry_run_df.squeeze(axis=1) + return dry_run_df df, query_job = self._block.index.to_pandas( ordered=True, allow_large_results=allow_large_results diff --git a/bigframes/series.py b/bigframes/series.py index 82f4233c2f..6e3872e303 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -23,7 +23,18 @@ import numbers import textwrap import typing -from typing import Any, cast, List, Literal, Mapping, Optional, Sequence, Tuple, Union +from typing import ( + Any, + cast, + List, + Literal, + Mapping, + Optional, + overload, + Sequence, + Tuple, + Union, +) import bigframes_vendored.constants as constants import bigframes_vendored.pandas.core.series as vendored_pandas_series @@ -374,6 +385,32 @@ def astype( bigframes.operations.AsTypeOp(to_type=dtype, safe=(errors == "null")) ) + @overload + def to_pandas( + self, + max_download_size: Optional[int] = ..., + sampling_method: Optional[str] = ..., + random_state: Optional[int] = ..., + *, + ordered: bool = ..., + dry_run: Literal[False] = ..., + allow_large_results: Optional[bool] = ..., + ) -> pandas.Series: + ... + + @overload + def to_pandas( + self, + max_download_size: Optional[int] = ..., + sampling_method: Optional[str] = ..., + random_state: Optional[int] = ..., + *, + ordered: bool = ..., + dry_run: Literal[True] = ..., + allow_large_results: Optional[bool] = ..., + ) -> pandas.DataFrame: + ... + def to_pandas( self, max_download_size: Optional[int] = None, @@ -383,7 +420,7 @@ def to_pandas( ordered: bool = True, dry_run: bool = False, allow_large_results: Optional[bool] = None, - ) -> pandas.Series: + ) -> pandas.Series | pandas.DataFrame: """Writes Series to pandas Series. Args: @@ -407,7 +444,7 @@ def to_pandas( In some cases, unordered may result in a faster-executing query. dry_run (bool, default False): If this argument is true, this method will not process the data. Instead, it returns - a Pandas series containing dtype and the amount of bytes to be processed. + a Pandas DataFrame containing dtype and the amount of bytes to be processed. allow_large_results (bool, default None): If not None, overrides the global setting to allow or disallow large query results over the default size limit of 10 GB. @@ -428,6 +465,11 @@ def to_pandas( ) if query_job: self._set_internal_query_job(query_job) + + if dry_run: + return df + # return self._convert_dry_run_df_to_series(df) + series = df.squeeze(axis=1) series.name = self._name return series @@ -436,6 +478,24 @@ def _compute_dry_run(self) -> bigquery.QueryJob: _, query_job = self._block._compute_dry_run((self._value_column,)) return query_job + def _convert_dry_run_df_to_series(self, df: pandas.DataFrame) -> pandas.Series: + column_dtypes = df["column_dtypes"].dropna() + column_dtypes.index = pandas.MultiIndex.from_tuples( + [("column_dtypes", col) for col in column_dtypes.index.name] + ) + + index_dtypes = df["index_dtypes"].dropna() + index_dtypes.index = pandas.MultiIndex.from_tuples( + [("index_dtypes", col) for col in index_dtypes.index.name] + ) + + job_statistics = df["job_statistics"].dropna() + job_statistics.index = pandas.MultiIndex.from_tuples( + [("job_statistics", col) for col in job_statistics.index.name] + ) + + return pandas.concat([column_dtypes, index_dtypes, job_statistics]) + def drop( self, labels: typing.Any = None, diff --git a/tests/system/small/test_dataframe_io.py b/tests/system/small/test_dataframe_io.py index 45b9ea26e6..1505d80d9a 100644 --- a/tests/system/small/test_dataframe_io.py +++ b/tests/system/small/test_dataframe_io.py @@ -930,10 +930,15 @@ def test_to_sql_query_named_index_excluded( ) -def test_to_pandas_dry_run(scalars_df_index): - result = scalars_df_index.to_pandas(dry_run=True) - - for col in scalars_df_index.columns: - assert result["dry_run_stats"][col] == scalars_df_index[col].dtype - assert result["dry_run_stats"]["[index]"] == scalars_df_index.index.dtype - assert result["dry_run_stats"]["total_bytes_processed"] >= 0 +def test_to_pandas_dry_run(session, scalars_pandas_df_multi_index): + bf_df = session.read_pandas(scalars_pandas_df_multi_index) + result = bf_df.to_pandas(dry_run=True) + + for col in bf_df.columns: + assert result["column_dtypes"][col] == bf_df[col].dtype + for idx_name in bf_df.index.names: + assert ( + result["index_dtypes"][idx_name] + == bf_df.index.get_level_values(idx_name).dtype + ) + assert result["job_statistics"]["total_bytes_processed"] >= 0 diff --git a/tests/system/small/test_index.py b/tests/system/small/test_index.py index f88a6b6807..2911352cbd 100644 --- a/tests/system/small/test_index.py +++ b/tests/system/small/test_index.py @@ -12,8 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import typing - import numpy import pandas as pd import pytest @@ -28,7 +26,7 @@ def test_index_construct_from_list(): ).to_pandas() pd_result: pd.Index = pd.Index([3, 14, 159], dtype=pd.Int64Dtype(), name="my_index") - pd.testing.assert_index_equal(typing.cast(pd.Index, bf_result), pd_result) + pd.testing.assert_index_equal(bf_result, pd_result) def test_index_construct_from_series(): @@ -42,7 +40,7 @@ def test_index_construct_from_series(): name="index_name", dtype=pd.Int64Dtype(), ) - pd.testing.assert_index_equal(typing.cast(pd.Index, bf_result), pd_result) + pd.testing.assert_index_equal(bf_result, pd_result) def test_index_construct_from_index(): @@ -58,7 +56,7 @@ def test_index_construct_from_index(): pd_result: pd.Index = pd.Index( pd_index_input, dtype=pd.Int64Dtype(), name="index_name" ) - pd.testing.assert_index_equal(typing.cast(pd.Index, bf_result), pd_result) + pd.testing.assert_index_equal(bf_result, pd_result) def test_get_index(scalars_df_index, scalars_pandas_df_index): @@ -435,5 +433,5 @@ def test_to_pandas_dry_run(scalars_df_index): result = index.to_pandas(dry_run=True) - assert result["[index]"] == index.dtype - assert result["total_bytes_processed"] >= 0 + assert result["index_dtypes"][index.name] == index.dtype + assert result["job_statistics"]["total_bytes_processed"] >= 0 diff --git a/tests/system/small/test_multiindex.py b/tests/system/small/test_multiindex.py index b382e3dc2a..1c78ac63d9 100644 --- a/tests/system/small/test_multiindex.py +++ b/tests/system/small/test_multiindex.py @@ -12,8 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import typing - import numpy as np import pandas import pytest @@ -44,9 +42,7 @@ def test_multi_index_from_arrays(): names=[" 1index 1", "_1index 2"], ) assert bf_idx.names == pd_idx.names - pandas.testing.assert_index_equal( - typing.cast(pandas.Index, bf_idx.to_pandas()), pd_idx - ) + pandas.testing.assert_index_equal(bf_idx.to_pandas(), pd_idx) @skip_legacy_pandas diff --git a/tests/system/small/test_series.py b/tests/system/small/test_series.py index acfab59e3d..942712d6d8 100644 --- a/tests/system/small/test_series.py +++ b/tests/system/small/test_series.py @@ -4389,6 +4389,6 @@ def test_series_to_pandas_dry_run(scalars_df_index): result = bf_series.to_pandas(dry_run=True) - assert result["int64_col"] == bf_series.dtype - assert result["[index]"] == bf_series.index.dtype - assert result["total_bytes_processed"] >= 0 + assert result["column_dtypes"][bf_series.name] == bf_series.dtype + assert result["index_dtypes"][bf_series.index.name] == bf_series.index.dtype + assert result["job_statistics"]["total_bytes_processed"] >= 0 From 86bf46b8447bc9b114611e457b3d8aa57445c55b Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Thu, 6 Mar 2025 23:17:53 +0000 Subject: [PATCH 06/11] flatten the job stats to a series --- bigframes/core/blocks.py | 128 ++++++++++++++++++++---- bigframes/core/indexes/base.py | 23 +++-- bigframes/dataframe.py | 52 ++++++++-- bigframes/series.py | 70 ++++--------- tests/system/small/test_dataframe_io.py | 10 +- tests/system/small/test_index.py | 3 +- tests/system/small/test_series.py | 4 +- 7 files changed, 191 insertions(+), 99 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index ffb3afc417..efe11a4bf2 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -22,6 +22,7 @@ from __future__ import annotations import ast +import copy import dataclasses import datetime import functools @@ -30,11 +31,13 @@ import textwrap import typing from typing import ( + Any, Iterable, List, Literal, Mapping, Optional, + overload, Sequence, Tuple, TYPE_CHECKING, @@ -501,6 +504,32 @@ def to_arrow( pa_table = pa_table.rename_columns(list(self.column_labels) + pa_index_labels) return pa_table, execute_result.query_job + @overload + def to_pandas( + self, + max_download_size: Optional[int] = ..., + sampling_method: Optional[str] = ..., + random_state: Optional[int] = ..., + *, + ordered: bool = ..., + dry_run: Literal[False] = ..., + allow_large_results: Optional[bool] = ..., + ) -> Tuple[pd.DataFrame, Optional[bigquery.QueryJob]]: + ... + + @overload + def to_pandas( + self, + max_download_size: Optional[int] = ..., + sampling_method: Optional[str] = ..., + random_state: Optional[int] = ..., + *, + ordered: bool = ..., + dry_run: Literal[True] = ..., + allow_large_results: Optional[bool] = ..., + ) -> Tuple[pd.Series, Optional[bigquery.QueryJob]]: + ... + def to_pandas( self, max_download_size: Optional[int] = None, @@ -510,7 +539,7 @@ def to_pandas( ordered: bool = True, dry_run: bool = False, allow_large_results: Optional[bool] = None, - ) -> Tuple[pd.DataFrame, Optional[bigquery.QueryJob]]: + ) -> Tuple[pd.DataFrame | pd.Series, Optional[bigquery.QueryJob]]: """Run query and download results as a pandas DataFrame. Args: @@ -533,11 +562,11 @@ def to_pandas( Determines whether the resulting pandas dataframe will be ordered. Whether the row ordering is deterministics depends on whether session ordering is strict. dry_run (bool, default False): - Whether to perfrom a dry run. If true, the method will return a dataframe containing dry run - stats instead. + Whether to perfrom a dry run. If true, the method will return a pandas Series containing + dry run statistics. Returns: - pandas.DataFrame, QueryJob + pandas.DataFrame | pandas.Series, QueryJob """ if (sampling_method is not None) and (sampling_method not in _SAMPLING_METHODS): raise NotImplementedError( @@ -808,26 +837,60 @@ def split( def _compute_dry_run( self, value_keys: Optional[Iterable[str]] = None, ordered: bool = True - ) -> typing.Tuple[pd.DataFrame, bigquery.QueryJob]: - column_dtypes = pd.Series( - [*self.dtypes], index=[*self.column_labels], name="column_dtypes" - ) + ) -> typing.Tuple[pd.Series, bigquery.QueryJob]: + index: List[Any] = [] + values: List[Any] = [] - index_names = [self.index.names[n] or str(n) for n in range(self.index.nlevels)] - index_dtypes = pd.Series( - [*self.index.dtypes], index=index_names, name="index_dtypes" + index.append("columnCount") + values.append(len(self.value_columns)) + index.append("columnDtypes") + values.append( + { + col: self.expr.get_column_type(self.resolve_label_exact_or_error(col)) + for col in self.column_labels + } ) + index.append("indexLevel") + values.append(self.index.nlevels) + index.append("indexDtypes") + values.append(self.index.dtypes) + expr = self._apply_value_keys_to_expr(value_keys=value_keys) query_job = self.session._executor.dry_run(expr, ordered) + job_api_repr = copy.deepcopy(query_job._properties) + + job_ref = job_api_repr["jobReference"] + for key, val in job_ref.items(): + index.append(key) + values.append(val) + + index.append("jobType") + values.append(job_api_repr["configuration"]["jobType"]) + + query_config = job_api_repr["configuration"]["query"] + for key in ("destinationTable", "useLegacySql"): + index.append(key) + values.append(query_config.get(key)) + + query_stats = job_api_repr["statistics"]["query"] + for key in ( + "referencedTables", + "totalBytesProcessed", + "cacheHit", + "statementType", + ): + index.append(key) + values.append(query_stats.get(key)) - job_stats = pd.Series( - [query_job.total_bytes_processed], - index=["total_bytes_processed"], - name="job_statistics", + index.append("creationTime") + values.append( + pd.Timestamp( + job_api_repr["statistics"]["creationTime"], unit="ms", tz="UTC" + ) ) - return pd.concat([column_dtypes, index_dtypes, job_stats], axis=1), query_job + return pd.Series(values, index=index), query_job def _apply_value_keys_to_expr(self, value_keys: Optional[Iterable[str]] = None): expr = self._expr @@ -2716,20 +2779,49 @@ def column_ids(self) -> Sequence[str]: def is_null(self) -> bool: return len(self._block._index_columns) == 0 + @overload + def to_pandas( + self, + *, + ordered: Optional[bool] = ..., + dry_run: Literal[False] = ..., + allow_large_results: Optional[bool] = ..., + ) -> Tuple[pd.Index, Optional[bigquery.QueryJob]]: + ... + + @overload + def to_pandas( + self, + *, + ordered: Optional[bool] = ..., + dry_run: Literal[True] = ..., + allow_large_results: Optional[bool] = ..., + ) -> Tuple[pd.Series, Optional[bigquery.QueryJob]]: + ... + def to_pandas( self, *, ordered: Optional[bool] = None, + dry_run: bool = False, allow_large_results: Optional[bool] = None, - ) -> Tuple[pd.Index, Optional[bigquery.QueryJob]]: + ) -> Tuple[pd.Index | pd.Series, Optional[bigquery.QueryJob]]: """Executes deferred operations and downloads the results.""" if len(self.column_ids) == 0: raise bigframes.exceptions.NullIndexError( "Cannot materialize index, as this object does not have an index. Set index column(s) using set_index." ) ordered = ordered if ordered is not None else True + if dry_run: + series, query_job = self._block.select_columns([]).to_pandas( + ordered=ordered, + allow_large_results=allow_large_results, + dry_run=dry_run, + ) + return series, query_job + df, query_job = self._block.select_columns([]).to_pandas( - ordered=ordered, allow_large_results=allow_large_results + ordered=ordered, allow_large_results=allow_large_results, dry_run=dry_run ) return df.index, query_job diff --git a/bigframes/core/indexes/base.py b/bigframes/core/indexes/base.py index f3e295415f..5e23ec8dd5 100644 --- a/bigframes/core/indexes/base.py +++ b/bigframes/core/indexes/base.py @@ -504,12 +504,12 @@ def to_pandas( @overload def to_pandas( self, *, allow_large_results: Optional[bool] = ..., dry_run: Literal[True] = ... - ) -> pandas.DataFrame: + ) -> pandas.Series: ... def to_pandas( self, *, allow_large_results: Optional[bool] = None, dry_run: bool = False - ) -> pandas.Index | pandas.DataFrame: + ) -> pandas.Index | pandas.Series: """Gets the Index as a pandas Index. Args: @@ -521,19 +521,22 @@ def to_pandas( a Pandas series containing dtype and the amount of bytes to be processed. Returns: - pandas.Index | pandas.Dataframe: + pandas.Index | pandas.Series: A pandas Index with all of the labels from this Index. If dry run is set to True, - returns a DataFrame containing dry run statistics. + returns a Series containing dry run statistics. """ - if dry_run: - dry_run_df, query_job = self._block.to_pandas(ordered=True, dry_run=True) - self._query_job = query_job - - return dry_run_df + series, query_job = self._block.index.to_pandas( + ordered=True, allow_large_results=allow_large_results, dry_run=dry_run + ) + if query_job: + self._query_job = query_job + return series + # Repeat the to_pandas() call to make mypy deduce type correctly, because mypy cannot resolve + # Literal[True/False] to bool df, query_job = self._block.index.to_pandas( - ordered=True, allow_large_results=allow_large_results + ordered=True, allow_large_results=allow_large_results, dry_run=dry_run ) if query_job: self._query_job = query_job diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 99892d9379..414453c5dd 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -32,6 +32,7 @@ Literal, Mapping, Optional, + overload, Sequence, Tuple, Union, @@ -1594,6 +1595,32 @@ def to_arrow( self._set_internal_query_job(query_job) return pa_table + @overload + def to_pandas( + self, + max_download_size: Optional[int] = ..., + sampling_method: Optional[str] = ..., + random_state: Optional[int] = ..., + *, + ordered: bool = ..., + dry_run: Literal[False] = ..., + allow_large_results: Optional[bool] = ..., + ) -> pandas.DataFrame: + ... + + @overload + def to_pandas( + self, + max_download_size: Optional[int] = ..., + sampling_method: Optional[str] = ..., + random_state: Optional[int] = ..., + *, + ordered: bool = ..., + dry_run: Literal[True] = ..., + allow_large_results: Optional[bool] = ..., + ) -> pandas.Series: + ... + def to_pandas( self, max_download_size: Optional[int] = None, @@ -1603,7 +1630,7 @@ def to_pandas( ordered: bool = True, dry_run: bool = False, allow_large_results: Optional[bool] = None, - ) -> pandas.DataFrame: + ) -> pandas.DataFrame | pandas.Series: """Write DataFrame to pandas DataFrame. Args: @@ -1627,7 +1654,7 @@ def to_pandas( In some cases, unordered may result in a faster-executing query. dry_run (bool, default False): If this argument is true, this method will not process the data. Instead, it returns - a Pandas dataframe containing dry run statistics + a Pandas Series containing dry run statistics allow_large_results (bool, default None): If not None, overrides the global setting to allow or disallow large query results over the default size limit of 10 GB. @@ -1636,10 +1663,26 @@ def to_pandas( pandas.DataFrame: A pandas DataFrame with all rows and columns of this DataFrame if the data_sampling_threshold_mb is not exceeded; otherwise, a pandas DataFrame with downsampled rows and all columns of this DataFrame. If dry_run is set, a pandas - DataFrame containing dry run statistics will be returned. + Series containing dry run statistics will be returned. """ # TODO(orrbradford): Optimize this in future. Potentially some cases where we can return the stored query job + + if dry_run: + series, query_job = self._block.to_pandas( + max_download_size=max_download_size, + sampling_method=sampling_method, + random_state=random_state, + ordered=ordered, + dry_run=dry_run, + allow_large_results=allow_large_results, + ) + if query_job: + self._set_internal_query_job(query_job) + return series + + # Repeat the to_pandas() call to make mypy deduce type correctly, because mypy cannot resolve + # Literal[True/False] to bool df, query_job = self._block.to_pandas( max_download_size=max_download_size, sampling_method=sampling_method, @@ -1650,9 +1693,6 @@ def to_pandas( ) if query_job: self._set_internal_query_job(query_job) - - if dry_run: - return df return df.set_axis(self._block.column_labels, axis=1, copy=False) def to_pandas_batches( diff --git a/bigframes/series.py b/bigframes/series.py index 6e3872e303..b512a77686 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -385,32 +385,6 @@ def astype( bigframes.operations.AsTypeOp(to_type=dtype, safe=(errors == "null")) ) - @overload - def to_pandas( - self, - max_download_size: Optional[int] = ..., - sampling_method: Optional[str] = ..., - random_state: Optional[int] = ..., - *, - ordered: bool = ..., - dry_run: Literal[False] = ..., - allow_large_results: Optional[bool] = ..., - ) -> pandas.Series: - ... - - @overload - def to_pandas( - self, - max_download_size: Optional[int] = ..., - sampling_method: Optional[str] = ..., - random_state: Optional[int] = ..., - *, - ordered: bool = ..., - dry_run: Literal[True] = ..., - allow_large_results: Optional[bool] = ..., - ) -> pandas.DataFrame: - ... - def to_pandas( self, max_download_size: Optional[int] = None, @@ -420,7 +394,7 @@ def to_pandas( ordered: bool = True, dry_run: bool = False, allow_large_results: Optional[bool] = None, - ) -> pandas.Series | pandas.DataFrame: + ) -> pandas.Series: """Writes Series to pandas Series. Args: @@ -444,7 +418,7 @@ def to_pandas( In some cases, unordered may result in a faster-executing query. dry_run (bool, default False): If this argument is true, this method will not process the data. Instead, it returns - a Pandas DataFrame containing dtype and the amount of bytes to be processed. + a Pandas Series containing dry run job statistics allow_large_results (bool, default None): If not None, overrides the global setting to allow or disallow large query results over the default size limit of 10 GB. @@ -455,6 +429,22 @@ def to_pandas( is set to True, a pandas Series containing dry run statistics will be returned. """ + if dry_run: + series, query_job = self._block.to_pandas( + max_download_size=max_download_size, + sampling_method=sampling_method, + random_state=random_state, + ordered=ordered, + dry_run=dry_run, + allow_large_results=allow_large_results, + ) + + if query_job: + self._set_internal_query_job(query_job) + return series + + # Repeat the to_pandas() call to make mypy deduce type correctly, because mypy cannot resolve + # Literal[True/False] to bool df, query_job = self._block.to_pandas( max_download_size=max_download_size, sampling_method=sampling_method, @@ -463,12 +453,6 @@ def to_pandas( dry_run=dry_run, allow_large_results=allow_large_results, ) - if query_job: - self._set_internal_query_job(query_job) - - if dry_run: - return df - # return self._convert_dry_run_df_to_series(df) series = df.squeeze(axis=1) series.name = self._name @@ -478,24 +462,6 @@ def _compute_dry_run(self) -> bigquery.QueryJob: _, query_job = self._block._compute_dry_run((self._value_column,)) return query_job - def _convert_dry_run_df_to_series(self, df: pandas.DataFrame) -> pandas.Series: - column_dtypes = df["column_dtypes"].dropna() - column_dtypes.index = pandas.MultiIndex.from_tuples( - [("column_dtypes", col) for col in column_dtypes.index.name] - ) - - index_dtypes = df["index_dtypes"].dropna() - index_dtypes.index = pandas.MultiIndex.from_tuples( - [("index_dtypes", col) for col in index_dtypes.index.name] - ) - - job_statistics = df["job_statistics"].dropna() - job_statistics.index = pandas.MultiIndex.from_tuples( - [("job_statistics", col) for col in job_statistics.index.name] - ) - - return pandas.concat([column_dtypes, index_dtypes, job_statistics]) - def drop( self, labels: typing.Any = None, diff --git a/tests/system/small/test_dataframe_io.py b/tests/system/small/test_dataframe_io.py index 1505d80d9a..d7fcb929ed 100644 --- a/tests/system/small/test_dataframe_io.py +++ b/tests/system/small/test_dataframe_io.py @@ -932,13 +932,7 @@ def test_to_sql_query_named_index_excluded( def test_to_pandas_dry_run(session, scalars_pandas_df_multi_index): bf_df = session.read_pandas(scalars_pandas_df_multi_index) + result = bf_df.to_pandas(dry_run=True) - for col in bf_df.columns: - assert result["column_dtypes"][col] == bf_df[col].dtype - for idx_name in bf_df.index.names: - assert ( - result["index_dtypes"][idx_name] - == bf_df.index.get_level_values(idx_name).dtype - ) - assert result["job_statistics"]["total_bytes_processed"] >= 0 + assert len(result) == 14 diff --git a/tests/system/small/test_index.py b/tests/system/small/test_index.py index 2911352cbd..535e4bc9ae 100644 --- a/tests/system/small/test_index.py +++ b/tests/system/small/test_index.py @@ -433,5 +433,4 @@ def test_to_pandas_dry_run(scalars_df_index): result = index.to_pandas(dry_run=True) - assert result["index_dtypes"][index.name] == index.dtype - assert result["job_statistics"]["total_bytes_processed"] >= 0 + assert len(result) == 14 diff --git a/tests/system/small/test_series.py b/tests/system/small/test_series.py index 942712d6d8..1b5588becf 100644 --- a/tests/system/small/test_series.py +++ b/tests/system/small/test_series.py @@ -4389,6 +4389,4 @@ def test_series_to_pandas_dry_run(scalars_df_index): result = bf_series.to_pandas(dry_run=True) - assert result["column_dtypes"][bf_series.name] == bf_series.dtype - assert result["index_dtypes"][bf_series.index.name] == bf_series.index.dtype - assert result["job_statistics"]["total_bytes_processed"] >= 0 + assert len(result) == 14 From 4af0ac4d731c8461bd4998ce2b038ebde8e6e8b1 Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Thu, 6 Mar 2025 23:20:02 +0000 Subject: [PATCH 07/11] fix lint --- bigframes/series.py | 1 - 1 file changed, 1 deletion(-) diff --git a/bigframes/series.py b/bigframes/series.py index b512a77686..ad8ad3ca76 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -30,7 +30,6 @@ Literal, Mapping, Optional, - overload, Sequence, Tuple, Union, From 416ad494030ce37acd3e1a15c6ffae02d6a077af Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Thu, 6 Mar 2025 23:22:38 +0000 Subject: [PATCH 08/11] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20?= =?UTF-8?q?post-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- bigframes/series.py | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/bigframes/series.py b/bigframes/series.py index ad8ad3ca76..48b0396aa6 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -23,17 +23,7 @@ import numbers import textwrap import typing -from typing import ( - Any, - cast, - List, - Literal, - Mapping, - Optional, - Sequence, - Tuple, - Union, -) +from typing import Any, cast, List, Literal, Mapping, Optional, Sequence, Tuple, Union import bigframes_vendored.constants as constants import bigframes_vendored.pandas.core.series as vendored_pandas_series From 301e99368c82b8a5f795954b661a011757805e20 Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Fri, 7 Mar 2025 01:40:09 +0000 Subject: [PATCH 09/11] fix query job issue --- bigframes/core/convert.py | 3 +-- bigframes/series.py | 3 +++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/bigframes/core/convert.py b/bigframes/core/convert.py index 07ae31a80a..94a0564556 100644 --- a/bigframes/core/convert.py +++ b/bigframes/core/convert.py @@ -13,7 +13,6 @@ # limitations under the License. from __future__ import annotations -import typing from typing import Optional import pandas as pd @@ -88,7 +87,7 @@ def to_pd_series(obj, default_index: pd.Index) -> pd.Series: pandas.Series """ if isinstance(obj, series.Series): - return typing.cast(pd.Series, obj.to_pandas()) + return obj.to_pandas() if isinstance(obj, pd.Series): return obj if isinstance(obj, indexes.Index): diff --git a/bigframes/series.py b/bigframes/series.py index 48b0396aa6..72917d6fec 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -443,6 +443,9 @@ def to_pandas( allow_large_results=allow_large_results, ) + if query_job: + self._set_internal_query_job(query_job) + series = df.squeeze(axis=1) series.name = self._name return series From b4db89747d4f32d3eb045f289c9b669551c49df0 Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Thu, 13 Mar 2025 20:29:57 +0000 Subject: [PATCH 10/11] Make pandas surface directly call block._compute_dry_run --- bigframes/core/blocks.py | 131 ++++++++++++--------------------- bigframes/core/indexes/base.py | 10 +-- bigframes/dataframe.py | 12 +-- bigframes/series.py | 10 +-- tests/unit/core/test_blocks.py | 4 +- 5 files changed, 59 insertions(+), 108 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index efe11a4bf2..b2a10fbfaa 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -37,7 +37,6 @@ Literal, Mapping, Optional, - overload, Sequence, Tuple, TYPE_CHECKING, @@ -52,7 +51,7 @@ import pyarrow as pa from bigframes import session -import bigframes._config.sampling_options as sampling_options +from bigframes._config import sampling_options import bigframes.constants import bigframes.core as core import bigframes.core.compile.googlesql as googlesql @@ -504,32 +503,6 @@ def to_arrow( pa_table = pa_table.rename_columns(list(self.column_labels) + pa_index_labels) return pa_table, execute_result.query_job - @overload - def to_pandas( - self, - max_download_size: Optional[int] = ..., - sampling_method: Optional[str] = ..., - random_state: Optional[int] = ..., - *, - ordered: bool = ..., - dry_run: Literal[False] = ..., - allow_large_results: Optional[bool] = ..., - ) -> Tuple[pd.DataFrame, Optional[bigquery.QueryJob]]: - ... - - @overload - def to_pandas( - self, - max_download_size: Optional[int] = ..., - sampling_method: Optional[str] = ..., - random_state: Optional[int] = ..., - *, - ordered: bool = ..., - dry_run: Literal[True] = ..., - allow_large_results: Optional[bool] = ..., - ) -> Tuple[pd.Series, Optional[bigquery.QueryJob]]: - ... - def to_pandas( self, max_download_size: Optional[int] = None, @@ -537,9 +510,8 @@ def to_pandas( random_state: Optional[int] = None, *, ordered: bool = True, - dry_run: bool = False, allow_large_results: Optional[bool] = None, - ) -> Tuple[pd.DataFrame | pd.Series, Optional[bigquery.QueryJob]]: + ) -> Tuple[pd.DataFrame, Optional[bigquery.QueryJob]]: """Run query and download results as a pandas DataFrame. Args: @@ -561,31 +533,13 @@ def to_pandas( ordered (bool, default True): Determines whether the resulting pandas dataframe will be ordered. Whether the row ordering is deterministics depends on whether session ordering is strict. - dry_run (bool, default False): - Whether to perfrom a dry run. If true, the method will return a pandas Series containing - dry run statistics. Returns: - pandas.DataFrame | pandas.Series, QueryJob + pandas.DataFrame, QueryJob """ - if (sampling_method is not None) and (sampling_method not in _SAMPLING_METHODS): - raise NotImplementedError( - f"The downsampling method {sampling_method} is not implemented, " - f"please choose from {','.join(_SAMPLING_METHODS)}." - ) - - sampling = bigframes.options.sampling.with_max_download_size(max_download_size) - if sampling_method is not None: - sampling = sampling.with_method(sampling_method).with_random_state( # type: ignore - random_state - ) - else: - sampling = sampling.with_disabled() - - if dry_run: - if sampling.enable_downsampling: - raise NotImplementedError("Dry run with sampling is not supproted") - return self._compute_dry_run(ordered=ordered) + sampling = self._get_sampling_option( + max_download_size, sampling_method, random_state + ) df, query_job = self._materialize_local( materialize_options=MaterializationOptions( @@ -597,6 +551,27 @@ def to_pandas( df.set_axis(self.column_labels, axis=1, copy=False) return df, query_job + def _get_sampling_option( + self, + max_download_size: Optional[int] = None, + sampling_method: Optional[str] = None, + random_state: Optional[int] = None, + ) -> sampling_options.SamplingOptions: + + if (sampling_method is not None) and (sampling_method not in _SAMPLING_METHODS): + raise NotImplementedError( + f"The downsampling method {sampling_method} is not implemented, " + f"please choose from {','.join(_SAMPLING_METHODS)}." + ) + + sampling = bigframes.options.sampling.with_max_download_size(max_download_size) + if sampling_method is None: + return sampling.with_disabled() + + return sampling.with_method(sampling_method).with_random_state( # type: ignore + random_state + ) + def try_peek( self, n: int = 20, force: bool = False, allow_large_results=None ) -> typing.Optional[pd.DataFrame]: @@ -836,8 +811,20 @@ def split( return [sliced_block.drop_columns(drop_cols) for sliced_block in sliced_blocks] def _compute_dry_run( - self, value_keys: Optional[Iterable[str]] = None, ordered: bool = True + self, + value_keys: Optional[Iterable[str]] = None, + *, + ordered: bool = True, + max_download_size: Optional[int] = None, + sampling_method: Optional[str] = None, + random_state: Optional[int] = None, ) -> typing.Tuple[pd.Series, bigquery.QueryJob]: + sampling = self._get_sampling_option( + max_download_size, sampling_method, random_state + ) + if sampling.enable_downsampling: + raise NotImplementedError("Dry run with sampling is not supported") + index: List[Any] = [] values: List[Any] = [] @@ -2779,52 +2766,30 @@ def column_ids(self) -> Sequence[str]: def is_null(self) -> bool: return len(self._block._index_columns) == 0 - @overload - def to_pandas( - self, - *, - ordered: Optional[bool] = ..., - dry_run: Literal[False] = ..., - allow_large_results: Optional[bool] = ..., - ) -> Tuple[pd.Index, Optional[bigquery.QueryJob]]: - ... - - @overload - def to_pandas( - self, - *, - ordered: Optional[bool] = ..., - dry_run: Literal[True] = ..., - allow_large_results: Optional[bool] = ..., - ) -> Tuple[pd.Series, Optional[bigquery.QueryJob]]: - ... - def to_pandas( self, *, ordered: Optional[bool] = None, - dry_run: bool = False, allow_large_results: Optional[bool] = None, - ) -> Tuple[pd.Index | pd.Series, Optional[bigquery.QueryJob]]: + ) -> Tuple[pd.Index, Optional[bigquery.QueryJob]]: """Executes deferred operations and downloads the results.""" if len(self.column_ids) == 0: raise bigframes.exceptions.NullIndexError( "Cannot materialize index, as this object does not have an index. Set index column(s) using set_index." ) ordered = ordered if ordered is not None else True - if dry_run: - series, query_job = self._block.select_columns([]).to_pandas( - ordered=ordered, - allow_large_results=allow_large_results, - dry_run=dry_run, - ) - return series, query_job df, query_job = self._block.select_columns([]).to_pandas( - ordered=ordered, allow_large_results=allow_large_results, dry_run=dry_run + ordered=ordered, + allow_large_results=allow_large_results, ) return df.index, query_job + def _compute_dry_run( + self, *, ordered: bool = True + ) -> Tuple[pd.Series, Optional[bigquery.QueryJob]]: + return self._block.select_columns([])._compute_dry_run(ordered=ordered) + def resolve_level(self, level: LevelsType) -> typing.Sequence[str]: if utils.is_list_like(level): levels = list(level) diff --git a/bigframes/core/indexes/base.py b/bigframes/core/indexes/base.py index 5e23ec8dd5..be4efd6a76 100644 --- a/bigframes/core/indexes/base.py +++ b/bigframes/core/indexes/base.py @@ -526,17 +526,13 @@ def to_pandas( returns a Series containing dry run statistics. """ if dry_run: - series, query_job = self._block.index.to_pandas( - ordered=True, allow_large_results=allow_large_results, dry_run=dry_run - ) + dry_run_stats, query_job = self._block.index._compute_dry_run(ordered=True) if query_job: self._query_job = query_job - return series + return dry_run_stats - # Repeat the to_pandas() call to make mypy deduce type correctly, because mypy cannot resolve - # Literal[True/False] to bool df, query_job = self._block.index.to_pandas( - ordered=True, allow_large_results=allow_large_results, dry_run=dry_run + ordered=True, allow_large_results=allow_large_results ) if query_job: self._query_job = query_job diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 9ab58fc523..1359a02a17 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -1669,26 +1669,20 @@ def to_pandas( # TODO(orrbradford): Optimize this in future. Potentially some cases where we can return the stored query job if dry_run: - series, query_job = self._block.to_pandas( + dry_run_stats, dry_run_job = self._block._compute_dry_run( max_download_size=max_download_size, sampling_method=sampling_method, random_state=random_state, ordered=ordered, - dry_run=dry_run, - allow_large_results=allow_large_results, ) - if query_job: - self._set_internal_query_job(query_job) - return series + self._set_internal_query_job(dry_run_job) + return dry_run_stats - # Repeat the to_pandas() call to make mypy deduce type correctly, because mypy cannot resolve - # Literal[True/False] to bool df, query_job = self._block.to_pandas( max_download_size=max_download_size, sampling_method=sampling_method, random_state=random_state, ordered=ordered, - dry_run=dry_run, allow_large_results=allow_large_results, ) if query_job: diff --git a/bigframes/series.py b/bigframes/series.py index 72917d6fec..fb497fd817 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -419,18 +419,15 @@ def to_pandas( """ if dry_run: - series, query_job = self._block.to_pandas( + dry_run_stats, dry_run_job = self._block._compute_dry_run( max_download_size=max_download_size, sampling_method=sampling_method, random_state=random_state, ordered=ordered, - dry_run=dry_run, - allow_large_results=allow_large_results, ) - if query_job: - self._set_internal_query_job(query_job) - return series + self._set_internal_query_job(dry_run_job) + return dry_run_stats # Repeat the to_pandas() call to make mypy deduce type correctly, because mypy cannot resolve # Literal[True/False] to bool @@ -439,7 +436,6 @@ def to_pandas( sampling_method=sampling_method, random_state=random_state, ordered=ordered, - dry_run=dry_run, allow_large_results=allow_large_results, ) diff --git a/tests/unit/core/test_blocks.py b/tests/unit/core/test_blocks.py index 14a13832f5..fb5a927e76 100644 --- a/tests/unit/core/test_blocks.py +++ b/tests/unit/core/test_blocks.py @@ -94,9 +94,9 @@ def test_block_from_local(data): assert block.shape == expected.shape -def test_block_to_pandas_dry_run__raises_error_when_sampling_is_enabled(): +def test_block_compute_dry_run__raises_error_when_sampling_is_enabled(): mock_session = mock.create_autospec(spec=bigframes.Session) block = blocks.Block.from_local(pandas.DataFrame(), mock_session) with pytest.raises(NotImplementedError): - block.to_pandas(sampling_method="UNIFORM", dry_run=True) + block._compute_dry_run(sampling_method="UNIFORM") From 140107621f31a8682f41f8a196e1d9fecb1361ff Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Thu, 13 Mar 2025 20:54:05 +0000 Subject: [PATCH 11/11] type hint update --- bigframes/core/blocks.py | 2 +- bigframes/core/indexes/base.py | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index b2a10fbfaa..aad36b9c01 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -2787,7 +2787,7 @@ def to_pandas( def _compute_dry_run( self, *, ordered: bool = True - ) -> Tuple[pd.Series, Optional[bigquery.QueryJob]]: + ) -> Tuple[pd.Series, bigquery.QueryJob]: return self._block.select_columns([])._compute_dry_run(ordered=ordered) def resolve_level(self, level: LevelsType) -> typing.Sequence[str]: diff --git a/bigframes/core/indexes/base.py b/bigframes/core/indexes/base.py index be4efd6a76..900825996e 100644 --- a/bigframes/core/indexes/base.py +++ b/bigframes/core/indexes/base.py @@ -526,9 +526,10 @@ def to_pandas( returns a Series containing dry run statistics. """ if dry_run: - dry_run_stats, query_job = self._block.index._compute_dry_run(ordered=True) - if query_job: - self._query_job = query_job + dry_run_stats, dry_run_job = self._block.index._compute_dry_run( + ordered=True + ) + self._query_job = dry_run_job return dry_run_stats df, query_job = self._block.index.to_pandas(