diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.java index 60b6932b61769..0d1c65f0f2f96 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.java @@ -25,11 +25,13 @@ import org.apache.flink.table.connector.source.ScanTableSource; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.utils.JoinedRowData; +import org.apache.flink.table.expressions.ResolvedExpression; import org.apache.flink.table.factories.Factory; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.utils.LogicalTypeCasts; +import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -154,4 +156,71 @@ public interface SupportsReadingMetadata { default boolean supportsMetadataProjection() { return true; } + + /** + * Whether this source supports filtering on metadata columns. + * + *

When this method returns {@code true}, the planner may call {@link + * #applyMetadataFilters(List)} during optimization with predicates expressed in metadata key + * names (from {@link #listReadableMetadata()}), not SQL column aliases. Sources that do not + * override this method will not receive metadata filter predicates. + * + *

This is independent of {@link SupportsFilterPushDown}, which handles physical column + * predicates. A source can implement both to accept filters on physical and metadata columns. + */ + default boolean supportsMetadataFilterPushDown() { + return false; + } + + /** + * Provides a list of metadata filters in conjunctive form. A source can pick filters and return + * the accepted and remaining filters. Same contract as {@link + * SupportsFilterPushDown#applyFilters(List)}, but for metadata columns. + * + *

The provided filters reference metadata key names (from {@link #listReadableMetadata()}), + * not SQL column aliases. For example, a column declared as {@code msg_offset BIGINT METADATA + * FROM 'offset'} will have its predicate expressed as {@code offset >= 1000}, not {@code + * msg_offset >= 1000}. The planner handles the alias-to-key translation before calling this + * method. + */ + default MetadataFilterResult applyMetadataFilters(List metadataFilters) { + return MetadataFilterResult.of(Collections.emptyList(), metadataFilters); + } + + /** + * Result of a metadata filter push down. Communicates the source's response to the planner + * during optimization. + */ + @PublicEvolving + final class MetadataFilterResult { + private final List acceptedFilters; + private final List remainingFilters; + + private MetadataFilterResult( + List acceptedFilters, + List remainingFilters) { + this.acceptedFilters = acceptedFilters; + this.remainingFilters = remainingFilters; + } + + /** + * Constructs a metadata filter push-down result. + * + * @param acceptedFilters filters consumed by the source (best effort) + * @param remainingFilters filters that a subsequent operation must still apply at runtime + */ + public static MetadataFilterResult of( + List acceptedFilters, + List remainingFilters) { + return new MetadataFilterResult(acceptedFilters, remainingFilters); + } + + public List getAcceptedFilters() { + return acceptedFilters; + } + + public List getRemainingFilters() { + return remainingFilters; + } + } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/MetadataFilterPushDownSpec.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/MetadataFilterPushDownSpec.java new file mode 100644 index 0000000000000..89baea1aa7a6c --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/MetadataFilterPushDownSpec.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.abilities.source; + +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata; +import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata.MetadataFilterResult; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.expressions.resolver.ExpressionResolver; +import org.apache.flink.table.planner.plan.utils.FlinkRexUtil; +import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter; +import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; +import org.apache.flink.table.types.logical.RowType; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName; + +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexNode; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; + +import scala.Option; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Serializes metadata filter predicates and replays them during compiled plan restoration. + * + *

Predicates are stored with a {@code predicateRowType} that already uses metadata key names + * (not SQL aliases). The alias-to-key translation happens once at optimization time, so no + * column-to-key mapping needs to be persisted. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +@JsonTypeName("MetadataFilterPushDown") +public final class MetadataFilterPushDownSpec extends SourceAbilitySpecBase { + + public static final String FIELD_NAME_PREDICATES = "predicates"; + public static final String FIELD_NAME_PREDICATE_ROW_TYPE = "predicateRowType"; + + @JsonProperty(FIELD_NAME_PREDICATES) + private final List predicates; + + /** + * Row type snapshot using metadata key names. Stored because ProjectPushDownSpec may narrow the + * context's row type during restore. + */ + @JsonProperty(FIELD_NAME_PREDICATE_ROW_TYPE) + private final RowType predicateRowType; + + @JsonCreator + public MetadataFilterPushDownSpec( + @JsonProperty(FIELD_NAME_PREDICATES) List predicates, + @JsonProperty(FIELD_NAME_PREDICATE_ROW_TYPE) RowType predicateRowType) { + this.predicates = new ArrayList<>(checkNotNull(predicates)); + this.predicateRowType = checkNotNull(predicateRowType); + } + + public List getPredicates() { + return predicates; + } + + @Override + public void apply(DynamicTableSource tableSource, SourceAbilityContext context) { + // Use stored predicateRowType; context's row type may be narrowed by ProjectPushDownSpec. + MetadataFilterResult result = + applyMetadataFilters(predicates, predicateRowType, tableSource, context); + if (result.getAcceptedFilters().size() != predicates.size()) { + throw new TableException("All metadata predicates should be accepted here."); + } + } + + /** + * Converts RexNode predicates to ResolvedExpressions using the given row type and calls + * applyMetadataFilters on the source. The row type must already use metadata key names. + */ + public static MetadataFilterResult applyMetadataFilters( + List predicates, + RowType metadataKeyRowType, + DynamicTableSource tableSource, + SourceAbilityContext context) { + if (!(tableSource instanceof SupportsReadingMetadata)) { + throw new TableException( + String.format( + "%s does not support SupportsReadingMetadata.", + tableSource.getClass().getName())); + } + + String[] fieldNames = metadataKeyRowType.getFieldNames().toArray(new String[0]); + + RexNodeToExpressionConverter converter = + new RexNodeToExpressionConverter( + new RexBuilder(context.getTypeFactory()), + fieldNames, + context.getFunctionCatalog(), + context.getCatalogManager(), + Option.apply( + context.getTypeFactory().buildRelNodeRowType(metadataKeyRowType))); + + List filters = + predicates.stream() + .map( + p -> { + scala.Option expr = p.accept(converter); + if (expr.isDefined()) { + return expr.get(); + } else { + throw new TableException( + String.format( + "%s can not be converted to Expression for metadata filter push-down.", + p.toString())); + } + }) + .collect(Collectors.toList()); + + ExpressionResolver resolver = + ExpressionResolver.resolverFor( + context.getTableConfig(), + context.getClassLoader(), + name -> Optional.empty(), + context.getFunctionCatalog() + .asLookup( + str -> { + throw new TableException( + "We should not need to lookup any expressions at this point"); + }), + context.getCatalogManager().getDataTypeFactory(), + (sqlExpression, inputRowType, outputType) -> { + throw new TableException( + "SQL expression parsing is not supported at this location."); + }) + .build(); + + return ((SupportsReadingMetadata) tableSource) + .applyMetadataFilters(resolver.resolve(filters)); + } + + @Override + public boolean needAdjustFieldReferenceAfterProjection() { + return true; + } + + @Override + public String getDigests(SourceAbilityContext context) { + final List expressionStrs = new ArrayList<>(); + for (RexNode rexNode : predicates) { + expressionStrs.add( + FlinkRexUtil.getExpressionString( + rexNode, + JavaScalaConversionUtil.toScala(predicateRowType.getFieldNames()))); + } + + return String.format( + "metadataFilter=[%s]", + expressionStrs.stream() + .reduce((l, r) -> String.format("and(%s, %s)", l, r)) + .orElse("")); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + MetadataFilterPushDownSpec that = (MetadataFilterPushDownSpec) o; + return Objects.equals(predicates, that.predicates) + && Objects.equals(predicateRowType, that.predicateRowType); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), predicates, predicateRowType); + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilitySpec.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilitySpec.java index e51328d5e9f25..20e594304a86b 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilitySpec.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilitySpec.java @@ -38,6 +38,7 @@ @JsonSubTypes({ @JsonSubTypes.Type(value = FilterPushDownSpec.class), @JsonSubTypes.Type(value = LimitPushDownSpec.class), + @JsonSubTypes.Type(value = MetadataFilterPushDownSpec.class), @JsonSubTypes.Type(value = PartitionPushDownSpec.class), @JsonSubTypes.Type(value = ProjectPushDownSpec.class), @JsonSubTypes.Type(value = ReadingMetadataSpec.class), diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java index 76e5a9b4a105f..6e07928a522ec 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java @@ -17,23 +17,35 @@ package org.apache.flink.table.planner.plan.rules.logical; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown; +import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata; +import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata.MetadataFilterResult; import org.apache.flink.table.expressions.ResolvedExpression; import org.apache.flink.table.planner.expressions.converter.ExpressionConverter; import org.apache.flink.table.planner.plan.abilities.source.FilterPushDownSpec; +import org.apache.flink.table.planner.plan.abilities.source.MetadataFilterPushDownSpec; import org.apache.flink.table.planner.plan.abilities.source.SourceAbilityContext; import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec; import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.RowType.RowField; import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.plan.RelOptRuleOperand; import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexVisitorImpl; import org.apache.calcite.tools.RelBuilder; +import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import scala.Tuple2; @@ -60,15 +72,8 @@ protected RexNode createRemainingCondition( } /** - * Resolves filters using the underlying sources {@link SupportsFilterPushDown} and creates a - * new {@link TableSourceTable} with the supplied predicates. - * - * @param convertiblePredicates Predicates to resolve - * @param oldTableSourceTable TableSourceTable to copy - * @param scan Underlying table scan to push to - * @param relBuilder Builder to push the scan to - * @return A tuple, constituting of the resolved filters and the newly created {@link - * TableSourceTable} + * Resolves filters via {@link SupportsFilterPushDown} and creates a new {@link + * TableSourceTable}. */ protected Tuple2 resolveFiltersAndCreateTableSourceTable( @@ -102,17 +107,122 @@ protected RexNode createRemainingCondition( return new Tuple2<>(result, newTableSourceTable); } - /** - * Determines wether we can pushdown the filter into the source. we can not push filter twice, - * make sure FilterPushDownSpec has not been assigned as a capability. - * - * @param tableSourceTable Table scan to attempt to push into - * @return Whether we can push or not - */ + /** Whether filter push-down is possible and not already assigned. */ protected boolean canPushdownFilter(TableSourceTable tableSourceTable) { return tableSourceTable != null && tableSourceTable.tableSource() instanceof SupportsFilterPushDown && Arrays.stream(tableSourceTable.abilitySpecs()) .noneMatch(spec -> spec instanceof FilterPushDownSpec); } + + /** Whether metadata filter push-down is possible and not already assigned. */ + protected boolean canPushdownMetadataFilter(TableSourceTable tableSourceTable) { + if (tableSourceTable == null) { + return false; + } + DynamicTableSource source = tableSourceTable.tableSource(); + if (!(source instanceof SupportsReadingMetadata)) { + return false; + } + if (!((SupportsReadingMetadata) source).supportsMetadataFilterPushDown()) { + return false; + } + return Arrays.stream(tableSourceTable.abilitySpecs()) + .noneMatch(spec -> spec instanceof MetadataFilterPushDownSpec); + } + + /** + * True if predicate references metadata columns exclusively (no physical columns). + * + *

A predicate like {@code OR(physical_pred, metadata_pred)} returns false because it + * references both physical and metadata columns. Mixed predicates remain as runtime filters. + */ + protected boolean referencesOnlyMetadataColumns(RexNode predicate, int physicalColumnCount) { + boolean[] saw = new boolean[2]; // [0] = sawPhysical, [1] = sawMetadata + predicate.accept( + new RexVisitorImpl(true) { + @Override + public Void visitInputRef(RexInputRef inputRef) { + if (inputRef.getIndex() >= physicalColumnCount) { + saw[1] = true; + } else { + saw[0] = true; + } + return null; + } + }); + return saw[1] && !saw[0]; + } + + /** Number of physical columns in the scan's schema. */ + protected int getPhysicalColumnCount(TableSourceTable tableSourceTable) { + ResolvedSchema schema = tableSourceTable.contextResolvedTable().getResolvedSchema(); + return (int) schema.getColumns().stream().filter(Column::isPhysical).count(); + } + + /** Maps SQL column names to metadata keys for metadata columns. */ + protected Map buildColumnToMetadataKeyMap(TableSourceTable tableSourceTable) { + ResolvedSchema schema = tableSourceTable.contextResolvedTable().getResolvedSchema(); + Map mapping = new HashMap<>(); + for (Column col : schema.getColumns()) { + if (col instanceof Column.MetadataColumn) { + Column.MetadataColumn metaCol = (Column.MetadataColumn) col; + String sqlName = metaCol.getName(); + String metadataKey = metaCol.getMetadataKey().orElse(sqlName); + mapping.put(sqlName, metadataKey); + } + } + return mapping; + } + + /** Resolves metadata filters and creates a new {@link TableSourceTable}. */ + protected Tuple2 + resolveMetadataFiltersAndCreateTableSourceTable( + RexNode[] metadataPredicates, + TableSourceTable oldTableSourceTable, + TableScan scan, + RelBuilder relBuilder) { + DynamicTableSource newTableSource = oldTableSourceTable.tableSource().copy(); + Map columnToMetadataKey = buildColumnToMetadataKeyMap(oldTableSourceTable); + SourceAbilityContext abilityContext = SourceAbilityContext.from(scan); + + // Pre-translate the RowType to use metadata key names instead of SQL aliases. + RowType sourceRowType = abilityContext.getSourceRowType(); + RowType translatedRowType = translateRowType(sourceRowType, columnToMetadataKey); + + MetadataFilterResult result = + MetadataFilterPushDownSpec.applyMetadataFilters( + Arrays.asList(metadataPredicates), + translatedRowType, + newTableSource, + abilityContext); + + int acceptedCount = result.getAcceptedFilters().size(); + List acceptedOriginalPredicates = new ArrayList<>(); + for (int i = 0; i < acceptedCount; i++) { + acceptedOriginalPredicates.add(metadataPredicates[i]); + } + MetadataFilterPushDownSpec metadataSpec = + new MetadataFilterPushDownSpec(acceptedOriginalPredicates, translatedRowType); + + TableSourceTable newTableSourceTable = + oldTableSourceTable.copy( + newTableSource, + oldTableSourceTable.getStatistic(), + new SourceAbilitySpec[] {metadataSpec}); + + return new Tuple2<>(result, newTableSourceTable); + } + + /** Replaces SQL alias names with metadata key names in the RowType. */ + private RowType translateRowType( + RowType sourceRowType, Map columnToMetadataKey) { + List translatedFields = new ArrayList<>(); + for (int i = 0; i < sourceRowType.getFieldCount(); i++) { + String originalName = sourceRowType.getFieldNames().get(i); + String translatedName = columnToMetadataKey.getOrDefault(originalName, originalName); + translatedFields.add(new RowField(translatedName, sourceRowType.getTypeAt(i))); + } + return new RowType(false, translatedFields); + } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java index eaa6999cdd7fe..57d80b1b2f94c 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java @@ -19,7 +19,8 @@ package org.apache.flink.table.planner.plan.rules.logical; import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown; -import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase; +import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata; +import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata.MetadataFilterResult; import org.apache.flink.table.planner.plan.schema.TableSourceTable; import org.apache.flink.table.planner.plan.utils.FlinkRexUtil; @@ -29,11 +30,15 @@ import org.apache.calcite.rex.RexNode; import org.apache.calcite.tools.RelBuilder; +import java.util.ArrayList; +import java.util.List; + import scala.Tuple2; /** - * Planner rule that tries to push a filter into a {@link LogicalTableScan}, which table is a {@link - * TableSourceTable}. And the table source in the table is a {@link SupportsFilterPushDown}. + * Pushes filters from a {@link Filter} into a {@link LogicalTableScan}. Physical filters use {@link + * SupportsFilterPushDown}; metadata filters use {@link + * SupportsReadingMetadata#applyMetadataFilters}. */ public class PushFilterIntoTableSourceScanRule extends PushFilterIntoSourceScanRuleBase { public static final PushFilterIntoTableSourceScanRule INSTANCE = @@ -59,7 +64,7 @@ public boolean matches(RelOptRuleCall call) { LogicalTableScan scan = call.rel(1); TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class); - return canPushdownFilter(tableSourceTable); + return canPushdownFilter(tableSourceTable) || canPushdownMetadataFilter(tableSourceTable); } @Override @@ -74,7 +79,7 @@ private void pushFilterIntoScan( RelOptRuleCall call, Filter filter, LogicalTableScan scan, - FlinkPreparingTableBase relOptTable) { + TableSourceTable tableSourceTable) { RelBuilder relBuilder = call.builder(); Tuple2 extractedPredicates = @@ -87,28 +92,77 @@ private void pushFilterIntoScan( RexNode[] convertiblePredicates = extractedPredicates._1; RexNode[] unconvertedPredicates = extractedPredicates._2; if (convertiblePredicates.length == 0) { - // no condition can be translated to expression return; } - Tuple2 scanAfterPushdownWithResult = - resolveFiltersAndCreateTableSourceTable( - convertiblePredicates, - relOptTable.unwrap(TableSourceTable.class), - scan, - relBuilder); + boolean supportsPhysicalFilter = canPushdownFilter(tableSourceTable); + boolean supportsMetadataFilter = canPushdownMetadataFilter(tableSourceTable); + int physicalColumnCount = getPhysicalColumnCount(tableSourceTable); + + // Classify predicates: only separate metadata predicates when the source + // actually supports metadata filter push-down. Otherwise, all predicates + // go through the physical path to preserve the FilterPushDownSpec guard + // that prevents rule re-firing and maintains scan reuse invariants. + List physicalPredicates = new ArrayList<>(); + List metadataPredicates = new ArrayList<>(); + for (RexNode predicate : convertiblePredicates) { + if (supportsMetadataFilter + && referencesOnlyMetadataColumns(predicate, physicalColumnCount)) { + metadataPredicates.add(predicate); + } else { + physicalPredicates.add(predicate); + } + } + + List allRemainingRexNodes = new ArrayList<>(); + TableSourceTable currentTable = tableSourceTable; + + if (!physicalPredicates.isEmpty() && supportsPhysicalFilter) { + Tuple2 physicalResult = + resolveFiltersAndCreateTableSourceTable( + physicalPredicates.toArray(new RexNode[0]), + currentTable, + scan, + relBuilder); + currentTable = physicalResult._2; + List physicalRemaining = + convertExpressionToRexNode(physicalResult._1.getRemainingFilters(), relBuilder); + allRemainingRexNodes.addAll(physicalRemaining); + } else { + allRemainingRexNodes.addAll(physicalPredicates); + } - SupportsFilterPushDown.Result result = scanAfterPushdownWithResult._1; - TableSourceTable tableSourceTable = scanAfterPushdownWithResult._2; + if (!metadataPredicates.isEmpty()) { + Tuple2 metadataResult = + resolveMetadataFiltersAndCreateTableSourceTable( + metadataPredicates.toArray(new RexNode[0]), + currentTable, + scan, + relBuilder); + currentTable = metadataResult._2; + // Remaining (rejected) metadata predicates stay as a LogicalFilter above + // the scan so they are still evaluated at runtime. We use the original + // RexNodes (suffix) because the remaining ResolvedExpressions use metadata + // key names, not SQL aliases needed by the Filter's row type. The + // validation in resolveMetadataFiltersAndCreateTableSourceTable ensures + // the partition invariant (accepted prefix + remaining suffix = input). + int acceptedCount = metadataResult._1.getAcceptedFilters().size(); + for (int i = acceptedCount; i < metadataPredicates.size(); i++) { + allRemainingRexNodes.add(metadataPredicates.get(i)); + } + } + + for (RexNode unconverted : unconvertedPredicates) { + allRemainingRexNodes.add(unconverted); + } LogicalTableScan newScan = - LogicalTableScan.create(scan.getCluster(), tableSourceTable, scan.getHints()); - if (result.getRemainingFilters().isEmpty() && unconvertedPredicates.length == 0) { + LogicalTableScan.create(scan.getCluster(), currentTable, scan.getHints()); + + if (allRemainingRexNodes.isEmpty()) { call.transformTo(newScan); } else { - RexNode remainingCondition = - createRemainingCondition( - relBuilder, result.getRemainingFilters(), unconvertedPredicates); + RexNode remainingCondition = relBuilder.and(allRemainingRexNodes); RexNode simplifiedRemainingCondition = FlinkRexUtil.simplify( relBuilder.getRexBuilder(), diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java index 6fd5942f19e43..61f545538674b 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java @@ -42,6 +42,7 @@ import org.apache.flink.table.planner.factories.TestValuesTableFactory; import org.apache.flink.table.planner.plan.abilities.source.FilterPushDownSpec; import org.apache.flink.table.planner.plan.abilities.source.LimitPushDownSpec; +import org.apache.flink.table.planner.plan.abilities.source.MetadataFilterPushDownSpec; import org.apache.flink.table.planner.plan.abilities.source.PartitionPushDownSpec; import org.apache.flink.table.planner.plan.abilities.source.ProjectPushDownSpec; import org.apache.flink.table.planner.plan.abilities.source.ReadingMetadataSpec; @@ -385,6 +386,52 @@ void testDynamicTableSourceSpecSerdeWithEnrichmentOptions() throws Exception { .isEqualTo(","); } + @Test + void testMetadataFilterPushDownSpecSerde() throws IOException { + FlinkTypeFactory factory = + new FlinkTypeFactory( + Thread.currentThread().getContextClassLoader(), FlinkTypeSystem.INSTANCE); + RexBuilder rexBuilder = new RexBuilder(factory); + + // RowType uses metadata key names (already translated from SQL aliases) + RowType predicateRowType = + RowType.of( + new LogicalType[] {new TimestampType(3), new BigIntType()}, + new String[] {"timestamp", "offset"}); + + MetadataFilterPushDownSpec original = + new MetadataFilterPushDownSpec( + Arrays.asList( + // timestamp > '2024-01-01' + rexBuilder.makeCall( + SqlStdOperatorTable.GREATER_THAN, + rexBuilder.makeInputRef( + factory.createSqlType(SqlTypeName.TIMESTAMP, 3), 0), + rexBuilder.makeTimestampLiteral( + new org.apache.calcite.util.TimestampString( + "2024-01-01 00:00:00"), + 3)), + // offset >= 10 + rexBuilder.makeCall( + SqlStdOperatorTable.GREATER_THAN_OR_EQUAL, + rexBuilder.makeInputRef( + factory.createSqlType(SqlTypeName.BIGINT), 1), + rexBuilder.makeExactLiteral(new BigDecimal(10)))), + predicateRowType); + + PlannerMocks plannerMocks = PlannerMocks.create(); + SerdeContext serdeCtx = + configuredSerdeContext( + plannerMocks.getCatalogManager(), plannerMocks.getTableConfig()); + + String json = toJson(serdeCtx, original); + MetadataFilterPushDownSpec deserialized = + toObject(serdeCtx, json, MetadataFilterPushDownSpec.class); + + assertThat(deserialized.getPredicates()).isEqualTo(original.getPredicates()); + assertThat(deserialized).isEqualTo(original); + } + static ResolvedCatalogTable tableWithOnlyPhysicalColumns(Map options) { ResolvedSchema resolvedSchema = new ResolvedSchema( diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterInReadingMetadataTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterInReadingMetadataTest.java new file mode 100644 index 0000000000000..9d6e5e80801dd --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterInReadingMetadataTest.java @@ -0,0 +1,383 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableDescriptor; +import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown; +import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata; +import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata.MetadataFilterResult; +import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.planner.calcite.CalciteConfig; +import org.apache.flink.table.planner.factories.TableFactoryHarness; +import org.apache.flink.table.planner.plan.optimize.program.BatchOptimizeContext; +import org.apache.flink.table.planner.plan.optimize.program.FlinkBatchProgram; +import org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgramBuilder; +import org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE; +import org.apache.flink.table.planner.utils.BatchTableTestUtil; +import org.apache.flink.table.planner.utils.TableConfigUtils; +import org.apache.flink.table.planner.utils.TableTestBase; +import org.apache.flink.table.types.DataType; +import org.apache.flink.testutils.junit.SharedObjectsExtension; +import org.apache.flink.testutils.junit.SharedReference; + +import org.apache.calcite.plan.hep.HepMatchOrder; +import org.apache.calcite.rel.rules.CoreRules; +import org.apache.calcite.tools.RuleSets; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.table.api.DataTypes.INT; +import static org.apache.flink.table.api.DataTypes.STRING; +import static org.apache.flink.table.api.DataTypes.TIMESTAMP; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for metadata filter push-down through {@link SupportsReadingMetadata}. */ +class MetadataFilterInReadingMetadataTest extends TableTestBase { + + @RegisterExtension + private final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create(); + + private BatchTableTestUtil util; + + @BeforeEach + void setup() { + util = batchTestUtil(TableConfig.getDefault()); + util.buildBatchProgram(FlinkBatchProgram.DEFAULT_REWRITE()); + CalciteConfig calciteConfig = + TableConfigUtils.getCalciteConfig(util.tableEnv().getConfig()); + calciteConfig + .getBatchProgram() + .get() + .addLast( + "rules", + FlinkHepRuleSetProgramBuilder.newBuilder() + .setHepRulesExecutionType( + HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION()) + .setHepMatchOrder(HepMatchOrder.BOTTOM_UP) + .add( + RuleSets.ofList( + PushFilterIntoTableSourceScanRule.INSTANCE, + CoreRules.FILTER_PROJECT_TRANSPOSE)) + .build()); + } + + @Test + void testMetadataFilterPushDown() { + SharedReference> receivedFilters = + sharedObjects.add(new ArrayList<>()); + TableDescriptor descriptor = + TableFactoryHarness.newBuilder() + .schema(MetadataFilterSource.SCHEMA) + .source(new MetadataFilterSource(true, receivedFilters)) + .build(); + util.tableEnv().createTable("T1", descriptor); + + util.verifyRelPlan("SELECT id FROM T1 WHERE event_time > TIMESTAMP '2024-01-01 00:00:00'"); + + assertThat(receivedFilters.get()).isNotEmpty(); + } + + @Test + void testMetadataFilterNotPushedWhenNotSupported() { + SharedReference> receivedFilters = + sharedObjects.add(new ArrayList<>()); + TableDescriptor descriptor = + TableFactoryHarness.newBuilder() + .schema(MetadataFilterSource.SCHEMA) + .source(new MetadataFilterSource(false, receivedFilters)) + .build(); + util.tableEnv().createTable("T2", descriptor); + + util.verifyRelPlan("SELECT id FROM T2 WHERE event_time > TIMESTAMP '2024-01-01 00:00:00'"); + + // No metadata filters should have been pushed + assertThat(receivedFilters.get()).isEmpty(); + } + + @Test + void testAliasedMetadataColumnFilter() { + SharedReference> receivedFilters = + sharedObjects.add(new ArrayList<>()); + TableDescriptor descriptor = + TableFactoryHarness.newBuilder() + .schema(RenamedMetadataFilterSource.SCHEMA) + .source(new RenamedMetadataFilterSource(receivedFilters)) + .build(); + util.tableEnv().createTable("T3", descriptor); + + // 'event_ts' is the SQL alias for metadata key 'timestamp' + util.verifyRelPlan("SELECT id FROM T3 WHERE event_ts > TIMESTAMP '2024-01-01 00:00:00'"); + + // The source should receive the filter with metadata key 'timestamp', not 'event_ts' + assertThat(receivedFilters.get()).isNotEmpty(); + assertThat(receivedFilters.get().get(0).toString()).contains("timestamp"); + } + + @Test + void testMixedPhysicalAndMetadataFilters() { + SharedReference> metadataFilters = + sharedObjects.add(new ArrayList<>()); + SharedReference> physicalFilters = + sharedObjects.add(new ArrayList<>()); + TableDescriptor descriptor = + TableFactoryHarness.newBuilder() + .schema(MixedFilterSource.SCHEMA) + .source(new MixedFilterSource(metadataFilters, physicalFilters)) + .build(); + util.tableEnv().createTable("T4", descriptor); + + util.verifyRelPlan( + "SELECT id FROM T4 WHERE id > 10 AND event_time > TIMESTAMP '2024-01-01 00:00:00'"); + + // Both paths should receive their respective filters + assertThat(physicalFilters.get()).isNotEmpty(); + assertThat(metadataFilters.get()).isNotEmpty(); + } + + @Test + void testPartialMetadataFilterAcceptance() { + SharedReference> receivedFilters = + sharedObjects.add(new ArrayList<>()); + TableDescriptor descriptor = + TableFactoryHarness.newBuilder() + .schema(PartialMetadataFilterSource.SCHEMA) + .source(new PartialMetadataFilterSource(receivedFilters)) + .build(); + util.tableEnv().createTable("T6", descriptor); + + // Two metadata filters: the source accepts only the first one + util.verifyRelPlan( + "SELECT id FROM T6 WHERE event_time > TIMESTAMP '2024-01-01 00:00:00'" + + " AND priority > 5"); + + // Source receives both filters but only accepts the first + assertThat(receivedFilters.get()).hasSize(2); + } + + @Test + void testMetadataFilterWithProjection() { + SharedReference> receivedFilters = + sharedObjects.add(new ArrayList<>()); + TableDescriptor descriptor = + TableFactoryHarness.newBuilder() + .schema(MetadataFilterSource.SCHEMA) + .source(new MetadataFilterSource(true, receivedFilters)) + .build(); + util.tableEnv().createTable("T5", descriptor); + + util.verifyRelPlan( + "SELECT id, name FROM T5 WHERE event_time > TIMESTAMP '2024-01-01 00:00:00'"); + + assertThat(receivedFilters.get()).isNotEmpty(); + } + + // ----------------------------------------------------------------------------------------- + // Test sources + // ----------------------------------------------------------------------------------------- + + /** Supports metadata filter push-down. */ + private static class MetadataFilterSource extends TableFactoryHarness.ScanSourceBase + implements SupportsReadingMetadata { + + public static final Schema SCHEMA = + Schema.newBuilder() + .column("id", INT()) + .column("name", STRING()) + .columnByMetadata("event_time", TIMESTAMP(3)) + .build(); + + private final boolean supportsMetadataFilter; + private final SharedReference> receivedMetadataFilters; + + MetadataFilterSource( + boolean supportsMetadataFilter, + SharedReference> receivedMetadataFilters) { + this.supportsMetadataFilter = supportsMetadataFilter; + this.receivedMetadataFilters = receivedMetadataFilters; + } + + @Override + public Map listReadableMetadata() { + Map metadata = new HashMap<>(); + metadata.put("event_time", org.apache.flink.table.api.DataTypes.TIMESTAMP(3)); + return metadata; + } + + @Override + public void applyReadableMetadata(List metadataKeys, DataType producedDataType) {} + + @Override + public boolean supportsMetadataFilterPushDown() { + return supportsMetadataFilter; + } + + @Override + public MetadataFilterResult applyMetadataFilters(List metadataFilters) { + receivedMetadataFilters.get().addAll(metadataFilters); + return MetadataFilterResult.of(metadataFilters, Collections.emptyList()); + } + } + + /** Tests key translation when SQL alias differs from metadata key. */ + private static class RenamedMetadataFilterSource extends TableFactoryHarness.ScanSourceBase + implements SupportsReadingMetadata { + + public static final Schema SCHEMA = + Schema.newBuilder() + .column("id", INT()) + .columnByMetadata("event_ts", TIMESTAMP(3), "timestamp") + .build(); + + private final SharedReference> receivedMetadataFilters; + + RenamedMetadataFilterSource( + SharedReference> receivedMetadataFilters) { + this.receivedMetadataFilters = receivedMetadataFilters; + } + + @Override + public Map listReadableMetadata() { + Map metadata = new HashMap<>(); + metadata.put("timestamp", org.apache.flink.table.api.DataTypes.TIMESTAMP(3)); + return metadata; + } + + @Override + public void applyReadableMetadata(List metadataKeys, DataType producedDataType) {} + + @Override + public boolean supportsMetadataFilterPushDown() { + return true; + } + + @Override + public MetadataFilterResult applyMetadataFilters(List metadataFilters) { + receivedMetadataFilters.get().addAll(metadataFilters); + return MetadataFilterResult.of(metadataFilters, Collections.emptyList()); + } + } + + /** Accepts only the first metadata filter; rejected filters remain in plan. */ + private static class PartialMetadataFilterSource extends TableFactoryHarness.ScanSourceBase + implements SupportsReadingMetadata { + + public static final Schema SCHEMA = + Schema.newBuilder() + .column("id", INT()) + .columnByMetadata("event_time", TIMESTAMP(3)) + .columnByMetadata("priority", INT()) + .build(); + + private final SharedReference> receivedMetadataFilters; + + PartialMetadataFilterSource( + SharedReference> receivedMetadataFilters) { + this.receivedMetadataFilters = receivedMetadataFilters; + } + + @Override + public Map listReadableMetadata() { + Map metadata = new HashMap<>(); + metadata.put("event_time", org.apache.flink.table.api.DataTypes.TIMESTAMP(3)); + metadata.put("priority", org.apache.flink.table.api.DataTypes.INT()); + return metadata; + } + + @Override + public void applyReadableMetadata(List metadataKeys, DataType producedDataType) {} + + @Override + public boolean supportsMetadataFilterPushDown() { + return true; + } + + @Override + public MetadataFilterResult applyMetadataFilters(List metadataFilters) { + receivedMetadataFilters.get().addAll(metadataFilters); + // Accept only the first filter + List accepted = + metadataFilters.isEmpty() + ? Collections.emptyList() + : Collections.singletonList(metadataFilters.get(0)); + List remaining = + metadataFilters.size() > 1 + ? metadataFilters.subList(1, metadataFilters.size()) + : Collections.emptyList(); + return MetadataFilterResult.of(accepted, remaining); + } + } + + /** Tests mixed physical and metadata filter push-down. */ + private static class MixedFilterSource extends TableFactoryHarness.ScanSourceBase + implements SupportsReadingMetadata, SupportsFilterPushDown { + + public static final Schema SCHEMA = + Schema.newBuilder() + .column("id", INT()) + .column("name", STRING()) + .columnByMetadata("event_time", TIMESTAMP(3)) + .build(); + + private final SharedReference> receivedMetadataFilters; + private final SharedReference> receivedPhysicalFilters; + + MixedFilterSource( + SharedReference> receivedMetadataFilters, + SharedReference> receivedPhysicalFilters) { + this.receivedMetadataFilters = receivedMetadataFilters; + this.receivedPhysicalFilters = receivedPhysicalFilters; + } + + @Override + public Map listReadableMetadata() { + Map metadata = new HashMap<>(); + metadata.put("event_time", org.apache.flink.table.api.DataTypes.TIMESTAMP(3)); + return metadata; + } + + @Override + public void applyReadableMetadata(List metadataKeys, DataType producedDataType) {} + + @Override + public boolean supportsMetadataFilterPushDown() { + return true; + } + + @Override + public MetadataFilterResult applyMetadataFilters(List metadataFilters) { + receivedMetadataFilters.get().addAll(metadataFilters); + return MetadataFilterResult.of(metadataFilters, Collections.emptyList()); + } + + @Override + public Result applyFilters(List filters) { + receivedPhysicalFilters.get().addAll(filters); + return Result.of(filters, Collections.emptyList()); + } + } +} diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterInReadingMetadataTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterInReadingMetadataTest.xml new file mode 100644 index 0000000000000..1a2c2f0a32ee5 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterInReadingMetadataTest.xml @@ -0,0 +1,141 @@ + + + + + + TIMESTAMP '2024-01-01 00:00:00']]> + + + ($1, 2024-01-01 00:00:00)]) + +- LogicalProject(id=[$0], event_ts=[$1]) + +- LogicalTableScan(table=[[default_catalog, default_database, T3, metadata=[timestamp]]]) +]]> + + + (timestamp, 2024-01-01 00:00:00)]]]) +]]> + + + + + TIMESTAMP '2024-01-01 00:00:00']]> + + + ($2, 2024-01-01 00:00:00)]) + +- LogicalProject(id=[$0], name=[$1], event_time=[$2]) + +- LogicalTableScan(table=[[default_catalog, default_database, T2, metadata=[event_time]]]) +]]> + + + ($2, 2024-01-01 00:00:00)]) + +- LogicalTableScan(table=[[default_catalog, default_database, T2, metadata=[event_time]]]) +]]> + + + + + TIMESTAMP '2024-01-01 00:00:00']]> + + + ($2, 2024-01-01 00:00:00)]) + +- LogicalProject(id=[$0], name=[$1], event_time=[$2]) + +- LogicalTableScan(table=[[default_catalog, default_database, T1, metadata=[event_time]]]) +]]> + + + (event_time, 2024-01-01 00:00:00)]]]) +]]> + + + + + TIMESTAMP '2024-01-01 00:00:00']]> + + + ($2, 2024-01-01 00:00:00)]) + +- LogicalProject(id=[$0], name=[$1], event_time=[$2]) + +- LogicalTableScan(table=[[default_catalog, default_database, T5, metadata=[event_time]]]) +]]> + + + (event_time, 2024-01-01 00:00:00)]]]) +]]> + + + + + 10 AND event_time > TIMESTAMP '2024-01-01 00:00:00']]> + + + ($0, 10), >($2, 2024-01-01 00:00:00))]) + +- LogicalProject(id=[$0], name=[$1], event_time=[$2]) + +- LogicalTableScan(table=[[default_catalog, default_database, T4, metadata=[event_time]]]) +]]> + + + (id, 10)], metadataFilter=[>(event_time, 2024-01-01 00:00:00)]]]) +]]> + + + + + TIMESTAMP '2024-01-01 00:00:00' AND priority > 5]]> + + + ($1, 2024-01-01 00:00:00), >($2, 5))]) + +- LogicalProject(id=[$0], event_time=[$2], priority=[$1]) + +- LogicalTableScan(table=[[default_catalog, default_database, T6, metadata=[priority, event_time]]]) +]]> + + + ($1, 5)]) + +- LogicalTableScan(table=[[default_catalog, default_database, T6, metadata=[priority, event_time], metadataFilter=[>(event_time, 2024-01-01 00:00:00)]]]) +]]> + + +