Skip to content

[FLINK-39261][table] Add FROM_CHANGELOG built-in process table function#27901

Open
raminqaf wants to merge 7 commits intoapache:masterfrom
raminqaf:FLINK-39261
Open

[FLINK-39261][table] Add FROM_CHANGELOG built-in process table function#27901
raminqaf wants to merge 7 commits intoapache:masterfrom
raminqaf:FLINK-39261

Conversation

@raminqaf
Copy link
Copy Markdown
Contributor

@raminqaf raminqaf commented Apr 7, 2026

What is the purpose of the change

Implement the FROM_CHANGELOG built-in process table function as specified in FLIP-564, section 4.1.3.1 (append-only stream to upsert stream, flat mode).

FROM_CHANGELOG converts an append-only table with an explicit operation code column (e.g., Debezium's 'c', 'u', 'd') into a dynamic table backed by a Flink upsert stream ({+I, +U, -D}). This is the reverse of TO_CHANGELOG. The implementation is stateless — each input record maps directly to one output record with the appropriate RowKind.

SELECT * FROM FROM_CHANGELOG(
    input => TABLE cdc_stream PARTITION BY id,
    op => DESCRIPTOR(__op),
    op_mapping => MAP['c, r', 'INSERT', 'u', 'UPDATE_AFTER', 'd', 'DELETE']
)

Brief change log

  • Add optional ChangelogFunction delegate to BuiltInFunctionDefinition so built-in PTFs can declare their output changelog mode
  • Update FlinkChangelogModeInferenceProgram to handle the new delegate
  • Add FromChangelogTypeStrategy with input validation (op column existence, STRING type, op_mapping value validation) and output type inference (removes op column from output)
  • Add FROM_CHANGELOG built-in function definition with ChangelogMode.upsert(false) output
  • Add FromChangelogFunction runtime implementation using ProjectedRowData for zero-copy projection
  • Add fromChangelog() convenience method to PartitionedTable Table API
  • Add documentation for FROM_CHANGELOG in changelog.md

Verifying this change

This change added tests and can be verified as follows:

  • Added 7 unit tests for FromChangelogTypeStrategy input validation (valid mapping, op column not found, wrong type, invalid descriptor, invalid RowKind, UPDATE_BEFORE rejected, duplicate RowKind)
  • Added 11 semantic tests covering: default op_mapping, Debezium-style mapping, custom op column name, unmapped codes dropped, Table API convenience method, and 6 error validation tests
  • Added 2 plan tests verifying changelog mode propagation (changelogMode=[I,UA,D] for PTF output, changelogMode=[I] for source input)

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): yes (PartitionedTable.fromChangelog())
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? docs / JavaDocs

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Apr 7, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Copy Markdown
Contributor

@gustavodemorais gustavodemorais left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, @raminqaf! In general, it looks very consistent with to_changelog.

Here is my first set of reviews. In this one, I think there are some minor details we have to decide on and two bigger things:

  • Going with retract by default as the output mode
  • Finding a nice way of expressing the changelog mode with the BuiltInFunctionsDefinition

See below for more details

SELECT * FROM FROM_CHANGELOG(
input => TABLE source_table PARTITION BY key_col,
[op => DESCRIPTOR(op_column_name),]
[op_mapping => MAP['c, r', 'INSERT', 'u', 'UPDATE_AFTER', 'd', 'DELETE']]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The invalid_op_handling param would already be relevant here but we can add it in a follow up PR

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point and good idea, let's keep this PR not grow too big.

@github-actions github-actions bot added the community-reviewed PR has been reviewed by the community. label Apr 7, 2026
Copy link
Copy Markdown
Contributor

@gustavodemorais gustavodemorais left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates, Ramin! Added some comments, take a look


```sql
SELECT * FROM FROM_CHANGELOG(
input => TABLE source_table PARTITION BY key_col,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're moving to row semantics as the first default version - no partition by. I'm finishing a new PR that should help you see the changes that you'll have to make

Copy link
Copy Markdown
Contributor

@gustavodemorais gustavodemorais Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the PR #27911

FromChangelogTestPrograms.DEBEZIUM_MAPPING,
FromChangelogTestPrograms.UNMAPPED_CODES_DROPPED,
FromChangelogTestPrograms.TABLE_API_DEFAULT,
FromChangelogTestPrograms.MISSING_PARTITION_BY);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are two good tests we could add here

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests are added. The round-trip test is commented out, I will uncomment it as soon as we merge the PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed PR has been reviewed by the community.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants