Skip to content

feat: Support dry_run in to_pandas() #1436

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 32 commits into from
Mar 19, 2025
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
dac34d7
feat: Support dry_run in
sycai Feb 27, 2025
b88ba73
Merge branch 'main' into sycai_to_pandas_dry_run
sycai Feb 27, 2025
330a647
Merge branch 'main' into sycai_to_pandas_dry_run
sycai Feb 28, 2025
5f8a76a
centralize dry_run logics at block level
sycai Feb 28, 2025
75f4ce1
fix lint errors
sycai Feb 28, 2025
0b4c48c
Merge branch 'main' into sycai_to_pandas_dry_run
sycai Feb 28, 2025
40c557b
Merge branch 'main' into sycai_to_pandas_dry_run
sycai Feb 28, 2025
fe82c6d
remove unnecessary code
sycai Feb 28, 2025
1adc96a
Merge branch 'main' into sycai_to_pandas_dry_run
sycai Feb 28, 2025
3c0efc2
Merge branch 'main' into sycai_to_pandas_dry_run
sycai Mar 3, 2025
9c3d849
Merge branch 'main' into sycai_to_pandas_dry_run
sycai Mar 3, 2025
725050b
Merge branch 'main' into sycai_to_pandas_dry_run
sycai Mar 3, 2025
7550f6a
Merge branch 'main' into sycai_to_pandas_dry_run
sycai Mar 4, 2025
3b9ea0e
Merge branch 'main' into sycai_to_pandas_dry_run
sycai Mar 5, 2025
70e1986
Merge branch 'main' into sycai_to_pandas_dry_run
sycai Mar 5, 2025
c2c3fca
Merge branch 'main' into sycai_to_pandas_dry_run
sycai Mar 6, 2025
cde29a0
use dataframe for dry_run stats
sycai Mar 6, 2025
e291c70
Merge branch 'main' into sycai_to_pandas_dry_run
sycai Mar 6, 2025
86bf46b
flatten the job stats to a series
sycai Mar 6, 2025
09fb874
Merge branch 'main' into sycai_to_pandas_dry_run
sycai Mar 6, 2025
4af0ac4
fix lint
sycai Mar 6, 2025
416ad49
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Mar 6, 2025
301e993
fix query job issue
sycai Mar 7, 2025
67e40e9
Merge branch 'main' into sycai_to_pandas_dry_run
sycai Mar 10, 2025
6eeb69e
Merge branch 'main' into sycai_to_pandas_dry_run
sycai Mar 10, 2025
5a85ad5
Merge branch 'main' into sycai_to_pandas_dry_run
sycai Mar 11, 2025
e11ccdb
Merge branch 'main' into sycai_to_pandas_dry_run
sycai Mar 12, 2025
c610e57
Merge branch 'main' into sycai_to_pandas_dry_run
sycai Mar 13, 2025
b4db897
Make pandas surface directly call block._compute_dry_run
sycai Mar 13, 2025
1401076
type hint update
sycai Mar 13, 2025
fb9f8bf
Merge branch 'main' into sycai_to_pandas_dry_run
sycai Mar 18, 2025
30b9f3d
Merge branch 'main' into sycai_to_pandas_dry_run
sycai Mar 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 125 additions & 8 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from __future__ import annotations

import ast
import copy
import dataclasses
import datetime
import functools
Expand All @@ -30,11 +31,13 @@
import textwrap
import typing
from typing import (
Any,
Iterable,
List,
Literal,
Mapping,
Optional,
overload,
Sequence,
Tuple,
TYPE_CHECKING,
Expand Down Expand Up @@ -501,15 +504,42 @@ def to_arrow(
pa_table = pa_table.rename_columns(list(self.column_labels) + pa_index_labels)
return pa_table, execute_result.query_job

@overload
def to_pandas(
self,
max_download_size: Optional[int] = ...,
sampling_method: Optional[str] = ...,
random_state: Optional[int] = ...,
*,
ordered: bool = ...,
dry_run: Literal[False] = ...,
allow_large_results: Optional[bool] = ...,
) -> Tuple[pd.DataFrame, Optional[bigquery.QueryJob]]:
...

@overload
def to_pandas(
self,
max_download_size: Optional[int] = ...,
sampling_method: Optional[str] = ...,
random_state: Optional[int] = ...,
*,
ordered: bool = ...,
dry_run: Literal[True] = ...,
allow_large_results: Optional[bool] = ...,
) -> Tuple[pd.Series, Optional[bigquery.QueryJob]]:
...

def to_pandas(
self,
max_download_size: Optional[int] = None,
sampling_method: Optional[str] = None,
random_state: Optional[int] = None,
*,
ordered: bool = True,
dry_run: bool = False,
allow_large_results: Optional[bool] = None,
) -> Tuple[pd.DataFrame, Optional[bigquery.QueryJob]]:
) -> Tuple[pd.DataFrame | pd.Series, Optional[bigquery.QueryJob]]:
"""Run query and download results as a pandas DataFrame.

Args:
Expand All @@ -531,9 +561,12 @@ def to_pandas(
ordered (bool, default True):
Determines whether the resulting pandas dataframe will be ordered.
Whether the row ordering is deterministics depends on whether session ordering is strict.
dry_run (bool, default False):
Whether to perfrom a dry run. If true, the method will return a pandas Series containing
dry run statistics.

Returns:
pandas.DataFrame, QueryJob
pandas.DataFrame | pandas.Series, QueryJob
"""
if (sampling_method is not None) and (sampling_method not in _SAMPLING_METHODS):
raise NotImplementedError(
Expand All @@ -549,6 +582,11 @@ def to_pandas(
else:
sampling = sampling.with_disabled()

if dry_run:
if sampling.enable_downsampling:
raise NotImplementedError("Dry run with sampling is not supproted")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
raise NotImplementedError("Dry run with sampling is not supproted")
raise NotImplementedError("Dry run with sampling is not supported")

return self._compute_dry_run(ordered=ordered)

df, query_job = self._materialize_local(
materialize_options=MaterializationOptions(
downsampling=sampling,
Expand Down Expand Up @@ -798,11 +836,61 @@ def split(
return [sliced_block.drop_columns(drop_cols) for sliced_block in sliced_blocks]

def _compute_dry_run(
self, value_keys: Optional[Iterable[str]] = None
) -> bigquery.QueryJob:
self, value_keys: Optional[Iterable[str]] = None, ordered: bool = True
) -> typing.Tuple[pd.Series, bigquery.QueryJob]:
index: List[Any] = []
values: List[Any] = []

index.append("columnCount")
values.append(len(self.value_columns))
index.append("columnDtypes")
values.append(
{
col: self.expr.get_column_type(self.resolve_label_exact_or_error(col))
for col in self.column_labels
}
)

index.append("indexLevel")
values.append(self.index.nlevels)
index.append("indexDtypes")
values.append(self.index.dtypes)

expr = self._apply_value_keys_to_expr(value_keys=value_keys)
query_job = self.session._executor.dry_run(expr)
return query_job
query_job = self.session._executor.dry_run(expr, ordered)
job_api_repr = copy.deepcopy(query_job._properties)

job_ref = job_api_repr["jobReference"]
for key, val in job_ref.items():
index.append(key)
values.append(val)

index.append("jobType")
values.append(job_api_repr["configuration"]["jobType"])

query_config = job_api_repr["configuration"]["query"]
for key in ("destinationTable", "useLegacySql"):
index.append(key)
values.append(query_config.get(key))

query_stats = job_api_repr["statistics"]["query"]
for key in (
"referencedTables",
"totalBytesProcessed",
"cacheHit",
"statementType",
):
index.append(key)
values.append(query_stats.get(key))

index.append("creationTime")
values.append(
pd.Timestamp(
job_api_repr["statistics"]["creationTime"], unit="ms", tz="UTC"
)
)

return pd.Series(values, index=index), query_job

def _apply_value_keys_to_expr(self, value_keys: Optional[Iterable[str]] = None):
expr = self._expr
Expand Down Expand Up @@ -2691,20 +2779,49 @@ def column_ids(self) -> Sequence[str]:
def is_null(self) -> bool:
return len(self._block._index_columns) == 0

@overload
def to_pandas(
self,
*,
ordered: Optional[bool] = ...,
dry_run: Literal[False] = ...,
allow_large_results: Optional[bool] = ...,
) -> Tuple[pd.Index, Optional[bigquery.QueryJob]]:
...

@overload
def to_pandas(
self,
*,
ordered: Optional[bool] = ...,
dry_run: Literal[True] = ...,
allow_large_results: Optional[bool] = ...,
) -> Tuple[pd.Series, Optional[bigquery.QueryJob]]:
...

def to_pandas(
self,
*,
ordered: Optional[bool] = None,
dry_run: bool = False,
allow_large_results: Optional[bool] = None,
) -> Tuple[pd.Index, Optional[bigquery.QueryJob]]:
) -> Tuple[pd.Index | pd.Series, Optional[bigquery.QueryJob]]:
"""Executes deferred operations and downloads the results."""
if len(self.column_ids) == 0:
raise bigframes.exceptions.NullIndexError(
"Cannot materialize index, as this object does not have an index. Set index column(s) using set_index."
)
ordered = ordered if ordered is not None else True
if dry_run:
series, query_job = self._block.select_columns([]).to_pandas(
ordered=ordered,
allow_large_results=allow_large_results,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think allow_large_results shouldn't have an effect on dry run queries, as that controls the destination table property.

dry_run=dry_run,
)
return series, query_job

df, query_job = self._block.select_columns([]).to_pandas(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The select_columns([]) confuses me, but I see that was here before. Please refactor these a bit so that self._block.select_columns([]) is saved to a variable since it is in common with both.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, we can get rid of this if statement and rename the variable df_or_series.

ordered=ordered, allow_large_results=allow_large_results
ordered=ordered, allow_large_results=allow_large_results, dry_run=dry_run
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why include the dry_run argument here if we know it's false?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why include the dry_run argument here if we know it's false?

)
return df.index, query_job

Expand Down
49 changes: 41 additions & 8 deletions bigframes/core/indexes/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from __future__ import annotations

import typing
from typing import Hashable, Literal, Optional, Sequence, Union
from typing import Hashable, Literal, Optional, overload, Sequence, Union

import bigframes_vendored.constants as constants
import bigframes_vendored.pandas.core.indexes.base as vendored_pandas_index
Expand Down Expand Up @@ -228,15 +228,16 @@ def T(self) -> Index:
return self.transpose()

@property
def query_job(self) -> Optional[bigquery.QueryJob]:
def query_job(self) -> bigquery.QueryJob:
"""BigQuery job metadata for the most recent query.

Returns:
The most recent `QueryJob
<https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJob>`_.
"""
if self._query_job is None:
self._query_job = self._block._compute_dry_run()
_, query_job = self._block._compute_dry_run()
self._query_job = query_job
return self._query_job

def __repr__(self) -> str:
Expand All @@ -252,7 +253,8 @@ def __repr__(self) -> str:
opts = bigframes.options.display
max_results = opts.max_rows
if opts.repr_mode == "deferred":
return formatter.repr_query_job(self._block._compute_dry_run())
_, dry_run_query_job = self._block._compute_dry_run()
return formatter.repr_query_job(dry_run_query_job)

pandas_df, _, query_job = self._block.retrieve_repr_request_results(max_results)
self._query_job = query_job
Expand Down Expand Up @@ -490,20 +492,51 @@ def __getitem__(self, key: int) -> typing.Any:
else:
raise NotImplementedError(f"Index key not supported {key}")

def to_pandas(self, *, allow_large_results: Optional[bool] = None) -> pandas.Index:
@overload
def to_pandas(
self,
*,
allow_large_results: Optional[bool] = ...,
dry_run: Literal[False] = ...,
) -> pandas.Index:
...

@overload
def to_pandas(
self, *, allow_large_results: Optional[bool] = ..., dry_run: Literal[True] = ...
) -> pandas.Series:
...

def to_pandas(
self, *, allow_large_results: Optional[bool] = None, dry_run: bool = False
) -> pandas.Index | pandas.Series:
"""Gets the Index as a pandas Index.

Args:
allow_large_results (bool, default None):
If not None, overrides the global setting to allow or disallow large query results
over the default size limit of 10 GB.
dry_run (bool, default False):
If this argument is true, this method will not process the data. Instead, it returns
a Pandas series containing dtype and the amount of bytes to be processed.

Returns:
pandas.Index:
A pandas Index with all of the labels from this Index.
pandas.Index | pandas.Series:
A pandas Index with all of the labels from this Index. If dry run is set to True,
returns a Series containing dry run statistics.
"""
if dry_run:
series, query_job = self._block.index.to_pandas(
ordered=True, allow_large_results=allow_large_results, dry_run=dry_run
)
if query_job:
self._query_job = query_job
return series

# Repeat the to_pandas() call to make mypy deduce type correctly, because mypy cannot resolve
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't you just use bool consistently, then?

# Literal[True/False] to bool
df, query_job = self._block.index.to_pandas(
ordered=True, allow_large_results=allow_large_results
ordered=True, allow_large_results=allow_large_results, dry_run=dry_run
)
if query_job:
self._query_job = query_job
Expand Down
Loading