diff --git a/packages/bigframes/bigframes/core/blocks.py b/packages/bigframes/bigframes/core/blocks.py index b9a246fc0360..33f5aaab5c7d 100644 --- a/packages/bigframes/bigframes/core/blocks.py +++ b/packages/bigframes/bigframes/core/blocks.py @@ -1091,14 +1091,14 @@ def multi_apply_window_op( def multi_apply_unary_op( self, - op: Union[ops.UnaryOp, ex.Expression], + op: Union[ops.UnaryOp, ops.NaryOp, ex.Expression], ) -> Block: - if isinstance(op, ops.UnaryOp): + if isinstance(op, (ops.UnaryOp, ops.NaryOp)): input_varname = guid.generate_guid() expr = op.as_expr(ex.free_var(input_varname)) else: input_varnames = op.free_variables - assert len(input_varnames) == 1 + assert len(set(input_varnames)) == 1 expr = op input_varname = input_varnames[0] diff --git a/packages/bigframes/bigframes/core/compile/ibis_compiler/scalar_op_registry.py b/packages/bigframes/bigframes/core/compile/ibis_compiler/scalar_op_registry.py index e9e8435ea1a1..5172d1e7c602 100644 --- a/packages/bigframes/bigframes/core/compile/ibis_compiler/scalar_op_registry.py +++ b/packages/bigframes/bigframes/core/compile/ibis_compiler/scalar_op_registry.py @@ -1034,44 +1034,8 @@ def timedelta_floor_op_impl(x: ibis_types.NumericValue): return ibis_api.case().when(x > ibis.literal(0), x.floor()).else_(x.ceil()).end() -@scalar_op_compiler.register_unary_op(ops.RemoteFunctionOp, pass_op=True) -def remote_function_op_impl(x: ibis_types.Value, op: ops.RemoteFunctionOp): - udf_sig = op.function_def.signature - assert not udf_sig.is_virtual # should have been devirtualized in lowering pass - ibis_py_sig = (tuple(arg.py_type for arg in udf_sig.inputs), udf_sig.output.py_type) - - @ibis_udf.scalar.builtin( - name=str(op.function_def.routine_ref), signature=ibis_py_sig - ) - def udf(input): ... - - x_transformed = udf(x) - if not op.apply_on_null: - return ibis_api.case().when(x.isnull(), x).else_(x_transformed).end() - return x_transformed - - -@scalar_op_compiler.register_binary_op(ops.BinaryRemoteFunctionOp, pass_op=True) -def binary_remote_function_op_impl( - x: ibis_types.Value, y: ibis_types.Value, op: ops.BinaryRemoteFunctionOp -): - udf_sig = op.function_def.signature - assert not udf_sig.is_virtual # should have been devirtualized in lowering pass - ibis_py_sig = (tuple(arg.py_type for arg in udf_sig.inputs), udf_sig.output.py_type) - - @ibis_udf.scalar.builtin( - name=str(op.function_def.routine_ref), signature=ibis_py_sig - ) - def udf(input1, input2): ... - - x_transformed = udf(x, y) - return x_transformed - - -@scalar_op_compiler.register_nary_op(ops.NaryRemoteFunctionOp, pass_op=True) -def nary_remote_function_op_impl( - *operands: ibis_types.Value, op: ops.NaryRemoteFunctionOp -): +@scalar_op_compiler.register_nary_op(ops.RemoteFunctionOp, pass_op=True) +def remote_function_op_impl(*values: ibis_types.Value, op: ops.RemoteFunctionOp): udf_sig = op.function_def.signature assert not udf_sig.is_virtual # should have been devirtualized in lowering pass ibis_py_sig = (tuple(arg.py_type for arg in udf_sig.inputs), udf_sig.output.py_type) @@ -1084,8 +1048,7 @@ def nary_remote_function_op_impl( ) def udf(*inputs): ... - result = udf(*operands) - return result + return udf(*values) @scalar_op_compiler.register_unary_op(ops.MapOp, pass_op=True) diff --git a/packages/bigframes/bigframes/core/compile/sqlglot/expressions/generic_ops.py b/packages/bigframes/bigframes/core/compile/sqlglot/expressions/generic_ops.py index 644e7bc365f7..d78c18f01499 100644 --- a/packages/bigframes/bigframes/core/compile/sqlglot/expressions/generic_ops.py +++ b/packages/bigframes/bigframes/core/compile/sqlglot/expressions/generic_ops.py @@ -186,33 +186,9 @@ def _get_remote_function_name(op): ) -@register_unary_op(ops.RemoteFunctionOp, pass_op=True) -def _(expr: TypedExpr, op: ops.RemoteFunctionOp) -> sge.Expression: - func_name = _get_remote_function_name(op) - func = sge.func(func_name, expr.expr) - - if not op.apply_on_null: - return sge.If( - this=sge.Is(this=expr.expr, expression=sge.Null()), - true=expr.expr, - false=func, - ) - - return func - - -@register_binary_op(ops.BinaryRemoteFunctionOp, pass_op=True) -def _( - left: TypedExpr, right: TypedExpr, op: ops.BinaryRemoteFunctionOp -) -> sge.Expression: - func_name = _get_remote_function_name(op) - return sge.func(func_name, left.expr, right.expr) - - -@register_nary_op(ops.NaryRemoteFunctionOp, pass_op=True) -def _(*operands: TypedExpr, op: ops.NaryRemoteFunctionOp) -> sge.Expression: - func_name = _get_remote_function_name(op) - return sge.func(func_name, *(operand.expr for operand in operands)) +@register_nary_op(ops.RemoteFunctionOp, pass_op=True) +def _(*values: TypedExpr, op: ops.RemoteFunctionOp) -> sge.Expression: + return sge.func(_get_remote_function_name(op), *(value.expr for value in values)) @register_nary_op(ops.case_when_op) diff --git a/packages/bigframes/bigframes/core/rewrite/udfs.py b/packages/bigframes/bigframes/core/rewrite/udfs.py index 284ac4217c09..286a9d9d9401 100644 --- a/packages/bigframes/bigframes/core/rewrite/udfs.py +++ b/packages/bigframes/bigframes/core/rewrite/udfs.py @@ -32,7 +32,6 @@ def lower(self, expr: expression.OpExpression) -> expression.Expression: func_def = expr.op.function_def devirtualized_expr = ops.RemoteFunctionOp( func_def.with_devirtualize(), - apply_on_null=expr.op.apply_on_null, ).as_expr(*expr.children) if isinstance(func_def.signature.output, udf_def.VirtualListTypeV1): return func_def.signature.output.out_expr(devirtualized_expr) @@ -40,47 +39,7 @@ def lower(self, expr: expression.OpExpression) -> expression.Expression: return devirtualized_expr -@dataclasses.dataclass -class LowerBinaryRemoteFunctionRule(op_lowering.OpLoweringRule): - @property - def op(self) -> type[ops.ScalarOp]: - return ops.BinaryRemoteFunctionOp - - def lower(self, expr: expression.OpExpression) -> expression.Expression: - assert isinstance(expr.op, ops.BinaryRemoteFunctionOp) - func_def = expr.op.function_def - devirtualized_expr = ops.BinaryRemoteFunctionOp( - func_def.with_devirtualize(), - ).as_expr(*expr.children) - if isinstance(func_def.signature.output, udf_def.VirtualListTypeV1): - return func_def.signature.output.out_expr(devirtualized_expr) - else: - return devirtualized_expr - - -@dataclasses.dataclass -class LowerNaryRemoteFunctionRule(op_lowering.OpLoweringRule): - @property - def op(self) -> type[ops.ScalarOp]: - return ops.NaryRemoteFunctionOp - - def lower(self, expr: expression.OpExpression) -> expression.Expression: - assert isinstance(expr.op, ops.NaryRemoteFunctionOp) - func_def = expr.op.function_def - devirtualized_expr = ops.NaryRemoteFunctionOp( - func_def.with_devirtualize(), - ).as_expr(*expr.children) - if isinstance(func_def.signature.output, udf_def.VirtualListTypeV1): - return func_def.signature.output.out_expr(devirtualized_expr) - else: - return devirtualized_expr - - -UDF_LOWERING_RULES = ( - LowerRemoteFunctionRule(), - LowerBinaryRemoteFunctionRule(), - LowerNaryRemoteFunctionRule(), -) +UDF_LOWERING_RULES = (LowerRemoteFunctionRule(),) def lower_udfs(root: bigframe_node.BigFrameNode) -> bigframe_node.BigFrameNode: diff --git a/packages/bigframes/bigframes/dataframe.py b/packages/bigframes/bigframes/dataframe.py index 33ca3b0a4ce9..960aab25c355 100644 --- a/packages/bigframes/bigframes/dataframe.py +++ b/packages/bigframes/bigframes/dataframe.py @@ -4678,12 +4678,14 @@ def map(self, func, na_action: Optional[str] = None) -> DataFrame: if na_action not in {None, "ignore"}: raise ValueError(f"na_action={na_action} not supported") - # TODO(shobs): Support **kwargs - return self._apply_unary_op( - ops.RemoteFunctionOp( - function_def=func.udf_def, apply_on_null=(na_action is None) + expr = ops.func_to_op(func).as_expr(ex.free_var("input")) + if na_action == "ignore": + # True case, predicate, False case + expr = ops.where_op.as_expr( + expr, ops.notnull_op.as_expr(ex.free_var("input")), ex.const(None) ) - ) + + return DataFrame(self._block.multi_apply_unary_op(expr)) def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs): # In Bigframes BigQuery function, DataFrame '.apply' method is specifically @@ -4754,17 +4756,11 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs): ) # Apply the function - if args: - result_series = rows_as_json_series._apply_nary_op( - ops.NaryRemoteFunctionOp(function_def=func.udf_def), - list(args), - ) - else: - result_series = rows_as_json_series._apply_unary_op( - ops.RemoteFunctionOp( - function_def=func.udf_def, apply_on_null=True - ) - ) + result_series = rows_as_json_series._apply_nary_op( + ops.func_to_op(func), + list(args), + ) + else: # This is a special case where we are providing not-pandas-like # extension. If the bigquery function can take one or more @@ -4822,7 +4818,7 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs): series_list = [self[col] for col in self.columns] op_list = series_list[1:] + list(args) result_series = series_list[0]._apply_nary_op( - ops.NaryRemoteFunctionOp(function_def=func.udf_def), op_list + ops.func_to_op(func), op_list ) result_series.name = None diff --git a/packages/bigframes/bigframes/functions/_function_session.py b/packages/bigframes/bigframes/functions/_function_session.py index 186ecbcf3e1f..281785fbea67 100644 --- a/packages/bigframes/bigframes/functions/_function_session.py +++ b/packages/bigframes/bigframes/functions/_function_session.py @@ -20,6 +20,7 @@ import inspect import sys import threading +import time import warnings from typing import ( TYPE_CHECKING, @@ -64,6 +65,9 @@ def __init__(self): # Lock to synchronize the update of the session artifacts self._artifacts_lock = threading.Lock() + self._deployed_routines: set[bytes] = set() + self._deploying_routines: set[bytes] = set() + def _resolve_session(self, session: Optional[Session]) -> Session: """Resolves the BigFrames session.""" import bigframes.pandas as bpd @@ -191,6 +195,80 @@ def _update_temp_artifacts(self, bqrf_routine: str, gcf_path: str): with self._artifacts_lock: self._temp_artifacts[bqrf_routine] = gcf_path + def _deploy_udf( + self, + session: Session, + bq_udf: udf_def.PythonUdf, + ) -> udf_def.BigqueryUdf: + """Deploys a UDF to BigQuery if not already deployed.""" + udf_hash = bq_udf.stable_hash() + + bigquery_client = self._resolve_bigquery_client(session, None) + bq_connection_manager = session.bqconnectionmanager + dataset_ref = self._resolve_dataset_reference(session, bigquery_client, None) + bq_location, _ = _utils.get_remote_function_locations(bigquery_client.location) + + managed_function_client = _function_client.FunctionClient( + dataset_ref.project, + bq_location, + dataset_ref.dataset_id, + bigquery_client, + bq_connection_manager, + session=session, + ) + + config = bq_udf.to_managed_function_config() + bq_function_name = _function_client.get_managed_function_name( + config, session.session_id + ) + full_rf_name = ( + managed_function_client.get_remote_function_fully_qualilfied_name( + bq_function_name + ) + ) + routine_ref = bigquery.RoutineReference.from_string(full_rf_name) + + with self._artifacts_lock: + if udf_hash in self._deployed_routines: + return udf_def.BigqueryUdf( + routine_ref=routine_ref, + signature=bq_udf.signature, + ) + + while True: + with self._artifacts_lock: + if udf_hash in self._deployed_routines: + return udf_def.BigqueryUdf( + routine_ref=routine_ref, + signature=bq_udf.signature, + ) + + if udf_hash not in self._deploying_routines: + self._deploying_routines.add(udf_hash) + break + + time.sleep(0.2) + + try: + managed_function_client.provision_bq_managed_function( + name=bq_function_name, + config=config, + ) + except Exception: + with self._artifacts_lock: + self._deploying_routines.discard(udf_hash) + raise + + with self._artifacts_lock: + self._deploying_routines.discard(udf_hash) + self._deployed_routines.add(udf_hash) + self._temp_artifacts[full_rf_name] = "" + + return udf_def.BigqueryUdf( + routine_ref=routine_ref, + signature=bq_udf.signature, + ) + def clean_up( self, bqclient: bigquery.Client, @@ -679,6 +757,8 @@ def udf( max_batching_rows: Optional[int] = None, container_cpu: Optional[float] = None, container_memory: Optional[str] = None, + *, + _force_deploy: bool = False, ): """Decorator to turn a Python user defined function (udf) into a BigQuery managed function. @@ -835,33 +915,49 @@ def wrapper(func): capture_references=False, ) - bq_function_name = managed_function_client.provision_bq_managed_function( - name=name, - config=config, - ) - full_rf_name = ( - managed_function_client.get_remote_function_fully_qualilfied_name( - bq_function_name - ) - ) - - udf_definition = udf_def.BigqueryUdf( - routine_ref=bigquery.RoutineReference.from_string(full_rf_name), - signature=udf_sig, + requirements = udf_def.RuntimeRequirements( + container_cpu=container_cpu, + container_memory=container_memory, + bq_connection_id=bq_connection_id, + max_batching_rows=max_batching_rows, + packages=tuple(packages) if packages else (), ) - if udf_sig.is_row_processor: msg = bfe.format_message("input_types=Series is in preview.") warnings.warn(msg, stacklevel=1, category=bfe.PreviewWarning) - if not name: # session-owned resource - will be cleaned up automatically - self._update_temp_artifacts(full_rf_name, "") + if ( + not name and not _force_deploy + ): # session-owned resource - deferred deployment + udf_definition = udf_def.PythonUdf( + signature=udf_sig, + code=code_def, + requirements=requirements, + ) return bq_functions.UdfRoutine(func=func, _udf_def=udf_definition) + else: # deploy immediately + bq_function_name = ( + managed_function_client.provision_bq_managed_function( + name=name, + config=config, + ) + ) + full_rf_name = ( + managed_function_client.get_remote_function_fully_qualilfied_name( + bq_function_name + ) + ) + rf_def = udf_def.BigqueryUdf( + routine_ref=bigquery.RoutineReference.from_string(full_rf_name), + signature=udf_sig, + ) + if name is None: + # Null name means anonymous, session-owned resource with force deploy. + # Unnamed resources are owned by the session and will be cleaned up automatically. + self._update_temp_artifacts(full_rf_name, "") - # user-managed permanent resource - will not be cleaned up automatically - else: return bq_functions.BigqueryCallableRoutine( - udf_definition, session, local_func=func, is_managed=True + rf_def, session, local_func=func, is_managed=True ) return wrapper @@ -888,9 +984,7 @@ def deploy_udf( A wrapped Python user defined function, usable in :meth:`~bigframes.series.Series.apply`. """ - # TODO(tswast): If we update udf to defer deployment, update this method - # to deploy immediately. - return self.udf(**kwargs)(func) + return self.udf(_force_deploy=True, **kwargs)(func) def _resolve_signature( diff --git a/packages/bigframes/bigframes/functions/function.py b/packages/bigframes/bigframes/functions/function.py index dc0fa55c8e7b..e9a40f415324 100644 --- a/packages/bigframes/bigframes/functions/function.py +++ b/packages/bigframes/bigframes/functions/function.py @@ -16,7 +16,7 @@ import dataclasses import logging -from typing import TYPE_CHECKING, Callable, Optional, Protocol, runtime_checkable +from typing import TYPE_CHECKING, Callable, Optional, Protocol, Union, runtime_checkable import google.api_core.exceptions from google.cloud import bigquery @@ -162,7 +162,7 @@ class Udf(Protocol): """ @property - def udf_def(self) -> udf_def.BigqueryUdf: ... + def udf_def(self) -> Union[udf_def.BigqueryUdf, udf_def.PythonUdf]: ... class BigqueryCallableRoutine: @@ -242,11 +242,11 @@ class UdfRoutine: func: Callable # Try not to depend on this, bq managed function creation will be deferred later # And this ref will be replaced with requirements rather to support lazy creation - _udf_def: udf_def.BigqueryUdf + _udf_def: Union[udf_def.BigqueryUdf, udf_def.PythonUdf] def __call__(self, *args, **kwargs): return self.func(*args, **kwargs) @property - def udf_def(self) -> udf_def.BigqueryUdf: + def udf_def(self) -> Union[udf_def.BigqueryUdf, udf_def.PythonUdf]: return self._udf_def diff --git a/packages/bigframes/bigframes/functions/udf_def.py b/packages/bigframes/bigframes/functions/udf_def.py index fbe000f608fd..b95dafc4253b 100644 --- a/packages/bigframes/bigframes/functions/udf_def.py +++ b/packages/bigframes/bigframes/functions/udf_def.py @@ -371,6 +371,30 @@ def stable_hash(self) -> bytes: return hash_val.digest() +@dataclasses.dataclass(frozen=True) +class RuntimeRequirements: + container_cpu: Optional[float] = None + container_memory: Optional[str] = None + bq_connection_id: Optional[str] = None + max_batching_rows: Optional[int] = None + packages: tuple[str, ...] = () + + def stable_hash(self) -> bytes: + hash_val = google_crc32c.Checksum() + if self.container_cpu is not None: + hash_val.update(str(self.container_cpu).encode()) + if self.container_memory is not None: + hash_val.update(str(self.container_memory).encode()) + if self.bq_connection_id is not None: + hash_val.update(str(self.bq_connection_id).encode()) + if self.max_batching_rows is not None: + hash_val.update(str(self.max_batching_rows).encode()) + if self.packages: + for p in sorted(self.packages): + hash_val.update(p.encode()) + return hash_val.digest() + + @dataclasses.dataclass(frozen=True) class BigqueryUdf: """ @@ -398,6 +422,37 @@ def from_routine( return cls(routine.reference, signature=signature) +@dataclasses.dataclass(frozen=True) +class PythonUdf: + """ + Represents user-requested Python UDF semantics, including the code and runtime requirements. + """ + + signature: UdfSignature + code: CodeDef + requirements: RuntimeRequirements = dataclasses.field( + default_factory=RuntimeRequirements + ) + + def stable_hash(self) -> bytes: + hash_val = google_crc32c.Checksum() + hash_val.update(self.code.stable_hash()) + hash_val.update(self.signature.stable_hash()) + hash_val.update(self.requirements.stable_hash()) + return hash_val.digest() + + def to_managed_function_config(self) -> ManagedFunctionConfig: + return ManagedFunctionConfig( + code=self.code, + signature=self.signature, + max_batching_rows=self.requirements.max_batching_rows, + container_cpu=self.requirements.container_cpu, + container_memory=self.requirements.container_memory, + bq_connection_id=self.requirements.bq_connection_id, + capture_references=False, + ) + + @dataclasses.dataclass(frozen=True) class CodeDef: # Produced by cloudpickle, not compatible across python versions diff --git a/packages/bigframes/bigframes/operations/__init__.py b/packages/bigframes/bigframes/operations/__init__.py index dd036bec5a26..11d0ef28f075 100644 --- a/packages/bigframes/bigframes/operations/__init__.py +++ b/packages/bigframes/bigframes/operations/__init__.py @@ -183,8 +183,7 @@ ) from bigframes.operations.numpy_op_maps import NUMPY_TO_BINOP, NUMPY_TO_OP from bigframes.operations.remote_function_ops import ( - BinaryRemoteFunctionOp, - NaryRemoteFunctionOp, + PythonUdfOp, RemoteFunctionOp, ) from bigframes.operations.string_ops import ( @@ -230,6 +229,7 @@ timestamp_add_op, timestamp_sub_op, ) +from bigframes.operations.to_op import func_to_op __all__ = [ # Base ops @@ -375,9 +375,8 @@ "StructFieldOp", "StructOp", # Remote Functions ops - "BinaryRemoteFunctionOp", - "NaryRemoteFunctionOp", "RemoteFunctionOp", + "PythonUdfOp", # Frequency ops "DatetimeToIntegerLabelOp", "FloorDtOp", @@ -436,6 +435,8 @@ "AIIf", "AIScore", "AISimilarity", + # Helper functions + "func_to_op", # Numpy ops mapping "NUMPY_TO_BINOP", "NUMPY_TO_OP", diff --git a/packages/bigframes/bigframes/operations/remote_function_ops.py b/packages/bigframes/bigframes/operations/remote_function_ops.py index 9c51210df0e7..3ce77d51c615 100644 --- a/packages/bigframes/bigframes/operations/remote_function_ops.py +++ b/packages/bigframes/bigframes/operations/remote_function_ops.py @@ -19,25 +19,10 @@ from bigframes.operations import base_ops -# TODO: Enforce input type constraints from function def @dataclasses.dataclass(frozen=True) -class RemoteFunctionOp(base_ops.UnaryOp): - name: typing.ClassVar[str] = "remote_function" - function_def: udf_def.BigqueryUdf - apply_on_null: bool - - @property - def expensive(self) -> bool: - return True - - def output_type(self, *input_types): - return self.function_def.signature.output.bf_type - - -@dataclasses.dataclass(frozen=True) -class BinaryRemoteFunctionOp(base_ops.BinaryOp): - name: typing.ClassVar[str] = "binary_remote_function" - function_def: udf_def.BigqueryUdf +class PythonUdfOp(base_ops.NaryOp): + name: typing.ClassVar[str] = "python_udf" + function_def: udf_def.PythonUdf @property def expensive(self) -> bool: @@ -48,8 +33,8 @@ def output_type(self, *input_types): @dataclasses.dataclass(frozen=True) -class NaryRemoteFunctionOp(base_ops.NaryOp): - name: typing.ClassVar[str] = "nary_remote_function" +class RemoteFunctionOp(base_ops.NaryOp): + name: typing.ClassVar[str] = "remote_function" function_def: udf_def.BigqueryUdf @property diff --git a/packages/bigframes/bigframes/operations/to_op.py b/packages/bigframes/bigframes/operations/to_op.py new file mode 100644 index 000000000000..c139541470d1 --- /dev/null +++ b/packages/bigframes/bigframes/operations/to_op.py @@ -0,0 +1,41 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from bigframes.functions import Udf +from bigframes.functions.udf_def import BigqueryUdf, PythonUdf +from bigframes.operations import base_ops, remote_function_ops + + +def func_to_op(op) -> base_ops.NaryOp: + """ + Convert various bigframes, python objects into a bigframes operations. + + This should handle anything that might be passed to eg map, combine, other pandas methods that take a function. + + It should raise a TypeError if the object is not a supported type. + + Args: + op: The object to convert. + + Returns: + A bigframes operations. + """ + # TODO: Handle numpy ufuncs, builtin functions, etc. + if isinstance(op, Udf): + if isinstance(op.udf_def, BigqueryUdf): + return remote_function_ops.RemoteFunctionOp(function_def=op.udf_def) + elif isinstance(op.udf_def, PythonUdf): + return remote_function_ops.PythonUdfOp(function_def=op.udf_def) + else: + raise TypeError(f"Unsupported function type: {op}") diff --git a/packages/bigframes/bigframes/pandas/__init__.py b/packages/bigframes/bigframes/pandas/__init__.py index 34ec3037e92f..082a00438f42 100644 --- a/packages/bigframes/bigframes/pandas/__init__.py +++ b/packages/bigframes/bigframes/pandas/__init__.py @@ -202,7 +202,7 @@ def udf( output_type: Optional[type] = None, dataset: str, bigquery_connection: Optional[str] = None, - name: str, + name: Optional[str] = None, packages: Optional[Sequence[str]] = None, max_batching_rows: Optional[int] = None, container_cpu: Optional[float] = None, diff --git a/packages/bigframes/bigframes/series.py b/packages/bigframes/bigframes/series.py index 87c03395c753..469ddb8ac276 100644 --- a/packages/bigframes/bigframes/series.py +++ b/packages/bigframes/bigframes/series.py @@ -2039,17 +2039,10 @@ def apply( if isinstance(func, bigframes.functions.Udf): # We are working with bigquery function at this point - if args: - result_series = self._apply_nary_op( - ops.NaryRemoteFunctionOp(function_def=func.udf_def), args - ) - # TODO(jialuo): Investigate why `_apply_nary_op` drops the series - # `name`. Manually reassigning it here as a temporary fix. - result_series.name = self.name - else: - result_series = self._apply_unary_op( - ops.RemoteFunctionOp(function_def=func.udf_def, apply_on_null=True) - ) + result_series = self._apply_nary_op(ops.func_to_op(func), args) + # TODO(jialuo): Investigate why `_apply_nary_op` drops the series + # `name`. Manually reassigning it here as a temporary fix. + result_series.name = self.name return result_series @@ -2099,9 +2092,11 @@ def combine( ) if isinstance(func, bigframes.functions.Udf): - result_series = self._apply_binary_op( - other, ops.BinaryRemoteFunctionOp(function_def=func.udf_def) - ) + result_series = self._apply_nary_op(ops.func_to_op(func), (other,)) + if hasattr(other, "name") and other.name != self._name: # type: ignore + result_series.name = None + else: + result_series.name = self.name return result_series bf_op = python_ops.python_callable_to_op(func) diff --git a/packages/bigframes/bigframes/session/__init__.py b/packages/bigframes/bigframes/session/__init__.py index 38e92a60321b..d1bdc3854e46 100644 --- a/packages/bigframes/bigframes/session/__init__.py +++ b/packages/bigframes/bigframes/session/__init__.py @@ -338,6 +338,7 @@ def __init__( enable_polars_execution=context.enable_polars_execution, publisher=self._publisher, labels=tuple(labels.items()), + function_manager=self._function_session, ) def __del__(self): @@ -1958,7 +1959,7 @@ def udf( output_type: Optional[type] = None, dataset: str, bigquery_connection: Optional[str] = None, - name: str, + name: Optional[str] = None, packages: Optional[Sequence[str]] = None, max_batching_rows: Optional[int] = None, container_cpu: Optional[float] = None, diff --git a/packages/bigframes/bigframes/session/bq_caching_executor.py b/packages/bigframes/bigframes/session/bq_caching_executor.py index 9948480d5cac..77723c1bbd2a 100644 --- a/packages/bigframes/bigframes/session/bq_caching_executor.py +++ b/packages/bigframes/bigframes/session/bq_caching_executor.py @@ -35,6 +35,8 @@ import bigframes.core.schema as schemata import bigframes.core.tree_properties as tree_properties import bigframes.dtypes +import bigframes.functions._function_session as bff_session +import bigframes.operations as ops import bigframes.session._io.bigquery as bq_io import bigframes.session.execution_cache as execution_cache import bigframes.session.execution_spec as ex_spec @@ -42,9 +44,18 @@ import bigframes.session.planner import bigframes.session.temporary_storage from bigframes._config import ComputeOptions -from bigframes.core import bq_data, compile, guid, identifiers, local_data, rewrite +from bigframes.core import ( + bq_data, + compile, + expression, + guid, + identifiers, + local_data, + rewrite, +) from bigframes.core.compile.sqlglot import sql as sg_sql from bigframes.core.compile.sqlglot import sqlglot_ir +from bigframes.functions import udf_def from bigframes.session import ( direct_gbq_execution, executor, @@ -121,6 +132,7 @@ def __init__( labels: tuple[tuple[str, str], ...] = (), compiler_name: Literal["ibis", "sqlglot"] = "sqlglot", cache: Optional[execution_cache.ExecutionCache] = None, + function_manager: bff_session.FunctionSession, ): self.bqclient = bqclient self.storage_manager = storage_manager @@ -156,6 +168,7 @@ def __init__( publisher=self._publisher, labels=dict(labels), ) + self._function_manager = function_manager def to_sql( self, @@ -511,12 +524,74 @@ def _prepare_plan_simplify(self, plan: nodes.BigFrameNode) -> nodes.BigFrameNode plan = plan.top_down(rewrite.fold_row_counts) return plan + async def _deploy_undeployed_udfs( + self, plan: nodes.BigFrameNode + ) -> nodes.BigFrameNode: + referenced_udfs = list(set(self._collect_udf_defs(plan))) + session = self.loader._session + deployed_mapping: dict[udf_def.PythonUdf, udf_def.BigqueryUdf] = {} + tasks = [ + asyncio.to_thread( + self._function_manager._deploy_udf, + session, + udf, + ) + for udf in referenced_udfs + ] + results = await asyncio.gather(*tasks) + deployed_mapping = dict(zip(referenced_udfs, results)) + + return self._subsitute_temporary_functions(plan, deployed_mapping) + + def _collect_udf_defs(self, plan: nodes.BigFrameNode) -> list[udf_def.PythonUdf]: + udf_defs: list[udf_def.PythonUdf] = [] + for node in plan.unique_nodes(): + for expr in node._node_expressions: + for sub_expr in expr.walk(): + if isinstance(sub_expr, expression.OpExpression): + op = sub_expr.op + if isinstance(op, ops.PythonUdfOp): + func_def = op.function_def + if isinstance(func_def, udf_def.PythonUdf): + udf_defs.append(func_def) + return udf_defs + + def _subsitute_temporary_functions( + self, + plan: nodes.BigFrameNode, + deployed_mapping: dict[udf_def.PythonUdf, udf_def.BigqueryUdf], + ) -> nodes.BigFrameNode: + def replace_in_expr(expr: expression.Expression) -> expression.Expression: + def replace_step(e: expression.Expression) -> expression.Expression: + if isinstance(e, expression.OpExpression): + op = e.op + if isinstance(op, ops.PythonUdfOp): + func_def = op.function_def + if func_def in deployed_mapping: + deployed_func = deployed_mapping[func_def] + rf_op = ops.RemoteFunctionOp(function_def=deployed_func) + return dataclasses.replace(e, op=rf_op) + raise ValueError( + f"UDF definition {func_def} not found in deployed mapping" + ) + return e + + return expr.bottom_up(replace_step) + + def replace_in_node(node: nodes.BigFrameNode) -> nodes.BigFrameNode: + if hasattr(node, "transform_exprs"): + return node.transform_exprs(replace_in_expr) + return node + + return plan.bottom_up(replace_in_node) + async def _prepare_plan_bq_execution( self, plan: nodes.BigFrameNode, compute_options: Optional[ex_spec.BqComputeOptions] = None, ) -> nodes.BigFrameNode: """Prepare the plan for BigQuery execution by caching subtrees and uploading large local sources.""" + plan = await self._deploy_undeployed_udfs(plan) if compute_options is not None and compute_options.enable_multi_query_execution: await self._simplify_with_caching(plan, compute_options=compute_options) plan = self._prepare_plan_simplify(plan) diff --git a/packages/bigframes/bigframes/session/proxy_executor.py b/packages/bigframes/bigframes/session/proxy_executor.py index c4fe6584bd27..615823822c71 100644 --- a/packages/bigframes/bigframes/session/proxy_executor.py +++ b/packages/bigframes/bigframes/session/proxy_executor.py @@ -23,6 +23,7 @@ import google.cloud.exceptions import bigframes.core +import bigframes.functions._function_session as bff_session from bigframes import exceptions as bfe from bigframes.session import ( bq_caching_executor, @@ -52,6 +53,7 @@ def __init__( metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None, enable_polars_execution: bool = False, publisher: bigframes.core.events.Publisher, + function_manager: bff_session.FunctionSession, labels: tuple[tuple[str, str], ...] = (), ): self._enable_polars_execution = enable_polars_execution @@ -67,6 +69,7 @@ def __init__( labels=labels, cache=shared_cache, compiler_name="ibis", + function_manager=function_manager, ) self._sqlglot_executor = bq_caching_executor.BigQueryCachingExecutor( bqclient, @@ -79,6 +82,7 @@ def __init__( labels=labels, cache=shared_cache, compiler_name="sqlglot", + function_manager=function_manager, ) def to_sql( diff --git a/packages/bigframes/tests/system/large/functions/test_managed_function.py b/packages/bigframes/tests/system/large/functions/test_managed_function.py index 0c2e3d8fe895..e93d2bb068be 100644 --- a/packages/bigframes/tests/system/large/functions/test_managed_function.py +++ b/packages/bigframes/tests/system/large/functions/test_managed_function.py @@ -1128,3 +1128,76 @@ def foo_list(x: int, y0: float, y1: bytes, y2: bool) -> list[str]: # Ignore any dtype difference. pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) + + +def test_deferred_unnamed_udf_execution(session, dataset_id, scalars_dfs): + import bigframes.functions.udf_def as udf_def + + # Create an unnamed UDF (name=None) + @session.udf(dataset=dataset_id) + def unnamed_multiplier(x: int) -> int: + return x * 3 + + # 1. Assert it is represented as a PythonUdf (not deployed yet) + assert isinstance(unnamed_multiplier.udf_def, udf_def.PythonUdf) + + scalars_df, scalars_pandas_df = scalars_dfs + bf_series = scalars_df["int64_too"] + pd_series = scalars_pandas_df["int64_too"] + + # 2. Applying it triggers deployment behind the scenes! + bf_result = bf_series.apply(unnamed_multiplier).to_pandas() + pd_result = pd_series.apply(lambda x: x * 3) + + pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) + + # 3. Verify that the deployed routine name matches our stable hash and exists in BigQuery + import bigframes.functions._function_client as bff_client + + config = unnamed_multiplier.udf_def.to_managed_function_config() + expected_routine_name = bff_client.get_managed_function_name( + config, session.session_id + ) + routine = session.bqclient.get_routine( + f"{session._anonymous_dataset.project}.{session._anonymous_dataset.dataset_id}.{expected_routine_name}" + ) + assert routine is not None + + +def test_deferred_udf_with_runtime_requirements(session, dataset_id, scalars_dfs): + import bigframes.functions.udf_def as udf_def + + # Create an unnamed UDF with custom options + @session.udf( + dataset=dataset_id, + container_cpu=1, + container_memory="2Gi", + max_batching_rows=25, + ) + def heavy_unnamed_udf(x: int) -> int: + return x + 100 + + assert isinstance(heavy_unnamed_udf.udf_def, udf_def.PythonUdf) + + scalars_df, scalars_pandas_df = scalars_dfs + bf_series = scalars_df["int64_too"] + pd_series = scalars_pandas_df["int64_too"] + + bf_result = bf_series.apply(heavy_unnamed_udf).to_pandas() + pd_result = pd_series.apply(lambda x: x + 100) + + pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) + + # Verify it was deployed with the correct runtime options + import bigframes.functions._function_client as bff_client + + config = heavy_unnamed_udf.udf_def.to_managed_function_config() + expected_routine_name = bff_client.get_managed_function_name( + config, session.session_id + ) + routine = session.bqclient.get_routine( + f"{session._anonymous_dataset.project}.{session._anonymous_dataset.dataset_id}.{expected_routine_name}" + ) + assert routine._properties["externalRuntimeOptions"]["containerCpu"] == 1 + assert routine._properties["externalRuntimeOptions"]["containerMemory"] == "2Gi" + assert routine._properties["externalRuntimeOptions"]["maxBatchingRows"] == "25" diff --git a/packages/bigframes/tests/unit/core/compile/sqlglot/expressions/snapshots/test_generic_ops/test_remote_function_op/out.sql b/packages/bigframes/tests/unit/core/compile/sqlglot/expressions/snapshots/test_generic_ops/test_remote_function_op/out.sql index 1854c0258825..a1977d809f70 100644 --- a/packages/bigframes/tests/unit/core/compile/sqlglot/expressions/snapshots/test_generic_ops/test_remote_function_op/out.sql +++ b/packages/bigframes/tests/unit/core/compile/sqlglot/expressions/snapshots/test_generic_ops/test_remote_function_op/out.sql @@ -1,8 +1,3 @@ SELECT - `my_project`.`my_dataset`.`my_routine`(`int64_col`) AS `apply_on_null_true`, - IF( - `int64_col` IS NULL, - `int64_col`, - `my_project`.`my_dataset`.`my_routine`(`int64_col`) - ) AS `apply_on_null_false` + `my_project`.`my_dataset`.`my_routine`(`int64_col`, `float64_col`, `string_col`) AS `int64_col` FROM `bigframes-dev`.`sqlglot_test`.`scalar_types` AS `bft_0` \ No newline at end of file diff --git a/packages/bigframes/tests/unit/core/compile/sqlglot/expressions/test_generic_ops.py b/packages/bigframes/tests/unit/core/compile/sqlglot/expressions/test_generic_ops.py index a986ca270de0..fb5a9fd7ce84 100644 --- a/packages/bigframes/tests/unit/core/compile/sqlglot/expressions/test_generic_ops.py +++ b/packages/bigframes/tests/unit/core/compile/sqlglot/expressions/test_generic_ops.py @@ -172,63 +172,8 @@ def test_astype_json_invalid( def test_remote_function_op(scalar_types_df: bpd.DataFrame, snapshot): - bf_df = scalar_types_df[["int64_col"]] - function_def = udf_def.BigqueryUdf( - routine_ref=bigquery.RoutineReference.from_string( - "my_project.my_dataset.my_routine" - ), - signature=udf_def.UdfSignature( - inputs=( - udf_def.UdfArg( - "x", - udf_def.DirectScalarType(int), - ), - ), - output=udf_def.DirectScalarType(float), - ), - ) - ops_map = { - "apply_on_null_true": ops.RemoteFunctionOp( - function_def=function_def, apply_on_null=True - ).as_expr("int64_col"), - "apply_on_null_false": ops.RemoteFunctionOp( - function_def=function_def, apply_on_null=False - ).as_expr("int64_col"), - } - sql = utils._apply_ops_to_sql(bf_df, list(ops_map.values()), list(ops_map.keys())) - snapshot.assert_match(sql, "out.sql") - - -def test_binary_remote_function_op(scalar_types_df: bpd.DataFrame, snapshot): - bf_df = scalar_types_df[["int64_col", "float64_col"]] - op = ops.BinaryRemoteFunctionOp( - function_def=udf_def.BigqueryUdf( - routine_ref=bigquery.RoutineReference.from_string( - "my_project.my_dataset.my_routine" - ), - signature=udf_def.UdfSignature( - inputs=( - udf_def.UdfArg( - "x", - udf_def.DirectScalarType(int), - ), - udf_def.UdfArg( - "y", - udf_def.DirectScalarType(float), - ), - ), - output=udf_def.DirectScalarType(float), - ), - ) - ) - sql = utils._apply_binary_op(bf_df, op, "int64_col", "float64_col") - - snapshot.assert_match(sql, "out.sql") - - -def test_nary_remote_function_op(scalar_types_df: bpd.DataFrame, snapshot): bf_df = scalar_types_df[["int64_col", "float64_col", "string_col"]] - op = ops.NaryRemoteFunctionOp( + op = ops.RemoteFunctionOp( function_def=udf_def.BigqueryUdf( routine_ref=bigquery.RoutineReference.from_string( "my_project.my_dataset.my_routine" diff --git a/packages/bigframes/tests/unit/session/test_proxy_executor.py b/packages/bigframes/tests/unit/session/test_proxy_executor.py index a1a8f168995b..7aa549934446 100644 --- a/packages/bigframes/tests/unit/session/test_proxy_executor.py +++ b/packages/bigframes/tests/unit/session/test_proxy_executor.py @@ -33,8 +33,14 @@ def mock_executor(): bqstoragereadclient = mock.Mock() loader = mock.Mock() publisher = mock.Mock() + function_manager = mock.Mock() return DualCompilerProxyExecutor( - bqclient, storage_manager, bqstoragereadclient, loader, publisher=publisher + bqclient, + storage_manager, + bqstoragereadclient, + loader, + publisher=publisher, + function_manager=function_manager, )