Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.utils.LogicalTypeCasts;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -154,4 +156,71 @@ public interface SupportsReadingMetadata {
default boolean supportsMetadataProjection() {
return true;
}

/**
* Whether this source supports filtering on metadata columns.
*
* <p>When this method returns {@code true}, the planner may call {@link
* #applyMetadataFilters(List)} during optimization with predicates expressed in metadata key
* names (from {@link #listReadableMetadata()}), not SQL column aliases. Sources that do not
* override this method will not receive metadata filter predicates.
*
* <p>This is independent of {@link SupportsFilterPushDown}, which handles physical column
* predicates. A source can implement both to accept filters on physical and metadata columns.
*/
default boolean supportsMetadataFilterPushDown() {
return false;
}

/**
* Provides a list of metadata filters in conjunctive form. A source can pick filters and return
* the accepted and remaining filters. Same contract as {@link
* SupportsFilterPushDown#applyFilters(List)}, but for metadata columns.
*
* <p>The provided filters reference metadata key names (from {@link #listReadableMetadata()}),
* not SQL column aliases. For example, a column declared as {@code msg_offset BIGINT METADATA
* FROM 'offset'} will have its predicate expressed as {@code offset >= 1000}, not {@code
* msg_offset >= 1000}. The planner handles the alias-to-key translation before calling this
* method.
*/
default MetadataFilterResult applyMetadataFilters(List<ResolvedExpression> metadataFilters) {
return MetadataFilterResult.of(Collections.emptyList(), metadataFilters);
}

/**
* Result of a metadata filter push down. Communicates the source's response to the planner
* during optimization.
*/
@PublicEvolving
final class MetadataFilterResult {
private final List<ResolvedExpression> acceptedFilters;
private final List<ResolvedExpression> remainingFilters;

private MetadataFilterResult(
List<ResolvedExpression> acceptedFilters,
List<ResolvedExpression> remainingFilters) {
this.acceptedFilters = acceptedFilters;
this.remainingFilters = remainingFilters;
}

/**
* Constructs a metadata filter push-down result.
*
* @param acceptedFilters filters consumed by the source (best effort)
* @param remainingFilters filters that a subsequent operation must still apply at runtime
*/
public static MetadataFilterResult of(
List<ResolvedExpression> acceptedFilters,
List<ResolvedExpression> remainingFilters) {
return new MetadataFilterResult(acceptedFilters, remainingFilters);
}

public List<ResolvedExpression> getAcceptedFilters() {
return acceptedFilters;
}

public List<ResolvedExpression> getRemainingFilters() {
return remainingFilters;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.plan.abilities.source;

import org.apache.flink.table.api.TableException;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata.MetadataFilterResult;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.expressions.resolver.ExpressionResolver;
import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;
import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.types.logical.RowType;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;

import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

import scala.Option;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* Serializes metadata filter predicates and replays them during compiled plan restoration.
*
* <p>Predicates are stored with a {@code predicateRowType} that already uses metadata key names
* (not SQL aliases). The alias-to-key translation happens once at optimization time, so no
* column-to-key mapping needs to be persisted.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonTypeName("MetadataFilterPushDown")
public final class MetadataFilterPushDownSpec extends SourceAbilitySpecBase {

public static final String FIELD_NAME_PREDICATES = "predicates";
public static final String FIELD_NAME_PREDICATE_ROW_TYPE = "predicateRowType";

@JsonProperty(FIELD_NAME_PREDICATES)
private final List<RexNode> predicates;

/**
* Row type snapshot using metadata key names. Stored because ProjectPushDownSpec may narrow the
* context's row type during restore.
*/
@JsonProperty(FIELD_NAME_PREDICATE_ROW_TYPE)
private final RowType predicateRowType;

@JsonCreator
public MetadataFilterPushDownSpec(
@JsonProperty(FIELD_NAME_PREDICATES) List<RexNode> predicates,
@JsonProperty(FIELD_NAME_PREDICATE_ROW_TYPE) RowType predicateRowType) {
this.predicates = new ArrayList<>(checkNotNull(predicates));
this.predicateRowType = checkNotNull(predicateRowType);
}

public List<RexNode> getPredicates() {
return predicates;
}

@Override
public void apply(DynamicTableSource tableSource, SourceAbilityContext context) {
// Use stored predicateRowType; context's row type may be narrowed by ProjectPushDownSpec.
MetadataFilterResult result =
applyMetadataFilters(predicates, predicateRowType, tableSource, context);
if (result.getAcceptedFilters().size() != predicates.size()) {
throw new TableException("All metadata predicates should be accepted here.");
}
}

/**
* Converts RexNode predicates to ResolvedExpressions using the given row type and calls
* applyMetadataFilters on the source. The row type must already use metadata key names.
*/
public static MetadataFilterResult applyMetadataFilters(
List<RexNode> predicates,
RowType metadataKeyRowType,
DynamicTableSource tableSource,
SourceAbilityContext context) {
if (!(tableSource instanceof SupportsReadingMetadata)) {
throw new TableException(
String.format(
"%s does not support SupportsReadingMetadata.",
tableSource.getClass().getName()));
}

String[] fieldNames = metadataKeyRowType.getFieldNames().toArray(new String[0]);

RexNodeToExpressionConverter converter =
new RexNodeToExpressionConverter(
new RexBuilder(context.getTypeFactory()),
fieldNames,
context.getFunctionCatalog(),
context.getCatalogManager(),
Option.apply(
context.getTypeFactory().buildRelNodeRowType(metadataKeyRowType)));

List<Expression> filters =
predicates.stream()
.map(
p -> {
scala.Option<ResolvedExpression> expr = p.accept(converter);
if (expr.isDefined()) {
return expr.get();
} else {
throw new TableException(
String.format(
"%s can not be converted to Expression for metadata filter push-down.",
p.toString()));
}
})
.collect(Collectors.toList());

ExpressionResolver resolver =
ExpressionResolver.resolverFor(
context.getTableConfig(),
context.getClassLoader(),
name -> Optional.empty(),
context.getFunctionCatalog()
.asLookup(
str -> {
throw new TableException(
"We should not need to lookup any expressions at this point");
}),
context.getCatalogManager().getDataTypeFactory(),
(sqlExpression, inputRowType, outputType) -> {
throw new TableException(
"SQL expression parsing is not supported at this location.");
})
.build();

return ((SupportsReadingMetadata) tableSource)
.applyMetadataFilters(resolver.resolve(filters));
}

@Override
public boolean needAdjustFieldReferenceAfterProjection() {
return true;
}

@Override
public String getDigests(SourceAbilityContext context) {
final List<String> expressionStrs = new ArrayList<>();
for (RexNode rexNode : predicates) {
expressionStrs.add(
FlinkRexUtil.getExpressionString(
rexNode,
JavaScalaConversionUtil.toScala(predicateRowType.getFieldNames())));
}

return String.format(
"metadataFilter=[%s]",
expressionStrs.stream()
.reduce((l, r) -> String.format("and(%s, %s)", l, r))
.orElse(""));
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
MetadataFilterPushDownSpec that = (MetadataFilterPushDownSpec) o;
return Objects.equals(predicates, that.predicates)
&& Objects.equals(predicateRowType, that.predicateRowType);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), predicates, predicateRowType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
@JsonSubTypes({
@JsonSubTypes.Type(value = FilterPushDownSpec.class),
@JsonSubTypes.Type(value = LimitPushDownSpec.class),
@JsonSubTypes.Type(value = MetadataFilterPushDownSpec.class),
@JsonSubTypes.Type(value = PartitionPushDownSpec.class),
@JsonSubTypes.Type(value = ProjectPushDownSpec.class),
@JsonSubTypes.Type(value = ReadingMetadataSpec.class),
Expand Down
Loading