Skip to content

[WIP][SPARK-40193][SQL] Merge subplans with different filter conditions#55298

Draft
peter-toth wants to merge 1 commit intoapache:masterfrom
peter-toth:SPARK-40193-merge-filters-2
Draft

[WIP][SPARK-40193][SQL] Merge subplans with different filter conditions#55298
peter-toth wants to merge 1 commit intoapache:masterfrom
peter-toth:SPARK-40193-merge-filters-2

Conversation

@peter-toth
Copy link
Copy Markdown
Contributor

@peter-toth peter-toth commented Apr 10, 2026

What changes were proposed in this pull request?

PlanMerger is extended to merge non-correlated non-grouping aggregate subplans that differ only in their WHERE filter conditions.

Filter merging follows the same recursive plan-matching logic as the rest of PlanMerger and handles three cases:

  • (np: Filter, cp: Filter) with different conditions: both conditions are aliased as boolean attributes in a Project, a merged Filter(OR(f0, f1)) is introduced, and the aliases are propagated up to the enclosing Aggregate where each side's expressions receive a FILTER (WHERE ...) clause.
  • (np: Filter, cp) or (np, cp: Filter): only one side has a filter; the condition is exposed as a Project attribute and propagated up so only that side's aggregate expressions receive a FILTER clause.
  • Equal filter conditions pass through unchanged.

When plans also differ in intermediate Project expressions above a Filter, those expressions are wrapped with If(filterAttr, expr, null) to avoid computing them for rows that do not match that side's filter condition.

Example

// Input plans
Aggregate [sum(a) AS sum_a]         Aggregate [max(d) AS max_d]
+- Filter (a < 1)                   +- Project [udf(a) AS d]
   +- Scan t                           +- Filter (a > 1)
                                          +- Scan t

// Merged plan
Aggregate [sum(a) FILTER (WHERE f0) AS sum_a, max(d0) FILTER (WHERE f1) AS max_d]
+- Project [a, If(f1, udf(a), null) AS d0, f0, f1]
   +- Filter (f0 OR f1)
      +- Project [a, (a < 1) AS f0, (a > 1) AS f1]
         +- Scan t

Benefit: a single scan of t computes both aggregates, which is typically cheaper than two separate scans.

Drawback (symmetric case only): the merged Filter(f0 OR f1) is less selective than each individual filter, which may reduce IO pruning such as partition or file skipping. On heavily partitioned or file-pruned tables the extra IO can outweigh the scan-deduplication benefit. The asymmetric case ((np: Filter, cp)) is always beneficial because the unfiltered side would have read all the data anyway.

Configs:

  • spark.sql.planMerge.filterPropagation.enabled (internal, default true): master switch; disabling it turns off all filter-based merging.
  • spark.sql.planMerge.symmetricFilterPropagation.enabled (default true): controls the symmetric (Filter, Filter) case specifically, so users on IO-pruning-sensitive workloads can disable only that path while keeping the always-beneficial asymmetric merging.

MergeResult.outputMap is changed from AttributeMap[Attribute] to AttributeMap[Int], mapping each input plan attribute to its positional index in the merged output. Positional indices remain stable across subsequent PlanMerger.merge calls (outputs are only ever appended), whereas retained Attribute values can become stale when filter merging replaces expressions with new aliases. This also simplifies the two call sites in MergeSubplans.

Why are the changes needed?

Computing aggregates over the same table with different WHERE clauses is a common analytical pattern (e.g. conditional sums or counts for different predicates). Without this change each subquery forces a separate full scan; merging them reduces scan count and overall query cost.

Does this PR introduce any user-facing change?

Yes. A new config spark.sql.planMerge.symmetricFilterPropagation.enabled (default true) is added. The optimization is otherwise transparent: queries produce the same results, and both configs can be set to false to restore the previous behavior.

How was this patch tested?

New unit tests in MergeSubplansSuite and new end-to-end tests in PlanMergeSuite covering the basic two-subplan cases, three-subplan merging, disabled configs, grouping aggregates (not merged), asymmetric filters, stacked filters, and reversed filter ordering.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Sonnet 4.6

@peter-toth peter-toth force-pushed the SPARK-40193-merge-filters-2 branch 2 times, most recently from 99cdfb1 to 219b19f Compare April 12, 2026 18:20
### What changes were proposed in this pull request?

`PlanMerger` is extended to merge non-correlated non-grouping aggregate subplans
that differ only in their `WHERE` filter conditions.

Filter merging follows the same recursive plan-matching logic as the rest of
`PlanMerger` and handles three cases:

- `(np: Filter, cp: Filter)` with different conditions: both conditions are
  aliased as boolean attributes in a `Project`, a merged `Filter(OR(f0, f1))`
  is introduced, and the aliases are propagated up to the enclosing `Aggregate`
  where each side's expressions receive a `FILTER (WHERE ...)` clause.
- `(np: Filter, cp)` or `(np, cp: Filter)`: only one side has a filter; the
  condition is exposed as a `Project` attribute and propagated up so only that
  side's aggregate expressions receive a `FILTER` clause.
- Equal filter conditions pass through unchanged.

When plans also differ in intermediate `Project` expressions above a `Filter`,
those expressions are wrapped with `If(filterAttr, expr, null)` to avoid
computing them for rows that do not match that side's filter condition. Plain
attribute references are never wrapped since reading a column value is free.

**Example**

```
// Input plans
Aggregate [sum(a) AS sum_a]         Aggregate [max(d) AS max_d]
+- Filter (a < 1)                   +- Project [udf(a) AS d]
   +- Scan t                           +- Filter (a > 1)
                                          +- Scan t

// Merged plan
Aggregate [sum(a) FILTER f0 AS sum_a, max(d0) FILTER f1 AS max_d]
+- Project [a, If(f1, udf(a), null) AS d0, f0, f1]
   +- Filter (f0 OR f1)
      +- Project [a, (a < 1) AS f0, (a > 1) AS f1]
         +- Scan t
```

**Benefit**: a single scan of `t` computes both aggregates, which is typically
cheaper than two separate scans. The `If` wrapping ensures `udf(a)` is only
evaluated for rows that match `a > 1`.

**Drawback** (symmetric case only): the merged `Filter(f0 OR f1)` is less
selective than each individual filter, which may reduce IO pruning such as
partition or file skipping. On heavily partitioned or file-pruned tables the
extra IO can outweigh the scan-deduplication benefit. The asymmetric case
(`(np: Filter, cp)`) is always beneficial because the unfiltered side would
have read all the data anyway.

**Configs**:

- `spark.sql.planMerge.filterPropagation.enabled` (internal, default `true`):
  master switch; disabling it turns off all filter-based merging.
- `spark.sql.planMerge.symmetricFilterPropagation.enabled` (default `true`):
  controls the symmetric `(Filter, Filter)` case specifically, so users on
  IO-pruning-sensitive workloads can disable only that path while keeping the
  always-beneficial asymmetric merging.

`MergeResult.outputMap` is changed from `AttributeMap[Attribute]` to
`AttributeMap[Int]`, mapping each input plan attribute to its positional index
in the merged output. Positional indices remain stable across subsequent
`PlanMerger.merge` calls (outputs are only ever appended), whereas retained
`Attribute` values can become stale when filter merging replaces expressions
with new aliases. This also simplifies the two call sites in `MergeSubplans`.

### Why are the changes needed?

Computing aggregates over the same table with different `WHERE` clauses is a
common analytical pattern (e.g. conditional sums or counts for different
predicates). Without this change each subquery forces a separate full scan;
merging them reduces scan count and overall query cost.

### Does this PR introduce _any_ user-facing change?

Yes. A new config `spark.sql.planMerge.symmetricFilterPropagation.enabled`
(default `true`) is added. The optimization is otherwise transparent: queries
produce the same results, and both configs can be set to `false` to restore
the previous behavior.

### How was this patch tested?

New unit tests in `MergeSubplansSuite` and new end-to-end tests in
`PlanMergeSuite` covering the basic two-subplan cases, three-subplan merging,
disabled configs, grouping aggregates (not merged), asymmetric filters, stacked
filters, and reversed filter ordering.

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Sonnet 4.6 (Anthropic)
@peter-toth peter-toth force-pushed the SPARK-40193-merge-filters-2 branch from 219b19f to 66f7683 Compare April 12, 2026 18:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant