diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.java
index 60b6932b61769..0d1c65f0f2f96 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.java
@@ -25,11 +25,13 @@
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.utils.LogicalTypeCasts;
+import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -154,4 +156,71 @@ public interface SupportsReadingMetadata {
default boolean supportsMetadataProjection() {
return true;
}
+
+ /**
+ * Whether this source supports filtering on metadata columns.
+ *
+ *
When this method returns {@code true}, the planner may call {@link
+ * #applyMetadataFilters(List)} during optimization with predicates expressed in metadata key
+ * names (from {@link #listReadableMetadata()}), not SQL column aliases. Sources that do not
+ * override this method will not receive metadata filter predicates.
+ *
+ *
This is independent of {@link SupportsFilterPushDown}, which handles physical column
+ * predicates. A source can implement both to accept filters on physical and metadata columns.
+ */
+ default boolean supportsMetadataFilterPushDown() {
+ return false;
+ }
+
+ /**
+ * Provides a list of metadata filters in conjunctive form. A source can pick filters and return
+ * the accepted and remaining filters. Same contract as {@link
+ * SupportsFilterPushDown#applyFilters(List)}, but for metadata columns.
+ *
+ *
The provided filters reference metadata key names (from {@link #listReadableMetadata()}),
+ * not SQL column aliases. For example, a column declared as {@code msg_offset BIGINT METADATA
+ * FROM 'offset'} will have its predicate expressed as {@code offset >= 1000}, not {@code
+ * msg_offset >= 1000}. The planner handles the alias-to-key translation before calling this
+ * method.
+ */
+ default MetadataFilterResult applyMetadataFilters(List metadataFilters) {
+ return MetadataFilterResult.of(Collections.emptyList(), metadataFilters);
+ }
+
+ /**
+ * Result of a metadata filter push down. Communicates the source's response to the planner
+ * during optimization.
+ */
+ @PublicEvolving
+ final class MetadataFilterResult {
+ private final List acceptedFilters;
+ private final List remainingFilters;
+
+ private MetadataFilterResult(
+ List acceptedFilters,
+ List remainingFilters) {
+ this.acceptedFilters = acceptedFilters;
+ this.remainingFilters = remainingFilters;
+ }
+
+ /**
+ * Constructs a metadata filter push-down result.
+ *
+ * @param acceptedFilters filters consumed by the source (best effort)
+ * @param remainingFilters filters that a subsequent operation must still apply at runtime
+ */
+ public static MetadataFilterResult of(
+ List acceptedFilters,
+ List remainingFilters) {
+ return new MetadataFilterResult(acceptedFilters, remainingFilters);
+ }
+
+ public List getAcceptedFilters() {
+ return acceptedFilters;
+ }
+
+ public List getRemainingFilters() {
+ return remainingFilters;
+ }
+ }
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/MetadataFilterPushDownSpec.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/MetadataFilterPushDownSpec.java
new file mode 100644
index 0000000000000..89baea1aa7a6c
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/MetadataFilterPushDownSpec.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.abilities.source;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata.MetadataFilterResult;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;
+import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import scala.Option;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Serializes metadata filter predicates and replays them during compiled plan restoration.
+ *
+ * Predicates are stored with a {@code predicateRowType} that already uses metadata key names
+ * (not SQL aliases). The alias-to-key translation happens once at optimization time, so no
+ * column-to-key mapping needs to be persisted.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+@JsonTypeName("MetadataFilterPushDown")
+public final class MetadataFilterPushDownSpec extends SourceAbilitySpecBase {
+
+ public static final String FIELD_NAME_PREDICATES = "predicates";
+ public static final String FIELD_NAME_PREDICATE_ROW_TYPE = "predicateRowType";
+
+ @JsonProperty(FIELD_NAME_PREDICATES)
+ private final List predicates;
+
+ /**
+ * Row type snapshot using metadata key names. Stored because ProjectPushDownSpec may narrow the
+ * context's row type during restore.
+ */
+ @JsonProperty(FIELD_NAME_PREDICATE_ROW_TYPE)
+ private final RowType predicateRowType;
+
+ @JsonCreator
+ public MetadataFilterPushDownSpec(
+ @JsonProperty(FIELD_NAME_PREDICATES) List predicates,
+ @JsonProperty(FIELD_NAME_PREDICATE_ROW_TYPE) RowType predicateRowType) {
+ this.predicates = new ArrayList<>(checkNotNull(predicates));
+ this.predicateRowType = checkNotNull(predicateRowType);
+ }
+
+ public List getPredicates() {
+ return predicates;
+ }
+
+ @Override
+ public void apply(DynamicTableSource tableSource, SourceAbilityContext context) {
+ // Use stored predicateRowType; context's row type may be narrowed by ProjectPushDownSpec.
+ MetadataFilterResult result =
+ applyMetadataFilters(predicates, predicateRowType, tableSource, context);
+ if (result.getAcceptedFilters().size() != predicates.size()) {
+ throw new TableException("All metadata predicates should be accepted here.");
+ }
+ }
+
+ /**
+ * Converts RexNode predicates to ResolvedExpressions using the given row type and calls
+ * applyMetadataFilters on the source. The row type must already use metadata key names.
+ */
+ public static MetadataFilterResult applyMetadataFilters(
+ List predicates,
+ RowType metadataKeyRowType,
+ DynamicTableSource tableSource,
+ SourceAbilityContext context) {
+ if (!(tableSource instanceof SupportsReadingMetadata)) {
+ throw new TableException(
+ String.format(
+ "%s does not support SupportsReadingMetadata.",
+ tableSource.getClass().getName()));
+ }
+
+ String[] fieldNames = metadataKeyRowType.getFieldNames().toArray(new String[0]);
+
+ RexNodeToExpressionConverter converter =
+ new RexNodeToExpressionConverter(
+ new RexBuilder(context.getTypeFactory()),
+ fieldNames,
+ context.getFunctionCatalog(),
+ context.getCatalogManager(),
+ Option.apply(
+ context.getTypeFactory().buildRelNodeRowType(metadataKeyRowType)));
+
+ List filters =
+ predicates.stream()
+ .map(
+ p -> {
+ scala.Option expr = p.accept(converter);
+ if (expr.isDefined()) {
+ return expr.get();
+ } else {
+ throw new TableException(
+ String.format(
+ "%s can not be converted to Expression for metadata filter push-down.",
+ p.toString()));
+ }
+ })
+ .collect(Collectors.toList());
+
+ ExpressionResolver resolver =
+ ExpressionResolver.resolverFor(
+ context.getTableConfig(),
+ context.getClassLoader(),
+ name -> Optional.empty(),
+ context.getFunctionCatalog()
+ .asLookup(
+ str -> {
+ throw new TableException(
+ "We should not need to lookup any expressions at this point");
+ }),
+ context.getCatalogManager().getDataTypeFactory(),
+ (sqlExpression, inputRowType, outputType) -> {
+ throw new TableException(
+ "SQL expression parsing is not supported at this location.");
+ })
+ .build();
+
+ return ((SupportsReadingMetadata) tableSource)
+ .applyMetadataFilters(resolver.resolve(filters));
+ }
+
+ @Override
+ public boolean needAdjustFieldReferenceAfterProjection() {
+ return true;
+ }
+
+ @Override
+ public String getDigests(SourceAbilityContext context) {
+ final List expressionStrs = new ArrayList<>();
+ for (RexNode rexNode : predicates) {
+ expressionStrs.add(
+ FlinkRexUtil.getExpressionString(
+ rexNode,
+ JavaScalaConversionUtil.toScala(predicateRowType.getFieldNames())));
+ }
+
+ return String.format(
+ "metadataFilter=[%s]",
+ expressionStrs.stream()
+ .reduce((l, r) -> String.format("and(%s, %s)", l, r))
+ .orElse(""));
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ MetadataFilterPushDownSpec that = (MetadataFilterPushDownSpec) o;
+ return Objects.equals(predicates, that.predicates)
+ && Objects.equals(predicateRowType, that.predicateRowType);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), predicates, predicateRowType);
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilitySpec.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilitySpec.java
index e51328d5e9f25..20e594304a86b 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilitySpec.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilitySpec.java
@@ -38,6 +38,7 @@
@JsonSubTypes({
@JsonSubTypes.Type(value = FilterPushDownSpec.class),
@JsonSubTypes.Type(value = LimitPushDownSpec.class),
+ @JsonSubTypes.Type(value = MetadataFilterPushDownSpec.class),
@JsonSubTypes.Type(value = PartitionPushDownSpec.class),
@JsonSubTypes.Type(value = ProjectPushDownSpec.class),
@JsonSubTypes.Type(value = ReadingMetadataSpec.class),
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java
index 76e5a9b4a105f..6e07928a522ec 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java
@@ -17,23 +17,35 @@
package org.apache.flink.table.planner.plan.rules.logical;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata.MetadataFilterResult;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.planner.expressions.converter.ExpressionConverter;
import org.apache.flink.table.planner.plan.abilities.source.FilterPushDownSpec;
+import org.apache.flink.table.planner.plan.abilities.source.MetadataFilterPushDownSpec;
import org.apache.flink.table.planner.plan.abilities.source.SourceAbilityContext;
import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec;
import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.RowType.RowField;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleOperand;
import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexVisitorImpl;
import org.apache.calcite.tools.RelBuilder;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
import scala.Tuple2;
@@ -60,15 +72,8 @@ protected RexNode createRemainingCondition(
}
/**
- * Resolves filters using the underlying sources {@link SupportsFilterPushDown} and creates a
- * new {@link TableSourceTable} with the supplied predicates.
- *
- * @param convertiblePredicates Predicates to resolve
- * @param oldTableSourceTable TableSourceTable to copy
- * @param scan Underlying table scan to push to
- * @param relBuilder Builder to push the scan to
- * @return A tuple, constituting of the resolved filters and the newly created {@link
- * TableSourceTable}
+ * Resolves filters via {@link SupportsFilterPushDown} and creates a new {@link
+ * TableSourceTable}.
*/
protected Tuple2
resolveFiltersAndCreateTableSourceTable(
@@ -102,17 +107,122 @@ protected RexNode createRemainingCondition(
return new Tuple2<>(result, newTableSourceTable);
}
- /**
- * Determines wether we can pushdown the filter into the source. we can not push filter twice,
- * make sure FilterPushDownSpec has not been assigned as a capability.
- *
- * @param tableSourceTable Table scan to attempt to push into
- * @return Whether we can push or not
- */
+ /** Whether filter push-down is possible and not already assigned. */
protected boolean canPushdownFilter(TableSourceTable tableSourceTable) {
return tableSourceTable != null
&& tableSourceTable.tableSource() instanceof SupportsFilterPushDown
&& Arrays.stream(tableSourceTable.abilitySpecs())
.noneMatch(spec -> spec instanceof FilterPushDownSpec);
}
+
+ /** Whether metadata filter push-down is possible and not already assigned. */
+ protected boolean canPushdownMetadataFilter(TableSourceTable tableSourceTable) {
+ if (tableSourceTable == null) {
+ return false;
+ }
+ DynamicTableSource source = tableSourceTable.tableSource();
+ if (!(source instanceof SupportsReadingMetadata)) {
+ return false;
+ }
+ if (!((SupportsReadingMetadata) source).supportsMetadataFilterPushDown()) {
+ return false;
+ }
+ return Arrays.stream(tableSourceTable.abilitySpecs())
+ .noneMatch(spec -> spec instanceof MetadataFilterPushDownSpec);
+ }
+
+ /**
+ * True if predicate references metadata columns exclusively (no physical columns).
+ *
+ * A predicate like {@code OR(physical_pred, metadata_pred)} returns false because it
+ * references both physical and metadata columns. Mixed predicates remain as runtime filters.
+ */
+ protected boolean referencesOnlyMetadataColumns(RexNode predicate, int physicalColumnCount) {
+ boolean[] saw = new boolean[2]; // [0] = sawPhysical, [1] = sawMetadata
+ predicate.accept(
+ new RexVisitorImpl(true) {
+ @Override
+ public Void visitInputRef(RexInputRef inputRef) {
+ if (inputRef.getIndex() >= physicalColumnCount) {
+ saw[1] = true;
+ } else {
+ saw[0] = true;
+ }
+ return null;
+ }
+ });
+ return saw[1] && !saw[0];
+ }
+
+ /** Number of physical columns in the scan's schema. */
+ protected int getPhysicalColumnCount(TableSourceTable tableSourceTable) {
+ ResolvedSchema schema = tableSourceTable.contextResolvedTable().getResolvedSchema();
+ return (int) schema.getColumns().stream().filter(Column::isPhysical).count();
+ }
+
+ /** Maps SQL column names to metadata keys for metadata columns. */
+ protected Map buildColumnToMetadataKeyMap(TableSourceTable tableSourceTable) {
+ ResolvedSchema schema = tableSourceTable.contextResolvedTable().getResolvedSchema();
+ Map mapping = new HashMap<>();
+ for (Column col : schema.getColumns()) {
+ if (col instanceof Column.MetadataColumn) {
+ Column.MetadataColumn metaCol = (Column.MetadataColumn) col;
+ String sqlName = metaCol.getName();
+ String metadataKey = metaCol.getMetadataKey().orElse(sqlName);
+ mapping.put(sqlName, metadataKey);
+ }
+ }
+ return mapping;
+ }
+
+ /** Resolves metadata filters and creates a new {@link TableSourceTable}. */
+ protected Tuple2
+ resolveMetadataFiltersAndCreateTableSourceTable(
+ RexNode[] metadataPredicates,
+ TableSourceTable oldTableSourceTable,
+ TableScan scan,
+ RelBuilder relBuilder) {
+ DynamicTableSource newTableSource = oldTableSourceTable.tableSource().copy();
+ Map columnToMetadataKey = buildColumnToMetadataKeyMap(oldTableSourceTable);
+ SourceAbilityContext abilityContext = SourceAbilityContext.from(scan);
+
+ // Pre-translate the RowType to use metadata key names instead of SQL aliases.
+ RowType sourceRowType = abilityContext.getSourceRowType();
+ RowType translatedRowType = translateRowType(sourceRowType, columnToMetadataKey);
+
+ MetadataFilterResult result =
+ MetadataFilterPushDownSpec.applyMetadataFilters(
+ Arrays.asList(metadataPredicates),
+ translatedRowType,
+ newTableSource,
+ abilityContext);
+
+ int acceptedCount = result.getAcceptedFilters().size();
+ List acceptedOriginalPredicates = new ArrayList<>();
+ for (int i = 0; i < acceptedCount; i++) {
+ acceptedOriginalPredicates.add(metadataPredicates[i]);
+ }
+ MetadataFilterPushDownSpec metadataSpec =
+ new MetadataFilterPushDownSpec(acceptedOriginalPredicates, translatedRowType);
+
+ TableSourceTable newTableSourceTable =
+ oldTableSourceTable.copy(
+ newTableSource,
+ oldTableSourceTable.getStatistic(),
+ new SourceAbilitySpec[] {metadataSpec});
+
+ return new Tuple2<>(result, newTableSourceTable);
+ }
+
+ /** Replaces SQL alias names with metadata key names in the RowType. */
+ private RowType translateRowType(
+ RowType sourceRowType, Map columnToMetadataKey) {
+ List translatedFields = new ArrayList<>();
+ for (int i = 0; i < sourceRowType.getFieldCount(); i++) {
+ String originalName = sourceRowType.getFieldNames().get(i);
+ String translatedName = columnToMetadataKey.getOrDefault(originalName, originalName);
+ translatedFields.add(new RowField(translatedName, sourceRowType.getTypeAt(i)));
+ }
+ return new RowType(false, translatedFields);
+ }
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
index eaa6999cdd7fe..57d80b1b2f94c 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
@@ -19,7 +19,8 @@
package org.apache.flink.table.planner.plan.rules.logical;
import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
-import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;
+import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata.MetadataFilterResult;
import org.apache.flink.table.planner.plan.schema.TableSourceTable;
import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;
@@ -29,11 +30,15 @@
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.tools.RelBuilder;
+import java.util.ArrayList;
+import java.util.List;
+
import scala.Tuple2;
/**
- * Planner rule that tries to push a filter into a {@link LogicalTableScan}, which table is a {@link
- * TableSourceTable}. And the table source in the table is a {@link SupportsFilterPushDown}.
+ * Pushes filters from a {@link Filter} into a {@link LogicalTableScan}. Physical filters use {@link
+ * SupportsFilterPushDown}; metadata filters use {@link
+ * SupportsReadingMetadata#applyMetadataFilters}.
*/
public class PushFilterIntoTableSourceScanRule extends PushFilterIntoSourceScanRuleBase {
public static final PushFilterIntoTableSourceScanRule INSTANCE =
@@ -59,7 +64,7 @@ public boolean matches(RelOptRuleCall call) {
LogicalTableScan scan = call.rel(1);
TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
- return canPushdownFilter(tableSourceTable);
+ return canPushdownFilter(tableSourceTable) || canPushdownMetadataFilter(tableSourceTable);
}
@Override
@@ -74,7 +79,7 @@ private void pushFilterIntoScan(
RelOptRuleCall call,
Filter filter,
LogicalTableScan scan,
- FlinkPreparingTableBase relOptTable) {
+ TableSourceTable tableSourceTable) {
RelBuilder relBuilder = call.builder();
Tuple2 extractedPredicates =
@@ -87,28 +92,77 @@ private void pushFilterIntoScan(
RexNode[] convertiblePredicates = extractedPredicates._1;
RexNode[] unconvertedPredicates = extractedPredicates._2;
if (convertiblePredicates.length == 0) {
- // no condition can be translated to expression
return;
}
- Tuple2 scanAfterPushdownWithResult =
- resolveFiltersAndCreateTableSourceTable(
- convertiblePredicates,
- relOptTable.unwrap(TableSourceTable.class),
- scan,
- relBuilder);
+ boolean supportsPhysicalFilter = canPushdownFilter(tableSourceTable);
+ boolean supportsMetadataFilter = canPushdownMetadataFilter(tableSourceTable);
+ int physicalColumnCount = getPhysicalColumnCount(tableSourceTable);
+
+ // Classify predicates: only separate metadata predicates when the source
+ // actually supports metadata filter push-down. Otherwise, all predicates
+ // go through the physical path to preserve the FilterPushDownSpec guard
+ // that prevents rule re-firing and maintains scan reuse invariants.
+ List physicalPredicates = new ArrayList<>();
+ List metadataPredicates = new ArrayList<>();
+ for (RexNode predicate : convertiblePredicates) {
+ if (supportsMetadataFilter
+ && referencesOnlyMetadataColumns(predicate, physicalColumnCount)) {
+ metadataPredicates.add(predicate);
+ } else {
+ physicalPredicates.add(predicate);
+ }
+ }
+
+ List allRemainingRexNodes = new ArrayList<>();
+ TableSourceTable currentTable = tableSourceTable;
+
+ if (!physicalPredicates.isEmpty() && supportsPhysicalFilter) {
+ Tuple2 physicalResult =
+ resolveFiltersAndCreateTableSourceTable(
+ physicalPredicates.toArray(new RexNode[0]),
+ currentTable,
+ scan,
+ relBuilder);
+ currentTable = physicalResult._2;
+ List physicalRemaining =
+ convertExpressionToRexNode(physicalResult._1.getRemainingFilters(), relBuilder);
+ allRemainingRexNodes.addAll(physicalRemaining);
+ } else {
+ allRemainingRexNodes.addAll(physicalPredicates);
+ }
- SupportsFilterPushDown.Result result = scanAfterPushdownWithResult._1;
- TableSourceTable tableSourceTable = scanAfterPushdownWithResult._2;
+ if (!metadataPredicates.isEmpty()) {
+ Tuple2 metadataResult =
+ resolveMetadataFiltersAndCreateTableSourceTable(
+ metadataPredicates.toArray(new RexNode[0]),
+ currentTable,
+ scan,
+ relBuilder);
+ currentTable = metadataResult._2;
+ // Remaining (rejected) metadata predicates stay as a LogicalFilter above
+ // the scan so they are still evaluated at runtime. We use the original
+ // RexNodes (suffix) because the remaining ResolvedExpressions use metadata
+ // key names, not SQL aliases needed by the Filter's row type. The
+ // validation in resolveMetadataFiltersAndCreateTableSourceTable ensures
+ // the partition invariant (accepted prefix + remaining suffix = input).
+ int acceptedCount = metadataResult._1.getAcceptedFilters().size();
+ for (int i = acceptedCount; i < metadataPredicates.size(); i++) {
+ allRemainingRexNodes.add(metadataPredicates.get(i));
+ }
+ }
+
+ for (RexNode unconverted : unconvertedPredicates) {
+ allRemainingRexNodes.add(unconverted);
+ }
LogicalTableScan newScan =
- LogicalTableScan.create(scan.getCluster(), tableSourceTable, scan.getHints());
- if (result.getRemainingFilters().isEmpty() && unconvertedPredicates.length == 0) {
+ LogicalTableScan.create(scan.getCluster(), currentTable, scan.getHints());
+
+ if (allRemainingRexNodes.isEmpty()) {
call.transformTo(newScan);
} else {
- RexNode remainingCondition =
- createRemainingCondition(
- relBuilder, result.getRemainingFilters(), unconvertedPredicates);
+ RexNode remainingCondition = relBuilder.and(allRemainingRexNodes);
RexNode simplifiedRemainingCondition =
FlinkRexUtil.simplify(
relBuilder.getRexBuilder(),
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java
index 6fd5942f19e43..61f545538674b 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java
@@ -42,6 +42,7 @@
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.plan.abilities.source.FilterPushDownSpec;
import org.apache.flink.table.planner.plan.abilities.source.LimitPushDownSpec;
+import org.apache.flink.table.planner.plan.abilities.source.MetadataFilterPushDownSpec;
import org.apache.flink.table.planner.plan.abilities.source.PartitionPushDownSpec;
import org.apache.flink.table.planner.plan.abilities.source.ProjectPushDownSpec;
import org.apache.flink.table.planner.plan.abilities.source.ReadingMetadataSpec;
@@ -385,6 +386,52 @@ void testDynamicTableSourceSpecSerdeWithEnrichmentOptions() throws Exception {
.isEqualTo(",");
}
+ @Test
+ void testMetadataFilterPushDownSpecSerde() throws IOException {
+ FlinkTypeFactory factory =
+ new FlinkTypeFactory(
+ Thread.currentThread().getContextClassLoader(), FlinkTypeSystem.INSTANCE);
+ RexBuilder rexBuilder = new RexBuilder(factory);
+
+ // RowType uses metadata key names (already translated from SQL aliases)
+ RowType predicateRowType =
+ RowType.of(
+ new LogicalType[] {new TimestampType(3), new BigIntType()},
+ new String[] {"timestamp", "offset"});
+
+ MetadataFilterPushDownSpec original =
+ new MetadataFilterPushDownSpec(
+ Arrays.asList(
+ // timestamp > '2024-01-01'
+ rexBuilder.makeCall(
+ SqlStdOperatorTable.GREATER_THAN,
+ rexBuilder.makeInputRef(
+ factory.createSqlType(SqlTypeName.TIMESTAMP, 3), 0),
+ rexBuilder.makeTimestampLiteral(
+ new org.apache.calcite.util.TimestampString(
+ "2024-01-01 00:00:00"),
+ 3)),
+ // offset >= 10
+ rexBuilder.makeCall(
+ SqlStdOperatorTable.GREATER_THAN_OR_EQUAL,
+ rexBuilder.makeInputRef(
+ factory.createSqlType(SqlTypeName.BIGINT), 1),
+ rexBuilder.makeExactLiteral(new BigDecimal(10)))),
+ predicateRowType);
+
+ PlannerMocks plannerMocks = PlannerMocks.create();
+ SerdeContext serdeCtx =
+ configuredSerdeContext(
+ plannerMocks.getCatalogManager(), plannerMocks.getTableConfig());
+
+ String json = toJson(serdeCtx, original);
+ MetadataFilterPushDownSpec deserialized =
+ toObject(serdeCtx, json, MetadataFilterPushDownSpec.class);
+
+ assertThat(deserialized.getPredicates()).isEqualTo(original.getPredicates());
+ assertThat(deserialized).isEqualTo(original);
+ }
+
static ResolvedCatalogTable tableWithOnlyPhysicalColumns(Map options) {
ResolvedSchema resolvedSchema =
new ResolvedSchema(
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterInReadingMetadataTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterInReadingMetadataTest.java
new file mode 100644
index 0000000000000..9d6e5e80801dd
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterInReadingMetadataTest.java
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableDescriptor;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata.MetadataFilterResult;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.planner.calcite.CalciteConfig;
+import org.apache.flink.table.planner.factories.TableFactoryHarness;
+import org.apache.flink.table.planner.plan.optimize.program.BatchOptimizeContext;
+import org.apache.flink.table.planner.plan.optimize.program.FlinkBatchProgram;
+import org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgramBuilder;
+import org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE;
+import org.apache.flink.table.planner.utils.BatchTableTestUtil;
+import org.apache.flink.table.planner.utils.TableConfigUtils;
+import org.apache.flink.table.planner.utils.TableTestBase;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.testutils.junit.SharedObjectsExtension;
+import org.apache.flink.testutils.junit.SharedReference;
+
+import org.apache.calcite.plan.hep.HepMatchOrder;
+import org.apache.calcite.rel.rules.CoreRules;
+import org.apache.calcite.tools.RuleSets;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for metadata filter push-down through {@link SupportsReadingMetadata}. */
+class MetadataFilterInReadingMetadataTest extends TableTestBase {
+
+ @RegisterExtension
+ private final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create();
+
+ private BatchTableTestUtil util;
+
+ @BeforeEach
+ void setup() {
+ util = batchTestUtil(TableConfig.getDefault());
+ util.buildBatchProgram(FlinkBatchProgram.DEFAULT_REWRITE());
+ CalciteConfig calciteConfig =
+ TableConfigUtils.getCalciteConfig(util.tableEnv().getConfig());
+ calciteConfig
+ .getBatchProgram()
+ .get()
+ .addLast(
+ "rules",
+ FlinkHepRuleSetProgramBuilder.newBuilder()
+ .setHepRulesExecutionType(
+ HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION())
+ .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+ .add(
+ RuleSets.ofList(
+ PushFilterIntoTableSourceScanRule.INSTANCE,
+ CoreRules.FILTER_PROJECT_TRANSPOSE))
+ .build());
+ }
+
+ @Test
+ void testMetadataFilterPushDown() {
+ SharedReference> receivedFilters =
+ sharedObjects.add(new ArrayList<>());
+ TableDescriptor descriptor =
+ TableFactoryHarness.newBuilder()
+ .schema(MetadataFilterSource.SCHEMA)
+ .source(new MetadataFilterSource(true, receivedFilters))
+ .build();
+ util.tableEnv().createTable("T1", descriptor);
+
+ util.verifyRelPlan("SELECT id FROM T1 WHERE event_time > TIMESTAMP '2024-01-01 00:00:00'");
+
+ assertThat(receivedFilters.get()).isNotEmpty();
+ }
+
+ @Test
+ void testMetadataFilterNotPushedWhenNotSupported() {
+ SharedReference> receivedFilters =
+ sharedObjects.add(new ArrayList<>());
+ TableDescriptor descriptor =
+ TableFactoryHarness.newBuilder()
+ .schema(MetadataFilterSource.SCHEMA)
+ .source(new MetadataFilterSource(false, receivedFilters))
+ .build();
+ util.tableEnv().createTable("T2", descriptor);
+
+ util.verifyRelPlan("SELECT id FROM T2 WHERE event_time > TIMESTAMP '2024-01-01 00:00:00'");
+
+ // No metadata filters should have been pushed
+ assertThat(receivedFilters.get()).isEmpty();
+ }
+
+ @Test
+ void testAliasedMetadataColumnFilter() {
+ SharedReference> receivedFilters =
+ sharedObjects.add(new ArrayList<>());
+ TableDescriptor descriptor =
+ TableFactoryHarness.newBuilder()
+ .schema(RenamedMetadataFilterSource.SCHEMA)
+ .source(new RenamedMetadataFilterSource(receivedFilters))
+ .build();
+ util.tableEnv().createTable("T3", descriptor);
+
+ // 'event_ts' is the SQL alias for metadata key 'timestamp'
+ util.verifyRelPlan("SELECT id FROM T3 WHERE event_ts > TIMESTAMP '2024-01-01 00:00:00'");
+
+ // The source should receive the filter with metadata key 'timestamp', not 'event_ts'
+ assertThat(receivedFilters.get()).isNotEmpty();
+ assertThat(receivedFilters.get().get(0).toString()).contains("timestamp");
+ }
+
+ @Test
+ void testMixedPhysicalAndMetadataFilters() {
+ SharedReference> metadataFilters =
+ sharedObjects.add(new ArrayList<>());
+ SharedReference> physicalFilters =
+ sharedObjects.add(new ArrayList<>());
+ TableDescriptor descriptor =
+ TableFactoryHarness.newBuilder()
+ .schema(MixedFilterSource.SCHEMA)
+ .source(new MixedFilterSource(metadataFilters, physicalFilters))
+ .build();
+ util.tableEnv().createTable("T4", descriptor);
+
+ util.verifyRelPlan(
+ "SELECT id FROM T4 WHERE id > 10 AND event_time > TIMESTAMP '2024-01-01 00:00:00'");
+
+ // Both paths should receive their respective filters
+ assertThat(physicalFilters.get()).isNotEmpty();
+ assertThat(metadataFilters.get()).isNotEmpty();
+ }
+
+ @Test
+ void testPartialMetadataFilterAcceptance() {
+ SharedReference> receivedFilters =
+ sharedObjects.add(new ArrayList<>());
+ TableDescriptor descriptor =
+ TableFactoryHarness.newBuilder()
+ .schema(PartialMetadataFilterSource.SCHEMA)
+ .source(new PartialMetadataFilterSource(receivedFilters))
+ .build();
+ util.tableEnv().createTable("T6", descriptor);
+
+ // Two metadata filters: the source accepts only the first one
+ util.verifyRelPlan(
+ "SELECT id FROM T6 WHERE event_time > TIMESTAMP '2024-01-01 00:00:00'"
+ + " AND priority > 5");
+
+ // Source receives both filters but only accepts the first
+ assertThat(receivedFilters.get()).hasSize(2);
+ }
+
+ @Test
+ void testMetadataFilterWithProjection() {
+ SharedReference> receivedFilters =
+ sharedObjects.add(new ArrayList<>());
+ TableDescriptor descriptor =
+ TableFactoryHarness.newBuilder()
+ .schema(MetadataFilterSource.SCHEMA)
+ .source(new MetadataFilterSource(true, receivedFilters))
+ .build();
+ util.tableEnv().createTable("T5", descriptor);
+
+ util.verifyRelPlan(
+ "SELECT id, name FROM T5 WHERE event_time > TIMESTAMP '2024-01-01 00:00:00'");
+
+ assertThat(receivedFilters.get()).isNotEmpty();
+ }
+
+ // -----------------------------------------------------------------------------------------
+ // Test sources
+ // -----------------------------------------------------------------------------------------
+
+ /** Supports metadata filter push-down. */
+ private static class MetadataFilterSource extends TableFactoryHarness.ScanSourceBase
+ implements SupportsReadingMetadata {
+
+ public static final Schema SCHEMA =
+ Schema.newBuilder()
+ .column("id", INT())
+ .column("name", STRING())
+ .columnByMetadata("event_time", TIMESTAMP(3))
+ .build();
+
+ private final boolean supportsMetadataFilter;
+ private final SharedReference> receivedMetadataFilters;
+
+ MetadataFilterSource(
+ boolean supportsMetadataFilter,
+ SharedReference> receivedMetadataFilters) {
+ this.supportsMetadataFilter = supportsMetadataFilter;
+ this.receivedMetadataFilters = receivedMetadataFilters;
+ }
+
+ @Override
+ public Map listReadableMetadata() {
+ Map metadata = new HashMap<>();
+ metadata.put("event_time", org.apache.flink.table.api.DataTypes.TIMESTAMP(3));
+ return metadata;
+ }
+
+ @Override
+ public void applyReadableMetadata(List metadataKeys, DataType producedDataType) {}
+
+ @Override
+ public boolean supportsMetadataFilterPushDown() {
+ return supportsMetadataFilter;
+ }
+
+ @Override
+ public MetadataFilterResult applyMetadataFilters(List metadataFilters) {
+ receivedMetadataFilters.get().addAll(metadataFilters);
+ return MetadataFilterResult.of(metadataFilters, Collections.emptyList());
+ }
+ }
+
+ /** Tests key translation when SQL alias differs from metadata key. */
+ private static class RenamedMetadataFilterSource extends TableFactoryHarness.ScanSourceBase
+ implements SupportsReadingMetadata {
+
+ public static final Schema SCHEMA =
+ Schema.newBuilder()
+ .column("id", INT())
+ .columnByMetadata("event_ts", TIMESTAMP(3), "timestamp")
+ .build();
+
+ private final SharedReference> receivedMetadataFilters;
+
+ RenamedMetadataFilterSource(
+ SharedReference> receivedMetadataFilters) {
+ this.receivedMetadataFilters = receivedMetadataFilters;
+ }
+
+ @Override
+ public Map listReadableMetadata() {
+ Map metadata = new HashMap<>();
+ metadata.put("timestamp", org.apache.flink.table.api.DataTypes.TIMESTAMP(3));
+ return metadata;
+ }
+
+ @Override
+ public void applyReadableMetadata(List metadataKeys, DataType producedDataType) {}
+
+ @Override
+ public boolean supportsMetadataFilterPushDown() {
+ return true;
+ }
+
+ @Override
+ public MetadataFilterResult applyMetadataFilters(List metadataFilters) {
+ receivedMetadataFilters.get().addAll(metadataFilters);
+ return MetadataFilterResult.of(metadataFilters, Collections.emptyList());
+ }
+ }
+
+ /** Accepts only the first metadata filter; rejected filters remain in plan. */
+ private static class PartialMetadataFilterSource extends TableFactoryHarness.ScanSourceBase
+ implements SupportsReadingMetadata {
+
+ public static final Schema SCHEMA =
+ Schema.newBuilder()
+ .column("id", INT())
+ .columnByMetadata("event_time", TIMESTAMP(3))
+ .columnByMetadata("priority", INT())
+ .build();
+
+ private final SharedReference> receivedMetadataFilters;
+
+ PartialMetadataFilterSource(
+ SharedReference> receivedMetadataFilters) {
+ this.receivedMetadataFilters = receivedMetadataFilters;
+ }
+
+ @Override
+ public Map listReadableMetadata() {
+ Map metadata = new HashMap<>();
+ metadata.put("event_time", org.apache.flink.table.api.DataTypes.TIMESTAMP(3));
+ metadata.put("priority", org.apache.flink.table.api.DataTypes.INT());
+ return metadata;
+ }
+
+ @Override
+ public void applyReadableMetadata(List metadataKeys, DataType producedDataType) {}
+
+ @Override
+ public boolean supportsMetadataFilterPushDown() {
+ return true;
+ }
+
+ @Override
+ public MetadataFilterResult applyMetadataFilters(List metadataFilters) {
+ receivedMetadataFilters.get().addAll(metadataFilters);
+ // Accept only the first filter
+ List accepted =
+ metadataFilters.isEmpty()
+ ? Collections.emptyList()
+ : Collections.singletonList(metadataFilters.get(0));
+ List remaining =
+ metadataFilters.size() > 1
+ ? metadataFilters.subList(1, metadataFilters.size())
+ : Collections.emptyList();
+ return MetadataFilterResult.of(accepted, remaining);
+ }
+ }
+
+ /** Tests mixed physical and metadata filter push-down. */
+ private static class MixedFilterSource extends TableFactoryHarness.ScanSourceBase
+ implements SupportsReadingMetadata, SupportsFilterPushDown {
+
+ public static final Schema SCHEMA =
+ Schema.newBuilder()
+ .column("id", INT())
+ .column("name", STRING())
+ .columnByMetadata("event_time", TIMESTAMP(3))
+ .build();
+
+ private final SharedReference> receivedMetadataFilters;
+ private final SharedReference> receivedPhysicalFilters;
+
+ MixedFilterSource(
+ SharedReference> receivedMetadataFilters,
+ SharedReference> receivedPhysicalFilters) {
+ this.receivedMetadataFilters = receivedMetadataFilters;
+ this.receivedPhysicalFilters = receivedPhysicalFilters;
+ }
+
+ @Override
+ public Map listReadableMetadata() {
+ Map metadata = new HashMap<>();
+ metadata.put("event_time", org.apache.flink.table.api.DataTypes.TIMESTAMP(3));
+ return metadata;
+ }
+
+ @Override
+ public void applyReadableMetadata(List metadataKeys, DataType producedDataType) {}
+
+ @Override
+ public boolean supportsMetadataFilterPushDown() {
+ return true;
+ }
+
+ @Override
+ public MetadataFilterResult applyMetadataFilters(List metadataFilters) {
+ receivedMetadataFilters.get().addAll(metadataFilters);
+ return MetadataFilterResult.of(metadataFilters, Collections.emptyList());
+ }
+
+ @Override
+ public Result applyFilters(List filters) {
+ receivedPhysicalFilters.get().addAll(filters);
+ return Result.of(filters, Collections.emptyList());
+ }
+ }
+}
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterInReadingMetadataTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterInReadingMetadataTest.xml
new file mode 100644
index 0000000000000..1a2c2f0a32ee5
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterInReadingMetadataTest.xml
@@ -0,0 +1,141 @@
+
+
+
+
+
+ TIMESTAMP '2024-01-01 00:00:00']]>
+
+
+ ($1, 2024-01-01 00:00:00)])
+ +- LogicalProject(id=[$0], event_ts=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T3, metadata=[timestamp]]])
+]]>
+
+
+ (timestamp, 2024-01-01 00:00:00)]]])
+]]>
+
+
+
+
+ TIMESTAMP '2024-01-01 00:00:00']]>
+
+
+ ($2, 2024-01-01 00:00:00)])
+ +- LogicalProject(id=[$0], name=[$1], event_time=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T2, metadata=[event_time]]])
+]]>
+
+
+ ($2, 2024-01-01 00:00:00)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T2, metadata=[event_time]]])
+]]>
+
+
+
+
+ TIMESTAMP '2024-01-01 00:00:00']]>
+
+
+ ($2, 2024-01-01 00:00:00)])
+ +- LogicalProject(id=[$0], name=[$1], event_time=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T1, metadata=[event_time]]])
+]]>
+
+
+ (event_time, 2024-01-01 00:00:00)]]])
+]]>
+
+
+
+
+ TIMESTAMP '2024-01-01 00:00:00']]>
+
+
+ ($2, 2024-01-01 00:00:00)])
+ +- LogicalProject(id=[$0], name=[$1], event_time=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T5, metadata=[event_time]]])
+]]>
+
+
+ (event_time, 2024-01-01 00:00:00)]]])
+]]>
+
+
+
+
+ 10 AND event_time > TIMESTAMP '2024-01-01 00:00:00']]>
+
+
+ ($0, 10), >($2, 2024-01-01 00:00:00))])
+ +- LogicalProject(id=[$0], name=[$1], event_time=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T4, metadata=[event_time]]])
+]]>
+
+
+ (id, 10)], metadataFilter=[>(event_time, 2024-01-01 00:00:00)]]])
+]]>
+
+
+
+
+ TIMESTAMP '2024-01-01 00:00:00' AND priority > 5]]>
+
+
+ ($1, 2024-01-01 00:00:00), >($2, 5))])
+ +- LogicalProject(id=[$0], event_time=[$2], priority=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T6, metadata=[priority, event_time]]])
+]]>
+
+
+ ($1, 5)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T6, metadata=[priority, event_time], metadataFilter=[>(event_time, 2024-01-01 00:00:00)]]])
+]]>
+
+
+