Skip to content

Commit dd2f488

Browse files
authored
feat: add allow_large_results option to many I/O methods. Set to False to reduce latency (#1428)
* feat: add allow_large_results option * add to_arrow * add the ones that only uses to_pandas() * add to_csv/json/parquet * mypy fix * gcs logic update and execute logic update. * add to_pandas_batches and to_pandas large test. * add to_pandas_batches and to_pandas large test. * add unit tests * add to_pandas and to_arrow override test * add to_pandas and to_arrow override test * add copyright * modify index to_pandas to match behavior of series and df, add tests. * update warning message * update warning message * update warning message test * update parameters * test fix
1 parent 3a633d5 commit dd2f488

File tree

19 files changed

+663
-79
lines changed

19 files changed

+663
-79
lines changed

bigframes/_config/bigquery_options.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ def __init__(
8787
kms_key_name: Optional[str] = None,
8888
skip_bq_connection_check: bool = False,
8989
*,
90+
allow_large_results: bool = True,
9091
ordering_mode: Literal["strict", "partial"] = "strict",
9192
client_endpoints_override: Optional[dict] = None,
9293
):
@@ -98,6 +99,7 @@ def __init__(
9899
self._application_name = application_name
99100
self._kms_key_name = kms_key_name
100101
self._skip_bq_connection_check = skip_bq_connection_check
102+
self._allow_large_results = allow_large_results
101103
self._session_started = False
102104
# Determines the ordering strictness for the session.
103105
self._ordering_mode = _validate_ordering_mode(ordering_mode)
@@ -232,6 +234,26 @@ def skip_bq_connection_check(self, value: bool):
232234
)
233235
self._skip_bq_connection_check = value
234236

237+
@property
238+
def allow_large_results(self) -> bool:
239+
"""
240+
Sets the flag to allow or disallow query results larger than 10 GB.
241+
242+
The default setting for this flag is True, which allows queries to return results
243+
exceeding 10 GB by creating an explicit destination table. If set to False, it
244+
restricts the result size to 10 GB, and BigQuery will raise an error if this limit
245+
is exceeded.
246+
247+
Returns:
248+
bool: True if large results are allowed with an explicit destination table,
249+
False if results are limited to 10 GB and errors are raised when exceeded.
250+
"""
251+
return self._allow_large_results
252+
253+
@allow_large_results.setter
254+
def allow_large_results(self, value: bool):
255+
self._allow_large_results = value
256+
235257
@property
236258
def use_regional_endpoints(self) -> bool:
237259
"""Flag to connect to regional API endpoints.

bigframes/core/blocks.py

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ class MaterializationOptions:
112112
downsampling: sampling_options.SamplingOptions = dataclasses.field(
113113
default_factory=sampling_options.SamplingOptions
114114
)
115+
allow_large_results: Optional[bool] = None
115116
ordered: bool = True
116117

117118

@@ -479,9 +480,12 @@ def to_arrow(
479480
self,
480481
*,
481482
ordered: bool = True,
483+
allow_large_results: Optional[bool] = None,
482484
) -> Tuple[pa.Table, bigquery.QueryJob]:
483485
"""Run query and download results as a pyarrow Table."""
484-
execute_result = self.session._executor.execute(self.expr, ordered=ordered)
486+
execute_result = self.session._executor.execute(
487+
self.expr, ordered=ordered, use_explicit_destination=allow_large_results
488+
)
485489
pa_table = execute_result.to_arrow_table()
486490

487491
pa_index_labels = []
@@ -503,6 +507,7 @@ def to_pandas(
503507
random_state: Optional[int] = None,
504508
*,
505509
ordered: bool = True,
510+
allow_large_results: Optional[bool] = None,
506511
) -> Tuple[pd.DataFrame, Optional[bigquery.QueryJob]]:
507512
"""Run query and download results as a pandas DataFrame.
508513
@@ -545,7 +550,9 @@ def to_pandas(
545550

546551
df, query_job = self._materialize_local(
547552
materialize_options=MaterializationOptions(
548-
downsampling=sampling, ordered=ordered
553+
downsampling=sampling,
554+
allow_large_results=allow_large_results,
555+
ordered=ordered,
549556
)
550557
)
551558
df.set_axis(self.column_labels, axis=1, copy=False)
@@ -563,7 +570,10 @@ def try_peek(
563570
return None
564571

565572
def to_pandas_batches(
566-
self, page_size: Optional[int] = None, max_results: Optional[int] = None
573+
self,
574+
page_size: Optional[int] = None,
575+
max_results: Optional[int] = None,
576+
allow_large_results: Optional[bool] = None,
567577
):
568578
"""Download results one message at a time.
569579
@@ -572,7 +582,7 @@ def to_pandas_batches(
572582
execute_result = self.session._executor.execute(
573583
self.expr,
574584
ordered=True,
575-
use_explicit_destination=True,
585+
use_explicit_destination=allow_large_results,
576586
page_size=page_size,
577587
max_results=max_results,
578588
)
@@ -601,7 +611,10 @@ def _materialize_local(
601611
"""Run query and download results as a pandas DataFrame. Return the total number of results as well."""
602612
# TODO(swast): Allow for dry run and timeout.
603613
execute_result = self.session._executor.execute(
604-
self.expr, ordered=materialize_options.ordered, get_size_bytes=True
614+
self.expr,
615+
ordered=materialize_options.ordered,
616+
use_explicit_destination=materialize_options.allow_large_results,
617+
get_size_bytes=True,
605618
)
606619
assert execute_result.total_bytes is not None
607620
table_mb = execute_result.total_bytes / _BYTES_TO_MEGABYTES
@@ -1698,7 +1711,7 @@ def transpose(
16981711
original_row_index = (
16991712
original_row_index
17001713
if original_row_index is not None
1701-
else self.index.to_pandas(ordered=True)
1714+
else self.index.to_pandas(ordered=True)[0]
17021715
)
17031716
original_row_count = len(original_row_index)
17041717
if original_row_count > bigframes.constants.MAX_COLUMNS:
@@ -2657,14 +2670,22 @@ def column_ids(self) -> Sequence[str]:
26572670
def is_null(self) -> bool:
26582671
return len(self._block._index_columns) == 0
26592672

2660-
def to_pandas(self, *, ordered: Optional[bool] = None) -> pd.Index:
2673+
def to_pandas(
2674+
self,
2675+
*,
2676+
ordered: Optional[bool] = None,
2677+
allow_large_results: Optional[bool] = None,
2678+
) -> Tuple[pd.Index, Optional[bigquery.QueryJob]]:
26612679
"""Executes deferred operations and downloads the results."""
26622680
if len(self.column_ids) == 0:
26632681
raise bigframes.exceptions.NullIndexError(
26642682
"Cannot materialize index, as this object does not have an index. Set index column(s) using set_index."
26652683
)
26662684
ordered = ordered if ordered is not None else True
2667-
return self._block.select_columns([]).to_pandas(ordered=ordered)[0].index
2685+
df, query_job = self._block.select_columns([]).to_pandas(
2686+
ordered=ordered, allow_large_results=allow_large_results
2687+
)
2688+
return df.index, query_job
26682689

26692690
def resolve_level(self, level: LevelsType) -> typing.Sequence[str]:
26702691
if utils.is_list_like(level):

bigframes/core/indexes/base.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -490,17 +490,28 @@ def __getitem__(self, key: int) -> typing.Any:
490490
else:
491491
raise NotImplementedError(f"Index key not supported {key}")
492492

493-
def to_pandas(self) -> pandas.Index:
493+
def to_pandas(self, *, allow_large_results: Optional[bool] = None) -> pandas.Index:
494494
"""Gets the Index as a pandas Index.
495495
496+
Args:
497+
allow_large_results (bool, default None):
498+
If not None, overrides the global setting to allow or disallow large query results
499+
over the default size limit of 10 GB.
500+
496501
Returns:
497502
pandas.Index:
498503
A pandas Index with all of the labels from this Index.
499504
"""
500-
return self._block.index.to_pandas(ordered=True)
505+
df, query_job = self._block.index.to_pandas(
506+
ordered=True, allow_large_results=allow_large_results
507+
)
508+
self._query_job = query_job
509+
return df
501510

502-
def to_numpy(self, dtype=None, **kwargs) -> np.ndarray:
503-
return self.to_pandas().to_numpy(dtype, **kwargs)
511+
def to_numpy(self, dtype=None, *, allow_large_results=None, **kwargs) -> np.ndarray:
512+
return self.to_pandas(allow_large_results=allow_large_results).to_numpy(
513+
dtype, **kwargs
514+
)
504515

505516
__array__ = to_numpy
506517

0 commit comments

Comments
 (0)