[FLINK-39261][table] Add FROM_CHANGELOG built-in process table function#27901
[FLINK-39261][table] Add FROM_CHANGELOG built-in process table function#27901raminqaf wants to merge 7 commits intoapache:masterfrom
Conversation
There was a problem hiding this comment.
Thanks for the PR, @raminqaf! In general, it looks very consistent with to_changelog.
Here is my first set of reviews. In this one, I think there are some minor details we have to decide on and two bigger things:
- Going with retract by default as the output mode
- Finding a nice way of expressing the changelog mode with the BuiltInFunctionsDefinition
See below for more details
| SELECT * FROM FROM_CHANGELOG( | ||
| input => TABLE source_table PARTITION BY key_col, | ||
| [op => DESCRIPTOR(op_column_name),] | ||
| [op_mapping => MAP['c, r', 'INSERT', 'u', 'UPDATE_AFTER', 'd', 'DELETE']] |
There was a problem hiding this comment.
The invalid_op_handling param would already be relevant here but we can add it in a follow up PR
There was a problem hiding this comment.
Good point and good idea, let's keep this PR not grow too big.
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java
Outdated
Show resolved
Hide resolved
...-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
Outdated
Show resolved
Hide resolved
...k-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java
Outdated
Show resolved
Hide resolved
gustavodemorais
left a comment
There was a problem hiding this comment.
Thanks for the updates, Ramin! Added some comments, take a look
...k-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java
Outdated
Show resolved
Hide resolved
...rg/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
Outdated
Show resolved
Hide resolved
|
|
||
| ```sql | ||
| SELECT * FROM FROM_CHANGELOG( | ||
| input => TABLE source_table PARTITION BY key_col, |
There was a problem hiding this comment.
We're moving to row semantics as the first default version - no partition by. I'm finishing a new PR that should help you see the changes that you'll have to make
...k-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java
Outdated
Show resolved
Hide resolved
...rg/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java
Outdated
Show resolved
Hide resolved
| FromChangelogTestPrograms.DEBEZIUM_MAPPING, | ||
| FromChangelogTestPrograms.UNMAPPED_CODES_DROPPED, | ||
| FromChangelogTestPrograms.TABLE_API_DEFAULT, | ||
| FromChangelogTestPrograms.MISSING_PARTITION_BY); |
There was a problem hiding this comment.
I think there are two good tests we could add here
- round-trip FROM_CHANGELOG(TO_CHANGELOG(table)) (important, might work better when we merge [FLINK-39419][table] Switch TO_CHANGELOG to row semantics with full deletes + require update before #27911 which is approved)
- custom op column name - There's no test for op => DESCRIPTOR(operation)
There was a problem hiding this comment.
Tests are added. The round-trip test is commented out, I will uncomment it as soon as we merge the PR.
What is the purpose of the change
Implement the FROM_CHANGELOG built-in process table function as specified in FLIP-564, section 4.1.3.1 (append-only stream to upsert stream, flat mode).
FROM_CHANGELOG converts an append-only table with an explicit operation code column (e.g., Debezium's
'c','u','d') into a dynamic table backed by a Flink upsert stream ({+I, +U, -D}). This is the reverse of TO_CHANGELOG. The implementation is stateless — each input record maps directly to one output record with the appropriate RowKind.Brief change log
ChangelogFunctiondelegate toBuiltInFunctionDefinitionso built-in PTFs can declare their output changelog modeFlinkChangelogModeInferenceProgramto handle the new delegateFromChangelogTypeStrategywith input validation (op column existence, STRING type, op_mapping value validation) and output type inference (removes op column from output)FROM_CHANGELOGbuilt-in function definition withChangelogMode.upsert(false)outputFromChangelogFunctionruntime implementation usingProjectedRowDatafor zero-copy projectionfromChangelog()convenience method toPartitionedTableTable APIVerifying this change
This change added tests and can be verified as follows:
FromChangelogTypeStrategyinput validation (valid mapping, op column not found, wrong type, invalid descriptor, invalid RowKind, UPDATE_BEFORE rejected, duplicate RowKind)changelogMode=[I,UA,D]for PTF output,changelogMode=[I]for source input)Does this pull request potentially affect one of the following parts:
@Public(Evolving): yes (PartitionedTable.fromChangelog())Documentation