[WIP][SPARK-40193][SQL] Merge subplans with different filter conditions#55298
Draft
peter-toth wants to merge 1 commit intoapache:masterfrom
Draft
[WIP][SPARK-40193][SQL] Merge subplans with different filter conditions#55298peter-toth wants to merge 1 commit intoapache:masterfrom
peter-toth wants to merge 1 commit intoapache:masterfrom
Conversation
99cdfb1 to
219b19f
Compare
### What changes were proposed in this pull request?
`PlanMerger` is extended to merge non-correlated non-grouping aggregate subplans
that differ only in their `WHERE` filter conditions.
Filter merging follows the same recursive plan-matching logic as the rest of
`PlanMerger` and handles three cases:
- `(np: Filter, cp: Filter)` with different conditions: both conditions are
aliased as boolean attributes in a `Project`, a merged `Filter(OR(f0, f1))`
is introduced, and the aliases are propagated up to the enclosing `Aggregate`
where each side's expressions receive a `FILTER (WHERE ...)` clause.
- `(np: Filter, cp)` or `(np, cp: Filter)`: only one side has a filter; the
condition is exposed as a `Project` attribute and propagated up so only that
side's aggregate expressions receive a `FILTER` clause.
- Equal filter conditions pass through unchanged.
When plans also differ in intermediate `Project` expressions above a `Filter`,
those expressions are wrapped with `If(filterAttr, expr, null)` to avoid
computing them for rows that do not match that side's filter condition. Plain
attribute references are never wrapped since reading a column value is free.
**Example**
```
// Input plans
Aggregate [sum(a) AS sum_a] Aggregate [max(d) AS max_d]
+- Filter (a < 1) +- Project [udf(a) AS d]
+- Scan t +- Filter (a > 1)
+- Scan t
// Merged plan
Aggregate [sum(a) FILTER f0 AS sum_a, max(d0) FILTER f1 AS max_d]
+- Project [a, If(f1, udf(a), null) AS d0, f0, f1]
+- Filter (f0 OR f1)
+- Project [a, (a < 1) AS f0, (a > 1) AS f1]
+- Scan t
```
**Benefit**: a single scan of `t` computes both aggregates, which is typically
cheaper than two separate scans. The `If` wrapping ensures `udf(a)` is only
evaluated for rows that match `a > 1`.
**Drawback** (symmetric case only): the merged `Filter(f0 OR f1)` is less
selective than each individual filter, which may reduce IO pruning such as
partition or file skipping. On heavily partitioned or file-pruned tables the
extra IO can outweigh the scan-deduplication benefit. The asymmetric case
(`(np: Filter, cp)`) is always beneficial because the unfiltered side would
have read all the data anyway.
**Configs**:
- `spark.sql.planMerge.filterPropagation.enabled` (internal, default `true`):
master switch; disabling it turns off all filter-based merging.
- `spark.sql.planMerge.symmetricFilterPropagation.enabled` (default `true`):
controls the symmetric `(Filter, Filter)` case specifically, so users on
IO-pruning-sensitive workloads can disable only that path while keeping the
always-beneficial asymmetric merging.
`MergeResult.outputMap` is changed from `AttributeMap[Attribute]` to
`AttributeMap[Int]`, mapping each input plan attribute to its positional index
in the merged output. Positional indices remain stable across subsequent
`PlanMerger.merge` calls (outputs are only ever appended), whereas retained
`Attribute` values can become stale when filter merging replaces expressions
with new aliases. This also simplifies the two call sites in `MergeSubplans`.
### Why are the changes needed?
Computing aggregates over the same table with different `WHERE` clauses is a
common analytical pattern (e.g. conditional sums or counts for different
predicates). Without this change each subquery forces a separate full scan;
merging them reduces scan count and overall query cost.
### Does this PR introduce _any_ user-facing change?
Yes. A new config `spark.sql.planMerge.symmetricFilterPropagation.enabled`
(default `true`) is added. The optimization is otherwise transparent: queries
produce the same results, and both configs can be set to `false` to restore
the previous behavior.
### How was this patch tested?
New unit tests in `MergeSubplansSuite` and new end-to-end tests in
`PlanMergeSuite` covering the basic two-subplan cases, three-subplan merging,
disabled configs, grouping aggregates (not merged), asymmetric filters, stacked
filters, and reversed filter ordering.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Sonnet 4.6 (Anthropic)
219b19f to
66f7683
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
PlanMergeris extended to merge non-correlated non-grouping aggregate subplans that differ only in theirWHEREfilter conditions.Filter merging follows the same recursive plan-matching logic as the rest of
PlanMergerand handles three cases:(np: Filter, cp: Filter)with different conditions: both conditions are aliased as boolean attributes in aProject, a mergedFilter(OR(f0, f1))is introduced, and the aliases are propagated up to the enclosingAggregatewhere each side's expressions receive aFILTER (WHERE ...)clause.(np: Filter, cp)or(np, cp: Filter): only one side has a filter; the condition is exposed as aProjectattribute and propagated up so only that side's aggregate expressions receive aFILTERclause.When plans also differ in intermediate
Projectexpressions above aFilter, those expressions are wrapped withIf(filterAttr, expr, null)to avoid computing them for rows that do not match that side's filter condition.Example
Benefit: a single scan of
tcomputes both aggregates, which is typically cheaper than two separate scans.Drawback (symmetric case only): the merged
Filter(f0 OR f1)is less selective than each individual filter, which may reduce IO pruning such as partition or file skipping. On heavily partitioned or file-pruned tables the extra IO can outweigh the scan-deduplication benefit. The asymmetric case ((np: Filter, cp)) is always beneficial because the unfiltered side would have read all the data anyway.Configs:
spark.sql.planMerge.filterPropagation.enabled(internal, defaulttrue): master switch; disabling it turns off all filter-based merging.spark.sql.planMerge.symmetricFilterPropagation.enabled(defaulttrue): controls the symmetric(Filter, Filter)case specifically, so users on IO-pruning-sensitive workloads can disable only that path while keeping the always-beneficial asymmetric merging.MergeResult.outputMapis changed fromAttributeMap[Attribute]toAttributeMap[Int], mapping each input plan attribute to its positional index in the merged output. Positional indices remain stable across subsequentPlanMerger.mergecalls (outputs are only ever appended), whereas retainedAttributevalues can become stale when filter merging replaces expressions with new aliases. This also simplifies the two call sites inMergeSubplans.Why are the changes needed?
Computing aggregates over the same table with different
WHEREclauses is a common analytical pattern (e.g. conditional sums or counts for different predicates). Without this change each subquery forces a separate full scan; merging them reduces scan count and overall query cost.Does this PR introduce any user-facing change?
Yes. A new config
spark.sql.planMerge.symmetricFilterPropagation.enabled(defaulttrue) is added. The optimization is otherwise transparent: queries produce the same results, and both configs can be set tofalseto restore the previous behavior.How was this patch tested?
New unit tests in
MergeSubplansSuiteand new end-to-end tests inPlanMergeSuitecovering the basic two-subplan cases, three-subplan merging, disabled configs, grouping aggregates (not merged), asymmetric filters, stacked filters, and reversed filter ordering.Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Sonnet 4.6