Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,7 @@ object MergeSubplans extends Rule[LogicalPlan] {

levelFromSubqueries = levelFromSubqueries.max(level + 1)

val mergedOutput = mergeResult.outputMap(planWithReferences.output.head)
val outputIndex =
mergeResult.mergedPlan.plan.output.indexWhere(_.exprId == mergedOutput.exprId)
val outputIndex = mergeResult.outputMap(planWithReferences.output.head)
ScalarSubqueryReference(
level,
mergeResult.mergedPlanIndex,
Expand All @@ -259,9 +257,7 @@ object MergeSubplans extends Rule[LogicalPlan] {

val mergeResult = getPlanMerger(planMergers, level).merge(aggregateWithReferences, false)

val mergedOutput = aggregateWithReferences.output.map(mergeResult.outputMap)
val outputIndices =
mergedOutput.map(a => mergeResult.mergedPlan.plan.output.indexWhere(_.exprId == a.exprId))
val outputIndices = aggregateWithReferences.output.map(mergeResult.outputMap)
val aggregateReference = NonGroupingAggregateReference(
level,
mergeResult.mergedPlanIndex,
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -6487,6 +6487,31 @@ object SQLConf {
.booleanConf
.createOptional

val PLAN_MERGE_FILTER_PROPAGATION_ENABLED =
buildConf("spark.sql.planMerge.filterPropagation.enabled")
.internal()
.doc("When set to true, subquery plans that differ only in their filter conditions can " +
"be merged by propagating filters up to enclosing non-grouping aggregates.")
.version("4.2.0")
.withBindingPolicy(ConfigBindingPolicy.SESSION)
.booleanConf
.createWithDefault(true)

val PLAN_MERGE_SYMMETRIC_FILTER_PROPAGATION_ENABLED =
buildConf("spark.sql.planMerge.symmetricFilterPropagation.enabled")
.doc("When set to true, two non-grouping aggregate subplans that both have filter " +
"conditions (but with different predicates) can be merged into a single scan using " +
"FILTER (WHERE ...) clauses on each aggregate expression. " +
"Merging two filtered scans broadens the combined filter to OR(f1, f2), which may " +
"reduce IO pruning (e.g. partition or file skipping) compared to the individual " +
"filters. In most cases the saving from a single scan outweighs this, but on highly " +
"partitioned or file-pruned tables disabling this option can improve performance. " +
s"Has no effect when ${PLAN_MERGE_FILTER_PROPAGATION_ENABLED.key} is false.")
.version("4.2.0")
.withBindingPolicy(ConfigBindingPolicy.SESSION)
.booleanConf
.createWithDefault(true)

val ERROR_MESSAGE_FORMAT = buildConf("spark.sql.error.messageFormat")
.doc("When PRETTY, the error message consists of textual representation of error class, " +
"message and query context. Stack traces are only shown for internal errors " +
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,18 @@ TakeOrderedAndProject [s_store_name1,s_store_id1,d_week_seq1,(sun_sales1 / sun_s
WholeStageCodegen (1)
Filter [d_date_sk,d_week_seq]
Subquery #1
ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(d_week_seq, 42), 335, 8990, 0, 0),bloomFilter,buf]
Exchange #3
ObjectHashAggregate [d_week_seq] [buf,buf]
WholeStageCodegen (1)
Project [d_week_seq]
Filter [d_month_seq,d_week_seq]
ColumnarToRow
InputAdapter
Scan parquet spark_catalog.default.date_dim [d_month_seq,d_week_seq]
WholeStageCodegen (2)
Project [bloomFilter,bloomFilter]
InputAdapter
ObjectHashAggregate [buf,buf] [bloom_filter_agg(xxhash64(d_week_seq, 42), 335, 8990, 0, 0),bloom_filter_agg(xxhash64(d_week_seq, 42), 335, 8990, 0, 0),bloomFilter,bloomFilter,buf,buf]
Exchange #3
ObjectHashAggregate [d_week_seq,propagatedFilter_1,propagatedFilter_2] [buf,buf,buf,buf]
WholeStageCodegen (1)
Project [d_week_seq,d_month_seq]
Filter [d_month_seq,d_week_seq]
ColumnarToRow
InputAdapter
Scan parquet spark_catalog.default.date_dim [d_month_seq,d_week_seq]
ColumnarToRow
InputAdapter
Scan parquet spark_catalog.default.date_dim [d_date_sk,d_week_seq,d_day_name]
Expand Down Expand Up @@ -71,28 +74,19 @@ TakeOrderedAndProject [s_store_name1,s_store_id1,d_week_seq1,(sun_sales1 / sun_s
BroadcastExchange #8
WholeStageCodegen (5)
Filter [d_date_sk,d_week_seq]
Subquery #2
ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(d_week_seq, 42), 335, 8990, 0, 0),bloomFilter,buf]
Exchange #9
ObjectHashAggregate [d_week_seq] [buf,buf]
WholeStageCodegen (1)
Project [d_week_seq]
Filter [d_month_seq,d_week_seq]
ColumnarToRow
InputAdapter
Scan parquet spark_catalog.default.date_dim [d_month_seq,d_week_seq]
ReusedSubquery [mergedValue] #1
ColumnarToRow
InputAdapter
Scan parquet spark_catalog.default.date_dim [d_date_sk,d_week_seq,d_day_name]
InputAdapter
BroadcastExchange #10
BroadcastExchange #9
WholeStageCodegen (7)
Filter [s_store_sk,s_store_id]
ColumnarToRow
InputAdapter
Scan parquet spark_catalog.default.store [s_store_sk,s_store_id]
InputAdapter
BroadcastExchange #11
BroadcastExchange #10
WholeStageCodegen (8)
Project [d_week_seq]
Filter [d_month_seq,d_week_seq]
Expand Down
Loading