Skip to content

feat: add allow_large_results option #1428

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Feb 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions bigframes/_config/bigquery_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ def __init__(
kms_key_name: Optional[str] = None,
skip_bq_connection_check: bool = False,
*,
allow_large_results: bool = True,
ordering_mode: Literal["strict", "partial"] = "strict",
client_endpoints_override: Optional[dict] = None,
):
Expand All @@ -98,6 +99,7 @@ def __init__(
self._application_name = application_name
self._kms_key_name = kms_key_name
self._skip_bq_connection_check = skip_bq_connection_check
self._allow_large_results = allow_large_results
self._session_started = False
# Determines the ordering strictness for the session.
self._ordering_mode = _validate_ordering_mode(ordering_mode)
Expand Down Expand Up @@ -232,6 +234,26 @@ def skip_bq_connection_check(self, value: bool):
)
self._skip_bq_connection_check = value

@property
def allow_large_results(self) -> bool:
"""
Sets the flag to allow or disallow query results larger than 10 GB.

The default setting for this flag is True, which allows queries to return results
exceeding 10 GB by creating an explicit destination table. If set to False, it
restricts the result size to 10 GB, and BigQuery will raise an error if this limit
is exceeded.

Returns:
bool: True if large results are allowed with an explicit destination table,
False if results are limited to 10 GB and errors are raised when exceeded.
"""
return self._allow_large_results

@allow_large_results.setter
def allow_large_results(self, value: bool):
self._allow_large_results = value

@property
def use_regional_endpoints(self) -> bool:
"""Flag to connect to regional API endpoints.
Expand Down
37 changes: 29 additions & 8 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ class MaterializationOptions:
downsampling: sampling_options.SamplingOptions = dataclasses.field(
default_factory=sampling_options.SamplingOptions
)
allow_large_results: Optional[bool] = None
ordered: bool = True


Expand Down Expand Up @@ -479,9 +480,12 @@ def to_arrow(
self,
*,
ordered: bool = True,
allow_large_results: Optional[bool] = None,
) -> Tuple[pa.Table, bigquery.QueryJob]:
"""Run query and download results as a pyarrow Table."""
execute_result = self.session._executor.execute(self.expr, ordered=ordered)
execute_result = self.session._executor.execute(
self.expr, ordered=ordered, use_explicit_destination=allow_large_results
)
pa_table = execute_result.to_arrow_table()

pa_index_labels = []
Expand All @@ -503,6 +507,7 @@ def to_pandas(
random_state: Optional[int] = None,
*,
ordered: bool = True,
allow_large_results: Optional[bool] = None,
) -> Tuple[pd.DataFrame, Optional[bigquery.QueryJob]]:
"""Run query and download results as a pandas DataFrame.

Expand Down Expand Up @@ -545,7 +550,9 @@ def to_pandas(

df, query_job = self._materialize_local(
materialize_options=MaterializationOptions(
downsampling=sampling, ordered=ordered
downsampling=sampling,
allow_large_results=allow_large_results,
ordered=ordered,
)
)
df.set_axis(self.column_labels, axis=1, copy=False)
Expand All @@ -563,7 +570,10 @@ def try_peek(
return None

def to_pandas_batches(
self, page_size: Optional[int] = None, max_results: Optional[int] = None
self,
page_size: Optional[int] = None,
max_results: Optional[int] = None,
allow_large_results: Optional[bool] = None,
):
"""Download results one message at a time.

Expand All @@ -572,7 +582,7 @@ def to_pandas_batches(
execute_result = self.session._executor.execute(
self.expr,
ordered=True,
use_explicit_destination=True,
use_explicit_destination=allow_large_results,
page_size=page_size,
max_results=max_results,
)
Expand Down Expand Up @@ -601,7 +611,10 @@ def _materialize_local(
"""Run query and download results as a pandas DataFrame. Return the total number of results as well."""
# TODO(swast): Allow for dry run and timeout.
execute_result = self.session._executor.execute(
self.expr, ordered=materialize_options.ordered, get_size_bytes=True
self.expr,
ordered=materialize_options.ordered,
use_explicit_destination=materialize_options.allow_large_results,
get_size_bytes=True,
)
assert execute_result.total_bytes is not None
table_mb = execute_result.total_bytes / _BYTES_TO_MEGABYTES
Expand Down Expand Up @@ -1698,7 +1711,7 @@ def transpose(
original_row_index = (
original_row_index
if original_row_index is not None
else self.index.to_pandas(ordered=True)
else self.index.to_pandas(ordered=True)[0]
)
original_row_count = len(original_row_index)
if original_row_count > bigframes.constants.MAX_COLUMNS:
Expand Down Expand Up @@ -2657,14 +2670,22 @@ def column_ids(self) -> Sequence[str]:
def is_null(self) -> bool:
return len(self._block._index_columns) == 0

def to_pandas(self, *, ordered: Optional[bool] = None) -> pd.Index:
def to_pandas(
self,
*,
ordered: Optional[bool] = None,
allow_large_results: Optional[bool] = None,
) -> Tuple[pd.Index, Optional[bigquery.QueryJob]]:
"""Executes deferred operations and downloads the results."""
if len(self.column_ids) == 0:
raise bigframes.exceptions.NullIndexError(
"Cannot materialize index, as this object does not have an index. Set index column(s) using set_index."
)
ordered = ordered if ordered is not None else True
return self._block.select_columns([]).to_pandas(ordered=ordered)[0].index
df, query_job = self._block.select_columns([]).to_pandas(
ordered=ordered, allow_large_results=allow_large_results
)
return df.index, query_job

def resolve_level(self, level: LevelsType) -> typing.Sequence[str]:
if utils.is_list_like(level):
Expand Down
19 changes: 15 additions & 4 deletions bigframes/core/indexes/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,17 +490,28 @@ def __getitem__(self, key: int) -> typing.Any:
else:
raise NotImplementedError(f"Index key not supported {key}")

def to_pandas(self) -> pandas.Index:
def to_pandas(self, *, allow_large_results: Optional[bool] = None) -> pandas.Index:
"""Gets the Index as a pandas Index.

Args:
allow_large_results (bool, default None):
If not None, overrides the global setting to allow or disallow large query results
over the default size limit of 10 GB.

Returns:
pandas.Index:
A pandas Index with all of the labels from this Index.
"""
return self._block.index.to_pandas(ordered=True)
df, query_job = self._block.index.to_pandas(
ordered=True, allow_large_results=allow_large_results
)
self._query_job = query_job
return df

def to_numpy(self, dtype=None, **kwargs) -> np.ndarray:
return self.to_pandas().to_numpy(dtype, **kwargs)
def to_numpy(self, dtype=None, *, allow_large_results=None, **kwargs) -> np.ndarray:
return self.to_pandas(allow_large_results=allow_large_results).to_numpy(
dtype, **kwargs
)

__array__ = to_numpy

Expand Down
Loading