From 204a688242d3b99c729ea083598cf49b3c58a630 Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Thu, 2 Apr 2026 16:12:29 +0200 Subject: [PATCH 1/3] [FLINK-39261][table] Add FROM_CHANGELOG built-in process table function --- .../docs/sql/reference/queries/changelog.md | 103 +++++++- .../org/apache/flink/table/api/Table.java | 27 ++ .../flink/table/api/internal/TableImpl.java | 5 + .../functions/BuiltInFunctionDefinition.java | 35 ++- .../functions/BuiltInFunctionDefinitions.java | 27 ++ .../table/functions/ChangelogFunction.java | 17 +- .../strategies/FromChangelogTypeStrategy.java | 247 ++++++++++++++++++ .../SpecificInputTypeStrategies.java | 4 + .../strategies/SpecificTypeStrategies.java | 4 + .../FromChangelogInputTypeStrategyTest.java | 147 +++++++++++ .../FlinkChangelogModeInferenceProgram.scala | 22 +- .../stream/FromChangelogSemanticTests.java | 49 ++++ .../stream/FromChangelogTestPrograms.java | 239 +++++++++++++++++ .../stream/ProcessTableFunctionTestUtils.java | 13 + .../plan/stream/sql/FromChangelogTest.java | 77 ++++++ .../stream/sql/ProcessTableFunctionTest.java | 11 +- .../plan/stream/sql/FromChangelogTest.xml | 57 ++++ .../stream/sql/ProcessTableFunctionTest.xml | 11 + .../functions/ptf/FromChangelogFunction.java | 146 +++++++++++ 19 files changed, 1229 insertions(+), 12 deletions(-) create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java create mode 100644 flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/FromChangelogInputTypeStrategyTest.java create mode 100644 flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java create mode 100644 flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java create mode 100644 flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.java create mode 100644 flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.xml create mode 100644 flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/FromChangelogFunction.java diff --git a/docs/content/docs/sql/reference/queries/changelog.md b/docs/content/docs/sql/reference/queries/changelog.md index 61843ab5c2f45..3c6a9dad9a0a9 100644 --- a/docs/content/docs/sql/reference/queries/changelog.md +++ b/docs/content/docs/sql/reference/queries/changelog.md @@ -30,9 +30,110 @@ Flink SQL provides built-in process table functions (PTFs) for working with chan | Function | Description | |:---------|:------------| +| [FROM_CHANGELOG](#from_changelog) | Converts an append-only table with operation codes into a dynamic table | | [TO_CHANGELOG](#to_changelog) | Converts a dynamic table into an append-only table with explicit operation codes | - +## FROM_CHANGELOG + +The `FROM_CHANGELOG` PTF converts an append-only table with an explicit operation code column into a dynamic table (i.e. an updating table). Each input row is expected to have a string column that indicates the change operation. The op column is removed from the output and the row is emitted with the corresponding change operation. + +This is useful when consuming Change Data Capture (CDC) streams from systems like Debezium, Maxwell, or Canal, where events arrive as flat append-only records with an explicit operation field. It's also useful to be used in combination with the TO_CHANGELOG function, when the user wants to turn the append-only table back into an updating table after doing some specific transformation to the events. + +### Syntax + +```sql +SELECT * FROM FROM_CHANGELOG( + input => TABLE source_table, + [op => DESCRIPTOR(op_column_name),] + [op_mapping => MAP['c, r', 'INSERT', 'd', 'DELETE']] +) +``` + +### Parameters + +| Parameter | Required | Description | +|:-------------|:---------|:------------| +| `input` | Yes | The input table. Must be append-only. | +| `op` | No | A `DESCRIPTOR` with a single column name for the operation code column. Defaults to `op`. The column must exist in the input table and be of type STRING. | +| `op_mapping` | No | A `MAP` mapping user-defined operation codes to change operation names. Keys are user codes (e.g., `'c'`, `'u'`, `'d'`), values are change operation names (`INSERT`, `UPDATE_BEFORE`, `UPDATE_AFTER`, `DELETE`). Keys can contain comma-separated codes to map multiple codes to the same operation (e.g., `'c, r'`). When provided, only mapped codes are forwarded - unmapped codes are dropped. Each change operation may appear at most once across all entries. | + +#### Default op_mapping + +When `op_mapping` is omitted, the following standard names are used: + +| Input code | Change operation | +|:-------------------|:------------------| +| `'INSERT'` | INSERT | +| `'UPDATE_BEFORE'` | UPDATE_BEFORE | +| `'UPDATE_AFTER'` | UPDATE_AFTER | +| `'DELETE'` | DELETE | + +### Output Schema + +The output columns are ordered as: + +``` +[all_input_columns_without_op] +``` + +The op column is removed from the output. Output rows carry the appropriate change operation (INSERT, UPDATE_BEFORE, UPDATE_AFTER, or DELETE). + +### Examples + +#### Basic usage with standard op names + +```sql +-- Input (append-only): +-- +I[id:1, op:'INSERT', name:'Alice'] +-- +I[id:1, op:'UPDATE_BEFORE', name:'Alice'] +-- +I[id:1, op:'UPDATE_AFTER', name:'Alice2'] +-- +I[id:2, op:'DELETE', name:'Bob'] + +SELECT * FROM FROM_CHANGELOG( + input => TABLE cdc_stream +) + +-- Output (updating table): +-- +I[id:1, name:'Alice'] +-- -U[id:1, name:'Alice'] +-- +U[id:1, name:'Alice2'] +-- -D[id:2, name:'Bob'] + +-- Table state after all events: +-- | id | name | +-- |----|--------| +-- | 1 | Alice2 | +``` + +#### Custom op_mapping with filtering + +```sql +SELECT * FROM FROM_CHANGELOG( + input => TABLE cdc_stream, + op => DESCRIPTOR(__op), + op_mapping => MAP['c, r', 'INSERT', 'd', 'DELETE'] +) +-- 'c' (create) and 'r' (read/snapshot) both produce INSERT +-- 'd' produces DELETE +-- All other op codes are silently dropped +``` + +#### Custom operation column name + +```sql +SELECT * FROM FROM_CHANGELOG( + input => TABLE cdc_stream, + op => DESCRIPTOR(operation) +) +-- The operation column named 'operation' is used instead of 'op' +``` + +#### Table API + +```java +// Default: reads 'op' column with standard change operation names +Table result = cdcStream.fromChangelog(); +``` ## TO_CHANGELOG diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java index ef1785aecddf8..97c81710c7bf4 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java @@ -1323,6 +1323,33 @@ default TableResult executeInsert( */ PartitionedTable partitionBy(Expression... fields); + /** + * Converts this append-only table with an explicit operation code column into a dynamic table + * using the built-in {@code FROM_CHANGELOG} process table function. + * + *

Each input row is expected to have a string operation code column (default: {@code "op"}) + * that indicates the change operation (e.g., INSERT, UPDATE_AFTER, UPDATE_BEFORE, DELETE). The + * output table is a dynamic table backed by a changelog stream. + * + *

Optional arguments can be passed using named expressions: + * + *

{@code
+     * // Default: reads 'op' column with standard change operation names
+     * table.fromChangelog();
+     *
+     * // Custom op column name and mapping (Debezium-style codes)
+     * table.fromChangelog(
+     *     descriptor("__op").asArgument("op"),
+     *     map("c, r", "INSERT", "u", "UPDATE_AFTER", "d", "DELETE").asArgument("op_mapping")
+     * );
+     * }
+ * + * @param arguments optional named arguments for {@code op} and {@code op_mapping} + * @return a dynamic {@link Table} with the op column removed and proper change operation + * semantics + */ + Table fromChangelog(Expression... arguments); + /** * Converts this table object into a named argument. * diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java index 73d70a83dfdad..38d9c01e881f3 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java @@ -497,6 +497,11 @@ public PartitionedTable partitionBy(Expression... fields) { return new PartitionedTableImpl(this, Arrays.asList(fields)); } + @Override + public Table fromChangelog(Expression... arguments) { + return process(BuiltInFunctionDefinitions.FROM_CHANGELOG.getName(), (Object[]) arguments); + } + @Override public ApiExpression asArgument(String name) { return createArgumentExpression(operationTree, tableEnvironment, name); diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java index 77cfaf7166771..2a100eb3e12cb 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java @@ -21,6 +21,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.table.api.TableException; import org.apache.flink.table.catalog.DataTypeFactory; +import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.inference.InputTypeStrategy; import org.apache.flink.table.types.inference.StaticArgument; @@ -33,6 +34,7 @@ import java.util.Arrays; import java.util.Locale; import java.util.Optional; +import java.util.function.Function; import java.util.regex.Pattern; import static org.apache.flink.util.Preconditions.checkArgument; @@ -52,7 +54,7 @@ *

Equality is defined by reference equality. */ @Internal -public final class BuiltInFunctionDefinition implements SpecializedFunction { +public final class BuiltInFunctionDefinition implements SpecializedFunction, ChangelogFunction { private final String name; @@ -72,6 +74,8 @@ public final class BuiltInFunctionDefinition implements SpecializedFunction { private final SqlCallSyntax sqlCallSyntax; + private final @Nullable Function changelogModeResolver; + private final String sqlName; private BuiltInFunctionDefinition( @@ -84,7 +88,8 @@ private BuiltInFunctionDefinition( boolean isDeterministic, boolean isRuntimeProvided, String runtimeClass, - boolean isInternal) { + boolean isInternal, + @Nullable Function changelogModeResolver) { this.name = checkNotNull(name, "Name must not be null."); this.sqlName = sqlName; this.version = isInternal ? null : version; @@ -95,6 +100,7 @@ private BuiltInFunctionDefinition( this.runtimeClass = runtimeClass; this.isInternal = isInternal; this.sqlCallSyntax = sqlCallSyntax; + this.changelogModeResolver = changelogModeResolver; validateFunction(this.name, this.version, this.isInternal); } @@ -225,6 +231,14 @@ public static String qualifyFunctionName(String name, int version) { return String.format(INTERNAL_NAME_FORMAT, name.toUpperCase(Locale.ROOT), version); } + @Override + public ChangelogMode getChangelogMode(final ChangelogContext changelogContext) { + if (changelogModeResolver != null) { + return changelogModeResolver.apply(changelogContext); + } + return ChangelogMode.insertOnly(); + } + // -------------------------------------------------------------------------------------------- // Builder // -------------------------------------------------------------------------------------------- @@ -253,6 +267,8 @@ public static final class Builder { private SqlCallSyntax sqlCallSyntax = SqlCallSyntax.FUNCTION; + private @Nullable Function changelogModeResolver; + public Builder() { // default constructor to allow a fluent definition } @@ -399,6 +415,18 @@ public Builder sqlName(String name) { return this; } + /** + * Sets a resolver that dynamically determines the output {@link ChangelogMode} for this + * built-in PTF. The resolver receives the {@link ChangelogContext} and can inspect function + * arguments (e.g., op_mapping) to adapt the changelog mode. Only needed for PTFs that emit + * updates (e.g., FROM_CHANGELOG). + */ + public Builder changelogModeResolver( + Function changelogModeResolver) { + this.changelogModeResolver = changelogModeResolver; + return this; + } + public BuiltInFunctionDefinition build() { return new BuiltInFunctionDefinition( name, @@ -410,7 +438,8 @@ public BuiltInFunctionDefinition build() { isDeterministic, isRuntimeProvided, runtimeClass, - isInternal); + isInternal, + changelogModeResolver); } } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java index 0f2f35a056a49..ca36de8e074b0 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java @@ -27,6 +27,7 @@ import org.apache.flink.table.api.JsonType; import org.apache.flink.table.api.JsonValueOnEmptyOrError; import org.apache.flink.table.api.TableException; +import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.expressions.TimeIntervalUnit; import org.apache.flink.table.expressions.TimePointUnit; import org.apache.flink.table.expressions.ValueLiteralExpression; @@ -106,6 +107,7 @@ import static org.apache.flink.table.types.inference.TypeStrategies.varyingString; import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.ARRAY_ELEMENT_ARG; import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.ARRAY_FULLY_COMPARABLE; +import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.FROM_CHANGELOG_INPUT_TYPE_STRATEGY; import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.INDEX; import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.JSON_ARGUMENT; import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.ML_PREDICT_INPUT_TYPE_STRATEGY; @@ -115,6 +117,7 @@ import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.percentage; import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.percentageArray; import static org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies.ARRAY_APPEND_PREPEND; +import static org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies.FROM_CHANGELOG_OUTPUT_TYPE_STRATEGY; import static org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies.ML_PREDICT_OUTPUT_TYPE_STRATEGY; import static org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies.TO_CHANGELOG_OUTPUT_TYPE_STRATEGY; @@ -809,6 +812,30 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) "org.apache.flink.table.runtime.functions.ptf.ToChangelogFunction") .build(); + public static final BuiltInFunctionDefinition FROM_CHANGELOG = + BuiltInFunctionDefinition.newBuilder() + .name("FROM_CHANGELOG") + .kind(PROCESS_TABLE) + .staticArguments( + StaticArgument.table( + "input", + Row.class, + false, + EnumSet.of( + StaticArgumentTrait.TABLE, + StaticArgumentTrait.ROW_SEMANTIC_TABLE)), + StaticArgument.scalar("op", DataTypes.DESCRIPTOR(), true), + StaticArgument.scalar( + "op_mapping", + DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()), + true)) + .inputTypeStrategy(FROM_CHANGELOG_INPUT_TYPE_STRATEGY) + .outputTypeStrategy(FROM_CHANGELOG_OUTPUT_TYPE_STRATEGY) + .changelogModeResolver(ctx -> ChangelogMode.all()) + .runtimeClass( + "org.apache.flink.table.runtime.functions.ptf.FromChangelogFunction") + .build(); + public static final BuiltInFunctionDefinition GREATEST = BuiltInFunctionDefinition.newBuilder() .name("GREATEST") diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ChangelogFunction.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ChangelogFunction.java index ad6e56d09d924..3ac0a3e55522e 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ChangelogFunction.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ChangelogFunction.java @@ -23,6 +23,8 @@ import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.types.RowKind; +import java.util.Optional; + /** * An extension that allows a process table function (PTF) to emit results with changelog semantics. * @@ -61,8 +63,10 @@ * * *

Emitting changelogs is only valid for PTFs that take table arguments with set semantics (see - * {@link ArgumentTrait#SET_SEMANTIC_TABLE}). In case of upserts, the upsert key must be equal to - * the PARTITION BY key. + * {@link ArgumentTrait#SET_SEMANTIC_TABLE}). When using {@code OPTIONAL_PARTITION_BY}, the + * PARTITION BY clause can be omitted for retract mode (with {@link RowKind#UPDATE_BEFORE}), since + * the stream is self-describing. In case of upserts, the upsert key must be equal to the PARTITION + * BY key. * *

It is perfectly valid for a {@link ChangelogFunction} implementation to return a fixed {@link * ChangelogMode}, regardless of the {@link ChangelogContext}. This approach may be appropriate when @@ -105,5 +109,14 @@ interface ChangelogContext { * are required and {@link ChangelogMode#keyOnlyDeletes()} are supported. */ ChangelogMode getRequiredChangelogMode(); + + /** + * Returns the value of a scalar argument at the given position. + * + * @param pos the argument position + * @param clazz the expected class of the argument value + * @return the argument value, or empty if the argument is null or not available + */ + Optional getArgumentValue(int pos, Class clazz); } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java new file mode 100644 index 0000000000000..69176251b3d37 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference.strategies; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.DataTypes.Field; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.functions.TableSemantics; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.ArgumentCount; +import org.apache.flink.table.types.inference.CallContext; +import org.apache.flink.table.types.inference.ConstantArgumentCount; +import org.apache.flink.table.types.inference.InputTypeStrategy; +import org.apache.flink.table.types.inference.Signature; +import org.apache.flink.table.types.inference.Signature.Argument; +import org.apache.flink.table.types.inference.TypeStrategy; +import org.apache.flink.table.types.logical.LogicalTypeFamily; +import org.apache.flink.types.ColumnList; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** Type strategies for the {@code FROM_CHANGELOG} process table function. */ +@Internal +public final class FromChangelogTypeStrategy { + + private static final String DEFAULT_OP_COLUMN_NAME = "op"; + + private static final Set VALID_ROW_KIND_NAMES = + Set.of("INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE"); + + // -------------------------------------------------------------------------------------------- + // Input validation + // -------------------------------------------------------------------------------------------- + + public static final InputTypeStrategy INPUT_TYPE_STRATEGY = + new InputTypeStrategy() { + @Override + public ArgumentCount getArgumentCount() { + return ConstantArgumentCount.between(1, 3); + } + + @Override + public Optional> inferInputTypes( + final CallContext callContext, final boolean throwOnFailure) { + return validateInputs(callContext, throwOnFailure); + } + + @Override + public List getExpectedSignatures(final FunctionDefinition definition) { + return List.of( + Signature.of(Argument.of("input", "TABLE")), + Signature.of( + Argument.of("input", "TABLE"), Argument.of("op", "DESCRIPTOR")), + Signature.of( + Argument.of("input", "TABLE"), + Argument.of("op", "DESCRIPTOR"), + Argument.of("op_mapping", "MAP"))); + } + }; + + // -------------------------------------------------------------------------------------------- + // Output type inference + // -------------------------------------------------------------------------------------------- + + public static final TypeStrategy OUTPUT_TYPE_STRATEGY = + callContext -> { + final TableSemantics tableSemantics = + callContext + .getTableSemantics(0) + .orElseThrow( + () -> + new ValidationException( + "First argument must be a table for FROM_CHANGELOG.")); + + final String opColumnName = resolveOpColumnName(callContext); + + final List outputFields = buildOutputFields(tableSemantics, opColumnName); + + return Optional.of(DataTypes.ROW(outputFields).notNull()); + }; + + // -------------------------------------------------------------------------------------------- + // Helpers + // -------------------------------------------------------------------------------------------- + + @SuppressWarnings("rawtypes") + private static Optional> validateInputs( + final CallContext callContext, final boolean throwOnFailure) { + final boolean isMissingTableArg = callContext.getTableSemantics(0).isEmpty(); + if (isMissingTableArg) { + return callContext.fail( + throwOnFailure, "First argument must be a table for FROM_CHANGELOG."); + } + + final Optional opDescriptor = callContext.getArgumentValue(1, ColumnList.class); + final boolean hasInvalidOpDescriptor = + opDescriptor.isPresent() && opDescriptor.get().getNames().size() != 1; + if (hasInvalidOpDescriptor) { + return callContext.fail( + throwOnFailure, + "The descriptor for argument 'op' must contain exactly one column name."); + } + + // Validate that the op column exists in the input schema and is of STRING type + final TableSemantics tableSemantics = callContext.getTableSemantics(0).get(); + final String opColumnName = resolveOpColumnName(callContext); + final List inputFields = DataType.getFields(tableSemantics.dataType()); + final Optional opField = + inputFields.stream().filter(f -> f.getName().equals(opColumnName)).findFirst(); + if (opField.isEmpty()) { + return callContext.fail( + throwOnFailure, + String.format( + "The op column '%s' does not exist in the input schema.", + opColumnName)); + } + if (!opField.get().getDataType().getLogicalType().is(LogicalTypeFamily.CHARACTER_STRING)) { + return callContext.fail( + throwOnFailure, + String.format( + "The op column '%s' must be of STRING type, but was '%s'.", + opColumnName, opField.get().getDataType().getLogicalType())); + } + + final boolean hasMappingArgProvided = !callContext.isArgumentNull(2); + final boolean isMappingArgLiteral = callContext.isArgumentLiteral(2); + if (hasMappingArgProvided && !isMappingArgLiteral) { + return callContext.fail( + throwOnFailure, "The 'op_mapping' argument must be a constant MAP literal."); + } + + final Optional opMapping = callContext.getArgumentValue(2, Map.class); + if (opMapping.isPresent()) { + final Optional> validationError = + validateOpMappingValues(callContext, opMapping.get(), throwOnFailure); + if (validationError.isPresent()) { + return validationError; + } + + // Retract mode requires UPDATE_BEFORE in the mapping + final boolean hasUpdateBefore = + opMapping.get().values().stream() + .anyMatch( + v -> + v instanceof String + && "UPDATE_BEFORE".equals(((String) v).trim())); + if (!hasUpdateBefore) { + return callContext.fail( + throwOnFailure, + "The 'op_mapping' must include UPDATE_BEFORE for retract mode. " + + "Upsert mode (without UPDATE_BEFORE) is not supported in this version."); + } + } + + return Optional.of(callContext.getArgumentDataTypes()); + } + + /** + * Validates op_mapping values. Values must be valid RowKind names from {INSERT, UPDATE_AFTER, + * DELETE}. Keys are arbitrary user strings (e.g., 'c', 'u', 'd') and may be comma-separated to + * map multiple user codes to the same RowKind. Each RowKind name must appear at most once + * across all entries. + */ + private static Optional> validateOpMappingValues( + final CallContext callContext, + final Map opMapping, + final boolean throwOnFailure) { + final Set allRowKindsSeen = new HashSet<>(); + + for (final Entry entry : opMapping.entrySet()) { + if (!(entry.getKey() instanceof String)) { + return callContext.fail( + throwOnFailure, "Invalid target mapping for argument 'op_mapping'."); + } + final Object value = entry.getValue(); + if (!(value instanceof String)) { + return callContext.fail( + throwOnFailure, "Invalid target mapping for argument 'op_mapping'."); + } + final String rowKindName = ((String) value).trim(); + if (!VALID_ROW_KIND_NAMES.contains(rowKindName)) { + return callContext.fail( + throwOnFailure, + String.format( + "Invalid target mapping for argument 'op_mapping'. " + + "Unknown change operation: '%s'. Valid values are: %s.", + rowKindName, VALID_ROW_KIND_NAMES)); + } + final boolean isDuplicate = !allRowKindsSeen.add(rowKindName); + if (isDuplicate) { + return callContext.fail( + throwOnFailure, + String.format( + "Invalid target mapping for argument 'op_mapping'. " + + "Duplicate change operation: '%s'. " + + "Use comma-separated keys to map multiple codes to the same operation " + + "(e.g., MAP['c, r', 'INSERT']).", + rowKindName)); + } + } + return Optional.empty(); + } + + private static String resolveOpColumnName(final CallContext callContext) { + return callContext + .getArgumentValue(1, ColumnList.class) + .filter(cl -> !cl.getNames().isEmpty()) + .map(cl -> cl.getNames().get(0)) + .orElse(DEFAULT_OP_COLUMN_NAME); + } + + private static List buildOutputFields( + final TableSemantics tableSemantics, final String opColumnName) { + final List inputFields = DataType.getFields(tableSemantics.dataType()); + + // Exclude the op column (becomes RowKind), keep all other columns + return inputFields.stream() + .filter(f -> !f.getName().equals(opColumnName)) + .collect(Collectors.toList()); + } + + private FromChangelogTypeStrategy() {} +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java index 4455b083ce2e9..b4cd96e0349e2 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java @@ -126,6 +126,10 @@ public static InputTypeStrategy windowTimeIndicator() { public static final InputTypeStrategy TO_CHANGELOG_INPUT_TYPE_STRATEGY = ToChangelogTypeStrategy.INPUT_TYPE_STRATEGY; + /** Input strategy for {@link BuiltInFunctionDefinitions#FROM_CHANGELOG}. */ + public static final InputTypeStrategy FROM_CHANGELOG_INPUT_TYPE_STRATEGY = + FromChangelogTypeStrategy.INPUT_TYPE_STRATEGY; + /** See {@link ExtractInputTypeStrategy}. */ public static final InputTypeStrategy EXTRACT = new ExtractInputTypeStrategy(); diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java index c1b854cad7c3b..06b6cbb2d3bf3 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java @@ -202,6 +202,10 @@ public final class SpecificTypeStrategies { public static final TypeStrategy TO_CHANGELOG_OUTPUT_TYPE_STRATEGY = ToChangelogTypeStrategy.OUTPUT_TYPE_STRATEGY; + /** Type strategy specific for {@link BuiltInFunctionDefinitions#FROM_CHANGELOG}. */ + public static final TypeStrategy FROM_CHANGELOG_OUTPUT_TYPE_STRATEGY = + FromChangelogTypeStrategy.OUTPUT_TYPE_STRATEGY; + private SpecificTypeStrategies() { // no instantiation } diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/FromChangelogInputTypeStrategyTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/FromChangelogInputTypeStrategyTest.java new file mode 100644 index 0000000000000..ec21522626f08 --- /dev/null +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/FromChangelogInputTypeStrategyTest.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference.strategies; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.InputTypeStrategiesTestBase; +import org.apache.flink.table.types.inference.utils.TableSemanticsMock; +import org.apache.flink.types.ColumnList; + +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.FROM_CHANGELOG_INPUT_TYPE_STRATEGY; + +/** Tests for {@link FromChangelogTypeStrategy#INPUT_TYPE_STRATEGY}. */ +class FromChangelogInputTypeStrategyTest extends InputTypeStrategiesTestBase { + + private static final DataType TABLE_TYPE = + DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.INT()), + DataTypes.FIELD("op", DataTypes.STRING()), + DataTypes.FIELD("name", DataTypes.STRING())); + + private static final DataType DESCRIPTOR_TYPE = DataTypes.DESCRIPTOR(); + + private static final DataType MAP_TYPE = DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()); + + @Override + protected Stream testData() { + return Stream.of( + // Valid: custom mapping with all change operations + TestSpec.forStrategy( + "Valid with custom mapping", FROM_CHANGELOG_INPUT_TYPE_STRATEGY) + .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE) + .calledWithTableSemanticsAt(0, new TableSemanticsMock(TABLE_TYPE)) + .calledWithLiteralAt(1, ColumnList.of(List.of("op"))) + .calledWithLiteralAt( + 2, + Map.of( + "c", "INSERT", + "ub", "UPDATE_BEFORE", + "ua", "UPDATE_AFTER", + "d", "DELETE")) + .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE), + + // Valid: retract-style mapping with UPDATE_BEFORE + TestSpec.forStrategy("Valid with UPDATE_BEFORE", FROM_CHANGELOG_INPUT_TYPE_STRATEGY) + .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE) + .calledWithTableSemanticsAt(0, new TableSemanticsMock(TABLE_TYPE)) + .calledWithLiteralAt(1, ColumnList.of(List.of("op"))) + .calledWithLiteralAt( + 2, + Map.of( + "c", "INSERT", + "ub", "UPDATE_BEFORE", + "ua", "UPDATE_AFTER", + "d", "DELETE")) + .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE), + + // Error: op column not found + TestSpec.forStrategy( + "Op column not found in schema", FROM_CHANGELOG_INPUT_TYPE_STRATEGY) + .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE) + .calledWithTableSemanticsAt(0, new TableSemanticsMock(TABLE_TYPE)) + .calledWithLiteralAt(1, ColumnList.of(List.of("nonexistent"))) + .expectErrorMessage("The op column 'nonexistent' does not exist"), + + // Error: op column is not STRING + TestSpec.forStrategy("Op column wrong type", FROM_CHANGELOG_INPUT_TYPE_STRATEGY) + .calledWithArgumentTypes( + DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.INT()), + DataTypes.FIELD("op", DataTypes.INT()), + DataTypes.FIELD("name", DataTypes.STRING())), + DESCRIPTOR_TYPE) + .calledWithTableSemanticsAt( + 0, + new TableSemanticsMock( + DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.INT()), + DataTypes.FIELD("op", DataTypes.INT()), + DataTypes.FIELD("name", DataTypes.STRING())))) + .calledWithLiteralAt(1, ColumnList.of(List.of("op"))) + .expectErrorMessage("must be of STRING type"), + + // Error: multi-column descriptor + TestSpec.forStrategy( + "Descriptor with multiple columns", + FROM_CHANGELOG_INPUT_TYPE_STRATEGY) + .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE) + .calledWithTableSemanticsAt(0, new TableSemanticsMock(TABLE_TYPE)) + .calledWithLiteralAt(1, ColumnList.of(List.of("a", "b"))) + .expectErrorMessage("must contain exactly one column name"), + + // Error: invalid RowKind in op_mapping value + TestSpec.forStrategy( + "Invalid RowKind in mapping value", + FROM_CHANGELOG_INPUT_TYPE_STRATEGY) + .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE) + .calledWithTableSemanticsAt(0, new TableSemanticsMock(TABLE_TYPE)) + .calledWithLiteralAt(1, ColumnList.of(List.of("op"))) + .calledWithLiteralAt(2, Map.of("c", "INVALID_KIND")) + .expectErrorMessage("Unknown change operation: 'INVALID_KIND'"), + + // Error: duplicate RowKind across entries + TestSpec.forStrategy( + "Duplicate RowKind in mapping values", + FROM_CHANGELOG_INPUT_TYPE_STRATEGY) + .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE) + .calledWithTableSemanticsAt(0, new TableSemanticsMock(TABLE_TYPE)) + .calledWithLiteralAt(1, ColumnList.of(List.of("op"))) + .calledWithLiteralAt(2, Map.of("c", "INSERT", "r", "INSERT")) + .expectErrorMessage("Duplicate change operation: 'INSERT'"), + + // Error: upsert mapping (no UPDATE_BEFORE) not supported + TestSpec.forStrategy( + "Upsert mapping not supported", FROM_CHANGELOG_INPUT_TYPE_STRATEGY) + .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE) + .calledWithTableSemanticsAt(0, new TableSemanticsMock(TABLE_TYPE)) + .calledWithLiteralAt(1, ColumnList.of(List.of("op"))) + .calledWithLiteralAt( + 2, + Map.of( + "c", "INSERT", + "u", "UPDATE_AFTER", + "d", "DELETE")) + .expectErrorMessage("must include UPDATE_BEFORE")); + } +} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala index 56c9cb1262a48..f5bb3d2b16af5 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala @@ -1680,8 +1680,6 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti inputChangelogModes, outputChangelogMode) - // Expose a simplified context to let users focus on important characteristics. - // If necessary, we can expose the full CallContext in the future. new ChangelogContext { override def getTableChangelogMode(pos: Int): ChangelogMode = { val tableSemantics = callContext.getTableSemantics(pos).orElse(null) @@ -1694,6 +1692,10 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti override def getRequiredChangelogMode: ChangelogMode = { callContext.getOutputChangelogMode.orElse(null) } + + override def getArgumentValue[T](pos: Int, clazz: Class[T]): java.util.Optional[T] = { + callContext.getArgumentValue(pos, clazz) + } } } @@ -1711,7 +1713,10 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti val changelogContext = toPtfChangelogContext(process, inputChangelogModes, requiredChangelogMode) val changelogMode = changelogFunction.getChangelogMode(changelogContext) - if (!changelogMode.containsOnly(RowKind.INSERT)) { + if ( + !changelogMode.containsOnly(RowKind.INSERT) && + !changelogMode.contains(RowKind.UPDATE_BEFORE) + ) { verifyPtfTableArgsForUpdates(call) } toTraitSet(changelogMode) @@ -1720,6 +1725,15 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti } } + /** + * Verifies that PTFs with upsert output (without UPDATE_BEFORE) use set semantics. + * + * Retract mode (with UPDATE_BEFORE) is self-describing — each update carries both the old and new + * value, so downstream can process it without a key. Row semantics is safe. + * + * Upsert mode (without UPDATE_BEFORE) requires a key to look up previous values, so set semantics + * with PARTITION BY is required. + */ private def verifyPtfTableArgsForUpdates(call: RexCall): Unit = { StreamPhysicalProcessTableFunction .getProvidedInputArgs(call) @@ -1728,7 +1742,7 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti tableArg => if (tableArg.is(StaticArgumentTrait.ROW_SEMANTIC_TABLE)) { throw new ValidationException( - s"PTFs that take table arguments with row semantics don't support updating output. " + + s"PTFs that take table arguments with row semantics don't support upsert output. " + s"Table argument '${tableArg.getName}' of function '${call.getOperator.toString}' " + s"must use set semantics.") } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java new file mode 100644 index 0000000000000..a8227dcc5ec93 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.planner.plan.nodes.exec.testutils.SemanticTestBase; +import org.apache.flink.table.test.program.TableTestProgram; + +import java.util.List; + +/** Semantic tests for the built-in FROM_CHANGELOG process table function. */ +public class FromChangelogSemanticTests extends SemanticTestBase { + + @Override + protected void applyDefaultEnvironmentOptions(TableConfig config) { + super.applyDefaultEnvironmentOptions(config); + config.set( + OptimizerConfigOptions.TABLE_OPTIMIZER_NONDETERMINISTIC_UPDATE_STRATEGY, + OptimizerConfigOptions.NonDeterministicUpdateStrategy.IGNORE); + } + + @Override + public List programs() { + return List.of( + FromChangelogTestPrograms.DEFAULT_OP_MAPPING, + FromChangelogTestPrograms.DEBEZIUM_MAPPING, + FromChangelogTestPrograms.UNMAPPED_CODES_DROPPED, + FromChangelogTestPrograms.CUSTOM_OP_NAME, + FromChangelogTestPrograms.TABLE_API_DEFAULT, + FromChangelogTestPrograms.ROUND_TRIP); + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java new file mode 100644 index 0000000000000..b47f4b531b8cf --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.test.program.SinkTestStep; +import org.apache.flink.table.test.program.SourceTestStep; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; + +/** {@link TableTestProgram} definitions for testing the built-in FROM_CHANGELOG PTF. */ +public class FromChangelogTestPrograms { + + private static final SourceTestStep SIMPLE_CDC_SOURCE = + SourceTestStep.newBuilder("cdc_stream") + .addSchema("id INT", "op STRING", "name STRING") + .producedValues(Row.of(1, "INSERT", "Alice")) + .build(); + + // -------------------------------------------------------------------------------------------- + // SQL tests + // -------------------------------------------------------------------------------------------- + + public static final TableTestProgram DEFAULT_OP_MAPPING = + TableTestProgram.of( + "from-changelog-default-op-mapping", + "default mapping with standard op names") + .setupTableSource( + SourceTestStep.newBuilder("cdc_stream") + .addSchema("id INT", "op STRING", "name STRING") + .producedValues( + Row.of(1, "INSERT", "Alice"), + Row.of(2, "INSERT", "Bob"), + Row.of(1, "UPDATE_BEFORE", "Alice"), + Row.of(1, "UPDATE_AFTER", "Alice2"), + Row.of(2, "DELETE", "Bob")) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema( + "id INT NOT NULL", + "name STRING", + "PRIMARY KEY (id) NOT ENFORCED") + .consumedValues( + Row.ofKind(RowKind.INSERT, 1, "Alice"), + Row.ofKind(RowKind.INSERT, 2, "Bob"), + Row.ofKind(RowKind.UPDATE_BEFORE, 1, "Alice"), + Row.ofKind(RowKind.UPDATE_AFTER, 1, "Alice2"), + Row.ofKind(RowKind.DELETE, 2, "Bob")) + .build()) + .runSql( + "INSERT INTO sink SELECT * FROM FROM_CHANGELOG(" + + "input => TABLE cdc_stream)") + .build(); + + public static final TableTestProgram DEBEZIUM_MAPPING = + TableTestProgram.of( + "from-changelog-custom-mapping", + "custom op_mapping with comma-separated keys") + .setupTableSource( + SourceTestStep.newBuilder("cdc_stream") + .addSchema("id INT", "__op STRING", "name STRING") + .producedValues( + Row.of(1, "c", "Alice"), + Row.of(2, "r", "Bob"), + Row.of(1, "ub", "Alice"), + Row.of(1, "ua", "Alice2"), + Row.of(2, "d", "Bob")) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema( + "id INT NOT NULL", + "name STRING", + "PRIMARY KEY (id) NOT ENFORCED") + .consumedValues( + Row.ofKind(RowKind.INSERT, 1, "Alice"), + Row.ofKind(RowKind.INSERT, 2, "Bob"), + Row.ofKind(RowKind.UPDATE_BEFORE, 1, "Alice"), + Row.ofKind(RowKind.UPDATE_AFTER, 1, "Alice2"), + Row.ofKind(RowKind.DELETE, 2, "Bob")) + .build()) + .runSql( + "INSERT INTO sink SELECT * FROM FROM_CHANGELOG(" + + "input => TABLE cdc_stream, " + + "op => DESCRIPTOR(__op), " + + "op_mapping => MAP['c, r', 'INSERT', 'ub', 'UPDATE_BEFORE', 'ua', 'UPDATE_AFTER', 'd', 'DELETE'])") + .build(); + + public static final TableTestProgram UNMAPPED_CODES_DROPPED = + TableTestProgram.of( + "from-changelog-unmapped-codes-dropped", + "unmapped op codes are silently dropped") + .setupTableSource( + SourceTestStep.newBuilder("cdc_stream") + .addSchema("id INT", "op STRING", "name STRING") + .producedValues( + Row.of(1, "INSERT", "Alice"), + Row.of(2, "INSERT", "Bob"), + Row.of(1, "UNKNOWN", "Alice2"), + Row.of(2, "DELETE", "Bob")) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema( + "id INT NOT NULL", + "name STRING", + "PRIMARY KEY (id) NOT ENFORCED") + .consumedValues( + Row.ofKind(RowKind.INSERT, 1, "Alice"), + Row.ofKind(RowKind.INSERT, 2, "Bob"), + Row.ofKind(RowKind.DELETE, 2, "Bob")) + .build()) + .runSql( + "INSERT INTO sink SELECT * FROM FROM_CHANGELOG(" + + "input => TABLE cdc_stream)") + .build(); + + /** Custom op column name via DESCRIPTOR. */ + public static final TableTestProgram CUSTOM_OP_NAME = + TableTestProgram.of( + "from-changelog-custom-op-name", "custom op column name via DESCRIPTOR") + .setupTableSource( + SourceTestStep.newBuilder("cdc_stream") + .addSchema("id INT", "operation STRING", "name STRING") + .producedValues( + Row.of(1, "INSERT", "Alice"), + Row.of(1, "UPDATE_BEFORE", "Alice"), + Row.of(1, "UPDATE_AFTER", "Alice2")) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema( + "id INT NOT NULL", + "name STRING", + "PRIMARY KEY (id) NOT ENFORCED") + .consumedValues( + Row.ofKind(RowKind.INSERT, 1, "Alice"), + Row.ofKind(RowKind.UPDATE_BEFORE, 1, "Alice"), + Row.ofKind(RowKind.UPDATE_AFTER, 1, "Alice2")) + .build()) + .runSql( + "INSERT INTO sink SELECT * FROM FROM_CHANGELOG(" + + "input => TABLE cdc_stream, " + + "op => DESCRIPTOR(operation))") + .build(); + + // -------------------------------------------------------------------------------------------- + // Table API test + // -------------------------------------------------------------------------------------------- + + public static final TableTestProgram TABLE_API_DEFAULT = + TableTestProgram.of( + "from-changelog-table-api-default", + "Table.fromChangelog() convenience method") + .setupTableSource( + SourceTestStep.newBuilder("cdc_stream") + .addSchema("id INT", "op STRING", "name STRING") + .producedValues( + Row.of(1, "INSERT", "Alice"), + Row.of(2, "INSERT", "Bob")) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema( + "id INT NOT NULL", + "name STRING", + "PRIMARY KEY (id) NOT ENFORCED") + .consumedValues( + Row.ofKind(RowKind.INSERT, 1, "Alice"), + Row.ofKind(RowKind.INSERT, 2, "Bob")) + .build()) + .runTableApi(env -> env.from("cdc_stream").fromChangelog(), "sink") + .build(); + + // -------------------------------------------------------------------------------------------- + // Round-trip test: FROM_CHANGELOG(TO_CHANGELOG(table)) + // -------------------------------------------------------------------------------------------- + + /** + * Verifies that FROM_CHANGELOG(TO_CHANGELOG(table)) recovers the original dynamic table. + * Requires TO_CHANGELOG with row semantics (PR #27911). + */ + public static final TableTestProgram ROUND_TRIP = + TableTestProgram.of( + "from-changelog-round-trip", + "FROM_CHANGELOG(TO_CHANGELOG(table)) recovers original table") + .setupTableSource( + SourceTestStep.newBuilder("orders") + .addSchema( + "id INT NOT NULL", + "name STRING", + "PRIMARY KEY (id) NOT ENFORCED") + .addMode(ChangelogMode.all()) + .producedValues( + Row.ofKind(RowKind.INSERT, 1, "Alice"), + Row.ofKind(RowKind.INSERT, 2, "Bob"), + Row.ofKind(RowKind.UPDATE_BEFORE, 1, "Alice"), + Row.ofKind(RowKind.UPDATE_AFTER, 1, "Alice2"), + Row.ofKind(RowKind.DELETE, 2, "Bob")) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema( + "id INT NOT NULL", + "name STRING", + "PRIMARY KEY (id) NOT ENFORCED") + .consumedValues( + Row.ofKind(RowKind.INSERT, 1, "Alice"), + Row.ofKind(RowKind.INSERT, 2, "Bob"), + Row.ofKind(RowKind.UPDATE_BEFORE, 1, "Alice"), + Row.ofKind(RowKind.UPDATE_AFTER, 1, "Alice2"), + Row.ofKind(RowKind.DELETE, 2, "Bob")) + .build()) + .setupSql( + "CREATE VIEW changelog_view AS " + + "SELECT * FROM TO_CHANGELOG(input => TABLE orders)") + .runSql( + "INSERT INTO sink SELECT * FROM FROM_CHANGELOG(" + + "input => TABLE changelog_view)") + .build(); +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java index cab5268805037..cd30617e68daa 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java @@ -918,6 +918,19 @@ public void eval(Context ctx, @ArgumentHint({ROW_SEMANTIC_TABLE, SUPPORT_UPDATES collectUpdate(ctx, r); } + @Override + public ChangelogMode getChangelogMode(ChangelogContext changelogContext) { + return ChangelogMode.upsert(false); + } + } + + /** Testing function that uses row semantics with retract mode (valid). */ + public static class UpdatingRetractRowSemanticFunction + extends ChangelogProcessTableFunctionBase { + public void eval(Context ctx, @ArgumentHint({ROW_SEMANTIC_TABLE, SUPPORT_UPDATES}) Row r) { + collectUpdate(ctx, r); + } + @Override public ChangelogMode getChangelogMode(ChangelogContext changelogContext) { return ChangelogMode.all(); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.java new file mode 100644 index 0000000000000..33e163d9f4193 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.stream.sql; + +import org.apache.flink.table.api.ExplainDetail; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.planner.utils.TableTestBase; +import org.apache.flink.table.planner.utils.TableTestUtil; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.List; + +/** + * Plan tests for the FROM_CHANGELOG built-in process table function. Uses {@link + * ExplainDetail#CHANGELOG_MODE} to verify changelog mode propagation through the plan. + */ +public class FromChangelogTest extends TableTestBase { + + private static final List CHANGELOG_MODE = + Collections.singletonList(ExplainDetail.CHANGELOG_MODE); + + private TableTestUtil util; + + @BeforeEach + void setup() { + util = streamTestUtil(TableConfig.getDefault()); + } + + @Test + void testInsertOnlySource() { + util.tableEnv() + .executeSql( + "CREATE TABLE cdc_stream (" + + " id INT," + + " op STRING," + + " name STRING" + + ") WITH ('connector' = 'values')"); + util.verifyRelPlan( + "SELECT * FROM FROM_CHANGELOG(input => TABLE cdc_stream)", CHANGELOG_MODE); + } + + @Test + void testCustomOpMapping() { + util.tableEnv() + .executeSql( + "CREATE TABLE cdc_stream (" + + " id INT," + + " __op STRING," + + " name STRING" + + ") WITH ('connector' = 'values')"); + util.verifyRelPlan( + "SELECT * FROM FROM_CHANGELOG(" + + "input => TABLE cdc_stream, " + + "op => DESCRIPTOR(__op), " + + "op_mapping => MAP['c, r', 'INSERT', 'ub', 'UPDATE_BEFORE', 'ua', 'UPDATE_AFTER', 'd', 'DELETE'])", + CHANGELOG_MODE); + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java index 659ee47cb9788..411e49f562862 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java @@ -42,6 +42,7 @@ import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.SetSemanticTablePassThroughFunction; import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.TypedRowSemanticTableFunction; import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.TypedSetSemanticTableFunction; +import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.UpdatingRetractRowSemanticFunction; import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.UpdatingUpsertFunction; import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.User; import org.apache.flink.table.planner.utils.TableTestBase; @@ -272,6 +273,12 @@ void testRowSemanticTableOptionalUid() { .addInsertSql("INSERT INTO t_sink SELECT * FROM f(r => TABLE t, i => 42)")); } + @Test + void testRetractModeWithRowSemantics() { + util.addTemporarySystemFunction("f", UpdatingRetractRowSemanticFunction.class); + util.verifyRelPlan("SELECT * FROM f(r => TABLE t)"); + } + @ParameterizedTest @MethodSource("errorSpecs") void testErrorBehavior(ErrorSpec spec) { @@ -410,10 +417,10 @@ private static Stream errorSpecs() { "SELECT * FROM f()", "Table arguments must not be optional."), ErrorSpec.ofSelect( - "no changelog support for tables with row semantics", + "no upsert support for tables with row semantics", InvalidUpdatingSemanticsFunction.class, "SELECT * FROM f(r => TABLE t)", - "PTFs that take table arguments with row semantics don't support updating output. " + "PTFs that take table arguments with row semantics don't support upsert output. " + "Table argument 'r' of function 'f' must use set semantics."), ErrorSpec.ofSelect( "on_time is not supported on updating output", diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.xml new file mode 100644 index 0000000000000..2eed936fc6ef4 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.xml @@ -0,0 +1,57 @@ + + + + + + TABLE cdc_stream, op => DESCRIPTOR(__op), op_mapping => MAP['c, r', 'INSERT', 'ub', 'UPDATE_BEFORE', 'ua', 'UPDATE_AFTER', 'd', 'DELETE'])]]> + + + + + + + + + + + TABLE cdc_stream)]]> + + + + + + + + + diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.xml index 55fa321f1ebad..5538ec707d608 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.xml @@ -123,6 +123,17 @@ LogicalProject(out=[$0], rowtime=[$1]) ProcessTableFunction(invocation=[f(TABLE(#0), DESCRIPTOR(_UTF-16LE'ts'), DEFAULT())], uid=[null], select=[out,rowtime], rowType=[RecordType(VARCHAR(2147483647) out, TIMESTAMP_LTZ(3) *ROWTIME* rowtime)]) +- WatermarkAssigner(rowtime=[ts], watermark=[ts]) +- TableSourceScan(table=[[default_catalog, default_database, t_watermarked]], fields=[name, score, ts]) +]]> + + + + + TABLE t)]]> + + + diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/FromChangelogFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/FromChangelogFunction.java new file mode 100644 index 0000000000000..d4e7905692045 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/FromChangelogFunction.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.ptf; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.utils.ProjectedRowData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext; +import org.apache.flink.table.functions.TableSemantics; +import org.apache.flink.table.types.inference.CallContext; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.ColumnList; +import org.apache.flink.types.RowKind; + +import javax.annotation.Nullable; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Runtime implementation of {@link BuiltInFunctionDefinitions#FROM_CHANGELOG}. + * + *

Converts each append-only input row (which contains an operation code column) back into a + * changelog stream with proper {@link RowKind} annotations. The output schema excludes the + * operation code column and partition key columns (which are prepended by the framework + * automatically). + * + *

This is the reverse operation of {@link ToChangelogFunction}. + */ +@Internal +public class FromChangelogFunction extends BuiltInProcessTableFunction { + + private static final long serialVersionUID = 1L; + + private static final Map DEFAULT_OP_MAPPING = + Map.of( + "INSERT", RowKind.INSERT, + "UPDATE_BEFORE", RowKind.UPDATE_BEFORE, + "UPDATE_AFTER", RowKind.UPDATE_AFTER, + "DELETE", RowKind.DELETE); + + private final Map rawOpMap; + private final int opColumnIndex; + private final int[] outputIndices; + + private transient HashMap opMap; + private transient ProjectedRowData projectedOutput; + + public FromChangelogFunction(final SpecializedContext context) { + super(BuiltInFunctionDefinitions.FROM_CHANGELOG, context); + final CallContext callContext = context.getCallContext(); + + final TableSemantics tableSemantics = + callContext + .getTableSemantics(0) + .orElseThrow(() -> new IllegalStateException("Table argument expected.")); + + final RowType inputType = (RowType) tableSemantics.dataType().getLogicalType(); + final String opColumnName = resolveOpColumnName(callContext); + this.opColumnIndex = inputType.getFieldNames().indexOf(opColumnName); + + // Exclude only the op column from output — all other columns pass through + this.outputIndices = + IntStream.range(0, inputType.getFieldCount()) + .filter(i -> i != opColumnIndex) + .toArray(); + + this.rawOpMap = buildOpMap(callContext); + } + + @Override + public void open(final FunctionContext context) throws Exception { + super.open(context); + opMap = new HashMap<>(); + rawOpMap.forEach((code, kind) -> opMap.put(StringData.fromString(code), kind)); + projectedOutput = ProjectedRowData.from(outputIndices); + } + + private static String resolveOpColumnName(final CallContext callContext) { + return callContext + .getArgumentValue(1, ColumnList.class) + .map(cl -> cl.getNames().get(0)) + .orElse("op"); + } + + /** + * Builds a String-to-RowKind map. Keys in the provided mapping may be comma-separated (e.g., + * "INSERT, UPDATE_AFTER") to map multiple input codes to the same RowKind. + */ + private static Map buildOpMap(CallContext callContext) { + return callContext + .getArgumentValue(2, Map.class) + .map(FromChangelogFunction::parseOpMapping) + .orElse(DEFAULT_OP_MAPPING); + } + + private static Map parseOpMapping(Map opMapping) { + return opMapping.entrySet().stream() + .flatMap( + e -> { + final RowKind kind = RowKind.valueOf(e.getValue().trim()); + return Arrays.stream(e.getKey().split(",")) + .map(code -> Map.entry(code.trim(), kind)); + }) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + public void eval( + final Context ctx, + final RowData input, + @Nullable final ColumnList op, + @Nullable final MapData opMapping) { + final StringData opCode = input.getString(opColumnIndex); + final RowKind rowKind = opMap.get(opCode); + if (rowKind == null) { + return; + } + + projectedOutput.replaceRow(input); + projectedOutput.setRowKind(rowKind); + collect(projectedOutput); + } +} From 20f42b7414984b0522964ca6a6d69b5c0f1aaddb Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Tue, 14 Apr 2026 14:48:09 +0200 Subject: [PATCH 2/3] [FLINK-39261][table] Update docs --- .../docs/sql/reference/queries/changelog.md | 45 ++++++++----------- 1 file changed, 19 insertions(+), 26 deletions(-) diff --git a/docs/content/docs/sql/reference/queries/changelog.md b/docs/content/docs/sql/reference/queries/changelog.md index 3c6a9dad9a0a9..5103c43b0828d 100644 --- a/docs/content/docs/sql/reference/queries/changelog.md +++ b/docs/content/docs/sql/reference/queries/changelog.md @@ -30,14 +30,16 @@ Flink SQL provides built-in process table functions (PTFs) for working with chan | Function | Description | |:---------|:------------| -| [FROM_CHANGELOG](#from_changelog) | Converts an append-only table with operation codes into a dynamic table | +| [FROM_CHANGELOG](#from_changelog) | Converts an append-only table with operation codes into a (potentially updating) dynamic table | | [TO_CHANGELOG](#to_changelog) | Converts a dynamic table into an append-only table with explicit operation codes | ## FROM_CHANGELOG -The `FROM_CHANGELOG` PTF converts an append-only table with an explicit operation code column into a dynamic table (i.e. an updating table). Each input row is expected to have a string column that indicates the change operation. The op column is removed from the output and the row is emitted with the corresponding change operation. +The `FROM_CHANGELOG` PTF converts an append-only table with an explicit operation code column into a (potentially updating) dynamic table. Each input row is expected to have a string column that indicates the change operation. The operation column is interpreted by the engine and removed from the output. -This is useful when consuming Change Data Capture (CDC) streams from systems like Debezium, Maxwell, or Canal, where events arrive as flat append-only records with an explicit operation field. It's also useful to be used in combination with the TO_CHANGELOG function, when the user wants to turn the append-only table back into an updating table after doing some specific transformation to the events. +This is useful when consuming Change Data Capture (CDC) streams from systems like Debezium where events arrive as flat append-only records with an explicit operation field. It's also useful to be used in combination with the TO_CHANGELOG function, when converting the append-only table back into an updating table after doing some specific transformation to the events. + +Note: This version requires that your CDC data encodes updates using a full image (i.e. providing separate events for before and after the update). Please double-check whether your source provides both UPDATE_BEFORE and UPDATE_AFTER events. FROM_CHANGELOG is a very powerful function but might produce incorrect results in subsequent operations and tables, if not configured correctly. ### Syntax @@ -45,21 +47,26 @@ This is useful when consuming Change Data Capture (CDC) streams from systems lik SELECT * FROM FROM_CHANGELOG( input => TABLE source_table, [op => DESCRIPTOR(op_column_name),] - [op_mapping => MAP['c, r', 'INSERT', 'd', 'DELETE']] + [op_mapping => MAP[ + 'c, r', 'INSERT', + 'ub', 'UPDATE_BEFORE', + 'ua', 'UPDATE_AFTER', + 'd', 'DELETE' + ]] ) ``` ### Parameters -| Parameter | Required | Description | -|:-------------|:---------|:------------| -| `input` | Yes | The input table. Must be append-only. | -| `op` | No | A `DESCRIPTOR` with a single column name for the operation code column. Defaults to `op`. The column must exist in the input table and be of type STRING. | -| `op_mapping` | No | A `MAP` mapping user-defined operation codes to change operation names. Keys are user codes (e.g., `'c'`, `'u'`, `'d'`), values are change operation names (`INSERT`, `UPDATE_BEFORE`, `UPDATE_AFTER`, `DELETE`). Keys can contain comma-separated codes to map multiple codes to the same operation (e.g., `'c, r'`). When provided, only mapped codes are forwarded - unmapped codes are dropped. Each change operation may appear at most once across all entries. | +| Parameter | Required | Description | +|:-------------|:---------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `input` | Yes | The input table. Must be append-only. | +| `op` | No | A `DESCRIPTOR` with a single column name for the operation code column. Defaults to `op`. The column must exist in the input table and be of type STRING. | +| `op_mapping` | No | A `MAP` mapping user-defined codes to Flink change operation names. Keys are user-defined codes (e.g., `'c'`, `'u'`, `'d'`), values are Flink change operation names (`INSERT`, `UPDATE_BEFORE`, `UPDATE_AFTER`, `DELETE`). Keys can contain comma-separated codes to map multiple codes to the same operation (e.g., `'c, r'`). When provided, only mapped codes are forwarded - unmapped codes are dropped. Each change operation may appear at most once across all entries. | #### Default op_mapping -When `op_mapping` is omitted, the following standard names are used: +When `op_mapping` is omitted, the following standard names are used. They allow a reverse conversion from TO_CHANGELOG by default. | Input code | Change operation | |:-------------------|:------------------| @@ -70,14 +77,12 @@ When `op_mapping` is omitted, the following standard names are used: ### Output Schema -The output columns are ordered as: +The output contains all input columns except the operation code (e.g., op) column, which is interpreted by Flink's SQL engine and removed. Each output row carries the appropriate change operation (INSERT, UPDATE_BEFORE, UPDATE_AFTER, or DELETE). ``` [all_input_columns_without_op] ``` -The op column is removed from the output. Output rows carry the appropriate change operation (INSERT, UPDATE_BEFORE, UPDATE_AFTER, or DELETE). - ### Examples #### Basic usage with standard op names @@ -105,22 +110,10 @@ SELECT * FROM FROM_CHANGELOG( -- | 1 | Alice2 | ``` -#### Custom op_mapping with filtering - -```sql -SELECT * FROM FROM_CHANGELOG( - input => TABLE cdc_stream, - op => DESCRIPTOR(__op), - op_mapping => MAP['c, r', 'INSERT', 'd', 'DELETE'] -) --- 'c' (create) and 'r' (read/snapshot) both produce INSERT --- 'd' produces DELETE --- All other op codes are silently dropped -``` - #### Custom operation column name ```sql +-- Source schema: id INT, operation STRING, name STRING SELECT * FROM FROM_CHANGELOG( input => TABLE cdc_stream, op => DESCRIPTOR(operation) From df99efe1ed8d83eedec54c4bba9ce060620a8a6f Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Tue, 14 Apr 2026 15:48:36 +0200 Subject: [PATCH 3/3] [FLINK-39261][table] Address improvement feedbacks --- .../org/apache/flink/table/api/Table.java | 54 +++++++++--------- .../functions/BuiltInFunctionDefinition.java | 25 ++++----- .../functions/BuiltInFunctionDefinitions.java | 2 +- .../table/functions/ChangelogFunction.java | 17 +----- .../strategies/FromChangelogTypeStrategy.java | 55 +++---------------- .../ValidationOnlyInputTypeStrategy.java | 49 +++++++++++++++++ .../FlinkChangelogModeInferenceProgram.scala | 27 +++++---- .../functions/ptf/FromChangelogFunction.java | 3 +- 8 files changed, 117 insertions(+), 115 deletions(-) create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ValidationOnlyInputTypeStrategy.java diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java index 97c81710c7bf4..172adfe1354b1 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java @@ -1323,33 +1323,6 @@ default TableResult executeInsert( */ PartitionedTable partitionBy(Expression... fields); - /** - * Converts this append-only table with an explicit operation code column into a dynamic table - * using the built-in {@code FROM_CHANGELOG} process table function. - * - *

Each input row is expected to have a string operation code column (default: {@code "op"}) - * that indicates the change operation (e.g., INSERT, UPDATE_AFTER, UPDATE_BEFORE, DELETE). The - * output table is a dynamic table backed by a changelog stream. - * - *

Optional arguments can be passed using named expressions: - * - *

{@code
-     * // Default: reads 'op' column with standard change operation names
-     * table.fromChangelog();
-     *
-     * // Custom op column name and mapping (Debezium-style codes)
-     * table.fromChangelog(
-     *     descriptor("__op").asArgument("op"),
-     *     map("c, r", "INSERT", "u", "UPDATE_AFTER", "d", "DELETE").asArgument("op_mapping")
-     * );
-     * }
- * - * @param arguments optional named arguments for {@code op} and {@code op_mapping} - * @return a dynamic {@link Table} with the op column removed and proper change operation - * semantics - */ - Table fromChangelog(Expression... arguments); - /** * Converts this table object into a named argument. * @@ -1481,4 +1454,31 @@ default TableResult executeInsert( * @return an append-only {@link Table} with an {@code op} column prepended to the input columns */ Table toChangelog(Expression... arguments); + + /** + * Converts this append-only table with an explicit operation code column into a dynamic table + * using the built-in {@code FROM_CHANGELOG} process table function. + * + *

Each input row is expected to have a string operation code column (default: {@code "op"}) + * that indicates the change operation (e.g., INSERT, UPDATE_AFTER, UPDATE_BEFORE, DELETE). The + * output table is a dynamic table backed by a changelog stream. + * + *

Optional arguments can be passed using named expressions: + * + *

{@code
+     * // Default: reads 'op' column with standard change operation names
+     * table.fromChangelog();
+     *
+     * // Custom op column name and mapping (Debezium-style codes)
+     * table.fromChangelog(
+     *     descriptor("__op").asArgument("op"),
+     *     map("c, r", "INSERT", "u", "UPDATE_AFTER", "d", "DELETE").asArgument("op_mapping")
+     * );
+     * }
+ * + * @param arguments optional named arguments for {@code op} and {@code op_mapping} + * @return a dynamic {@link Table} with the op column removed and proper change operation + * semantics + */ + Table fromChangelog(Expression... arguments); } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java index 2a100eb3e12cb..a32105a22f9b2 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java @@ -22,6 +22,7 @@ import org.apache.flink.table.api.TableException; import org.apache.flink.table.catalog.DataTypeFactory; import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.functions.ChangelogFunction.ChangelogContext; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.inference.InputTypeStrategy; import org.apache.flink.table.types.inference.StaticArgument; @@ -54,7 +55,7 @@ *

Equality is defined by reference equality. */ @Internal -public final class BuiltInFunctionDefinition implements SpecializedFunction, ChangelogFunction { +public final class BuiltInFunctionDefinition implements SpecializedFunction { private final String name; @@ -137,6 +138,14 @@ public boolean isInternal() { return isInternal; } + /** + * Returns the optional changelog mode resolver for built-in PTFs that emit updates (e.g., + * FROM_CHANGELOG). The planner uses this to determine the output changelog mode. + */ + public Optional> getChangelogModeResolver() { + return Optional.ofNullable(changelogModeResolver); + } + public String getQualifiedName() { if (isInternal) { return name; @@ -231,14 +240,6 @@ public static String qualifyFunctionName(String name, int version) { return String.format(INTERNAL_NAME_FORMAT, name.toUpperCase(Locale.ROOT), version); } - @Override - public ChangelogMode getChangelogMode(final ChangelogContext changelogContext) { - if (changelogModeResolver != null) { - return changelogModeResolver.apply(changelogContext); - } - return ChangelogMode.insertOnly(); - } - // -------------------------------------------------------------------------------------------- // Builder // -------------------------------------------------------------------------------------------- @@ -416,10 +417,8 @@ public Builder sqlName(String name) { } /** - * Sets a resolver that dynamically determines the output {@link ChangelogMode} for this - * built-in PTF. The resolver receives the {@link ChangelogContext} and can inspect function - * arguments (e.g., op_mapping) to adapt the changelog mode. Only needed for PTFs that emit - * updates (e.g., FROM_CHANGELOG). + * Sets a resolver that determines the output {@link ChangelogMode} for this built-in PTF. + * Only needed for PTFs that emit updates (e.g., FROM_CHANGELOG). */ public Builder changelogModeResolver( Function changelogModeResolver) { diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java index ca36de8e074b0..d4c1e0be02398 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java @@ -829,9 +829,9 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) "op_mapping", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()), true)) + .changelogModeResolver(ctx -> ChangelogMode.all()) .inputTypeStrategy(FROM_CHANGELOG_INPUT_TYPE_STRATEGY) .outputTypeStrategy(FROM_CHANGELOG_OUTPUT_TYPE_STRATEGY) - .changelogModeResolver(ctx -> ChangelogMode.all()) .runtimeClass( "org.apache.flink.table.runtime.functions.ptf.FromChangelogFunction") .build(); diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ChangelogFunction.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ChangelogFunction.java index 3ac0a3e55522e..ad6e56d09d924 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ChangelogFunction.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ChangelogFunction.java @@ -23,8 +23,6 @@ import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.types.RowKind; -import java.util.Optional; - /** * An extension that allows a process table function (PTF) to emit results with changelog semantics. * @@ -63,10 +61,8 @@ * * *

Emitting changelogs is only valid for PTFs that take table arguments with set semantics (see - * {@link ArgumentTrait#SET_SEMANTIC_TABLE}). When using {@code OPTIONAL_PARTITION_BY}, the - * PARTITION BY clause can be omitted for retract mode (with {@link RowKind#UPDATE_BEFORE}), since - * the stream is self-describing. In case of upserts, the upsert key must be equal to the PARTITION - * BY key. + * {@link ArgumentTrait#SET_SEMANTIC_TABLE}). In case of upserts, the upsert key must be equal to + * the PARTITION BY key. * *

It is perfectly valid for a {@link ChangelogFunction} implementation to return a fixed {@link * ChangelogMode}, regardless of the {@link ChangelogContext}. This approach may be appropriate when @@ -109,14 +105,5 @@ interface ChangelogContext { * are required and {@link ChangelogMode#keyOnlyDeletes()} are supported. */ ChangelogMode getRequiredChangelogMode(); - - /** - * Returns the value of a scalar argument at the given position. - * - * @param pos the argument position - * @param clazz the expected class of the argument value - * @return the argument value, or empty if the argument is null or not available - */ - Optional getArgumentValue(int pos, Class clazz); } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java index 69176251b3d37..5081b546d0309 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java @@ -22,15 +22,10 @@ import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.DataTypes.Field; import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.functions.FunctionDefinition; import org.apache.flink.table.functions.TableSemantics; import org.apache.flink.table.types.DataType; -import org.apache.flink.table.types.inference.ArgumentCount; import org.apache.flink.table.types.inference.CallContext; -import org.apache.flink.table.types.inference.ConstantArgumentCount; import org.apache.flink.table.types.inference.InputTypeStrategy; -import org.apache.flink.table.types.inference.Signature; -import org.apache.flink.table.types.inference.Signature.Argument; import org.apache.flink.table.types.inference.TypeStrategy; import org.apache.flink.table.types.logical.LogicalTypeFamily; import org.apache.flink.types.ColumnList; @@ -38,7 +33,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -57,29 +51,12 @@ public final class FromChangelogTypeStrategy { // -------------------------------------------------------------------------------------------- public static final InputTypeStrategy INPUT_TYPE_STRATEGY = - new InputTypeStrategy() { - @Override - public ArgumentCount getArgumentCount() { - return ConstantArgumentCount.between(1, 3); - } - + new ValidationOnlyInputTypeStrategy() { @Override public Optional> inferInputTypes( final CallContext callContext, final boolean throwOnFailure) { return validateInputs(callContext, throwOnFailure); } - - @Override - public List getExpectedSignatures(final FunctionDefinition definition) { - return List.of( - Signature.of(Argument.of("input", "TABLE")), - Signature.of( - Argument.of("input", "TABLE"), Argument.of("op", "DESCRIPTOR")), - Signature.of( - Argument.of("input", "TABLE"), - Argument.of("op", "DESCRIPTOR"), - Argument.of("op_mapping", "MAP"))); - } }; // -------------------------------------------------------------------------------------------- @@ -155,19 +132,16 @@ private static Optional> validateInputs( final Optional opMapping = callContext.getArgumentValue(2, Map.class); if (opMapping.isPresent()) { + final Map mapping = opMapping.get(); final Optional> validationError = - validateOpMappingValues(callContext, opMapping.get(), throwOnFailure); + validateOpMappingValues(callContext, mapping, throwOnFailure); if (validationError.isPresent()) { return validationError; } // Retract mode requires UPDATE_BEFORE in the mapping final boolean hasUpdateBefore = - opMapping.get().values().stream() - .anyMatch( - v -> - v instanceof String - && "UPDATE_BEFORE".equals(((String) v).trim())); + mapping.values().stream().anyMatch(v -> "UPDATE_BEFORE".equals(v.trim())); if (!hasUpdateBefore) { return callContext.fail( throwOnFailure, @@ -180,28 +154,17 @@ private static Optional> validateInputs( } /** - * Validates op_mapping values. Values must be valid RowKind names from {INSERT, UPDATE_AFTER, - * DELETE}. Keys are arbitrary user strings (e.g., 'c', 'u', 'd') and may be comma-separated to - * map multiple user codes to the same RowKind. Each RowKind name must appear at most once - * across all entries. + * Validates op_mapping values. Values must be valid Flink change operation names. Each name + * must appear at most once across all entries. */ private static Optional> validateOpMappingValues( final CallContext callContext, - final Map opMapping, + final Map opMapping, final boolean throwOnFailure) { final Set allRowKindsSeen = new HashSet<>(); - for (final Entry entry : opMapping.entrySet()) { - if (!(entry.getKey() instanceof String)) { - return callContext.fail( - throwOnFailure, "Invalid target mapping for argument 'op_mapping'."); - } - final Object value = entry.getValue(); - if (!(value instanceof String)) { - return callContext.fail( - throwOnFailure, "Invalid target mapping for argument 'op_mapping'."); - } - final String rowKindName = ((String) value).trim(); + for (final String value : opMapping.values()) { + final String rowKindName = value.trim(); if (!VALID_ROW_KIND_NAMES.contains(rowKindName)) { return callContext.fail( throwOnFailure, diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ValidationOnlyInputTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ValidationOnlyInputTypeStrategy.java new file mode 100644 index 0000000000000..0744076c164e3 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ValidationOnlyInputTypeStrategy.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference.strategies; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.types.inference.ArgumentCount; +import org.apache.flink.table.types.inference.ConstantArgumentCount; +import org.apache.flink.table.types.inference.InputTypeStrategy; +import org.apache.flink.table.types.inference.Signature; +import org.apache.flink.table.types.inference.StaticArgument; + +import java.util.List; + +/** + * Base class for input type strategies that only perform validation. Argument count and signatures + * are handled by {@link StaticArgument}s in the function definition. + * + *

Subclasses only need to implement {@link #inferInputTypes} for custom validation logic. + */ +@Internal +public abstract class ValidationOnlyInputTypeStrategy implements InputTypeStrategy { + + @Override + public ArgumentCount getArgumentCount() { + return ConstantArgumentCount.any(); + } + + @Override + public List getExpectedSignatures(FunctionDefinition definition) { + return List.of(Signature.of()); + } +} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala index f5bb3d2b16af5..cf48daadaac04 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala @@ -23,7 +23,7 @@ import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior import org.apache.flink.table.api.config.ExecutionConfigOptions import org.apache.flink.table.api.config.ExecutionConfigOptions.UpsertMaterialize import org.apache.flink.table.connector.ChangelogMode -import org.apache.flink.table.functions.ChangelogFunction +import org.apache.flink.table.functions.{BuiltInFunctionDefinition, ChangelogFunction} import org.apache.flink.table.functions.ChangelogFunction.ChangelogContext import org.apache.flink.table.planner.calcite.{FlinkTypeFactory, RexTableArgCall} import org.apache.flink.table.planner.plan.`trait`._ @@ -1692,10 +1692,6 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti override def getRequiredChangelogMode: ChangelogMode = { callContext.getOutputChangelogMode.orElse(null) } - - override def getArgumentValue[T](pos: Int, clazz: Class[T]): java.util.Optional[T] = { - callContext.getArgumentValue(pos, clazz) - } } } @@ -1713,12 +1709,14 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti val changelogContext = toPtfChangelogContext(process, inputChangelogModes, requiredChangelogMode) val changelogMode = changelogFunction.getChangelogMode(changelogContext) - if ( - !changelogMode.containsOnly(RowKind.INSERT) && - !changelogMode.contains(RowKind.UPDATE_BEFORE) - ) { - verifyPtfTableArgsForUpdates(call) - } + verifyPtfTableArgsForUpdates(call, changelogMode) + toTraitSet(changelogMode) + case builtIn: BuiltInFunctionDefinition if builtIn.getChangelogModeResolver.isPresent => + val inputChangelogModes = children.map(toChangelogMode(_, None, None)) + val changelogContext = + toPtfChangelogContext(process, inputChangelogModes, requiredChangelogMode) + val changelogMode = builtIn.getChangelogModeResolver.get().apply(changelogContext) + verifyPtfTableArgsForUpdates(call, changelogMode) toTraitSet(changelogMode) case _ => defaultTraitSet @@ -1734,7 +1732,12 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti * Upsert mode (without UPDATE_BEFORE) requires a key to look up previous values, so set semantics * with PARTITION BY is required. */ - private def verifyPtfTableArgsForUpdates(call: RexCall): Unit = { + private def verifyPtfTableArgsForUpdates(call: RexCall, changelogMode: ChangelogMode): Unit = { + if ( + changelogMode.containsOnly(RowKind.INSERT) || changelogMode.contains(RowKind.UPDATE_BEFORE) + ) { + return + } StreamPhysicalProcessTableFunction .getProvidedInputArgs(call) .map(_.e) diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/FromChangelogFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/FromChangelogFunction.java index d4e7905692045..38249aa02bd19 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/FromChangelogFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/FromChangelogFunction.java @@ -55,6 +55,7 @@ public class FromChangelogFunction extends BuiltInProcessTableFunction private static final long serialVersionUID = 1L; + private static final String DEFAULT_OP_COLUMN_NAME = "op"; private static final Map DEFAULT_OP_MAPPING = Map.of( "INSERT", RowKind.INSERT, @@ -103,7 +104,7 @@ private static String resolveOpColumnName(final CallContext callContext) { return callContext .getArgumentValue(1, ColumnList.class) .map(cl -> cl.getNames().get(0)) - .orElse("op"); + .orElse(DEFAULT_OP_COLUMN_NAME); } /**