Skip to content

[SPARK-56034][SQL][FOLLOWUP] Disable PushDownJoinThroughUnion by default; add non-deterministic guard and move after Join Reorder#55325

Open
cloud-fan wants to merge 2 commits intoapache:masterfrom
cloud-fan:SPARK-56034-followup
Open

[SPARK-56034][SQL][FOLLOWUP] Disable PushDownJoinThroughUnion by default; add non-deterministic guard and move after Join Reorder#55325
cloud-fan wants to merge 2 commits intoapache:masterfrom
cloud-fan:SPARK-56034-followup

Conversation

@cloud-fan
Copy link
Copy Markdown
Contributor

@cloud-fan cloud-fan commented Apr 13, 2026

What changes were proposed in this pull request?

Followup to #54865.

This PR makes four improvements to the PushDownJoinThroughUnion rule:

  1. Disable by default with a new config spark.sql.optimizer.pushDownJoinThroughUnion.enabled (default false) so the rule can be enabled explicitly when desired.
  2. Add a non-deterministic guard: reject right subtrees containing non-deterministic expressions. Duplicating a non-deterministic plan across Union branches changes query semantics (each copy could produce different results).
  3. Move the batch after Join Reorder so that CostBasedJoinReorder can optimize the full join graph before this rule breaks it into per-Union-branch joins.
  4. Pass SQLConf explicitly to the rule (change from object to case class) instead of relying on thread-local SQLConf.get, improving testability.

Why are the changes needed?

Config gate: The optimization is not universally beneficial. Pushing the join into each Union branch duplicates the BroadcastHashJoinExec node across branches. Since UnionExec does not implement CodegenSupport, each branch gets its own WholeStageCodegenExec stage, producing a distinct compiled Java class — even when the join logic is identical (the stage ID embedded in the class name causes cache misses). With SQLPartitioningAwareUnionRDD, a single task processes all branches, so all N generated classes must be compiled and loaded in the same task's JVM. This multiplies compilation time, classloader memory, and JIT work. A config allows easy disabling when this trade-off is unfavorable.

Non-deterministic guard: The existing guard only excludes subqueries (for DeduplicateRelations correctness), but duplicating any non-deterministic expression (e.g., rand(), monotonically_increasing_id()) changes semantics — a strictly broader and more correct check.

Batch ordering: Running before Join Reorder prevents cost-based reordering from seeing the full join graph above unions.

Does this PR introduce any user-facing change?

Yes — PushDownJoinThroughUnion is now disabled by default. Users who want the optimization must set spark.sql.optimizer.pushDownJoinThroughUnion.enabled=true. TPC-DS plan stability golden files are reverted to their pre-SPARK-56034 state since the rule no longer fires by default.

How was this patch tested?

Existing tests in PushDownJoinThroughUnionSuite (catalyst and core) are updated:

  • Catalyst tests construct the rule with a SQLConf instance that has the feature enabled.
  • Core integration tests enable the config via sparkConf override.
  • Added a test verifying non-deterministic right sides (e.g., rand()) are not pushed down.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code 4.6

…g, add non-deterministic guard, move after Join Reorder

Followup to apache#54865.

This PR makes four improvements to the `PushDownJoinThroughUnion` rule:

1. **Add a config gate** `spark.sql.optimizer.pushDownJoinThroughUnion.enabled` (default `false`) so the rule can be evaluated and ramped up gradually.
2. **Add a non-deterministic guard**: reject right subtrees containing non-deterministic expressions. Duplicating a non-deterministic plan across Union branches changes query semantics (each copy could produce different results).
3. **Move the batch after Join Reorder** so that `CostBasedJoinReorder` can optimize the full join graph before this rule breaks it into per-Union-branch joins.
4. **Pass SQLConf explicitly** to the rule (change from `object` to `case class`) instead of relying on thread-local `SQLConf.get`, improving testability.
@cloud-fan
Copy link
Copy Markdown
Contributor Author

@LuciferYang @peter-toth

Copy link
Copy Markdown
Contributor

@LuciferYang LuciferYang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for making this change. @cloud-fan

nit: can we add a test case in PushDownJoinThroughUnionSuite that verifies the non-deterministic guard — e.g., a right side containing rand() should not be pushed down. This prevents regressions on the new guard.

Copy link
Copy Markdown
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, @cloud-fan and @LuciferYang .

cc @yaooqinn , too.

@peter-toth
Copy link
Copy Markdown
Contributor

peter-toth commented Apr 13, 2026

The optimization is not universally beneficial. Even though the broadcast exchange is reused across Union branches, each branch still builds its own hash table, increasing peak memory. A config allows easy disabling when this trade-off is unfavorable.

@cloud-fan, how can this happen? Don't we build the hash table on the driver before broadcast and reuse that relation? Or is this some kind of bug?

@peter-toth
Copy link
Copy Markdown
Contributor

peter-toth commented Apr 13, 2026

Isn't the problem that we add the size of a broadcast relation to the task metric without taking into account whether it is reused or not?
E.g. if we have an union (SQLPartitioningAwareUnionRDD), in which corresponding child partitions are kind of concatenated, and if we have a BroadcastHashJoinExec with the original exhange in the first union child and another BroadcastHashJoinExec with a reused version of original exhange in the second child then we add the size of the same relation to the peak metric 2 times in each task of the union:

streamedPlan.execute().mapPartitions { streamedIter =>
val hashed = broadcastRelation.value.asReadOnlyCopy()
TaskContext.get().taskMetrics().incPeakExecutionMemory(hashed.estimatedSize)
join(streamedIter, hashed, numOutputRows)
}

@cloud-fan
Copy link
Copy Markdown
Contributor Author

@peter-toth Thanks for digging into this! You're right — the peak memory claim in the original description was inaccurate. The broadcast hash table is shared in memory and incPeakExecutionMemory is just a metric accounting issue as you identified.

I've updated the PR description with the actual overhead: since UnionExec doesn't implement CodegenSupport, each Union branch gets its own WholeStageCodegenExec stage, producing a distinct compiled Java class (the stage ID in the class name prevents cache hits). With SQLPartitioningAwareUnionRDD, a single task processes all branches, so all N generated classes must be compiled and loaded in the same JVM — multiplying compilation time, classloader memory, and JIT work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants