Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 114 additions & 1 deletion docs/content/docs/sql/reference/queries/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,117 @@ Flink SQL provides built-in process table functions (PTFs) for working with chan

| Function | Description |
|:---------|:------------|
| [FROM_CHANGELOG](#from_changelog) | Converts an append-only table with operation codes into a dynamic table |
| [TO_CHANGELOG](#to_changelog) | Converts a dynamic table into an append-only table with explicit operation codes |

<!-- Placeholder for future FROM_CHANGELOG function -->
## FROM_CHANGELOG

The `FROM_CHANGELOG` PTF converts an append-only table with an explicit operation code column into a dynamic table (i.e. an updating table). Each input row is expected to have a string column that indicates the change operation. The op column is removed from the output and the row is emitted with the corresponding change operation.

This is useful when consuming Change Data Capture (CDC) streams from systems like Debezium, Maxwell, or Canal, where events arrive as flat append-only records with an explicit operation field. It's also useful to be used in combination with the TO_CHANGELOG function, when the user wants to turn the append-only table back into an updating table after doing some specific transformation to the events.

### Syntax

```sql
SELECT * FROM FROM_CHANGELOG(
input => TABLE source_table [PARTITION BY key_col],
[op => DESCRIPTOR(op_column_name),]
[op_mapping => MAP['c, r', 'INSERT', 'u', 'UPDATE_AFTER', 'd', 'DELETE']]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The invalid_op_handling param would already be relevant here but we can add it in a follow up PR

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point and good idea, let's keep this PR not grow too big.

)
```

`PARTITION BY` is optional when the mapping includes `UPDATE_BEFORE` (retract mode). It is required when the mapping produces upsert mode (no `UPDATE_BEFORE`), because downstream operators need a key for updates and deletes. When provided, records are distributed by the partition key for parallel processing.

### Parameters

| Parameter | Required | Description |
|:-------------|:---------|:------------|
| `input` | Yes | The input table. Must be append-only. `PARTITION BY` is optional for retract mode (with `UPDATE_BEFORE`) and required for upsert mode (without `UPDATE_BEFORE`). |
| `op` | No | A `DESCRIPTOR` with a single column name for the operation code column. Defaults to `op`. The column must exist in the input table and be of type STRING. |
| `op_mapping` | No | A `MAP<STRING, STRING>` mapping user-defined operation codes to change operation names. Keys are user codes (e.g., `'c'`, `'u'`, `'d'`), values are change operation names (`INSERT`, `UPDATE_BEFORE`, `UPDATE_AFTER`, `DELETE`). Keys can contain comma-separated codes to map multiple codes to the same operation (e.g., `'c, r'`). When provided, only mapped codes are forwarded - unmapped codes are dropped. Each change operation may appear at most once across all entries. |

#### Default op_mapping

When `op_mapping` is omitted, the following standard names are used:

| Input code | Change operation |
|:-------------------|:------------------|
| `'INSERT'` | INSERT |
| `'UPDATE_BEFORE'` | UPDATE_BEFORE |
| `'UPDATE_AFTER'` | UPDATE_AFTER |
| `'DELETE'` | DELETE |

### Output Schema

The output columns are ordered as:

```
[all_input_columns_without_op]
```

The op column is removed from the output. Output rows carry the appropriate change operation (INSERT, UPDATE_BEFORE, UPDATE_AFTER, or DELETE).

### Examples

#### Basic usage with standard op names

```sql
-- Input (append-only):
-- +I[id:1, op:'INSERT', name:'Alice']
-- +I[id:1, op:'UPDATE_BEFORE', name:'Alice']
-- +I[id:1, op:'UPDATE_AFTER', name:'Alice2']
-- +I[id:2, op:'DELETE', name:'Bob']

SELECT * FROM FROM_CHANGELOG(
input => TABLE cdc_stream
)

-- Output (updating table):
-- +I[id:1, name:'Alice']
-- -U[id:1, name:'Alice']
-- +U[id:1, name:'Alice2']
-- -D[id:2, name:'Bob']

-- Table state after all events:
-- | id | name |
-- |----|--------|
-- | 1 | Alice2 |
```

#### Debezium-style CDC codes

```sql
SELECT * FROM FROM_CHANGELOG(
input => TABLE cdc_stream PARTITION BY id,
op => DESCRIPTOR(__op),
op_mapping => MAP['c, r', 'INSERT', 'u', 'UPDATE_AFTER', 'd', 'DELETE']
)
-- 'c' (create) and 'r' (read/snapshot) both produce INSERT
-- 'u' produces UPDATE_AFTER
-- 'd' produces DELETE
```

#### Custom operation column name

```sql
SELECT * FROM FROM_CHANGELOG(
input => TABLE cdc_stream,
op => DESCRIPTOR(operation)
)
-- The operation column named 'operation' is used instead of 'op'
```

#### Table API

```java
// Default (retract mode): reads 'op' column with standard change operation names
Table result = cdcStream.fromChangelog();

// Upsert mode requires PARTITION BY — use the generic process() method
Table result = cdcStream.partitionBy($("id")).process("FROM_CHANGELOG",
map("c, r", "INSERT", "u", "UPDATE_AFTER", "d", "DELETE").asArgument("op_mapping")
);
```

## TO_CHANGELOG

Expand Down Expand Up @@ -88,6 +196,11 @@ All output rows have `INSERT` - the table is always append-only.
-- +I[id:1, name:'Alice', cnt:1]
-- +U[id:1, name:'Alice', cnt:2]
-- -D[id:2, name:'Bob', cnt:1]
--
-- Table state after all events:
-- | id | name | cnt |
-- |----|-------|-----|
-- | 1 | Alice | 2 |

SELECT * FROM TO_CHANGELOG(
input => TABLE my_aggregation PARTITION BY id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1323,6 +1323,33 @@ default TableResult executeInsert(
*/
PartitionedTable partitionBy(Expression... fields);

/**
* Converts this append-only table with an explicit operation code column into a dynamic table
* using the built-in {@code FROM_CHANGELOG} process table function.
*
* <p>Each input row is expected to have a string operation code column (default: {@code "op"})
* that indicates the change operation (e.g., INSERT, UPDATE_AFTER, UPDATE_BEFORE, DELETE). The
* output table is a dynamic table backed by a changelog stream.
*
* <p>Optional arguments can be passed using named expressions:
*
* <pre>{@code
* // Default: reads 'op' column with standard change operation names
* table.fromChangelog();
*
* // Custom op column name and mapping (Debezium-style codes)
* table.fromChangelog(
* descriptor("__op").asArgument("op"),
* map("c, r", "INSERT", "u", "UPDATE_AFTER", "d", "DELETE").asArgument("op_mapping")
* );
* }</pre>
*
* @param arguments optional named arguments for {@code op} and {@code op_mapping}
* @return a dynamic {@link Table} with the op column removed and proper change operation
* semantics
*/
Table fromChangelog(Expression... arguments);

/**
* Converts this table object into a named argument.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,11 @@ public PartitionedTable partitionBy(Expression... fields) {
return new PartitionedTableImpl(this, Arrays.asList(fields));
}

@Override
public Table fromChangelog(Expression... arguments) {
return process(BuiltInFunctionDefinitions.FROM_CHANGELOG.getName(), (Object[]) arguments);
}

@Override
public ApiExpression asArgument(String name) {
return createArgumentExpression(operationTree, tableEnvironment, name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.InputTypeStrategy;
import org.apache.flink.table.types.inference.StaticArgument;
Expand All @@ -33,6 +34,7 @@
import java.util.Arrays;
import java.util.Locale;
import java.util.Optional;
import java.util.function.Function;
import java.util.regex.Pattern;

import static org.apache.flink.util.Preconditions.checkArgument;
Expand All @@ -52,7 +54,7 @@
* <p>Equality is defined by reference equality.
*/
@Internal
public final class BuiltInFunctionDefinition implements SpecializedFunction {
public final class BuiltInFunctionDefinition implements SpecializedFunction, ChangelogFunction {

private final String name;

Expand All @@ -72,6 +74,8 @@ public final class BuiltInFunctionDefinition implements SpecializedFunction {

private final SqlCallSyntax sqlCallSyntax;

private final @Nullable Function<ChangelogContext, ChangelogMode> changelogModeResolver;

private final String sqlName;

private BuiltInFunctionDefinition(
Expand All @@ -84,7 +88,8 @@ private BuiltInFunctionDefinition(
boolean isDeterministic,
boolean isRuntimeProvided,
String runtimeClass,
boolean isInternal) {
boolean isInternal,
@Nullable Function<ChangelogContext, ChangelogMode> changelogModeResolver) {
this.name = checkNotNull(name, "Name must not be null.");
this.sqlName = sqlName;
this.version = isInternal ? null : version;
Expand All @@ -95,6 +100,7 @@ private BuiltInFunctionDefinition(
this.runtimeClass = runtimeClass;
this.isInternal = isInternal;
this.sqlCallSyntax = sqlCallSyntax;
this.changelogModeResolver = changelogModeResolver;
validateFunction(this.name, this.version, this.isInternal);
}

Expand Down Expand Up @@ -225,6 +231,14 @@ public static String qualifyFunctionName(String name, int version) {
return String.format(INTERNAL_NAME_FORMAT, name.toUpperCase(Locale.ROOT), version);
}

@Override
public ChangelogMode getChangelogMode(final ChangelogContext changelogContext) {
if (changelogModeResolver != null) {
return changelogModeResolver.apply(changelogContext);
}
return ChangelogMode.insertOnly();
}

// --------------------------------------------------------------------------------------------
// Builder
// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -253,6 +267,8 @@ public static final class Builder {

private SqlCallSyntax sqlCallSyntax = SqlCallSyntax.FUNCTION;

private @Nullable Function<ChangelogContext, ChangelogMode> changelogModeResolver;

public Builder() {
// default constructor to allow a fluent definition
}
Expand Down Expand Up @@ -399,6 +415,18 @@ public Builder sqlName(String name) {
return this;
}

/**
* Sets a resolver that dynamically determines the output {@link ChangelogMode} for this
* built-in PTF. The resolver receives the {@link ChangelogContext} and can inspect function
* arguments (e.g., op_mapping) to adapt the changelog mode. Only needed for PTFs that emit
* updates (e.g., FROM_CHANGELOG).
*/
public Builder changelogModeResolver(
Function<ChangelogContext, ChangelogMode> changelogModeResolver) {
this.changelogModeResolver = changelogModeResolver;
return this;
}

public BuiltInFunctionDefinition build() {
return new BuiltInFunctionDefinition(
name,
Expand All @@ -410,7 +438,8 @@ public BuiltInFunctionDefinition build() {
isDeterministic,
isRuntimeProvided,
runtimeClass,
isInternal);
isInternal,
changelogModeResolver);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.flink.table.types.inference.StaticArgumentTrait;
import org.apache.flink.table.types.inference.TypeStrategies;
import org.apache.flink.table.types.inference.strategies.ArrayOfStringArgumentTypeStrategy;
import org.apache.flink.table.types.inference.strategies.FromChangelogTypeStrategy;
import org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies;
import org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
Expand Down Expand Up @@ -106,6 +107,7 @@
import static org.apache.flink.table.types.inference.TypeStrategies.varyingString;
import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.ARRAY_ELEMENT_ARG;
import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.ARRAY_FULLY_COMPARABLE;
import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.FROM_CHANGELOG_INPUT_TYPE_STRATEGY;
import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.INDEX;
import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.JSON_ARGUMENT;
import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.ML_PREDICT_INPUT_TYPE_STRATEGY;
Expand All @@ -115,6 +117,7 @@
import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.percentage;
import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.percentageArray;
import static org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies.ARRAY_APPEND_PREPEND;
import static org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies.FROM_CHANGELOG_OUTPUT_TYPE_STRATEGY;
import static org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies.ML_PREDICT_OUTPUT_TYPE_STRATEGY;
import static org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies.TO_CHANGELOG_OUTPUT_TYPE_STRATEGY;

Expand Down Expand Up @@ -802,6 +805,31 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
"org.apache.flink.table.runtime.functions.ptf.ToChangelogFunction")
.build();

public static final BuiltInFunctionDefinition FROM_CHANGELOG =
BuiltInFunctionDefinition.newBuilder()
.name("FROM_CHANGELOG")
.kind(PROCESS_TABLE)
.staticArguments(
StaticArgument.table(
"input",
Row.class,
false,
EnumSet.of(
StaticArgumentTrait.TABLE,
StaticArgumentTrait.SET_SEMANTIC_TABLE,
StaticArgumentTrait.OPTIONAL_PARTITION_BY)),
StaticArgument.scalar("op", DataTypes.DESCRIPTOR(), true),
StaticArgument.scalar(
"op_mapping",
DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()),
true))
.inputTypeStrategy(FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
.outputTypeStrategy(FROM_CHANGELOG_OUTPUT_TYPE_STRATEGY)
.changelogModeResolver(FromChangelogTypeStrategy::resolveChangelogMode)
.runtimeClass(
"org.apache.flink.table.runtime.functions.ptf.FromChangelogFunction")
.build();

public static final BuiltInFunctionDefinition GREATEST =
BuiltInFunctionDefinition.newBuilder()
.name("GREATEST")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.types.RowKind;

import java.util.Optional;

/**
* An extension that allows a process table function (PTF) to emit results with changelog semantics.
*
Expand Down Expand Up @@ -61,8 +63,10 @@
* </ol>
*
* <p>Emitting changelogs is only valid for PTFs that take table arguments with set semantics (see
* {@link ArgumentTrait#SET_SEMANTIC_TABLE}). In case of upserts, the upsert key must be equal to
* the PARTITION BY key.
* {@link ArgumentTrait#SET_SEMANTIC_TABLE}). When using {@code OPTIONAL_PARTITION_BY}, the
* PARTITION BY clause can be omitted for retract mode (with {@link RowKind#UPDATE_BEFORE}), since
* the stream is self-describing. In case of upserts, the upsert key must be equal to the PARTITION
* BY key.
*
* <p>It is perfectly valid for a {@link ChangelogFunction} implementation to return a fixed {@link
* ChangelogMode}, regardless of the {@link ChangelogContext}. This approach may be appropriate when
Expand Down Expand Up @@ -105,5 +109,14 @@ interface ChangelogContext {
* are required and {@link ChangelogMode#keyOnlyDeletes()} are supported.
*/
ChangelogMode getRequiredChangelogMode();

/**
* Returns the value of a scalar argument at the given position.
*
* @param pos the argument position
* @param clazz the expected class of the argument value
* @return the argument value, or empty if the argument is null or not available
*/
<T> Optional<T> getArgumentValue(int pos, Class<T> clazz);
}
}
Loading