From 346d713248c188036083be55d3be59a0274569d7 Mon Sep 17 00:00:00 2001 From: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Thu, 9 Apr 2026 20:07:50 +0000 Subject: [PATCH] [SPARK-56424][PYTHON] Benchmark SQL_SCALAR_PANDAS_UDF ### What changes were proposed in this pull request? Add ASV microbenchmark for `SQL_SCALAR_PANDAS_UDF` eval type in `bench_eval_type.py`. ### Why are the changes needed? Part of SPARK-55724 (Micro-benchmark PySpark Eval Types). Guards against performance regressions during the serializer refactor by measuring the full Arrow-to-Pandas-to-Arrow round-trip for scalar Pandas UDFs. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Verified all 27 scenario/UDF combinations run successfully. --- python/benchmarks/bench_eval_type.py | 70 ++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/python/benchmarks/bench_eval_type.py b/python/benchmarks/bench_eval_type.py index ce4d421b6cfe..1a5395e04354 100644 --- a/python/benchmarks/bench_eval_type.py +++ b/python/benchmarks/bench_eval_type.py @@ -1098,6 +1098,76 @@ class ScalarArrowIterUDFPeakmemBench(_ScalarArrowIterBenchMixin, _PeakmemBenchBa pass +# -- SQL_SCALAR_PANDAS_UDF -------------------------------------------------- +# UDF receives ``pandas.Series`` columns, returns ``pandas.Series``. +# Measures the full Arrow-to-Pandas-to-Arrow round-trip. + + +class _ScalarPandasBenchMixin: + """Mixin for SQL_SCALAR_PANDAS_UDF benchmarks.""" + + def _scalar_pandas_sort(s): + return s.sort_values().reset_index(drop=True) + + def _scalar_pandas_nullcheck(s): + return s.notna() + + _scenario_configs = { + "sm_batch_few_col": ("mixed", 500_000, 5, 1_000), + "sm_batch_many_col": ("mixed", 50_000, 50, 1_000), + "lg_batch_few_col": ("mixed", 5_000_000, 5, 10_000), + "lg_batch_many_col": ("mixed", 500_000, 50, 10_000), + "pure_ints": ("pure_ints", 1_000_000, 10, 5_000), + "pure_floats": ("pure_floats", 1_000_000, 10, 5_000), + "pure_strings": ("pure_strings", 1_000_000, 10, 5_000), + "pure_ts": ("pure_ts", 1_000_000, 10, 5_000), + "mixed_types": ("mixed", 1_000_000, 10, 5_000), + } + + @staticmethod + def _build_scenario(name): + """Build a single scenario by name.""" + np.random.seed(42) + type_key, num_rows, num_cols, batch_size = _ScalarPandasBenchMixin._scenario_configs[name] + pool = MockDataFactory.NAMED_TYPE_POOLS[type_key] + return MockDataFactory.make_batches( + num_rows=num_rows, + num_cols=num_cols, + spark_type_pool=pool, + batch_size=batch_size, + ) + + _eval_type = PythonEvalType.SQL_SCALAR_PANDAS_UDF + # ret_type=None means "use schema.fields[0].dataType from the scenario" + _udfs = { + "identity_udf": (lambda s: s, None, [0]), + "sort_udf": (_scalar_pandas_sort, None, [0]), + "nullcheck_udf": (_scalar_pandas_nullcheck, BooleanType(), [0]), + } + params = [list(_scenario_configs), list(_udfs)] + param_names = ["scenario", "udf"] + + def _write_scenario(self, scenario, udf_name, buf): + batches, schema = self._build_scenario(scenario) + udf_func, ret_type, arg_offsets = self._udfs[udf_name] + if ret_type is None: + ret_type = schema.fields[0].dataType + MockProtocolWriter.write_worker_input( + self._eval_type, + lambda b: MockProtocolWriter.write_udf_payload(udf_func, ret_type, arg_offsets, b), + lambda b: MockProtocolWriter.write_data_payload(iter(batches), b), + buf, + ) + + +class ScalarPandasUDFTimeBench(_ScalarPandasBenchMixin, _TimeBenchBase): + pass + + +class ScalarPandasUDFPeakmemBench(_ScalarPandasBenchMixin, _PeakmemBenchBase): + pass + + # -- SQL_WINDOW_AGG_ARROW_UDF ------------------------------------------------ # UDF receives ``pa.Array`` columns for the entire window partition, returns scalar.