diff --git a/python/benchmarks/bench_eval_type.py b/python/benchmarks/bench_eval_type.py index ce4d421b6cfe..1a5395e04354 100644 --- a/python/benchmarks/bench_eval_type.py +++ b/python/benchmarks/bench_eval_type.py @@ -1098,6 +1098,76 @@ class ScalarArrowIterUDFPeakmemBench(_ScalarArrowIterBenchMixin, _PeakmemBenchBa pass +# -- SQL_SCALAR_PANDAS_UDF -------------------------------------------------- +# UDF receives ``pandas.Series`` columns, returns ``pandas.Series``. +# Measures the full Arrow-to-Pandas-to-Arrow round-trip. + + +class _ScalarPandasBenchMixin: + """Mixin for SQL_SCALAR_PANDAS_UDF benchmarks.""" + + def _scalar_pandas_sort(s): + return s.sort_values().reset_index(drop=True) + + def _scalar_pandas_nullcheck(s): + return s.notna() + + _scenario_configs = { + "sm_batch_few_col": ("mixed", 500_000, 5, 1_000), + "sm_batch_many_col": ("mixed", 50_000, 50, 1_000), + "lg_batch_few_col": ("mixed", 5_000_000, 5, 10_000), + "lg_batch_many_col": ("mixed", 500_000, 50, 10_000), + "pure_ints": ("pure_ints", 1_000_000, 10, 5_000), + "pure_floats": ("pure_floats", 1_000_000, 10, 5_000), + "pure_strings": ("pure_strings", 1_000_000, 10, 5_000), + "pure_ts": ("pure_ts", 1_000_000, 10, 5_000), + "mixed_types": ("mixed", 1_000_000, 10, 5_000), + } + + @staticmethod + def _build_scenario(name): + """Build a single scenario by name.""" + np.random.seed(42) + type_key, num_rows, num_cols, batch_size = _ScalarPandasBenchMixin._scenario_configs[name] + pool = MockDataFactory.NAMED_TYPE_POOLS[type_key] + return MockDataFactory.make_batches( + num_rows=num_rows, + num_cols=num_cols, + spark_type_pool=pool, + batch_size=batch_size, + ) + + _eval_type = PythonEvalType.SQL_SCALAR_PANDAS_UDF + # ret_type=None means "use schema.fields[0].dataType from the scenario" + _udfs = { + "identity_udf": (lambda s: s, None, [0]), + "sort_udf": (_scalar_pandas_sort, None, [0]), + "nullcheck_udf": (_scalar_pandas_nullcheck, BooleanType(), [0]), + } + params = [list(_scenario_configs), list(_udfs)] + param_names = ["scenario", "udf"] + + def _write_scenario(self, scenario, udf_name, buf): + batches, schema = self._build_scenario(scenario) + udf_func, ret_type, arg_offsets = self._udfs[udf_name] + if ret_type is None: + ret_type = schema.fields[0].dataType + MockProtocolWriter.write_worker_input( + self._eval_type, + lambda b: MockProtocolWriter.write_udf_payload(udf_func, ret_type, arg_offsets, b), + lambda b: MockProtocolWriter.write_data_payload(iter(batches), b), + buf, + ) + + +class ScalarPandasUDFTimeBench(_ScalarPandasBenchMixin, _TimeBenchBase): + pass + + +class ScalarPandasUDFPeakmemBench(_ScalarPandasBenchMixin, _PeakmemBenchBase): + pass + + # -- SQL_WINDOW_AGG_ARROW_UDF ------------------------------------------------ # UDF receives ``pa.Array`` columns for the entire window partition, returns scalar.