Skip to content

refactor: Separate dataset id generation from temp table management #1520

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion bigframes/blob/_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ def _output_bq_type(self):

def _create_udf(self):
"""Create Python UDF in BQ. Return name of the UDF."""
udf_name = str(self._session._loader._storage_manager._random_table())
udf_name = str(
self._session._loader._storage_manager.generate_unique_resource_id()
)

func_body = inspect.getsource(self._func)
func_name = self._func.__name__
Expand Down
7 changes: 3 additions & 4 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3760,10 +3760,9 @@ def to_gbq(
)
if_exists = "replace"

temp_table_ref = self._session._temp_storage_manager._random_table(
# The client code owns this table reference now, so skip_cleanup=True
# to not clean it up when we close the session.
skip_cleanup=True,
# The client code owns this table reference now
temp_table_ref = (
self._session._temp_storage_manager.generate_unique_resource_id()
)
destination_table = f"{temp_table_ref.project}.{temp_table_ref.dataset_id}.{temp_table_ref.table_id}"

Expand Down
10 changes: 5 additions & 5 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ def __init__(
self._metrics = bigframes.session.metrics.ExecutionMetrics()
self._function_session = bff_session.FunctionSession()
self._temp_storage_manager = (
bigframes.session.temp_storage.TemporaryGbqStorageManager(
bigframes.session.temp_storage.AnonymousDatasetManager(
self._clients_provider.bqclient,
location=self._location,
session_id=self._session_id,
Expand Down Expand Up @@ -908,7 +908,7 @@ def read_csv(
engine=engine,
write_engine=write_engine,
)
table = self._temp_storage_manager._random_table()
table = self._temp_storage_manager.allocate_temp_table()

if engine is not None and engine == "bigquery":
if any(param is not None for param in (dtype, names)):
Expand Down Expand Up @@ -1054,7 +1054,7 @@ def read_parquet(
engine=engine,
write_engine=write_engine,
)
table = self._temp_storage_manager._random_table()
table = self._temp_storage_manager.allocate_temp_table()

if engine == "bigquery":
job_config = bigquery.LoadJobConfig()
Expand Down Expand Up @@ -1108,7 +1108,7 @@ def read_json(
engine=engine,
write_engine=write_engine,
)
table = self._temp_storage_manager._random_table()
table = self._temp_storage_manager.allocate_temp_table()

if engine == "bigquery":

Expand Down Expand Up @@ -1704,7 +1704,7 @@ def _start_query_ml_ddl(

def _create_object_table(self, path: str, connection: str) -> str:
"""Create a random id Object Table from the input path and connection."""
table = str(self._loader._storage_manager._random_table())
table = str(self._loader._storage_manager.generate_unique_resource_id())

import textwrap

Expand Down
10 changes: 6 additions & 4 deletions bigframes/session/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ class BigQueryCachingExecutor(Executor):
def __init__(
self,
bqclient: bigquery.Client,
storage_manager: bigframes.session.temp_storage.TemporaryGbqStorageManager,
storage_manager: bigframes.session.temp_storage.AnonymousDatasetManager,
bqstoragereadclient: google.cloud.bigquery_storage_v1.BigQueryReadClient,
*,
strictly_ordered: bool = True,
Expand Down Expand Up @@ -248,7 +248,7 @@ def execute(
job_config = bigquery.QueryJobConfig()
# Use explicit destination to avoid 10GB limit of temporary table
if use_explicit_destination:
destination_table = self.storage_manager.create_temp_table(
destination_table = self.storage_manager.allocate_and_create_temp_table(
array_value.schema.to_bigquery(), cluster_cols=[]
)
job_config.destination = destination_table
Expand Down Expand Up @@ -392,7 +392,7 @@ def peek(
job_config = bigquery.QueryJobConfig()
# Use explicit destination to avoid 10GB limit of temporary table
if use_explicit_destination:
destination_table = self.storage_manager.create_temp_table(
destination_table = self.storage_manager.allocate_and_create_temp_table(
array_value.schema.to_bigquery(), cluster_cols=[]
)
job_config.destination = destination_table
Expand Down Expand Up @@ -645,7 +645,9 @@ def _sql_as_cached_temp_table(
cluster_cols: Sequence[str],
) -> bigquery.TableReference:
assert len(cluster_cols) <= _MAX_CLUSTER_COLUMNS
temp_table = self.storage_manager.create_temp_table(schema, cluster_cols)
temp_table = self.storage_manager.allocate_and_create_temp_table(
schema, cluster_cols
)

# TODO: Get default job config settings
job_config = cast(
Expand Down
10 changes: 6 additions & 4 deletions bigframes/session/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def __init__(
self,
session: bigframes.session.Session,
bqclient: bigquery.Client,
storage_manager: bigframes.session.temp_storage.TemporaryGbqStorageManager,
storage_manager: bigframes.session.temp_storage.AnonymousDatasetManager,
default_index_type: bigframes.enums.DefaultIndexKind,
scan_index_uniqueness: bool,
force_total_order: bool,
Expand Down Expand Up @@ -167,7 +167,7 @@ def read_pandas_load_job(

job_config.labels = {"bigframes-api": api_name}

load_table_destination = self._storage_manager._random_table()
load_table_destination = self._storage_manager.allocate_temp_table()
load_job = self._bqclient.load_table_from_dataframe(
pandas_dataframe_copy,
load_table_destination,
Expand Down Expand Up @@ -216,7 +216,7 @@ def read_pandas_streaming(
index=True,
)

destination = self._storage_manager.create_temp_table(
destination = self._storage_manager.allocate_and_create_temp_table(
schema,
[ordering_col],
)
Expand Down Expand Up @@ -673,7 +673,9 @@ def _query_to_destination(
)
else:
cluster_cols = []
temp_table = self._storage_manager.create_temp_table(schema, cluster_cols)
temp_table = self._storage_manager.allocate_and_create_temp_table(
schema, cluster_cols
)

timeout_ms = configuration.get("jobTimeoutMs") or configuration["query"].get(
"timeoutMs"
Expand Down
39 changes: 21 additions & 18 deletions bigframes/session/temp_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
_TEMP_TABLE_ID_FORMAT = "bqdf{date}_{session_id}_{random_id}"


class TemporaryGbqStorageManager:
class AnonymousDatasetManager:
"""
Responsible for allocating and cleaning up temporary gbq tables used by a BigFrames session.
"""
Expand All @@ -46,32 +46,42 @@ def __init__(
)

self.session_id = session_id
self._table_ids: List[str] = []
self._table_ids: List[bigquery.TableReference] = []
self._kms_key = kms_key

def create_temp_table(
def allocate_and_create_temp_table(
self, schema: Sequence[bigquery.SchemaField], cluster_cols: Sequence[str]
) -> bigquery.TableReference:
# Can't set a table in _SESSION as destination via query job API, so we
# run DDL, instead.
"""
Allocates and and creates a table in the anonymous dataset.
The table will be cleaned up by clean_up_tables.
"""
expiration = (
datetime.datetime.now(datetime.timezone.utc) + constants.DEFAULT_EXPIRATION
)
table = bf_io_bigquery.create_temp_table(
self.bqclient,
self._random_table(),
self.allocate_temp_table(),
expiration,
schema=schema,
cluster_columns=list(cluster_cols),
kms_key=self._kms_key,
)
return bigquery.TableReference.from_string(table)

def _random_table(self, skip_cleanup: bool = False) -> bigquery.TableReference:
def allocate_temp_table(self) -> bigquery.TableReference:
"""
Allocates a unique table id, but does not create the table.
The table will be cleaned up by clean_up_tables.
"""
table_id = self.generate_unique_resource_id()
self._table_ids.append(table_id)
return table_id

def generate_unique_resource_id(self) -> bigquery.TableReference:
"""Generate a random table ID with BigQuery DataFrames prefix.

The generated ID will be stored and checked for deletion when the
session is closed, unless skip_cleanup is True.
This resource will not be cleaned up by this manager.

Args:
skip_cleanup (bool, default False):
Expand All @@ -87,16 +97,9 @@ def _random_table(self, skip_cleanup: bool = False) -> bigquery.TableReference:
table_id = _TEMP_TABLE_ID_FORMAT.format(
date=now.strftime("%Y%m%d"), session_id=self.session_id, random_id=random_id
)
if not skip_cleanup:
self._table_ids.append(table_id)
return self.dataset.table(table_id)

def clean_up_tables(self):
"""Delete tables that were created with this session's session_id."""
client = self.bqclient
project_id = self.dataset.project
dataset_id = self.dataset.dataset_id

for table_id in self._table_ids:
full_id = ".".join([project_id, dataset_id, table_id])
client.delete_table(full_id, not_found_ok=True)
for table_ref in self._table_ids:
self.bqclient.delete_table(table_ref, not_found_ok=True)
29 changes: 17 additions & 12 deletions tests/system/large/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import datetime

import google.cloud.bigquery as bigquery
import google.cloud.exceptions
import pytest

Expand Down Expand Up @@ -70,10 +71,14 @@ def test_close(session: bigframes.Session):
+ bigframes.constants.DEFAULT_EXPIRATION
)
full_id_1 = bigframes.session._io.bigquery.create_temp_table(
session.bqclient, session._temp_storage_manager._random_table(), expiration
session.bqclient,
session._temp_storage_manager.allocate_temp_table(),
expiration,
)
full_id_2 = bigframes.session._io.bigquery.create_temp_table(
session.bqclient, session._temp_storage_manager._random_table(), expiration
session.bqclient,
session._temp_storage_manager.allocate_temp_table(),
expiration,
)

# check that the tables were actually created
Expand Down Expand Up @@ -106,10 +111,14 @@ def test_clean_up_by_session_id():
+ bigframes.constants.DEFAULT_EXPIRATION
)
bigframes.session._io.bigquery.create_temp_table(
session.bqclient, session._temp_storage_manager._random_table(), expiration
session.bqclient,
session._temp_storage_manager.allocate_temp_table(),
expiration,
)
bigframes.session._io.bigquery.create_temp_table(
session.bqclient, session._temp_storage_manager._random_table(), expiration
session.bqclient,
session._temp_storage_manager.allocate_temp_table(),
expiration,
)

# check that some table exists with the expected session_id
Expand Down Expand Up @@ -148,15 +157,11 @@ def test_clean_up_via_context_manager(session_creator):
with session_creator() as session:
bqclient = session.bqclient

expiration = (
datetime.datetime.now(datetime.timezone.utc)
+ bigframes.constants.DEFAULT_EXPIRATION
full_id_1 = session._temp_storage_manager.allocate_and_create_temp_table(
[bigquery.SchemaField("a", "INT64")], cluster_cols=[]
)
full_id_1 = bigframes.session._io.bigquery.create_temp_table(
session.bqclient, session._temp_storage_manager._random_table(), expiration
)
full_id_2 = bigframes.session._io.bigquery.create_temp_table(
session.bqclient, session._temp_storage_manager._random_table(), expiration
full_id_2 = session._temp_storage_manager.allocate_and_create_temp_table(
[bigquery.SchemaField("b", "STRING")], cluster_cols=["b"]
)

# check that the tables were actually created
Expand Down
4 changes: 2 additions & 2 deletions tests/system/small/test_encryption.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def test_session_load_job(bq_cmek, session_with_bq_cmek):
pytest.skip("no cmek set for testing") # pragma: NO COVER

# Session should have cmek set in the default query and load job configs
load_table = session_with_bq_cmek._temp_storage_manager._random_table()
load_table = session_with_bq_cmek._temp_storage_manager.allocate_temp_table()

df = pandas.DataFrame({"col0": [1, 2, 3]})
load_job_config = bigquery.LoadJobConfig()
Expand Down Expand Up @@ -194,7 +194,7 @@ def test_to_gbq(bq_cmek, session_with_bq_cmek, scalars_table_id):

# Write the result to BQ custom table and assert encryption
session_with_bq_cmek.bqclient.get_table(output_table_id)
output_table_ref = session_with_bq_cmek._temp_storage_manager._random_table()
output_table_ref = session_with_bq_cmek._temp_storage_manager.allocate_temp_table()
output_table_id = str(output_table_ref)
df.to_gbq(output_table_id)
output_table = session_with_bq_cmek.bqclient.get_table(output_table_id)
Expand Down