Skip to content

feat: create deploy_remote_function and deploy_udf functions to immediately deploy functions to BigQuery #1832

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jun 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions bigframes/core/global_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ def get_global_session():
_T = TypeVar("_T")


def with_default_session(func: Callable[..., _T], *args, **kwargs) -> _T:
return func(get_global_session(), *args, **kwargs)
def with_default_session(func_: Callable[..., _T], *args, **kwargs) -> _T:
return func_(get_global_session(), *args, **kwargs)


class _GlobalSessionContext:
Expand Down
50 changes: 50 additions & 0 deletions bigframes/functions/_function_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,30 @@ def wrapper(func):

return wrapper

def deploy_remote_function(
self,
func,
**kwargs,
):
"""Orchestrates the creation of a BigQuery remote function that deploys immediately.

This method ensures that the remote function is created and available for
use in BigQuery as soon as this call is made.

Args:
kwargs:
All arguments are passed directly to
:meth:`~bigframes.session.Session.remote_function`. Please see
its docstring for parameter details.

Returns:
A wrapped remote function, usable in
:meth:`~bigframes.series.Series.apply`.
"""
# TODO(tswast): If we update remote_function to defer deployment, update
# this method to deploy immediately.
return self.remote_function(**kwargs)(func)

def udf(
self,
input_types: Union[None, type, Sequence[type]] = None,
Expand Down Expand Up @@ -866,6 +890,32 @@ def wrapper(func):

return wrapper

def deploy_udf(
self,
func,
**kwargs,
):
"""Orchestrates the creation of a BigQuery UDF that deploys immediately.

This method ensures that the UDF is created and available for
use in BigQuery as soon as this call is made.

Args:
func:
Function to deploy.
kwargs:
All arguments are passed directly to
:meth:`~bigframes.session.Session.udf`. Please see
its docstring for parameter details.

Returns:
A wrapped Python user defined function, usable in
:meth:`~bigframes.series.Series.apply`.
"""
# TODO(tswast): If we update udf to defer deployment, update this method
# to deploy immediately.
return self.udf(**kwargs)(func)


def _convert_row_processor_sig(
signature: inspect.Signature,
Expand Down
32 changes: 32 additions & 0 deletions bigframes/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,22 @@ def remote_function(
remote_function.__doc__ = inspect.getdoc(bigframes.session.Session.remote_function)


def deploy_remote_function(
func,
**kwargs,
):
return global_session.with_default_session(
bigframes.session.Session.deploy_remote_function,
func=func,
**kwargs,
)


deploy_remote_function.__doc__ = inspect.getdoc(
bigframes.session.Session.deploy_remote_function
)


def udf(
*,
input_types: Union[None, type, Sequence[type]] = None,
Expand All @@ -140,6 +156,20 @@ def udf(
udf.__doc__ = inspect.getdoc(bigframes.session.Session.udf)


def deploy_udf(
func,
**kwargs,
):
return global_session.with_default_session(
bigframes.session.Session.deploy_udf,
func=func,
**kwargs,
)


deploy_udf.__doc__ = inspect.getdoc(bigframes.session.Session.deploy_udf)


@typing.overload
def to_datetime(
arg: Union[
Expand Down Expand Up @@ -330,6 +360,8 @@ def reset_session():
clean_up_by_session_id,
concat,
cut,
deploy_remote_function,
deploy_udf,
get_default_session_id,
get_dummies,
merge,
Expand Down
78 changes: 76 additions & 2 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1343,6 +1343,40 @@ def _check_file_size(self, filepath: str):
"for large files to avoid loading the file into local memory."
)

def deploy_remote_function(
self,
func,
**kwargs,
):
"""Orchestrates the creation of a BigQuery remote function that deploys immediately.

This method ensures that the remote function is created and available for
use in BigQuery as soon as this call is made.

Args:
func:
Function to deploy.
kwargs:
All arguments are passed directly to
:meth:`~bigframes.session.Session.remote_function`. Please see
its docstring for parameter details.

Returns:
A wrapped remote function, usable in
:meth:`~bigframes.series.Series.apply`.
"""
return self._function_session.deploy_remote_function(
func,
# Session-provided arguments.
session=self,
bigquery_client=self._clients_provider.bqclient,
bigquery_connection_client=self._clients_provider.bqconnectionclient,
cloud_functions_client=self._clients_provider.cloudfunctionsclient,
resource_manager_client=self._clients_provider.resourcemanagerclient,
# User-provided arguments.
**kwargs,
)

def remote_function(
self,
# Make sure that the input/output types, and dataset can be used
Expand Down Expand Up @@ -1565,9 +1599,15 @@ def remote_function(
`bigframes_remote_function` - The bigquery remote function capable of calling into `bigframes_cloud_function`.
"""
return self._function_session.remote_function(
# Session-provided arguments.
session=self,
bigquery_client=self._clients_provider.bqclient,
bigquery_connection_client=self._clients_provider.bqconnectionclient,
cloud_functions_client=self._clients_provider.cloudfunctionsclient,
resource_manager_client=self._clients_provider.resourcemanagerclient,
# User-provided arguments.
input_types=input_types,
output_type=output_type,
session=self,
dataset=dataset,
bigquery_connection=bigquery_connection,
reuse=reuse,
Expand All @@ -1585,6 +1625,37 @@ def remote_function(
cloud_build_service_account=cloud_build_service_account,
)

def deploy_udf(
self,
func,
**kwargs,
):
"""Orchestrates the creation of a BigQuery UDF that deploys immediately.

This method ensures that the UDF is created and available for
use in BigQuery as soon as this call is made.

Args:
func:
Function to deploy.
kwargs:
All arguments are passed directly to
:meth:`~bigframes.session.Session.udf`. Please see
its docstring for parameter details.

Returns:
A wrapped Python user defined function, usable in
:meth:`~bigframes.series.Series.apply`.
"""
return self._function_session.deploy_udf(
func,
# Session-provided arguments.
session=self,
bigquery_client=self._clients_provider.bqclient,
# User-provided arguments.
**kwargs,
)

def udf(
self,
*,
Expand Down Expand Up @@ -1726,9 +1797,12 @@ def udf(
deployed for the user defined code.
"""
return self._function_session.udf(
# Session-provided arguments.
session=self,
bigquery_client=self._clients_provider.bqclient,
# User-provided arguments.
input_types=input_types,
output_type=output_type,
session=self,
dataset=dataset,
bigquery_connection=bigquery_connection,
name=name,
Expand Down
54 changes: 54 additions & 0 deletions tests/unit/functions/test_remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,57 @@ def function_without_return_annotation(myparam: int):
match="'output_type' was not set .* missing a return type annotation",
):
remote_function_decorator(function_without_return_annotation)


def test_deploy_remote_function():
session = mocks.create_bigquery_session()

def my_remote_func(x: int) -> int:
return x * 2

deployed = session.deploy_remote_function(
my_remote_func, cloud_function_service_account="[email protected]"
)

# Test that the function would have been deployed somewhere.
assert deployed.bigframes_bigquery_function


def test_deploy_remote_function_with_name():
session = mocks.create_bigquery_session()

def my_remote_func(x: int) -> int:
return x * 2

deployed = session.deploy_remote_function(
my_remote_func,
name="my_custom_name",
cloud_function_service_account="[email protected]",
)

# Test that the function would have been deployed somewhere.
assert "my_custom_name" in deployed.bigframes_bigquery_function


def test_deploy_udf():
session = mocks.create_bigquery_session()

def my_remote_func(x: int) -> int:
return x * 2

deployed = session.deploy_udf(my_remote_func)

# Test that the function would have been deployed somewhere.
assert deployed.bigframes_bigquery_function


def test_deploy_udf_with_name():
session = mocks.create_bigquery_session()

def my_remote_func(x: int) -> int:
return x * 2

deployed = session.deploy_udf(my_remote_func, name="my_custom_name")

# Test that the function would have been deployed somewhere.
assert "my_custom_name" in deployed.bigframes_bigquery_function