diff --git a/bigframes/_config/experiment_options.py b/bigframes/_config/experiment_options.py index 69273aef1c..b958667628 100644 --- a/bigframes/_config/experiment_options.py +++ b/bigframes/_config/experiment_options.py @@ -25,6 +25,7 @@ class ExperimentOptions: def __init__(self): self._semantic_operators: bool = False self._blob: bool = False + self._udf: bool = False @property def semantic_operators(self) -> bool: @@ -53,3 +54,17 @@ def blob(self, value: bool): ) warnings.warn(msg, category=bfe.PreviewWarning) self._blob = value + + @property + def udf(self) -> bool: + return self._udf + + @udf.setter + def udf(self, value: bool): + if value is True: + msg = ( + "BigFrames managed function (udf) is still under experiments. " + "It may not work and subject to change in the future." + ) + warnings.warn(msg, category=bfe.PreviewWarning) + self._udf = value diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 18d0b38fba..71e94e781d 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -4097,9 +4097,16 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs): msg = "axis=1 scenario is in preview." warnings.warn(msg, category=bfe.PreviewWarning) - # Check if the function is a remote function - if not hasattr(func, "bigframes_remote_function"): - raise ValueError("For axis=1 a remote function must be used.") + # TODO(jialuo): Deprecate the "bigframes_remote_function" attribute. + # We have some tests using pre-defined remote_function that were + # defined based on "bigframes_remote_function" instead of + # "bigframes_bigquery_function". So we need to fix those pre-defined + # remote functions before deprecating the "bigframes_remote_function" + # attribute. Check if the function is a remote function. + if not hasattr(func, "bigframes_remote_function") and not hasattr( + func, "bigframes_bigquery_function" + ): + raise ValueError("For axis=1 a bigframes function must be used.") is_row_processor = getattr(func, "is_row_processor") if is_row_processor: diff --git a/bigframes/functions/_function_client.py b/bigframes/functions/_function_client.py index f5001ff909..68580332c3 100644 --- a/bigframes/functions/_function_client.py +++ b/bigframes/functions/_function_client.py @@ -21,8 +21,8 @@ import random import shutil import string -import sys import tempfile +import textwrap import types from typing import cast, Tuple, TYPE_CHECKING @@ -55,7 +55,7 @@ class FunctionClient: - # Wait time (in seconds) for an IAM binding to take effect after creation + # Wait time (in seconds) for an IAM binding to take effect after creation. _iam_wait_seconds = 120 # TODO(b/392707725): Convert all necessary parameters for cloud function @@ -63,31 +63,78 @@ class FunctionClient: def __init__( self, gcp_project_id, - cloud_function_region, - cloud_functions_client, bq_location, bq_dataset, bq_client, bq_connection_id, bq_connection_manager, - cloud_function_service_account, - cloud_function_kms_key_name, - cloud_function_docker_repository, + cloud_function_region=None, + cloud_functions_client=None, + cloud_function_service_account=None, + cloud_function_kms_key_name=None, + cloud_function_docker_repository=None, *, session: Session, ): self._gcp_project_id = gcp_project_id - self._cloud_function_region = cloud_function_region - self._cloud_functions_client = cloud_functions_client self._bq_location = bq_location self._bq_dataset = bq_dataset self._bq_client = bq_client self._bq_connection_id = bq_connection_id self._bq_connection_manager = bq_connection_manager + self._session = session + + # Optional attributes only for remote functions. + self._cloud_function_region = cloud_function_region + self._cloud_functions_client = cloud_functions_client self._cloud_function_service_account = cloud_function_service_account self._cloud_function_kms_key_name = cloud_function_kms_key_name self._cloud_function_docker_repository = cloud_function_docker_repository - self._session = session + + def _create_bq_connection(self) -> None: + if self._bq_connection_manager: + self._bq_connection_manager.create_bq_connection( + self._gcp_project_id, + self._bq_location, + self._bq_connection_id, + "run.invoker", + ) + + def _ensure_dataset_exists(self) -> None: + # Make sure the dataset exists, i.e. if it doesn't exist, go ahead and + # create it. + dataset = bigquery.Dataset( + bigquery.DatasetReference.from_string( + self._bq_dataset, default_project=self._gcp_project_id + ) + ) + dataset.location = self._bq_location + try: + # This check does not require bigquery.datasets.create IAM + # permission. So, if the data set already exists, then user can work + # without having that permission. + self._bq_client.get_dataset(dataset) + except google.api_core.exceptions.NotFound: + # This requires bigquery.datasets.create IAM permission. + self._bq_client.create_dataset(dataset, exists_ok=True) + + def _create_bq_function(self, create_function_ddl: str) -> None: + # TODO(swast): plumb through the original, user-facing api_name. + _, query_job = bigframes.session._io.bigquery.start_query_with_client( + self._session.bqclient, + create_function_ddl, + job_config=bigquery.QueryJobConfig(), + ) + logger.info(f"Created bigframes function {query_job.ddl_target_routine}") + + def _format_function_options(self, function_options: dict) -> str: + return ", ".join( + [ + f"{key}='{val}'" if isinstance(val, str) else f"{key}={val}" + for key, val in function_options.items() + if val is not None + ] + ) def create_bq_remote_function( self, @@ -101,13 +148,7 @@ def create_bq_remote_function( ): """Create a BigQuery remote function given the artifacts of a user defined function and the http endpoint of a corresponding cloud function.""" - if self._bq_connection_manager: - self._bq_connection_manager.create_bq_connection( - self._gcp_project_id, - self._bq_location, - self._bq_connection_id, - "run.invoker", - ) + self._create_bq_connection() # Create BQ function # https://cloud.google.com/bigquery/docs/reference/standard-sql/remote-functions#create_a_remote_function_2 @@ -128,12 +169,8 @@ def create_bq_remote_function( # bigframes specific metadata for the lack of a better option remote_function_options["description"] = metadata - remote_function_options_str = ", ".join( - [ - f"{key}='{val}'" if isinstance(val, str) else f"{key}={val}" - for key, val in remote_function_options.items() - if val is not None - ] + remote_function_options_str = self._format_function_options( + remote_function_options ) create_function_ddl = f""" @@ -144,31 +181,78 @@ def create_bq_remote_function( logger.info(f"Creating BQ remote function: {create_function_ddl}") - # Make sure the dataset exists. I.e. if it doesn't exist, go ahead and - # create it - dataset = bigquery.Dataset( - bigquery.DatasetReference.from_string( - self._bq_dataset, default_project=self._gcp_project_id - ) - ) - dataset.location = self._bq_location - try: - # This check does not require bigquery.datasets.create IAM - # permission. So, if the data set already exists, then user can work - # without having that permission. - self._bq_client.get_dataset(dataset) - except google.api_core.exceptions.NotFound: - # This requires bigquery.datasets.create IAM permission - self._bq_client.create_dataset(dataset, exists_ok=True) + self._ensure_dataset_exists() + self._create_bq_function(create_function_ddl) - # TODO(swast): plumb through the original, user-facing api_name. - _, query_job = bigframes.session._io.bigquery.start_query_with_client( - self._session.bqclient, - create_function_ddl, - job_config=bigquery.QueryJobConfig(), + def provision_bq_managed_function( + self, + func, + input_types, + output_type, + name, + packages, + is_row_processor, + ): + """Create a BigQuery managed function.""" + import cloudpickle + + pickled = cloudpickle.dumps(func) + + # Create BQ managed function. + bq_function_args = [] + bq_function_return_type = output_type + + input_args = inspect.getargs(func.__code__).args + # We expect the input type annotations to be 1:1 with the input args. + for name_, type_ in zip(input_args, input_types): + bq_function_args.append(f"{name_} {type_}") + + managed_function_options = { + "runtime_version": _utils.get_python_version(), + "entry_point": "bigframes_handler", + } + + # Augment user package requirements with any internal package + # requirements. + packages = _utils._get_updated_package_requirements(packages, is_row_processor) + if packages: + managed_function_options["packages"] = packages + managed_function_options_str = self._format_function_options( + managed_function_options ) - logger.info(f"Created remote function {query_job.ddl_target_routine}") + session_id = None if name else self._session.session_id + bq_function_name = name + if not bq_function_name: + # Compute a unique hash representing the user code. + function_hash = _utils._get_hash(func, packages) + bq_function_name = _utils.get_bigframes_function_name( + function_hash, + session_id, + ) + + persistent_func_id = ( + f"`{self._gcp_project_id}.{self._bq_dataset}`.{bq_function_name}" + ) + create_function_ddl = textwrap.dedent( + f""" + CREATE OR REPLACE FUNCTION {persistent_func_id}({','.join(bq_function_args)}) + RETURNS {bq_function_return_type} + LANGUAGE python + OPTIONS ({managed_function_options_str}) + AS r''' + import cloudpickle + udf = cloudpickle.loads({pickled}) + def bigframes_handler(*args): + return udf(*args) + ''' + """ + ).strip() + + self._ensure_dataset_exists() + self._create_bq_function(create_function_ddl) + + return bq_function_name def get_cloud_function_fully_qualified_parent(self): "Get the fully qualilfied parent for a cloud function." @@ -262,9 +346,7 @@ def create_cloud_function( # TODO(shobs): Figure out how to achieve version compatibility, specially # when pickle (internally used by cloudpickle) guarantees that: # https://docs.python.org/3/library/pickle.html#:~:text=The%20pickle%20serialization%20format%20is,unique%20breaking%20change%20language%20boundary. - python_version = "python{}{}".format( - sys.version_info.major, sys.version_info.minor - ) + python_version = _utils.get_python_version(is_compat=True) # Determine an upload URL for user code upload_url_request = functions_v2.GenerateUploadUrlRequest( @@ -443,7 +525,7 @@ def provision_bq_remote_function( # Derive the name of the remote function remote_function_name = name if not remote_function_name: - remote_function_name = _utils.get_remote_function_name( + remote_function_name = _utils.get_bigframes_function_name( function_hash, self._session.session_id, uniq_suffix ) rf_endpoint, rf_conn = self.get_remote_function_specs(remote_function_name) diff --git a/bigframes/functions/_function_session.py b/bigframes/functions/_function_session.py index aef75129c0..20dcf45103 100644 --- a/bigframes/functions/_function_session.py +++ b/bigframes/functions/_function_session.py @@ -21,6 +21,7 @@ import threading from typing import ( Any, + Callable, cast, Dict, Literal, @@ -46,7 +47,9 @@ ) from bigframes import clients -from bigframes import version as bigframes_version +import bigframes.core.compile.ibis_types +import bigframes.exceptions as bfe +import bigframes.series as bf_series if TYPE_CHECKING: from bigframes.session import Session @@ -55,6 +58,9 @@ from . import _function_client, _utils +# BQ managed functions (@udf) currently only support Python 3.11. +_MANAGED_FUNC_PYTHON_VERSIONS = ("python-3.11",) + class FunctionSession: """Session to manage bigframes functions.""" @@ -66,6 +72,123 @@ def __init__(self): # Lock to synchronize the update of the session artifacts self._artifacts_lock = threading.Lock() + def _resolve_session(self, session: Optional[Session]) -> Session: + """Resolves the BigFrames session.""" + import bigframes.pandas as bpd + import bigframes.session + + # Using the global session if none is provided. + return cast(bigframes.session.Session, session or bpd.get_global_session()) + + def _resolve_bigquery_client( + self, session: Session, bigquery_client: Optional[bigquery.Client] + ) -> bigquery.Client: + """Resolves the BigQuery client.""" + if not bigquery_client: + bigquery_client = session.bqclient + if not bigquery_client: + raise ValueError( + "A bigquery client must be provided, either directly or via " + f"session. {constants.FEEDBACK_LINK}" + ) + return bigquery_client + + def _resolve_bigquery_connection_client( + self, + session: Session, + bigquery_connection_client: Optional[ + bigquery_connection_v1.ConnectionServiceClient + ], + ) -> bigquery_connection_v1.ConnectionServiceClient: + """Resolves the BigQuery connection client.""" + if not bigquery_connection_client: + bigquery_connection_client = session.bqconnectionclient + if not bigquery_connection_client: + raise ValueError( + "A bigquery connection client must be provided, either " + f"directly or via session. {constants.FEEDBACK_LINK}" + ) + return bigquery_connection_client + + def _resolve_resource_manager_client( + self, + session: Session, + resource_manager_client: Optional[resourcemanager_v3.ProjectsClient], + ) -> resourcemanager_v3.ProjectsClient: + """Resolves the resource manager client.""" + if not resource_manager_client: + resource_manager_client = session.resourcemanagerclient + if not resource_manager_client: + raise ValueError( + "A resource manager client must be provided, either directly " + f"or via session. {constants.FEEDBACK_LINK}" + ) + return resource_manager_client + + def _resolve_dataset_reference( + self, + session: Session, + bigquery_client: bigquery.Client, + dataset: Optional[str], + ) -> bigquery.DatasetReference: + """Resolves the dataset reference for the bigframes function.""" + if dataset: + dataset_ref = bigquery.DatasetReference.from_string( + dataset, default_project=bigquery_client.project + ) + else: + dataset_ref = session._anonymous_dataset + return dataset_ref + + def _resolve_cloud_functions_client( + self, + session: Session, + cloud_functions_client: Optional[functions_v2.FunctionServiceClient], + ) -> Optional[functions_v2.FunctionServiceClient]: + """Resolves the Cloud Functions client.""" + if not cloud_functions_client: + cloud_functions_client = session.cloudfunctionsclient + if not cloud_functions_client: + raise ValueError( + "A cloud functions client must be provided, either directly " + f"or via session. {constants.FEEDBACK_LINK}" + ) + return cloud_functions_client + + def _resolve_bigquery_connection_id( + self, + session: Session, + dataset_ref: bigquery.DatasetReference, + bq_location: str, + bigquery_connection: Optional[str] = None, + ) -> str: + """Resolves BigQuery connection id.""" + if not bigquery_connection: + bigquery_connection = session._bq_connection # type: ignore + + bigquery_connection = clients.resolve_full_bq_connection_name( + bigquery_connection, + default_project=dataset_ref.project, + default_location=bq_location, + ) + # Guaranteed to be the form of .. + ( + gcp_project_id, + bq_connection_location, + bq_connection_id, + ) = bigquery_connection.split(".") + if gcp_project_id.casefold() != dataset_ref.project.casefold(): + raise ValueError( + "The project_id does not match BigQuery connection " + f"gcp_project_id: {dataset_ref.project}." + ) + if bq_connection_location.casefold() != bq_location.casefold(): + raise ValueError( + "The location does not match BigQuery connection location: " + f"{bq_location}." + ) + return bq_connection_id + def _update_temp_artifacts(self, bqrf_routine: str, gcf_path: str): """Update function artifacts in the current session.""" with self._artifacts_lock: @@ -84,15 +207,27 @@ def clean_up( # deleted directly by the user bqclient.delete_routine(bqrf_routine, not_found_ok=True) - # Let's accept the possibility that the cloud function may have - # been deleted directly by the user - try: - gcfclient.delete_function(name=gcf_path) - except google.api_core.exceptions.NotFound: - pass + if gcf_path: + # Let's accept the possibility that the cloud function may + # have been deleted directly by the user + try: + gcfclient.delete_function(name=gcf_path) + except google.api_core.exceptions.NotFound: + pass self._temp_artifacts.clear() + def _try_delattr(self, func: Callable, attr: str) -> None: + """Attempts to delete an attribute from a bigframes function.""" + # In the unlikely case where the user is trying to re-deploy the same + # function, cleanup the attributes we add in bigframes functions, first. + # This prevents the pickle from having dependencies that might not + # otherwise be present such as ibis or pandas. + try: + delattr(func, attr) + except AttributeError: + pass + # Inspired by @udf decorator implemented in ibis-bigquery package # https://github.com/ibis-project/ibis-bigquery/blob/main/ibis_bigquery/udf/__init__.py # which has moved as @js to the ibis package @@ -130,14 +265,6 @@ def remote_function( .. deprecated:: 0.0.1 This is an internal method. Please use :func:`bigframes.pandas.remote_function` instead. - .. warning:: - To use remote functions with Bigframes 2.0 and onwards, please (preferred) - set an explicit user-managed ``cloud_function_service_account`` or (discouraged) - set ``cloud_function_service_account`` to use the Compute Engine service account - by setting it to `"default"`. - - See, https://cloud.google.com/functions/docs/securing/function-identity. - .. note:: Please make sure following is setup before using this API: @@ -311,116 +438,46 @@ def remote_function( https://cloud.google.com/functions/docs/configuring/memory. cloud_function_ingress_settings (str, Optional): Ingress settings controls dictating what traffic can reach the - function. Options are: `all`, `internal-only`, or `internal-and-gclb`. - If no setting is provided, `all` will be used by default and a warning - will be issued. See for more details + function. By default `all` will be used. It must be one of: + `all`, `internal-only`, `internal-and-gclb`. See for more details https://cloud.google.com/functions/docs/networking/network-settings#ingress_settings. """ - # Some defaults may be used from the session if not provided otherwise - import bigframes.exceptions as bfe - import bigframes.pandas as bpd - import bigframes.series as bf_series - import bigframes.session - - session = cast(bigframes.session.Session, session or bpd.get_global_session()) - - # raise a UserWarning if user does not explicitly set cloud_function_service_account to a - # user-managed cloud_function_service_account of to default - msg = ( - "You have not explicitly set a user-managed `cloud_function_service_account`. " - "Using the default Compute Engine service account. " - "To use Bigframes 2.0, please explicitly set `cloud_function_service_account` " - 'either to a user-managed service account (preferred) or to `"default"` ' - "to use the Compute Engine service account (discouraged). " - "See, https://cloud.google.com/functions/docs/securing/function-identity." - ) - - if ( - bigframes_version.__version__.startswith("1.") - and cloud_function_service_account is None - ): - warnings.warn(msg, stacklevel=2, category=FutureWarning) - - if cloud_function_service_account == "default": - cloud_function_service_account = None + # Some defaults may be used from the session if not provided otherwise. + session = self._resolve_session(session) - # A BigQuery client is required to perform BQ operations - if not bigquery_client: - bigquery_client = session.bqclient - if not bigquery_client: - raise ValueError( - "A bigquery client must be provided, either directly or via session. " - f"{constants.FEEDBACK_LINK}" - ) + # A BigQuery client is required to perform BQ operations. + bigquery_client = self._resolve_bigquery_client(session, bigquery_client) - # A BigQuery connection client is required to perform BQ connection operations - if not bigquery_connection_client: - bigquery_connection_client = session.bqconnectionclient - if not bigquery_connection_client: - raise ValueError( - "A bigquery connection client must be provided, either directly or via session. " - f"{constants.FEEDBACK_LINK}" - ) - - # A cloud functions client is required to perform cloud functions operations - if not cloud_functions_client: - cloud_functions_client = session.cloudfunctionsclient - if not cloud_functions_client: - raise ValueError( - "A cloud functions client must be provided, either directly or via session. " - f"{constants.FEEDBACK_LINK}" - ) + # A BigQuery connection client is required for BQ connection operations. + bigquery_connection_client = self._resolve_bigquery_connection_client( + session, bigquery_connection_client + ) - # A resource manager client is required to get/set IAM operations - if not resource_manager_client: - resource_manager_client = session.resourcemanagerclient - if not resource_manager_client: - raise ValueError( - "A resource manager client must be provided, either directly or via session. " - f"{constants.FEEDBACK_LINK}" - ) + # A resource manager client is required to get/set IAM operations. + resource_manager_client = self._resolve_resource_manager_client( + session, resource_manager_client + ) - # BQ remote function must be persisted, for which we need a dataset + # BQ remote function must be persisted, for which we need a dataset. # https://cloud.google.com/bigquery/docs/reference/standard-sql/remote-functions#:~:text=You%20cannot%20create%20temporary%20remote%20functions. - if dataset: - dataset_ref = bigquery.DatasetReference.from_string( - dataset, default_project=bigquery_client.project - ) - else: - dataset_ref = session._anonymous_dataset + dataset_ref = self._resolve_dataset_reference(session, bigquery_client, dataset) + + # A cloud functions client is required for cloud functions operations. + cloud_functions_client = self._resolve_cloud_functions_client( + session, cloud_functions_client + ) bq_location, cloud_function_region = _utils.get_remote_function_locations( bigquery_client.location ) - # A connection is required for BQ remote function + # A connection is required for BQ remote function. # https://cloud.google.com/bigquery/docs/reference/standard-sql/remote-functions#create_a_remote_function - if not bigquery_connection: - bigquery_connection = session._bq_connection # type: ignore - - bigquery_connection = clients.resolve_full_bq_connection_name( - bigquery_connection, - default_project=dataset_ref.project, - default_location=bq_location, + bq_connection_id = self._resolve_bigquery_connection_id( + session, dataset_ref, bq_location, bigquery_connection ) - # Guaranteed to be the form of .. - ( - gcp_project_id, - bq_connection_location, - bq_connection_id, - ) = bigquery_connection.split(".") - if gcp_project_id.casefold() != dataset_ref.project.casefold(): - raise ValueError( - "The project_id does not match BigQuery connection gcp_project_id: " - f"{dataset_ref.project}." - ) - if bq_connection_location.casefold() != bq_location.casefold(): - raise ValueError( - "The location does not match BigQuery connection location: " - f"{bq_location}." - ) - # If any CMEK is intended then check that a docker repository is also specified + # If any CMEK is intended then check that a docker repository is also specified. if ( cloud_function_kms_key_name is not None and cloud_function_docker_repository is None @@ -496,26 +553,26 @@ def wrapper(func): warnings.warn(msg, stacklevel=1, category=bfe.PreviewWarning) # we will model the row as a json serialized string containing the data - # and the metadata representing the row + # and the metadata representing the row. input_types = [str] is_row_processor = True elif isinstance(input_types, type): input_types = [input_types] - # TODO(b/340898611): fix type error + # TODO(b/340898611): fix type error. ibis_signature = _utils.ibis_signature_from_python_signature( signature, input_types, output_type # type: ignore ) remote_function_client = _function_client.FunctionClient( dataset_ref.project, - cloud_function_region, - cloud_functions_client, bq_location, dataset_ref.dataset_id, bigquery_client, bq_connection_id, bq_connection_manager, + cloud_function_region, + cloud_functions_client, cloud_function_service_account, cloud_function_kms_key_name, cloud_function_docker_repository, @@ -524,29 +581,20 @@ def wrapper(func): # To respect the user code/environment let's use a copy of the # original udf, especially since we would be setting some properties - # on it + # on it. func = cloudpickle.loads(cloudpickle.dumps(func)) - # In the unlikely case where the user is trying to re-deploy the same - # function, cleanup the attributes we add below, first. This prevents - # the pickle from having dependencies that might not otherwise be - # present such as ibis or pandas. - def try_delattr(attr): - try: - delattr(func, attr) - except AttributeError: - pass - - try_delattr("bigframes_cloud_function") - try_delattr("bigframes_remote_function") - try_delattr("input_dtypes") - try_delattr("output_dtype") - try_delattr("bigframes_bigquery_function_output_dtype") - try_delattr("is_row_processor") - try_delattr("ibis_node") + self._try_delattr(func, "bigframes_cloud_function") + self._try_delattr(func, "bigframes_remote_function") + self._try_delattr(func, "bigframes_bigquery_function") + self._try_delattr(func, "bigframes_bigquery_function_output_dtype") + self._try_delattr(func, "input_dtypes") + self._try_delattr(func, "output_dtype") + self._try_delattr(func, "is_row_processor") + self._try_delattr(func, "ibis_node") # resolve the output type that can be supported in the bigframes, - # ibis, BQ remote functions and cloud functions integration + # ibis, BQ remote functions and cloud functions integration. ibis_output_type_for_bqrf = ibis_signature.output_type bqrf_metadata = None if isinstance(ibis_signature.output_type, ibis_dtypes.Array): @@ -600,7 +648,7 @@ def try_delattr(attr): ] ) - # TODO: Move ibis logic to compiler step + # TODO: Move ibis logic to compiler step. node = ibis_udf.scalar.builtin( func, name=rf_name, @@ -611,11 +659,12 @@ def try_delattr(attr): func.bigframes_cloud_function = ( remote_function_client.get_cloud_function_fully_qualified_name(cf_name) ) - func.bigframes_remote_function = ( + func.bigframes_bigquery_function = ( remote_function_client.get_remote_function_fully_qualilfied_name( rf_name ) ) + func.bigframes_remote_function = func.bigframes_bigquery_function func.input_dtypes = tuple( [ bigframes.core.compile.ibis_types.ibis_dtype_to_bigframes_dtype( @@ -652,3 +701,234 @@ def try_delattr(attr): return func return wrapper + + def udf( + self, + input_types: Union[None, type, Sequence[type]] = None, + output_type: Optional[type] = None, + session: Optional[Session] = None, + bigquery_client: Optional[bigquery.Client] = None, + dataset: Optional[str] = None, + bigquery_connection: Optional[str] = None, + name: Optional[str] = None, + packages: Optional[Sequence[str]] = None, + ): + """Decorator to turn a Python udf into a BigQuery managed function. + + .. note:: + Please have following IAM roles enabled for you: + + * BigQuery Data Editor (roles/bigquery.dataEditor) + + Args: + input_types (type or sequence(type), Optional): + For scalar user defined function it should be the input type or + sequence of input types. The supported scalar input types are + `bool`, `bytes`, `float`, `int`, `str`. + output_type (type, Optional): + Data type of the output in the user defined function. If the + user defined function returns an array, then `list[type]` should + be specified. The supported output types are `bool`, `bytes`, + `float`, `int`, `str`, `list[bool]`, `list[float]`, `list[int]` + and `list[str]`. + session (bigframes.Session, Optional): + BigQuery DataFrames session to use for getting default project, + dataset and BigQuery connection. + bigquery_client (google.cloud.bigquery.Client, Optional): + Client to use for BigQuery operations. If this param is not + provided, then bigquery client from the session would be used. + dataset (str, Optional): + Dataset in which to create a BigQuery managed function. It + should be in `.` or `` + format. If this parameter is not provided then session dataset + id is used. + bigquery_connection (str, Optional): + Name of the BigQuery connection. It is used to provide an + identity to the serverless instances running the user code. It + helps BigQuery manage and track the resources used by the udf. + name (str, Optional): + Explicit name of the persisted BigQuery managed function. Use it + with caution, because more than one users working in the same + project and dataset could overwrite each other's managed + functions if they use the same persistent name. When an explicit + name is provided, any session specific clean up ( + ``bigframes.session.Session.close``/ + ``bigframes.pandas.close_session``/ + ``bigframes.pandas.reset_session``/ + ``bigframes.pandas.clean_up_by_session_id``) does not clean up + the function, and leaves it for the user to manage the function + and the associated cloud function directly. + packages (str[], Optional): + Explicit name of the external package dependencies. Each + dependency is added to the `requirements.txt` as is, and can be + of the form supported in + https://pip.pypa.io/en/stable/reference/requirements-file-format/. + """ + if not bigframes.options.experiments.udf: + raise NotImplementedError() + + # Check the Python version. + python_version = _utils.get_python_version() + if python_version not in _MANAGED_FUNC_PYTHON_VERSIONS: + raise RuntimeError( + f"Python version {python_version} is not supported yet for " + "BigFrames managed function." + ) + + # Some defaults may be used from the session if not provided otherwise. + session = self._resolve_session(session) + + # A BigQuery client is required to perform BQ operations. + bigquery_client = self._resolve_bigquery_client(session, bigquery_client) + + # BQ managed function must be persisted, for which we need a dataset. + dataset_ref = self._resolve_dataset_reference(session, bigquery_client, dataset) + + bq_location, _ = _utils.get_remote_function_locations(bigquery_client.location) + + # A connection is required for BQ managed function. + bq_connection_id = self._resolve_bigquery_connection_id( + session, dataset_ref, bq_location, bigquery_connection + ) + + bq_connection_manager = session.bqconnectionmanager + + # TODO(b/399129906): Write a method for the repeated part in the wrapper + # for both managed function and remote function. + def wrapper(func): + nonlocal input_types, output_type + + if not callable(func): + raise TypeError("f must be callable, got {}".format(func)) + + # Managed function supports version >= 3.11. + signature_kwargs: Mapping[str, Any] = {"eval_str": True} + signature = inspect.signature(func, **signature_kwargs) + + # Try to get input types via type annotations. + if input_types is None: + input_types = [] + for parameter in signature.parameters.values(): + if (param_type := parameter.annotation) is inspect.Signature.empty: + raise ValueError( + "'input_types' was not set and parameter " + f"'{parameter.name}' is missing a type annotation. " + "Types are required to use managed function." + ) + input_types.append(param_type) + elif not isinstance(input_types, collections.abc.Sequence): + input_types = [input_types] + + if output_type is None: + if ( + output_type := signature.return_annotation + ) is inspect.Signature.empty: + raise ValueError( + "'output_type' was not set and function is missing a " + "return type annotation. Types are required to use " + "managed function." + ) + + # The function will actually be receiving a pandas Series, but allow + # both BigQuery DataFrames and pandas object types for compatibility. + is_row_processor = False + if len(input_types) == 1 and ( + (input_type := input_types[0]) == bf_series.Series + or input_type == pandas.Series + ): + msg = "input_types=Series is in preview." + warnings.warn(msg, stacklevel=1, category=bfe.PreviewWarning) + + # we will model the row as a json serialized string containing + # the data and the metadata representing the row. + input_types = [str] + is_row_processor = True + elif isinstance(input_types, type): + input_types = [input_types] + + # TODO(b/340898611): fix type error. + ibis_signature = _utils.ibis_signature_from_python_signature( + signature, input_types, output_type # type: ignore + ) + + remote_function_client = _function_client.FunctionClient( + dataset_ref.project, + bq_location, + dataset_ref.dataset_id, + bigquery_client, + bq_connection_id, + bq_connection_manager, + session=session, # type: ignore + ) + + func = cloudpickle.loads(cloudpickle.dumps(func)) + + self._try_delattr(func, "bigframes_bigquery_function") + self._try_delattr(func, "input_dtypes") + self._try_delattr(func, "output_dtype") + self._try_delattr(func, "is_row_processor") + self._try_delattr(func, "ibis_node") + + bq_function_name = remote_function_client.provision_bq_managed_function( + func=func, + input_types=tuple( + third_party_ibis_bqtypes.BigQueryType.from_ibis(type_) + for type_ in ibis_signature.input_types + if type_ is not None + ), + output_type=third_party_ibis_bqtypes.BigQueryType.from_ibis( + ibis_signature.output_type + ), + name=name, + packages=packages, + is_row_processor=is_row_processor, + ) + + # TODO(shobs): Find a better way to support udfs with param named + # "name". This causes an issue in the ibis compilation. + func.__signature__ = inspect.signature(func).replace( # type: ignore + parameters=[ + inspect.Parameter( + f"bigframes_{param.name}", + param.kind, + ) + for param in inspect.signature(func).parameters.values() + ] + ) + + # TODO: Move ibis logic to compiler step. + node = ibis_udf.scalar.builtin( + func, + name=bq_function_name, + catalog=dataset_ref.project, + database=dataset_ref.dataset_id, + signature=(ibis_signature.input_types, ibis_signature.output_type), + ) # type: ignore + func.bigframes_bigquery_function = ( + remote_function_client.get_remote_function_fully_qualilfied_name( + bq_function_name + ) + ) + func.input_dtypes = tuple( + [ + bigframes.core.compile.ibis_types.ibis_dtype_to_bigframes_dtype( + input_type + ) + for input_type in ibis_signature.input_types + if input_type is not None + ] + ) + func.output_dtype = ( + bigframes.core.compile.ibis_types.ibis_dtype_to_bigframes_dtype( + ibis_signature.output_type + ) + ) + func.is_row_processor = is_row_processor + func.ibis_node = node + + if not name: + self._update_temp_artifacts(func.bigframes_bigquery_function, "") + + return func + + return wrapper diff --git a/bigframes/functions/_utils.py b/bigframes/functions/_utils.py index f1f8c97e7f..bd6bd920b8 100644 --- a/bigframes/functions/_utils.py +++ b/bigframes/functions/_utils.py @@ -16,6 +16,7 @@ import hashlib import inspect import json +import sys import typing from typing import cast, List, NamedTuple, Optional, Sequence, Set @@ -185,8 +186,8 @@ def get_cloud_function_name(function_hash, session_id=None, uniq_suffix=None): return _GCF_FUNCTION_NAME_SEPERATOR.join(parts) -def get_remote_function_name(function_hash, session_id, uniq_suffix=None): - "Get a name for the remote function for the given user defined function." +def get_bigframes_function_name(function_hash, session_id, uniq_suffix=None): + "Get a name for the bigframes function for the given user defined function." parts = [_BIGFRAMES_FUNCTION_PREFIX, session_id, function_hash] if uniq_suffix: parts.append(uniq_suffix) @@ -280,3 +281,12 @@ def get_bigframes_metadata(*, python_output_type: Optional[type] = None) -> str: ) return metadata_ser + + +def get_python_version(is_compat: bool = False) -> str: + # Cloud Run functions use the 'compat' format (e.g., python311, see more + # from https://cloud.google.com/functions/docs/runtime-support#python), + # while managed functions use the standard format (e.g., python-3.11). + major = sys.version_info.major + minor = sys.version_info.minor + return f"python{major}{minor}" if is_compat else f"python-{major}.{minor}" diff --git a/bigframes/functions/function.py b/bigframes/functions/function.py index c2809b96eb..392a209714 100644 --- a/bigframes/functions/function.py +++ b/bigframes/functions/function.py @@ -122,13 +122,22 @@ def get_routine_reference( def remote_function(*args, **kwargs): - remote_function_session = bff_session.FunctionSession() - return remote_function_session.remote_function(*args, **kwargs) + function_session = bff_session.FunctionSession() + return function_session.remote_function(*args, **kwargs) remote_function.__doc__ = bff_session.FunctionSession.remote_function.__doc__ +def udf(*args, **kwargs): + function_session = bff_session.FunctionSession() + return function_session.udf(*args, **kwargs) + + +udf.__doc__ = bff_session.FunctionSession.udf.__doc__ + + +# TODO(b/399894805): Support managed function. def read_gbq_function( function_name: str, *, diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index 50a71c9b9e..8ea7e6c320 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -108,6 +108,29 @@ def remote_function( remote_function.__doc__ = inspect.getdoc(bigframes.session.Session.remote_function) +def udf( + *, + input_types: Union[None, type, Sequence[type]] = None, + output_type: Optional[type] = None, + dataset: Optional[str] = None, + bigquery_connection: Optional[str] = None, + name: Optional[str] = None, + packages: Optional[Sequence[str]] = None, +): + return global_session.with_default_session( + bigframes.session.Session.udf, + input_types=input_types, + output_type=output_type, + dataset=dataset, + bigquery_connection=bigquery_connection, + name=name, + packages=packages, + ) + + +udf.__doc__ = inspect.getdoc(bigframes.session.Session.udf) + + @typing.overload def to_datetime( arg: Union[ diff --git a/bigframes/series.py b/bigframes/series.py index 33ba6f8599..d40ed39262 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -1525,10 +1525,18 @@ def apply( "Only a ufunc (a function that applies to the entire Series) or a remote function that only works on single values are supported." ) - if not hasattr(func, "bigframes_remote_function"): - # It is not a remote function + # TODO(jialuo): Deprecate the "bigframes_remote_function" attribute. + # We have some tests using pre-defined remote_function that were defined + # based on "bigframes_remote_function" instead of + # "bigframes_bigquery_function". So we need to fix those pre-defined + # remote functions before deprecating the "bigframes_remote_function" + # attribute. + if not hasattr(func, "bigframes_remote_function") and not hasattr( + func, "bigframes_bigquery_function" + ): + # It is neither a remote function nor a managed function. # Then it must be a vectorized function that applies to the Series - # as a whole + # as a whole. if by_row: raise ValueError( "A vectorized non-remote function can be provided only with by_row=False." @@ -1577,7 +1585,9 @@ def combine( "Only a ufunc (a function that applies to the entire Series) or a remote function that only works on single values are supported." ) - if not hasattr(func, "bigframes_remote_function"): + if not hasattr(func, "bigframes_remote_function") and not hasattr( + func, "bigframes_bigquery_function" + ): # Keep this in sync with .apply try: return func(self, other) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 770cac067c..8f53dccc06 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1407,6 +1407,83 @@ def remote_function( cloud_function_ingress_settings=cloud_function_ingress_settings, ) + def udf( + self, + *, + input_types: Union[None, type, Sequence[type]] = None, + output_type: Optional[type] = None, + dataset: Optional[str] = None, + bigquery_connection: Optional[str] = None, + name: Optional[str] = None, + packages: Optional[Sequence[str]] = None, + ): + """Decorator to turn a Python udf into a BigQuery managed function. + + .. note:: + Please have following IAM roles enabled for you: + + * BigQuery Data Editor (roles/bigquery.dataEditor) + + Args: + input_types (type or sequence(type), Optional): + For scalar user defined function it should be the input type or + sequence of input types. The supported scalar input types are + `bool`, `bytes`, `float`, `int`, `str`. + output_type (type, Optional): + Data type of the output in the user defined function. If the + user defined function returns an array, then `list[type]` should + be specified. The supported output types are `bool`, `bytes`, + `float`, `int`, `str`, `list[bool]`, `list[float]`, `list[int]` + and `list[str]`. + dataset (str, Optional): + Dataset in which to create a BigQuery managed function. It + should be in `.` or `` + format. If this parameter is not provided then session dataset + id is used. + bigquery_connection (str, Optional): + Name of the BigQuery connection. You should either have the + connection already created in the `location` you have chosen, or + you should have the Project IAM Admin role to enable the service + to create the connection for you if you need it. If this + parameter is not provided then the BigQuery connection from the + session is used. + name (str, Optional): + Explicit name of the persisted BigQuery managed function. Use it + with caution, because more than one users working in the same + project and dataset could overwrite each other's managed + functions if they use the same persistent name. When an explicit + name is provided, any session specific clean up ( + ``bigframes.session.Session.close``/ + ``bigframes.pandas.close_session``/ + ``bigframes.pandas.reset_session``/ + ``bigframes.pandas.clean_up_by_session_id``) does not clean up + the function, and leaves it for the user to manage the function + and the associated cloud function directly. + packages (str[], Optional): + Explicit name of the external package dependencies. Each + dependency is added to the `requirements.txt` as is, and can be + of the form supported in + https://pip.pypa.io/en/stable/reference/requirements-file-format/. + Returns: + collections.abc.Callable: + A managed function object pointing to the cloud assets created + in the background to support the remote execution. The cloud + ssets can be located through the following properties set in the + object: + + `bigframes_bigquery_function` - The bigquery managed function + deployed for the user defined code. + """ + return self._function_session.udf( + input_types, + output_type, + session=self, + dataset=dataset, + bigquery_connection=bigquery_connection, + name=name, + packages=packages, + ) + def read_gbq_function( self, function_name: str, diff --git a/tests/system/large/functions/test_managed_function.py b/tests/system/large/functions/test_managed_function.py new file mode 100644 index 0000000000..4db7a1c47c --- /dev/null +++ b/tests/system/large/functions/test_managed_function.py @@ -0,0 +1,166 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pandas +import pytest + +from bigframes.functions import _function_session as bff_session +from bigframes.functions._utils import get_python_version +import bigframes.pandas as bpd +from tests.system.utils import cleanup_function_assets + +bpd.options.experiments.udf = True + + +@pytest.mark.skipif( + get_python_version() not in bff_session._MANAGED_FUNC_PYTHON_VERSIONS, + reason=f"Supported version: {bff_session._MANAGED_FUNC_PYTHON_VERSIONS}", +) +def test_managed_function_multiply_with_ibis( + session, + scalars_table_id, + bigquery_client, + ibis_client, + dataset_id, +): + + try: + + @session.udf( + input_types=[int, int], + output_type=int, + dataset=dataset_id, + ) + def multiply(x, y): + return x * y + + _, dataset_name, table_name = scalars_table_id.split(".") + if not ibis_client.dataset: + ibis_client.dataset = dataset_name + + col_name = "int64_col" + table = ibis_client.tables[table_name] + table = table.filter(table[col_name].notnull()).order_by("rowindex").head(10) + sql = table.compile() + pandas_df_orig = bigquery_client.query(sql).to_dataframe() + + col = table[col_name] + col_2x = multiply(col, 2).name("int64_col_2x") + col_square = multiply(col, col).name("int64_col_square") + table = table.mutate([col_2x, col_square]) + sql = table.compile() + pandas_df_new = bigquery_client.query(sql).to_dataframe() + + pandas.testing.assert_series_equal( + pandas_df_orig[col_name] * 2, + pandas_df_new["int64_col_2x"], + check_names=False, + ) + + pandas.testing.assert_series_equal( + pandas_df_orig[col_name] * pandas_df_orig[col_name], + pandas_df_new["int64_col_square"], + check_names=False, + ) + finally: + # clean up the gcp assets created for the managed function. + cleanup_function_assets(multiply, bigquery_client) + + +@pytest.mark.skipif( + get_python_version() not in bff_session._MANAGED_FUNC_PYTHON_VERSIONS, + reason=f"Supported version: {bff_session._MANAGED_FUNC_PYTHON_VERSIONS}", +) +def test_managed_function_stringify_with_ibis( + session, + scalars_table_id, + bigquery_client, + ibis_client, + dataset_id, +): + try: + + @session.udf( + input_types=[int], + output_type=str, + dataset=dataset_id, + ) + def stringify(x): + return f"I got {x}" + + # Function should work locally. + assert stringify(8912) == "I got 8912" + + _, dataset_name, table_name = scalars_table_id.split(".") + if not ibis_client.dataset: + ibis_client.dataset = dataset_name + + col_name = "int64_col" + table = ibis_client.tables[table_name] + table = table.filter(table[col_name].notnull()).order_by("rowindex").head(10) + sql = table.compile() + pandas_df_orig = bigquery_client.query(sql).to_dataframe() + + col = table[col_name] + col_2x = stringify.ibis_node(col).name("int64_str_col") + table = table.mutate([col_2x]) + sql = table.compile() + pandas_df_new = bigquery_client.query(sql).to_dataframe() + + pandas.testing.assert_series_equal( + pandas_df_orig[col_name].apply(lambda x: f"I got {x}"), + pandas_df_new["int64_str_col"], + check_names=False, + ) + finally: + # clean up the gcp assets created for the managed function. + cleanup_function_assets( + bigquery_client, session.cloudfunctionsclient, stringify + ) + + +@pytest.mark.skipif( + get_python_version() not in bff_session._MANAGED_FUNC_PYTHON_VERSIONS, + reason=f"Supported version: {bff_session._MANAGED_FUNC_PYTHON_VERSIONS}", +) +def test_managed_function_binop(session, scalars_dfs, dataset_id): + try: + + def func(x, y): + return x * abs(y % 4) + + managed_func = session.udf( + input_types=[str, int], + output_type=str, + dataset=dataset_id, + )(func) + + scalars_df, scalars_pandas_df = scalars_dfs + + scalars_df = scalars_df.dropna() + scalars_pandas_df = scalars_pandas_df.dropna() + pd_result = scalars_pandas_df["string_col"].combine( + scalars_pandas_df["int64_col"], func + ) + bf_result = ( + scalars_df["string_col"] + .combine(scalars_df["int64_col"], managed_func) + .to_pandas() + ) + pandas.testing.assert_series_equal(bf_result, pd_result) + finally: + # clean up the gcp assets created for the managed function. + cleanup_function_assets( + session.bqclient, session.cloudfunctionsclient, managed_func + ) diff --git a/tests/system/large/functions/test_remote_function.py b/tests/system/large/functions/test_remote_function.py index 76b19c30b3..350eae3783 100644 --- a/tests/system/large/functions/test_remote_function.py +++ b/tests/system/large/functions/test_remote_function.py @@ -39,6 +39,7 @@ import bigframes.series from tests.system.utils import ( assert_pandas_df_equal, + cleanup_function_assets, delete_cloud_function, get_cloud_functions, ) @@ -55,30 +56,6 @@ ) -def cleanup_remote_function_assets( - bigquery_client, cloudfunctions_client, remote_udf, ignore_failures=True -): - """Clean up the GCP assets behind a bigframes remote function.""" - - # Clean up BQ remote function - try: - bigquery_client.delete_routine(remote_udf.bigframes_remote_function) - except Exception: - # By default don't raise exception in cleanup - if not ignore_failures: - raise - - # Clean up cloud function - try: - delete_cloud_function( - cloudfunctions_client, remote_udf.bigframes_cloud_function - ) - except Exception: - # By default don't raise exception in cleanup - if not ignore_failures: - raise - - def make_uniq_udf(udf): """Transform a udf to another with same behavior but a unique name. Use this to test remote functions with reuse=True, in which case parallel @@ -178,9 +155,7 @@ def multiply(x, y): ) finally: # clean up the gcp assets created for the remote function - cleanup_remote_function_assets( - bigquery_client, session.cloudfunctionsclient, multiply - ) + cleanup_function_assets(multiply, bigquery_client, session.cloudfunctionsclient) @pytest.mark.flaky(retries=2, delay=120) @@ -230,8 +205,8 @@ def stringify(x): ) finally: # clean up the gcp assets created for the remote function - cleanup_remote_function_assets( - bigquery_client, session.cloudfunctionsclient, stringify + cleanup_function_assets( + stringify, bigquery_client, session.cloudfunctionsclient ) @@ -265,8 +240,8 @@ def func(x, y): pandas.testing.assert_series_equal(bf_result, pd_result) finally: # clean up the gcp assets created for the remote function - cleanup_remote_function_assets( - session.bqclient, session.cloudfunctionsclient, remote_func + cleanup_function_assets( + remote_func, session.bqclient, session.cloudfunctionsclient ) @@ -302,8 +277,8 @@ def func(x, y): pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) finally: # clean up the gcp assets created for the remote function - cleanup_remote_function_assets( - session.bqclient, session.cloudfunctionsclient, remote_func + cleanup_function_assets( + remote_func, session.bqclient, session.cloudfunctionsclient ) @@ -347,9 +322,7 @@ def square(x): assert_pandas_df_equal(bf_result, pd_result) finally: # clean up the gcp assets created for the remote function - cleanup_remote_function_assets( - session.bqclient, session.cloudfunctionsclient, square - ) + cleanup_function_assets(square, session.bqclient, session.cloudfunctionsclient) @pytest.mark.flaky(retries=2, delay=120) @@ -393,8 +366,8 @@ def add_one(x): assert_pandas_df_equal(bf_result, pd_result) finally: # clean up the gcp assets created for the remote function - cleanup_remote_function_assets( - session.bqclient, session.cloudfunctionsclient, remote_add_one + cleanup_function_assets( + remote_add_one, session.bqclient, session.cloudfunctionsclient ) @@ -423,8 +396,8 @@ def add_one(x): pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) finally: # clean up the gcp assets created for the remote function - cleanup_remote_function_assets( - session.bqclient, session.cloudfunctionsclient, remote_add_one + cleanup_function_assets( + remote_add_one, session.bqclient, session.cloudfunctionsclient ) @@ -471,9 +444,7 @@ def square(x): assert_pandas_df_equal(bf_result, pd_result) finally: # clean up the gcp assets created for the remote function - cleanup_remote_function_assets( - session.bqclient, session.cloudfunctionsclient, square - ) + cleanup_function_assets(square, session.bqclient, session.cloudfunctionsclient) @pytest.mark.flaky(retries=2, delay=120) @@ -524,8 +495,8 @@ def sign(num): assert_pandas_df_equal(bf_result, pd_result) finally: # clean up the gcp assets created for the remote function - cleanup_remote_function_assets( - session.bqclient, session.cloudfunctionsclient, remote_sign + cleanup_function_assets( + remote_sign, session.bqclient, session.cloudfunctionsclient ) @@ -571,8 +542,8 @@ def circumference(radius): assert_pandas_df_equal(bf_result, pd_result) finally: # clean up the gcp assets created for the remote function - cleanup_remote_function_assets( - session.bqclient, session.cloudfunctionsclient, remote_circumference + cleanup_function_assets( + remote_circumference, session.bqclient, session.cloudfunctionsclient ) @@ -620,8 +591,8 @@ def find_team(num): assert_pandas_df_equal(bf_result, pd_result) finally: # clean up the gcp assets created for the remote function - cleanup_remote_function_assets( - session.bqclient, session.cloudfunctionsclient, remote_find_team + cleanup_function_assets( + remote_find_team, session.bqclient, session.cloudfunctionsclient ) @@ -757,8 +728,8 @@ def inner_test(): shutil.rmtree(add_one_uniq_dir) finally: # clean up the gcp assets created for the remote function - cleanup_remote_function_assets( - session.bqclient, session.cloudfunctionsclient, remote_add_one + cleanup_function_assets( + remote_add_one, session.bqclient, session.cloudfunctionsclient ) @@ -797,8 +768,8 @@ def is_odd(num): assert_pandas_df_equal(bf_result, pd_result) finally: # clean up the gcp assets created for the remote function - cleanup_remote_function_assets( - session.bqclient, session.cloudfunctionsclient, is_odd_remote + cleanup_function_assets( + is_odd_remote, session.bqclient, session.cloudfunctionsclient ) @@ -840,8 +811,8 @@ def is_odd(num): assert_pandas_df_equal(bf_result, pd_result) finally: # clean up the gcp assets created for the remote function - cleanup_remote_function_assets( - session.bqclient, session.cloudfunctionsclient, is_odd_remote + cleanup_function_assets( + is_odd_remote, session.bqclient, session.cloudfunctionsclient ) @@ -882,8 +853,8 @@ def test_remote_udf_lambda(session, scalars_dfs, dataset_id, bq_cf_connection): assert_pandas_df_equal(bf_result, pd_result) finally: # clean up the gcp assets created for the remote function - cleanup_remote_function_assets( - session.bqclient, session.cloudfunctionsclient, add_one_lambda_remote + cleanup_function_assets( + add_one_lambda_remote, session.bqclient, session.cloudfunctionsclient ) @@ -939,8 +910,8 @@ def square(x): assert_pandas_df_equal(bf_result, pd_result) finally: # clean up the gcp assets created for the remote function - cleanup_remote_function_assets( - session.bqclient, session.cloudfunctionsclient, square_remote + cleanup_function_assets( + square_remote, session.bqclient, session.cloudfunctionsclient ) @@ -984,8 +955,8 @@ def pd_np_foo(x): assert_pandas_df_equal(bf_result, pd_result) finally: # clean up the gcp assets created for the remote function - cleanup_remote_function_assets( - session.bqclient, session.cloudfunctionsclient, pd_np_foo_remote + cleanup_function_assets( + pd_np_foo_remote, session.bqclient, session.cloudfunctionsclient ) @@ -1137,14 +1108,14 @@ def plusone(x): test_internal(plusone_remote, plusone) finally: # clean up the gcp assets created for the remote function - cleanup_remote_function_assets( - session.bqclient, session.cloudfunctionsclient, square_remote1 + cleanup_function_assets( + square_remote1, session.bqclient, session.cloudfunctionsclient ) - cleanup_remote_function_assets( - session.bqclient, session.cloudfunctionsclient, square_remote2 + cleanup_function_assets( + square_remote2, session.bqclient, session.cloudfunctionsclient ) - cleanup_remote_function_assets( - session.bqclient, session.cloudfunctionsclient, plusone_remote + cleanup_function_assets( + plusone_remote, session.bqclient, session.cloudfunctionsclient ) for dir_ in dirs_to_cleanup: shutil.rmtree(dir_) @@ -1197,9 +1168,7 @@ def square(x): assert_pandas_df_equal(bf_result, pd_result) finally: # clean up the gcp assets created for the remote function - cleanup_remote_function_assets( - session.bqclient, session.cloudfunctionsclient, square - ) + cleanup_function_assets(square, session.bqclient, session.cloudfunctionsclient) @pytest.mark.flaky(retries=2, delay=120) @@ -1234,9 +1203,7 @@ def square(x): assert_pandas_df_equal(bf_result, pd_result) finally: # clean up the gcp assets created for the remote function - cleanup_remote_function_assets( - session.bqclient, session.cloudfunctionsclient, square - ) + cleanup_function_assets(square, session.bqclient, session.cloudfunctionsclient) @pytest.mark.flaky(retries=2, delay=120) @@ -1257,9 +1224,7 @@ def square(x): scalars_df["int64_col"].apply(square).to_pandas() finally: # clean up the gcp assets created for the remote function - cleanup_remote_function_assets( - session.bqclient, session.cloudfunctionsclient, square - ) + cleanup_function_assets(square, session.bqclient, session.cloudfunctionsclient) @pytest.mark.flaky(retries=2, delay=120) @@ -1302,9 +1267,7 @@ def square(x): assert_pandas_df_equal(bf_result, pd_result) finally: # clean up the gcp assets created for the remote function - cleanup_remote_function_assets( - session.bqclient, session.cloudfunctionsclient, square - ) + cleanup_function_assets(square, session.bqclient, session.cloudfunctionsclient) @pytest.mark.flaky(retries=2, delay=120) @@ -1355,8 +1318,8 @@ def square_num(x): assert gcf.service_config.service_account_email == gcf_service_account finally: # clean up the gcp assets created for the remote function - cleanup_remote_function_assets( - rf_session.bqclient, rf_session.cloudfunctionsclient, square_num + cleanup_function_assets( + square_num, rf_session.bqclient, rf_session.cloudfunctionsclient ) @@ -1430,8 +1393,8 @@ def square_num(x): finally: # clean up the gcp assets created for the remote function - cleanup_remote_function_assets( - session.bqclient, session.cloudfunctionsclient, square_num + cleanup_function_assets( + square_num, session.bqclient, session.cloudfunctionsclient ) @@ -1489,8 +1452,8 @@ def square_num(x): assert gcf.service_config.vpc_connector == gcf_vpc_connector finally: # clean up the gcp assets created for the remote function - cleanup_remote_function_assets( - rf_session.bqclient, rf_session.cloudfunctionsclient, square_num_remote + cleanup_function_assets( + square_num_remote, rf_session.bqclient, rf_session.cloudfunctionsclient ) @@ -1525,8 +1488,8 @@ def square(x): pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) finally: # clean up the gcp assets created for the remote function - cleanup_remote_function_assets( - session.bqclient, session.cloudfunctionsclient, square_remote + cleanup_function_assets( + square_remote, session.bqclient, session.cloudfunctionsclient ) @@ -1565,8 +1528,8 @@ def square(x): pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) finally: # clean up the gcp assets created for the remote function - cleanup_remote_function_assets( - session.bqclient, session.cloudfunctionsclient, square_remote + cleanup_function_assets( + square_remote, session.bqclient, session.cloudfunctionsclient ) @@ -1614,8 +1577,8 @@ def square(x): pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) finally: # clean up the gcp assets created for the remote function - cleanup_remote_function_assets( - session.bqclient, session.cloudfunctionsclient, square_remote + cleanup_function_assets( + square_remote, session.bqclient, session.cloudfunctionsclient ) @@ -1666,8 +1629,8 @@ def serialize_row(row): pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) finally: # clean up the gcp assets created for the remote function - cleanup_remote_function_assets( - session.bqclient, session.cloudfunctionsclient, serialize_row_remote + cleanup_function_assets( + serialize_row_remote, session.bqclient, session.cloudfunctionsclient ) @@ -1707,8 +1670,8 @@ def analyze(row): pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) finally: # clean up the gcp assets created for the remote function - cleanup_remote_function_assets( - session.bqclient, session.cloudfunctionsclient, analyze_remote + cleanup_function_assets( + analyze_remote, session.bqclient, session.cloudfunctionsclient ) @@ -1827,8 +1790,8 @@ def serialize_row(row): ) finally: # clean up the gcp assets created for the remote function - cleanup_remote_function_assets( - session.bqclient, session.cloudfunctionsclient, serialize_row_remote + cleanup_function_assets( + serialize_row_remote, session.bqclient, session.cloudfunctionsclient ) @@ -1890,8 +1853,8 @@ def float_parser(row): pandas.testing.assert_series_equal(bq_result, bf_result) finally: # clean up the gcp assets created for the remote function - cleanup_remote_function_assets( - session.bqclient, session.cloudfunctionsclient, float_parser_remote + cleanup_function_assets( + float_parser_remote, session.bqclient, session.cloudfunctionsclient ) @@ -1931,8 +1894,8 @@ def square(x: int) -> int: pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) finally: # clean up the gcp assets created for the remote function - cleanup_remote_function_assets( - session.bqclient, session.cloudfunctionsclient, square_remote + cleanup_function_assets( + square_remote, session.bqclient, session.cloudfunctionsclient ) @@ -2027,9 +1990,7 @@ def foo(x: int) -> int: assert gcf.state is functions_v2.Function.State.ACTIVE finally: # clean up the gcp assets created for the remote function - cleanup_remote_function_assets( - session.bqclient, session.cloudfunctionsclient, foo - ) + cleanup_function_assets(foo, session.bqclient, session.cloudfunctionsclient) @pytest.mark.flaky(retries=2, delay=120) @@ -2089,8 +2050,8 @@ def foo_named(x: int) -> int: assert gcf.state is functions_v2.Function.State.ACTIVE finally: # clean up the gcp assets created for the remote function - cleanup_remote_function_assets( - session.bqclient, session.cloudfunctionsclient, foo_named + cleanup_function_assets( + foo_named, session.bqclient, session.cloudfunctionsclient ) @@ -2166,9 +2127,7 @@ def foo(x, y, z): ) finally: # clean up the gcp assets created for the remote function - cleanup_remote_function_assets( - session.bqclient, session.cloudfunctionsclient, foo - ) + cleanup_function_assets(foo, session.bqclient, session.cloudfunctionsclient) def test_df_apply_axis_1_multiple_params_array_output(session): @@ -2254,9 +2213,7 @@ def foo(x, y, z): ) finally: # clean up the gcp assets created for the remote function - cleanup_remote_function_assets( - session.bqclient, session.cloudfunctionsclient, foo - ) + cleanup_function_assets(foo, session.bqclient, session.cloudfunctionsclient) def test_df_apply_axis_1_single_param_non_series(session): @@ -2318,9 +2275,7 @@ def foo(x): ) finally: # clean up the gcp assets created for the remote function - cleanup_remote_function_assets( - session.bqclient, session.cloudfunctionsclient, foo - ) + cleanup_function_assets(foo, session.bqclient, session.cloudfunctionsclient) @pytest.mark.flaky(retries=2, delay=120) @@ -2358,8 +2313,8 @@ def generate_stats(row: pandas.Series) -> list[int]: pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) finally: # clean up the gcp assets created for the remote function - cleanup_remote_function_assets( - session.bqclient, session.cloudfunctionsclient, generate_stats + cleanup_function_assets( + generate_stats, session.bqclient, session.cloudfunctionsclient ) @@ -2437,8 +2392,8 @@ def square(x: int) -> int: pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) finally: # clean up the gcp assets created for the remote function - cleanup_remote_function_assets( - session.bqclient, session.cloudfunctionsclient, square_remote + cleanup_function_assets( + square_remote, session.bqclient, session.cloudfunctionsclient ) @@ -2523,8 +2478,8 @@ def add_one(x: int) -> int: # clean up the gcp assets created for the temporary remote function, # just in case it was not explicitly cleaned up in the try clause due # to assertion failure or exception earlier than that - cleanup_remote_function_assets( - session.bqclient, session.cloudfunctionsclient, add_one_remote_temp + cleanup_function_assets( + add_one_remote_temp, session.bqclient, session.cloudfunctionsclient ) @@ -2600,8 +2555,8 @@ def add_one(x: int) -> int: ) finally: # clean up the gcp assets created for the persistent remote function - cleanup_remote_function_assets( - session.bqclient, session.cloudfunctionsclient, add_one_remote_persist + cleanup_function_assets( + add_one_remote_persist, session.bqclient, session.cloudfunctionsclient ) @@ -2647,8 +2602,8 @@ def featurize(x: int) -> list[array_dtype]: # type: ignore pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) finally: # clean up the gcp assets created for the remote function - cleanup_remote_function_assets( - session.bqclient, session.cloudfunctionsclient, featurize + cleanup_function_assets( + featurize, session.bqclient, session.cloudfunctionsclient ) @@ -2686,10 +2641,10 @@ def featurize(x: float) -> list[float]: # type: ignore pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) finally: # clean up the gcp assets created for the remote function - cleanup_remote_function_assets( + cleanup_function_assets( + featurize, unordered_session.bqclient, unordered_session.cloudfunctionsclient, - featurize, ) @@ -2722,6 +2677,6 @@ def featurize(x: int) -> list[float]: pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) finally: # clean up the gcp assets created for the remote function - cleanup_remote_function_assets( - session.bqclient, session.cloudfunctionsclient, featurize + cleanup_function_assets( + featurize, session.bqclient, session.cloudfunctionsclient ) diff --git a/tests/system/small/functions/test_managed_function.py b/tests/system/small/functions/test_managed_function.py new file mode 100644 index 0000000000..41a5785d01 --- /dev/null +++ b/tests/system/small/functions/test_managed_function.py @@ -0,0 +1,199 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import google.api_core.exceptions +import pandas as pd +import pytest + +import bigframes.exceptions +from bigframes.functions import _function_session as bff_session +from bigframes.functions._utils import get_python_version +from bigframes.pandas import udf +import bigframes.pandas as bpd +import bigframes.series +from tests.system.utils import assert_pandas_df_equal, get_function_name + +bpd.options.experiments.udf = True + + +@pytest.mark.skipif( + get_python_version() not in bff_session._MANAGED_FUNC_PYTHON_VERSIONS, + reason=f"Supported version: {bff_session._MANAGED_FUNC_PYTHON_VERSIONS}", +) +@pytest.mark.parametrize( + ("typ",), + [ + pytest.param(int), + pytest.param(float), + pytest.param(bool), + pytest.param(str), + pytest.param(bytes), + ], +) +def test_managed_function_series_apply( + typ, + scalars_dfs, + dataset_id_permanent, +): + def foo(x): + # The bytes() constructor expects a non-negative interger as its arg. + return typ(abs(x)) + + foo = udf( + input_types=int, + output_type=typ, + dataset=dataset_id_permanent, + name=get_function_name(foo), + )(foo) + + # Function should still work normally. + assert foo(-2) == typ(2) + + assert hasattr(foo, "bigframes_bigquery_function") + assert hasattr(foo, "ibis_node") + + scalars_df, scalars_pandas_df = scalars_dfs + + bf_result_col = scalars_df["int64_too"].apply(foo) + bf_result = ( + scalars_df["int64_too"].to_frame().assign(result=bf_result_col).to_pandas() + ) + + pd_result_col = scalars_pandas_df["int64_too"].apply(foo) + pd_result = scalars_pandas_df["int64_too"].to_frame().assign(result=pd_result_col) + + assert_pandas_df_equal(bf_result, pd_result, check_dtype=False) + + +@pytest.mark.skipif( + get_python_version() not in bff_session._MANAGED_FUNC_PYTHON_VERSIONS, + reason=f"Supported version: {bff_session._MANAGED_FUNC_PYTHON_VERSIONS}", +) +def test_managed_function_series_combine(dataset_id_permanent, scalars_dfs): + # This function is deliberately written to not work with NA input. + def add(x: int, y: int) -> int: + return x + y + + scalars_df, scalars_pandas_df = scalars_dfs + int_col_name_with_nulls = "int64_col" + int_col_name_no_nulls = "int64_too" + bf_df = scalars_df[[int_col_name_with_nulls, int_col_name_no_nulls]] + pd_df = scalars_pandas_df[[int_col_name_with_nulls, int_col_name_no_nulls]] + + # make sure there are NA values in the test column. + assert any([pd.isna(val) for val in bf_df[int_col_name_with_nulls]]) + + add_managed_func = udf( + dataset=dataset_id_permanent, + name=get_function_name(add), + )(add) + + # with nulls in the series the managed function application would fail. + with pytest.raises( + google.api_core.exceptions.BadRequest, match="unsupported operand" + ): + bf_df[int_col_name_with_nulls].combine( + bf_df[int_col_name_no_nulls], add_managed_func + ).to_pandas() + + # after filtering out nulls the managed function application should work + # similar to pandas. + pd_filter = pd_df[int_col_name_with_nulls].notnull() + pd_result = pd_df[pd_filter][int_col_name_with_nulls].combine( + pd_df[pd_filter][int_col_name_no_nulls], add + ) + bf_filter = bf_df[int_col_name_with_nulls].notnull() + bf_result = ( + bf_df[bf_filter][int_col_name_with_nulls] + .combine(bf_df[bf_filter][int_col_name_no_nulls], add_managed_func) + .to_pandas() + ) + + # ignore any dtype difference. + pd.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) + + +@pytest.mark.skipif( + get_python_version() not in bff_session._MANAGED_FUNC_PYTHON_VERSIONS, + reason=f"Supported version: {bff_session._MANAGED_FUNC_PYTHON_VERSIONS}", +) +def test_managed_function_dataframe_map(scalars_dfs, dataset_id_permanent): + def add_one(x): + return x + 1 + + mf_add_one = udf( + input_types=[int], + output_type=int, + dataset=dataset_id_permanent, + name=get_function_name(add_one), + )(add_one) + + scalars_df, scalars_pandas_df = scalars_dfs + int64_cols = ["int64_col", "int64_too"] + + bf_int64_df = scalars_df[int64_cols] + bf_int64_df_filtered = bf_int64_df.dropna() + bf_result = bf_int64_df_filtered.map(mf_add_one).to_pandas() + + pd_int64_df = scalars_pandas_df[int64_cols] + pd_int64_df_filtered = pd_int64_df.dropna() + pd_result = pd_int64_df_filtered.map(add_one) + # TODO(shobs): Figure why pandas .map() changes the dtype, i.e. + # pd_int64_df_filtered.dtype is Int64Dtype() + # pd_int64_df_filtered.map(lambda x: x).dtype is int64. + # For this test let's force the pandas dtype to be same as input. + for col in pd_result: + pd_result[col] = pd_result[col].astype(pd_int64_df_filtered[col].dtype) + + assert_pandas_df_equal(bf_result, pd_result) + + +@pytest.mark.skipif( + get_python_version() not in bff_session._MANAGED_FUNC_PYTHON_VERSIONS, + reason=f"Supported version: {bff_session._MANAGED_FUNC_PYTHON_VERSIONS}", +) +def test_managed_function_dataframe_apply_axis_1( + session, scalars_dfs, dataset_id_permanent +): + scalars_df, scalars_pandas_df = scalars_dfs + series = scalars_df["int64_too"] + series_pandas = scalars_pandas_df["int64_too"] + + def add_ints(x, y): + return x + y + + add_ints_mf = session.udf( + input_types=[int, int], + output_type=int, + dataset=dataset_id_permanent, + name=get_function_name(add_ints, is_row_processor=True), + )(add_ints) + assert add_ints_mf.bigframes_bigquery_function # type: ignore + + with pytest.warns( + bigframes.exceptions.PreviewWarning, match="axis=1 scenario is in preview." + ): + bf_result = ( + bpd.DataFrame({"x": series, "y": series}) + .apply(add_ints_mf, axis=1) + .to_pandas() + ) + + pd_result = pd.DataFrame({"x": series_pandas, "y": series_pandas}).apply( + lambda row: add_ints(row["x"], row["y"]), axis=1 + ) + + pd.testing.assert_series_equal( + pd_result, bf_result, check_dtype=False, check_exact=True + ) diff --git a/tests/system/small/functions/test_remote_function.py b/tests/system/small/functions/test_remote_function.py index 99a017c917..c12d0e03f5 100644 --- a/tests/system/small/functions/test_remote_function.py +++ b/tests/system/small/functions/test_remote_function.py @@ -29,7 +29,7 @@ from bigframes.functions import _utils as bff_utils from bigframes.functions import function as bff import bigframes.session._io.bigquery -from tests.system.utils import assert_pandas_df_equal +from tests.system.utils import assert_pandas_df_equal, get_function_name _prefixer = test_utils.prefixer.Prefixer("bigframes", "") @@ -92,20 +92,6 @@ def session_with_bq_connection(bq_cf_connection) -> bigframes.Session: return session -def get_rf_name(func, package_requirements=None, is_row_processor=False): - """Get a remote function name for testing given a udf.""" - # Augment user package requirements with any internal package - # requirements - package_requirements = bff_utils._get_updated_package_requirements( - package_requirements, is_row_processor - ) - - # Compute a unique hash representing the user code - function_hash = bff_utils._get_hash(func, package_requirements) - - return f"bigframes_{function_hash}" - - @pytest.mark.flaky(retries=2, delay=120) def test_remote_function_direct_no_session_param( bigquery_client, @@ -130,7 +116,7 @@ def square(x): bigquery_connection=bq_cf_connection, # See e2e tests for tests that actually deploy the Cloud Function. reuse=True, - name=get_rf_name(square), + name=get_function_name(square), )(square) # Function should still work normally. @@ -189,7 +175,7 @@ def square(x): bigquery_connection=bq_cf_connection_location, # See e2e tests for tests that actually deploy the Cloud Function. reuse=True, - name=get_rf_name(square), + name=get_function_name(square), )(square) # Function should still work normally. @@ -248,7 +234,7 @@ def square(x): bigquery_connection=bq_cf_connection_location_mismatched, # See e2e tests for tests that actually deploy the Cloud Function. reuse=True, - name=get_rf_name(square), + name=get_function_name(square), )(square) @@ -276,7 +262,7 @@ def square(x): bigquery_connection=bq_cf_connection_location_project, # See e2e tests for tests that actually deploy the Cloud Function. reuse=True, - name=get_rf_name(square), + name=get_function_name(square), )(square) # Function should still work normally. @@ -337,7 +323,7 @@ def square(x): bigquery_connection=bq_cf_connection_location_project_mismatched, # See e2e tests for tests that actually deploy the Cloud Function. reuse=True, - name=get_rf_name(square), + name=get_function_name(square), )(square) @@ -353,7 +339,7 @@ def square(x): int, session=session_with_bq_connection, dataset=dataset_id_permanent, - name=get_rf_name(square), + name=get_function_name(square), )(square) # Function should still work normally. @@ -398,7 +384,7 @@ def square(x): # udf is same as the one used in other tests in this file so the underlying # cloud function would be common and quickly reused. square = session_with_bq_connection.remote_function( - int, int, dataset_id_permanent, name=get_rf_name(square) + int, int, dataset_id_permanent, name=get_function_name(square) )(square) # Function should still work normally. @@ -442,7 +428,7 @@ def square(x): bq_cf_connection, # See e2e tests for tests that actually deploy the Cloud Function. reuse=True, - name=get_rf_name(square), + name=get_function_name(square), )(square) # Function should still work normally. @@ -480,7 +466,7 @@ def add_one(x): return x + 1 remote_add_one = session_with_bq_connection.remote_function( - [int], int, dataset_id_permanent, name=get_rf_name(add_one) + [int], int, dataset_id_permanent, name=get_function_name(add_one) )(add_one) scalars_df, scalars_pandas_df = scalars_dfs @@ -511,7 +497,7 @@ def add_one(x): return x + 1 remote_add_one = session_with_bq_connection.remote_function( - [int], int, dataset_id_permanent, name=get_rf_name(add_one) + [int], int, dataset_id_permanent, name=get_function_name(add_one) )(add_one) scalars_df, scalars_pandas_df = scalars_dfs @@ -542,7 +528,7 @@ def add_one(x): return x + 1 remote_add_one = session_with_bq_connection.remote_function( - [int], int, dataset_id_permanent, name=get_rf_name(add_one) + [int], int, dataset_id_permanent, name=get_function_name(add_one) )(add_one) scalars_df, scalars_pandas_df = scalars_dfs @@ -586,7 +572,7 @@ def bytes_to_hex(mybytes: bytes) -> bytes: packages = ["pandas"] remote_bytes_to_hex = session_with_bq_connection.remote_function( dataset=dataset_id_permanent, - name=get_rf_name(bytes_to_hex, package_requirements=packages), + name=get_function_name(bytes_to_hex, package_requirements=packages), packages=packages, )(bytes_to_hex) bf_result = scalars_df.bytes_col.map(remote_bytes_to_hex).to_pandas() @@ -630,7 +616,10 @@ def add_one(x): return x + 1 # pragma: NO COVER session.remote_function( - [int], int, dataset=dataset_id_permanent, name=get_rf_name(add_one) + [int], + int, + dataset=dataset_id_permanent, + name=get_function_name(add_one), )(add_one) @@ -669,7 +658,7 @@ def square1(x): resource_manager_client=resourcemanager_client, bigquery_connection=bq_cf_connection, reuse=True, - name=get_rf_name(square1), + name=get_function_name(square1), )(square1) # Function should still work normally. @@ -1142,7 +1131,7 @@ def add_ints(row): bigframes.series.Series, int, dataset_id_permanent, - name=get_rf_name(add_ints, is_row_processor=True), + name=get_function_name(add_ints, is_row_processor=True), )(add_ints) assert add_ints_remote.bigframes_remote_function # type: ignore assert add_ints_remote.bigframes_cloud_function # type: ignore @@ -1191,7 +1180,7 @@ def add_ints(row): bigframes.series.Series, int, dataset_id_permanent, - name=get_rf_name(add_ints, is_row_processor=True), + name=get_function_name(add_ints, is_row_processor=True), )(add_ints) bf_result = ( @@ -1230,7 +1219,7 @@ def add_numbers(row): bigframes.series.Series, float, dataset_id_permanent, - name=get_rf_name(add_numbers, is_row_processor=True), + name=get_function_name(add_numbers, is_row_processor=True), )(add_numbers) bf_result = bf_df.apply(add_numbers_remote, axis=1).to_pandas() @@ -1257,7 +1246,9 @@ def add_ints(row): # pandas works scalars_pandas_df.apply(add_ints, axis=1) - with pytest.raises(ValueError, match="For axis=1 a remote function must be used."): + with pytest.raises( + ValueError, match="For axis=1 a bigframes function must be used." + ): scalars_df[columns].apply(add_ints, axis=1) @@ -1281,7 +1272,7 @@ def echo_len(row): bigframes.series.Series, float, dataset_id_permanent, - name=get_rf_name(echo_len, is_row_processor=True), + name=get_function_name(echo_len, is_row_processor=True), )(echo_len) for column in columns_with_not_supported_dtypes: @@ -1314,7 +1305,7 @@ def should_mask(name: str) -> bool: assert "name" in inspect.signature(should_mask).parameters should_mask = session.remote_function( - dataset=dataset_id_permanent, name=get_rf_name(should_mask) + dataset=dataset_id_permanent, name=get_function_name(should_mask) )(should_mask) s = bigframes.series.Series(["Alice", "Bob", "Caroline"]) @@ -1373,7 +1364,7 @@ def is_odd(x: int) -> bool: # create a remote function is_odd_remote = session.remote_function( - dataset=dataset_id_permanent, name=get_rf_name(is_odd) + dataset=dataset_id_permanent, name=get_function_name(is_odd) )(is_odd) # with nulls in the series the remote function application would fail @@ -1423,7 +1414,7 @@ def add(x: int, y: int) -> int: # create a remote function add_remote = session.remote_function( - dataset=dataset_id_permanent, name=get_rf_name(add) + dataset=dataset_id_permanent, name=get_function_name(add) )(add) # with nulls in the series the remote function application would fail @@ -1476,7 +1467,7 @@ def add(x: int, y: int, z: float) -> float: # create a remote function add_remote = session.remote_function( - dataset=dataset_id_permanent, name=get_rf_name(add) + dataset=dataset_id_permanent, name=get_function_name(add) )(add) # pandas does not support nary functions, so let's create a proxy function @@ -1530,7 +1521,8 @@ def is_long_duration(minutes: int) -> bool: return minutes >= 120 is_long_duration = unordered_session.remote_function( - dataset=dataset_id_permanent, name=get_rf_name(is_long_duration) + dataset=dataset_id_permanent, + name=get_function_name(is_long_duration), )(is_long_duration) method = getattr(df["duration_minutes"], method) @@ -1549,7 +1541,7 @@ def combiner(x: int, y: int) -> int: return x combiner = unordered_session.remote_function( - dataset=dataset_id_permanent, name=get_rf_name(combiner) + dataset=dataset_id_permanent, name=get_function_name(combiner) )(combiner) df = scalars_df_index[["int64_col", "int64_too", "float64_col", "string_col"]] @@ -1565,7 +1557,7 @@ def processor(x: int, y: int, z: float, w: str) -> str: return f"I got x={x}, y={y}, z={z} and w={w}" processor = unordered_session.remote_function( - dataset=dataset_id_permanent, name=get_rf_name(processor) + dataset=dataset_id_permanent, name=get_function_name(processor) )(processor) df = scalars_df_index[["int64_col", "int64_too", "float64_col", "string_col"]] diff --git a/tests/system/utils.py b/tests/system/utils.py index 0772468085..fd8feb0eeb 100644 --- a/tests/system/utils.py +++ b/tests/system/utils.py @@ -383,3 +383,44 @@ def delete_cloud_function( def get_first_file_from_wildcard(path): return path.replace("*", "000000000000") + + +def cleanup_function_assets( + bigframes_func, + bigquery_client, + cloudfunctions_client=None, + ignore_failures=True, +) -> None: + """Clean up the GCP assets behind a bigframess function.""" + + # Clean up bigframes function. + try: + bigquery_client.delete_routine(bigframes_func.bigframes_bigquery_function) + except Exception: + # By default don't raise exception in cleanup. + if not ignore_failures: + raise + + # Clean up cloud function + try: + delete_cloud_function( + cloudfunctions_client, bigframes_func.bigframes_cloud_function + ) + except Exception: + # By default don't raise exception in cleanup. + if not ignore_failures: + raise + + +def get_function_name(func, package_requirements=None, is_row_processor=False): + """Get a bigframes function name for testing given a udf.""" + # Augment user package requirements with any internal package + # requirements. + package_requirements = bff_utils._get_updated_package_requirements( + package_requirements, is_row_processor + ) + + # Compute a unique hash representing the user code. + function_hash = bff_utils._get_hash(func, package_requirements) + + return f"bigframes_{function_hash}"