diff --git a/bigframes/blob/_functions.py b/bigframes/blob/_functions.py index a3e7ae153c..6c9b435473 100644 --- a/bigframes/blob/_functions.py +++ b/bigframes/blob/_functions.py @@ -68,7 +68,9 @@ def _output_bq_type(self): def _create_udf(self): """Create Python UDF in BQ. Return name of the UDF.""" - udf_name = str(self._session._loader._storage_manager._random_table()) + udf_name = str( + self._session._loader._storage_manager.generate_unique_resource_id() + ) func_body = inspect.getsource(self._func) func_name = self._func.__name__ diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 4955bb1295..1d3a45e879 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -3760,10 +3760,9 @@ def to_gbq( ) if_exists = "replace" - temp_table_ref = self._session._temp_storage_manager._random_table( - # The client code owns this table reference now, so skip_cleanup=True - # to not clean it up when we close the session. - skip_cleanup=True, + # The client code owns this table reference now + temp_table_ref = ( + self._session._temp_storage_manager.generate_unique_resource_id() ) destination_table = f"{temp_table_ref.project}.{temp_table_ref.dataset_id}.{temp_table_ref.table_id}" diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 7b416d4424..acaad4a5b7 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -248,7 +248,7 @@ def __init__( self._metrics = bigframes.session.metrics.ExecutionMetrics() self._function_session = bff_session.FunctionSession() self._temp_storage_manager = ( - bigframes.session.temp_storage.TemporaryGbqStorageManager( + bigframes.session.temp_storage.AnonymousDatasetManager( self._clients_provider.bqclient, location=self._location, session_id=self._session_id, @@ -908,7 +908,7 @@ def read_csv( engine=engine, write_engine=write_engine, ) - table = self._temp_storage_manager._random_table() + table = self._temp_storage_manager.allocate_temp_table() if engine is not None and engine == "bigquery": if any(param is not None for param in (dtype, names)): @@ -1054,7 +1054,7 @@ def read_parquet( engine=engine, write_engine=write_engine, ) - table = self._temp_storage_manager._random_table() + table = self._temp_storage_manager.allocate_temp_table() if engine == "bigquery": job_config = bigquery.LoadJobConfig() @@ -1108,7 +1108,7 @@ def read_json( engine=engine, write_engine=write_engine, ) - table = self._temp_storage_manager._random_table() + table = self._temp_storage_manager.allocate_temp_table() if engine == "bigquery": @@ -1704,7 +1704,7 @@ def _start_query_ml_ddl( def _create_object_table(self, path: str, connection: str) -> str: """Create a random id Object Table from the input path and connection.""" - table = str(self._loader._storage_manager._random_table()) + table = str(self._loader._storage_manager.generate_unique_resource_id()) import textwrap diff --git a/bigframes/session/executor.py b/bigframes/session/executor.py index e539525d80..91ad7c794b 100644 --- a/bigframes/session/executor.py +++ b/bigframes/session/executor.py @@ -195,7 +195,7 @@ class BigQueryCachingExecutor(Executor): def __init__( self, bqclient: bigquery.Client, - storage_manager: bigframes.session.temp_storage.TemporaryGbqStorageManager, + storage_manager: bigframes.session.temp_storage.AnonymousDatasetManager, bqstoragereadclient: google.cloud.bigquery_storage_v1.BigQueryReadClient, *, strictly_ordered: bool = True, @@ -248,7 +248,7 @@ def execute( job_config = bigquery.QueryJobConfig() # Use explicit destination to avoid 10GB limit of temporary table if use_explicit_destination: - destination_table = self.storage_manager.create_temp_table( + destination_table = self.storage_manager.allocate_and_create_temp_table( array_value.schema.to_bigquery(), cluster_cols=[] ) job_config.destination = destination_table @@ -392,7 +392,7 @@ def peek( job_config = bigquery.QueryJobConfig() # Use explicit destination to avoid 10GB limit of temporary table if use_explicit_destination: - destination_table = self.storage_manager.create_temp_table( + destination_table = self.storage_manager.allocate_and_create_temp_table( array_value.schema.to_bigquery(), cluster_cols=[] ) job_config.destination = destination_table @@ -645,7 +645,9 @@ def _sql_as_cached_temp_table( cluster_cols: Sequence[str], ) -> bigquery.TableReference: assert len(cluster_cols) <= _MAX_CLUSTER_COLUMNS - temp_table = self.storage_manager.create_temp_table(schema, cluster_cols) + temp_table = self.storage_manager.allocate_and_create_temp_table( + schema, cluster_cols + ) # TODO: Get default job config settings job_config = cast( diff --git a/bigframes/session/loader.py b/bigframes/session/loader.py index b9859e92a2..1296e9d1b3 100644 --- a/bigframes/session/loader.py +++ b/bigframes/session/loader.py @@ -115,7 +115,7 @@ def __init__( self, session: bigframes.session.Session, bqclient: bigquery.Client, - storage_manager: bigframes.session.temp_storage.TemporaryGbqStorageManager, + storage_manager: bigframes.session.temp_storage.AnonymousDatasetManager, default_index_type: bigframes.enums.DefaultIndexKind, scan_index_uniqueness: bool, force_total_order: bool, @@ -167,7 +167,7 @@ def read_pandas_load_job( job_config.labels = {"bigframes-api": api_name} - load_table_destination = self._storage_manager._random_table() + load_table_destination = self._storage_manager.allocate_temp_table() load_job = self._bqclient.load_table_from_dataframe( pandas_dataframe_copy, load_table_destination, @@ -216,7 +216,7 @@ def read_pandas_streaming( index=True, ) - destination = self._storage_manager.create_temp_table( + destination = self._storage_manager.allocate_and_create_temp_table( schema, [ordering_col], ) @@ -673,7 +673,9 @@ def _query_to_destination( ) else: cluster_cols = [] - temp_table = self._storage_manager.create_temp_table(schema, cluster_cols) + temp_table = self._storage_manager.allocate_and_create_temp_table( + schema, cluster_cols + ) timeout_ms = configuration.get("jobTimeoutMs") or configuration["query"].get( "timeoutMs" diff --git a/bigframes/session/temp_storage.py b/bigframes/session/temp_storage.py index de764e4535..3b2965efef 100644 --- a/bigframes/session/temp_storage.py +++ b/bigframes/session/temp_storage.py @@ -24,7 +24,7 @@ _TEMP_TABLE_ID_FORMAT = "bqdf{date}_{session_id}_{random_id}" -class TemporaryGbqStorageManager: +class AnonymousDatasetManager: """ Responsible for allocating and cleaning up temporary gbq tables used by a BigFrames session. """ @@ -46,20 +46,22 @@ def __init__( ) self.session_id = session_id - self._table_ids: List[str] = [] + self._table_ids: List[bigquery.TableReference] = [] self._kms_key = kms_key - def create_temp_table( + def allocate_and_create_temp_table( self, schema: Sequence[bigquery.SchemaField], cluster_cols: Sequence[str] ) -> bigquery.TableReference: - # Can't set a table in _SESSION as destination via query job API, so we - # run DDL, instead. + """ + Allocates and and creates a table in the anonymous dataset. + The table will be cleaned up by clean_up_tables. + """ expiration = ( datetime.datetime.now(datetime.timezone.utc) + constants.DEFAULT_EXPIRATION ) table = bf_io_bigquery.create_temp_table( self.bqclient, - self._random_table(), + self.allocate_temp_table(), expiration, schema=schema, cluster_columns=list(cluster_cols), @@ -67,11 +69,19 @@ def create_temp_table( ) return bigquery.TableReference.from_string(table) - def _random_table(self, skip_cleanup: bool = False) -> bigquery.TableReference: + def allocate_temp_table(self) -> bigquery.TableReference: + """ + Allocates a unique table id, but does not create the table. + The table will be cleaned up by clean_up_tables. + """ + table_id = self.generate_unique_resource_id() + self._table_ids.append(table_id) + return table_id + + def generate_unique_resource_id(self) -> bigquery.TableReference: """Generate a random table ID with BigQuery DataFrames prefix. - The generated ID will be stored and checked for deletion when the - session is closed, unless skip_cleanup is True. + This resource will not be cleaned up by this manager. Args: skip_cleanup (bool, default False): @@ -87,16 +97,9 @@ def _random_table(self, skip_cleanup: bool = False) -> bigquery.TableReference: table_id = _TEMP_TABLE_ID_FORMAT.format( date=now.strftime("%Y%m%d"), session_id=self.session_id, random_id=random_id ) - if not skip_cleanup: - self._table_ids.append(table_id) return self.dataset.table(table_id) def clean_up_tables(self): """Delete tables that were created with this session's session_id.""" - client = self.bqclient - project_id = self.dataset.project - dataset_id = self.dataset.dataset_id - - for table_id in self._table_ids: - full_id = ".".join([project_id, dataset_id, table_id]) - client.delete_table(full_id, not_found_ok=True) + for table_ref in self._table_ids: + self.bqclient.delete_table(table_ref, not_found_ok=True) diff --git a/tests/system/large/test_session.py b/tests/system/large/test_session.py index 7f13462cbe..e117cf0327 100644 --- a/tests/system/large/test_session.py +++ b/tests/system/large/test_session.py @@ -14,6 +14,7 @@ import datetime +import google.cloud.bigquery as bigquery import google.cloud.exceptions import pytest @@ -70,10 +71,14 @@ def test_close(session: bigframes.Session): + bigframes.constants.DEFAULT_EXPIRATION ) full_id_1 = bigframes.session._io.bigquery.create_temp_table( - session.bqclient, session._temp_storage_manager._random_table(), expiration + session.bqclient, + session._temp_storage_manager.allocate_temp_table(), + expiration, ) full_id_2 = bigframes.session._io.bigquery.create_temp_table( - session.bqclient, session._temp_storage_manager._random_table(), expiration + session.bqclient, + session._temp_storage_manager.allocate_temp_table(), + expiration, ) # check that the tables were actually created @@ -106,10 +111,14 @@ def test_clean_up_by_session_id(): + bigframes.constants.DEFAULT_EXPIRATION ) bigframes.session._io.bigquery.create_temp_table( - session.bqclient, session._temp_storage_manager._random_table(), expiration + session.bqclient, + session._temp_storage_manager.allocate_temp_table(), + expiration, ) bigframes.session._io.bigquery.create_temp_table( - session.bqclient, session._temp_storage_manager._random_table(), expiration + session.bqclient, + session._temp_storage_manager.allocate_temp_table(), + expiration, ) # check that some table exists with the expected session_id @@ -148,15 +157,11 @@ def test_clean_up_via_context_manager(session_creator): with session_creator() as session: bqclient = session.bqclient - expiration = ( - datetime.datetime.now(datetime.timezone.utc) - + bigframes.constants.DEFAULT_EXPIRATION + full_id_1 = session._temp_storage_manager.allocate_and_create_temp_table( + [bigquery.SchemaField("a", "INT64")], cluster_cols=[] ) - full_id_1 = bigframes.session._io.bigquery.create_temp_table( - session.bqclient, session._temp_storage_manager._random_table(), expiration - ) - full_id_2 = bigframes.session._io.bigquery.create_temp_table( - session.bqclient, session._temp_storage_manager._random_table(), expiration + full_id_2 = session._temp_storage_manager.allocate_and_create_temp_table( + [bigquery.SchemaField("b", "STRING")], cluster_cols=["b"] ) # check that the tables were actually created diff --git a/tests/system/small/test_encryption.py b/tests/system/small/test_encryption.py index 72529bc5b0..8ce53c218b 100644 --- a/tests/system/small/test_encryption.py +++ b/tests/system/small/test_encryption.py @@ -89,7 +89,7 @@ def test_session_load_job(bq_cmek, session_with_bq_cmek): pytest.skip("no cmek set for testing") # pragma: NO COVER # Session should have cmek set in the default query and load job configs - load_table = session_with_bq_cmek._temp_storage_manager._random_table() + load_table = session_with_bq_cmek._temp_storage_manager.allocate_temp_table() df = pandas.DataFrame({"col0": [1, 2, 3]}) load_job_config = bigquery.LoadJobConfig() @@ -194,7 +194,7 @@ def test_to_gbq(bq_cmek, session_with_bq_cmek, scalars_table_id): # Write the result to BQ custom table and assert encryption session_with_bq_cmek.bqclient.get_table(output_table_id) - output_table_ref = session_with_bq_cmek._temp_storage_manager._random_table() + output_table_ref = session_with_bq_cmek._temp_storage_manager.allocate_temp_table() output_table_id = str(output_table_ref) df.to_gbq(output_table_id) output_table = session_with_bq_cmek.bqclient.get_table(output_table_id)