Skip to content

[FLINK-38729] Add support for Flink 2.2.0#4294

Open
lvyanquan wants to merge 4 commits intomasterfrom
FLINK-38729-2
Open

[FLINK-38729] Add support for Flink 2.2.0#4294
lvyanquan wants to merge 4 commits intomasterfrom
FLINK-38729-2

Conversation

@lvyanquan
Copy link
Contributor

@lvyanquan lvyanquan commented Mar 2, 2026

Introduction

Building upon the foundation where existing modules continue to use Flink 1.20 dependencies, support for Flink 2.x versions is provided through newly added modules.

Development Plan

I plan to complete full support for Flink 2.x versions through three steps:

  1. The first step is to provide support for Flink 2.x versions in the common/runtime/composer modules,
    and perform integration tests and end-to-end tests on these modules based on a simple values pipeline
    connector to verify correctness.
  2. The second step is to implement a MySQL Pipeline connector that supports Flink 2.x versions, as it is
    our most commonly used CDC connector.
  3. The third step is to add support for Flink 2.x versions to existing source/pipeline connectors, if
    feasible.

This PR will complete the work of the first step.

Topics for Discussion

1. Module Design

Question: Is it necessary to design each module with a structure consisting of a common module, a module with 1.x API, and a module with 2.x API, as Paimon does?
My Answer: This would require creating three modules for every module in the project. I think this introduces too many additional modules. Therefore, I will keep existing modules' dependency on Flink 1.x unchanged, and only add new modules that depend on Flink 2.x. I will rewrite classes that depend on the new API, and use the shade plugin to reduce the number of classes that need to be rewritten in the new modules.

2. Test Coverage

Question: Is it necessary to add tests equivalent to those in the 1.x modules for each newly added 2.x module?
My Answer: This is a difficult decision point. Adding sufficient tests can guarantee the correctness and reliability of 2.x modules, but it would introduce a large amount of duplicate code and also increase the time required for CI runs. To avoid the burden of review, I have only added composer tests and e2e tests in this PR to ensure that the support for Flink 2.x is functional. I plan to add more complete tests in subsequent PRs (if necessary).

The above lists the points that I still consider uncertain during the implementation of this PR. Discussions are welcome.

Copy link
Member

@yuxiqian yuxiqian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just investigated the implementation in apache/fluss#1176. Seems instead of creating a replica for each file that uses incompatible API (between Flink 1.x and 2.x), Hongshun introduces a compatible layer in fluss-flink-common package and puts all incompatible API usages inside, and dispatch it in fluss-flink-1.x and fluss-flink-2.x. I think if we took this approach, the only module that needs to be versioned is flink-cdc-common.

The obvious advantage is we don't need so much code duplication. This PR adds ~20k sloc code (mostly duplicate, and it's hard to review what's changed) while Fluss support is merely +856 -36. Though two codebase could not be compared directly, such huge difference is not negligible. This design also causes maintenance issue, as any following PR modifying related file must remember updating both 1.x and 2.x implementation and keep them in-sync.

I'm not strongly against the approach used in this PR, just wondering if the alternative solution is possible / why it's impossible.

@macdoor
Copy link

macdoor commented Mar 5, 2026

Hi @yuxiqian,

We've been working on Flink 2.2 compatibility in a fork and have a working implementation following the approach referenced in fluss#1176 — i.e., introducing a version-specific compat module with runtime bridges.

Our branch: https://github.com/macdoor/flink-cdc/tree/feature/opengauss-flink22-compat

Key changes we made to get OpenGauss → Paimon pipelines running on Flink 2.2:

flink-cdc-flink-compat module — two sub-modules: flink-cdc-flink-compat-flink1 (Flink 1.x bridge with SinkFunction/SourceFunction) and flink-cdc-flink-compat-flink2 (Flink 2.x stub classes: Sink$InitContext, CatalogFactory, Catalog).
DataSinkWriterOperator — reflection-based wrapping of SinkWriterOperator to find compatible constructors; fixed getSubtaskIndexCompat() to use getTaskInfo().getIndexOfThisSubtask() (previously returned hardcoded 0, breaking SchemaCoordinator flush synchronization).
DataSinkTranslator — getMethods() (not getDeclaredMethods()) to detect two-phase commit across superclasses; SupportsCommitter adapter proxy (serializable) for sinks that declare createCommitter() without implementing the interface.
Serializers — added resolveSchemaCompatibility(TypeSerializerSnapshot) (Flink 2.x new abstract method) to all custom TypeSerializerSnapshot implementations without @OverRide, so they compile against both 1.x and 2.x.
SourceSplitSerializer — reflection-based LogicalTypeParser.parse() to handle the removed single-arg overload.
PreCommitOperator / schema operators — replaced getRuntimeContext().getIndexOfThisSubtask() with getRuntimeContext().getTaskInfo().getIndexOfThisSubtask().

The pipeline now runs end-to-end on Flink 2.2.0 with a standalone session cluster (OpenGauss source → Paimon sink, all 4 operator stages visible including Sink Committer).

Happy to share details, open a draft PR, or contribute directly to this effort. Let us know what would be most helpful!

@lvyanquan
Copy link
Contributor Author

Hi @macdoor, I pulled your branch, but the code in this branch doesn't seem to be complete (it doesn't even compile). Did I miss something? Of course, your implementation in the PR is more concise. If you can get it to compile and run on Flink 2.2, feel free to submit a PR first so we can verify that the tests pass.

@macdoor
Copy link

macdoor commented Mar 6, 2026

Hi @lvyanquan,

Thanks for trying the branch. We’ve since opened a dedicated PR with a complete, compilable implementation:
PR: #4307feat: Flink 2.2 compatibility and dual-dist packaging

  • Build: two-step Maven commands (Java 17) to produce both flink-cdc-dist--1.20.jar and flink-cdc-dist--2.2.jar
  • Deployment: where to put each dist JAR and how to reuse the same connector JARs on Flink 1.20 and 2.2

You can clone/build from branch macdoor:feature/flink22-compat and run the described steps to verify; we’ve already run a full build and conflict resolution on that branch. If anything doesn’t work on your side, we can iterate in that PR.

Co-authored-by: xiaoxiong.duan@zznode.com <xiaoxiong.duan@zznode.com>
@lvyanquan lvyanquan requested a review from Copilot March 9, 2026 03:38
@lvyanquan lvyanquan marked this pull request as ready for review March 9, 2026 03:43
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces Flink 2.2.0 compatibility by keeping existing Flink 1.20-based modules intact and adding targeted Flink 2.x support via new modules/profiles, reflection-based shims, and dedicated CI coverage (unit + e2e) for the Flink 2.x build.

Changes:

  • Adds a flink2 Maven profile and new Flink 2.x modules (notably values connector + pipeline e2e tests) while defaulting the build to Flink 1.20.
  • Introduces runtime compatibility utilities and adapts operators/serializers/translators to compile/run across Flink 1.x and 2.x API differences (often via reflection).
  • Extends CI workflows to compile/test against Flink 2.2.0 and run a new Flink 2.x pipeline e2e suite.

Reviewed changes

Copilot reviewed 83 out of 83 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
pom.xml Adds Flink 1.x/2.x version properties, shaded guava handling, and a flink2 profile hook for tests.
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/RegularEventOperatorTestHarness.java Uses reflection/proxy output and compatibility setupOperator for Flink 2.x access changes.
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/DistributedEventOperatorTestHarness.java Same as above for distributed harness.
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/SerializerTestBase.java Routes schema compatibility checks through compatibility utility.
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/EnumSerializerTest.java Updates schema compatibility calls for Flink 1.x/2.x.
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkOperatorAdapter.java Reflection-based chaining strategy + subtask index API update for Flink 2.x.
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/utils/FlinkCompatibilityUtils.java New helper for cross-version reflection (setup, chaining strategy, serializer compatibility).
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/PartitioningEventTypeInfo.java Adds Flink 2.x SerializerConfig serializer creation overload.
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/EventTypeInfo.java Same as above for Event type info.
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/ZonedTimestampDataSerializer.java Adds Flink 2.x snapshot-compat method variant.
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/TimestampDataSerializer.java Adds Flink 2.x snapshot-compat method variant.
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/MapDataSerializer.java Adds Flink 2.x snapshot-compat method variant.
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/LocalZonedTimestampDataSerializer.java Adds Flink 2.x snapshot-compat method variant.
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/DecimalDataSerializer.java Adds Flink 2.x snapshot-compat method variant.
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/ArrayDataSerializer.java Adds Flink 2.x snapshot-compat method variant.
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/NullableSerializerWrapper.java Adds Flink 2.x snapshot-compat method variant.
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/NestedSerializersSnapshotDelegate.java Uses reflection to resolve schema compatibility across Flink 1.x/2.x.
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/EnumSerializer.java Adds Flink 2.x snapshot-compat method variant.
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/RegularPrePartitionOperator.java Uses compatibility chaining-strategy setter.
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/DistributedPrePartitionOperator.java Uses compatibility chaining-strategy setter + updates subtask index access.
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/BatchRegularPrePartitionOperator.java Uses compatibility chaining-strategy setter.
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java Uses compatibility chaining-strategy setter + avoids wrapping chained-operator exceptions.
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperatorFactory.java Removes Flink-1-only factory interface and switches mailbox executor sourcing for compatibility.
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java Heavy reflection to invoke wrapped operator methods + Flink 2.2 committable emission workaround.
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkFunctionOperator.java Uses compatibility chaining-strategy setter + updates subtask index access.
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/BatchDataSinkWriterOperator.java Mirrors reflection invocation approach for batch writer operator.
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/BatchDataSinkFunctionOperator.java Uses compatibility chaining-strategy setter.
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java Uses compatibility chaining-strategy setter + updates subtask index access.
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/BatchSchemaOperator.java Uses compatibility chaining-strategy setter.
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java Updates subtask index access for Flink 2.x.
flink-cdc-runtime/pom.xml Adds flink2 profile to switch Flink version and exclude legacy SinkFunction operators.
flink-cdc-e2e-tests/pom.xml Adds new Flink 2.x pipeline e2e tests module.
flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/.../FlinkContainerTestEnvironment.java Switches to project-owned ValidationException.
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/.../PipelineTestEnvironment.java Switches to project-owned ValidationException.
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/.../MySqlToHudiE2eITCase.java Switches to project-owned ValidationException.
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/.../MaxComputeE2eITCase.java Switches to project-owned ValidationException.
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/resources/log4j2-test.properties Adds logging config for Flink 2.x e2e module.
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/resources/docker/mysql/setup.sql Adds MySQL init SQL for Flink 2.x e2e module.
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/resources/docker/mysql/my.cnf Adds MySQL config for Flink 2.x e2e module.
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/java/.../TarballFetcher.java New helper to fetch CDC tarballs/jars into containers for Flink 2.x e2e runs.
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/java/.../PipelineTestEnvironment.java New Flink 2.x container-based pipeline test environment.
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/java/.../ValuesE2eITCase.java Adds Flink 2.x values-pipeline end-to-end tests.
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/pom.xml New Flink 2.x pipeline e2e module build, deps, and failsafe execution.
flink-cdc-dist/pom.xml Adds flink2 profile dependency adjustment for shaded guava.
flink-cdc-connect/flink-cdc-pipeline-connectors/pom.xml Adds flink-cdc-pipeline-connector-values-2.x module.
flink-cdc-connect/.../flink-cdc-pipeline-connector-values/.../ValuesDataSinkFunction.java Removes legacy SinkFunction-based values sink implementation.
flink-cdc-connect/.../flink-cdc-pipeline-connector-values/.../ValuesDataSink.java Forces values sink to Sink V2 and adds writer init compatibility overload.
flink-cdc-connect/.../flink-cdc-pipeline-connector-values-2.x/src/main/resources/.../Factory Registers values connector factory for the Flink 2.x variant module.
flink-cdc-connect/.../flink-cdc-pipeline-connector-values-2.x/.../ValuesDataSink.java Flink 2.x-specific values sink implementation.
flink-cdc-connect/.../flink-cdc-pipeline-connector-values-2.x/pom.xml Adds Flink 2.x values connector module with shading configuration.
flink-cdc-connect/.../flink-cdc-pipeline-connector-starrocks/.../StarRocksDataSinkFactoryTest.java Switches to project-owned ValidationException.
flink-cdc-connect/.../flink-cdc-pipeline-connector-postgres/.../PostgresDataSourceFactoryTest.java Switches to project-owned ValidationException.
flink-cdc-connect/.../flink-cdc-pipeline-connector-postgres/.../PostgresDataSourceFactory.java Switches to project-owned ValidationException.
flink-cdc-connect/.../flink-cdc-pipeline-connector-paimon/.../PaimonDataSinkFactoryTest.java Switches to project-owned ValidationException.
flink-cdc-connect/.../flink-cdc-pipeline-connector-oracle/.../OracleDataSourceFactory.java Switches to project-owned ValidationException.
flink-cdc-connect/.../flink-cdc-pipeline-connector-mysql/.../MySqlDataSourceFactoryTest.java Switches to project-owned ValidationException.
flink-cdc-connect/.../flink-cdc-pipeline-connector-mysql/.../MySqlDataSourceFactory.java Switches to project-owned ValidationException.
flink-cdc-connect/.../flink-cdc-pipeline-connector-kafka/.../KafkaDataSinkFactoryTest.java Switches to project-owned ValidationException.
flink-cdc-connect/.../flink-cdc-pipeline-connector-iceberg/.../IcebergDataSinkFactoryTest.java Switches to project-owned ValidationException.
flink-cdc-connect/.../flink-cdc-pipeline-connector-hudi/.../HudiDataSinkFactoryTest.java Switches to project-owned ValidationException.
flink-cdc-connect/.../flink-cdc-pipeline-connector-fluss/.../FlussDataSinkFactoryTest.java Switches to project-owned ValidationException.
flink-cdc-connect/.../flink-cdc-pipeline-connector-elasticsearch/.../ElasticsearchDataSinkFactoryTest.java Switches to project-owned ValidationException.
flink-cdc-connect/.../flink-cdc-pipeline-connector-elasticsearch/.../ElasticsearchDataSinkFactory.java Switches to project-owned ValidationException.
flink-cdc-composer/src/test/java/.../DistributedSourceFunction.java Removes old SourceFunction-based test source.
flink-cdc-composer/src/test/java/.../DistributedSource.java Adds FLIP-27 Source API test source implementation.
flink-cdc-composer/src/test/java/.../DistributedDataSource.java Switches test datasource to new Source API provider.
flink-cdc-composer/src/main/java/.../DataSourceTranslator.java Supports legacy SourceFunction providers via reflection (Flink 1.x), defaults to new Source API.
flink-cdc-composer/src/main/java/.../DataSinkTranslator.java Supports legacy SinkFunction path via reflection and adapts commit topology checks for Flink 2.x.
flink-cdc-composer/pom.xml Switches values connector artifact by profile (flink2) and excludes a Flink-1-only test.
flink-cdc-common/src/test/java/.../FactoryHelperTests.java Switches to project-owned ValidationException.
flink-cdc-common/src/main/java/.../source/EventSourceProvider.java Removes compile-time dependency on Flink SourceFunction.
flink-cdc-common/src/main/java/.../sink/EventSinkProvider.java Removes compile-time dependency on Flink SinkFunction.
flink-cdc-common/src/main/java/.../factories/FactoryHelper.java Switches to project-owned ValidationException.
flink-cdc-common/src/main/java/.../exceptions/ValidationException.java Adds project-owned validation exception to avoid Flink Table API dependency.
flink-cdc-common/pom.xml Adds flink2 profile to switch Flink version and exclude legacy provider classes.
flink-cdc-cli/src/test/java/.../CliFrontendTest.java Adjusts savepoint restore mode assertions to support different Flink enums via reflection.
flink-cdc-cli/src/main/java/.../utils/ConfigurationUtils.java Resolves Flink restore/claim mode class across Flink 1.x/2.x via reflection.
flink-cdc-cli/src/main/java/.../CliFrontend.java Reworks default restore mode selection to avoid direct SavepointConfigOptions dependency.
flink-cdc-cli/pom.xml Adds flink2 profile to switch Flink version.
.github/workflows/modules.py Adds module grouping for Flink 2.x builds and new pipeline e2e 2.x module wiring.
.github/workflows/flink_cdc_ci_nightly.yml Adds Flink 2.x unit tests and Flink 2.x pipeline e2e jobs.
.github/workflows/flink_cdc_ci.yml Adds Flink 2.x unit tests and Flink 2.x pipeline e2e jobs.
.github/workflows/flink_cdc_base.yml Ensures the compile/install phase also runs with custom Maven params (e.g., -Pflink2).

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@lvyanquan
Copy link
Contributor Author

Hi @macdoor.
I reviewed your code and found an issue: we cannot directly compile and test the entire project using the flink2 profile (even though you verified the feasibility of the changes in a Flink session). This raises concerns about the reliability of the code. I picked the key changes from your code (I will add you as a collaborator), and on that basis, I implemented it so that it can compile and test with both Flink 1.20 and Flink 2.2 (by specifying the flink2 profile).

I've added the corresponding tests in CI. Perhaps @macdoor and @yuxiqian can continue reviewing the code now.

@leonardBang leonardBang self-requested a review March 9, 2026 09:05
@yuxiqian
Copy link
Member

yuxiqian commented Mar 9, 2026

Pulled and run pipeline e2e tests with Flink 1.19.3, and the following exceptions occur:

java.lang.NoSuchMethodError: 'org.apache.flink.api.common.operators.MailboxExecutor org.apache.flink.streaming.api.operators.StreamOperatorParameters.getMailboxExecutor()'
	at org.apache.flink.cdc.runtime.operators.sink.DataSinkWriterOperatorFactory.createStreamOperator(DataSinkWriterOperatorFactory.java:67) ~[blob_p-f8e1c56ff9f3c6b9b7d335d0a2b43d642854d16f-a7c482c63d7a2f66cc89f366c31083a1:3.6-SNAPSHOT]
	at org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:81) ~[flink-dist-1.19.3.jar:1.19.3]
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:868) ~[flink-dist-1.19.3.jar:1.19.3]
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:836) ~[flink-dist-1.19.3.jar:1.19.3]
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:732) ~[flink-dist-1.19.3.jar:1.19.3]
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:202) ~[flink-dist-1.19.3.jar:1.19.3]
	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.<init>(RegularOperatorChain.java:60) ~[flink-dist-1.19.3.jar:1.19.3]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:731) ~[flink-dist-1.19.3.jar:1.19.3]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:713) ~[flink-dist-1.19.3.jar:1.19.3]
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) ~[flink-dist-1.19.3.jar:1.19.3]
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) [flink-dist-1.19.3.jar:1.19.3]
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751) [flink-dist-1.19.3.jar:1.19.3]
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) [flink-dist-1.19.3.jar:1.19.3]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants