Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions python/benchmarks/bench_eval_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -1098,6 +1098,76 @@ class ScalarArrowIterUDFPeakmemBench(_ScalarArrowIterBenchMixin, _PeakmemBenchBa
pass


# -- SQL_SCALAR_PANDAS_UDF --------------------------------------------------
# UDF receives ``pandas.Series`` columns, returns ``pandas.Series``.
# Measures the full Arrow-to-Pandas-to-Arrow round-trip.


class _ScalarPandasBenchMixin:
"""Mixin for SQL_SCALAR_PANDAS_UDF benchmarks."""

def _scalar_pandas_sort(s):
return s.sort_values().reset_index(drop=True)

def _scalar_pandas_nullcheck(s):
return s.notna()

_scenario_configs = {
"sm_batch_few_col": ("mixed", 500_000, 5, 1_000),
"sm_batch_many_col": ("mixed", 50_000, 50, 1_000),
"lg_batch_few_col": ("mixed", 5_000_000, 5, 10_000),
"lg_batch_many_col": ("mixed", 500_000, 50, 10_000),
"pure_ints": ("pure_ints", 1_000_000, 10, 5_000),
"pure_floats": ("pure_floats", 1_000_000, 10, 5_000),
"pure_strings": ("pure_strings", 1_000_000, 10, 5_000),
"pure_ts": ("pure_ts", 1_000_000, 10, 5_000),
"mixed_types": ("mixed", 1_000_000, 10, 5_000),
}

@staticmethod
def _build_scenario(name):
"""Build a single scenario by name."""
np.random.seed(42)
type_key, num_rows, num_cols, batch_size = _ScalarPandasBenchMixin._scenario_configs[name]
pool = MockDataFactory.NAMED_TYPE_POOLS[type_key]
return MockDataFactory.make_batches(
num_rows=num_rows,
num_cols=num_cols,
spark_type_pool=pool,
batch_size=batch_size,
)

_eval_type = PythonEvalType.SQL_SCALAR_PANDAS_UDF
# ret_type=None means "use schema.fields[0].dataType from the scenario"
_udfs = {
"identity_udf": (lambda s: s, None, [0]),
"sort_udf": (_scalar_pandas_sort, None, [0]),
"nullcheck_udf": (_scalar_pandas_nullcheck, BooleanType(), [0]),
}
params = [list(_scenario_configs), list(_udfs)]
param_names = ["scenario", "udf"]

def _write_scenario(self, scenario, udf_name, buf):
batches, schema = self._build_scenario(scenario)
udf_func, ret_type, arg_offsets = self._udfs[udf_name]
if ret_type is None:
ret_type = schema.fields[0].dataType
MockProtocolWriter.write_worker_input(
self._eval_type,
lambda b: MockProtocolWriter.write_udf_payload(udf_func, ret_type, arg_offsets, b),
lambda b: MockProtocolWriter.write_data_payload(iter(batches), b),
buf,
)


class ScalarPandasUDFTimeBench(_ScalarPandasBenchMixin, _TimeBenchBase):
pass


class ScalarPandasUDFPeakmemBench(_ScalarPandasBenchMixin, _PeakmemBenchBase):
pass


# -- SQL_WINDOW_AGG_ARROW_UDF ------------------------------------------------
# UDF receives ``pa.Array`` columns for the entire window partition, returns scalar.

Expand Down