[SPARK-56034][SQL][FOLLOWUP] Disable PushDownJoinThroughUnion by default; add non-deterministic guard and move after Join Reorder#55325
Conversation
…g, add non-deterministic guard, move after Join Reorder Followup to apache#54865. This PR makes four improvements to the `PushDownJoinThroughUnion` rule: 1. **Add a config gate** `spark.sql.optimizer.pushDownJoinThroughUnion.enabled` (default `false`) so the rule can be evaluated and ramped up gradually. 2. **Add a non-deterministic guard**: reject right subtrees containing non-deterministic expressions. Duplicating a non-deterministic plan across Union branches changes query semantics (each copy could produce different results). 3. **Move the batch after Join Reorder** so that `CostBasedJoinReorder` can optimize the full join graph before this rule breaks it into per-Union-branch joins. 4. **Pass SQLConf explicitly** to the rule (change from `object` to `case class`) instead of relying on thread-local `SQLConf.get`, improving testability.
LuciferYang
left a comment
There was a problem hiding this comment.
Thank you for making this change. @cloud-fan
nit: can we add a test case in PushDownJoinThroughUnionSuite that verifies the non-deterministic guard — e.g., a right side containing rand() should not be pushed down. This prevents regressions on the new guard.
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Thank you, @cloud-fan and @LuciferYang .
cc @yaooqinn , too.
@cloud-fan, how can this happen? Don't we build the hash table on the driver before broadcast and reuse that relation? Or is this some kind of bug? |
|
Isn't the problem that we add the size of a broadcast relation to the task metric without taking into account whether it is reused or not? |
|
@peter-toth Thanks for digging into this! You're right — the peak memory claim in the original description was inaccurate. The broadcast hash table is shared in memory and I've updated the PR description with the actual overhead: since |
Co-authored-by: Isaac
What changes were proposed in this pull request?
Followup to #54865.
This PR makes four improvements to the
PushDownJoinThroughUnionrule:spark.sql.optimizer.pushDownJoinThroughUnion.enabled(defaultfalse) so the rule can be enabled explicitly when desired.CostBasedJoinReordercan optimize the full join graph before this rule breaks it into per-Union-branch joins.objecttocase class) instead of relying on thread-localSQLConf.get, improving testability.Why are the changes needed?
Config gate: The optimization is not universally beneficial. Pushing the join into each Union branch duplicates the
BroadcastHashJoinExecnode across branches. SinceUnionExecdoes not implementCodegenSupport, each branch gets its ownWholeStageCodegenExecstage, producing a distinct compiled Java class — even when the join logic is identical (the stage ID embedded in the class name causes cache misses). WithSQLPartitioningAwareUnionRDD, a single task processes all branches, so all N generated classes must be compiled and loaded in the same task's JVM. This multiplies compilation time, classloader memory, and JIT work. A config allows easy disabling when this trade-off is unfavorable.Non-deterministic guard: The existing guard only excludes subqueries (for
DeduplicateRelationscorrectness), but duplicating any non-deterministic expression (e.g.,rand(),monotonically_increasing_id()) changes semantics — a strictly broader and more correct check.Batch ordering: Running before Join Reorder prevents cost-based reordering from seeing the full join graph above unions.
Does this PR introduce any user-facing change?
Yes —
PushDownJoinThroughUnionis now disabled by default. Users who want the optimization must setspark.sql.optimizer.pushDownJoinThroughUnion.enabled=true. TPC-DS plan stability golden files are reverted to their pre-SPARK-56034 state since the rule no longer fires by default.How was this patch tested?
Existing tests in
PushDownJoinThroughUnionSuite(catalyst and core) are updated:SQLConfinstance that has the feature enabled.sparkConfoverride.rand()) are not pushed down.Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code 4.6