Conversation
There was a problem hiding this comment.
Just investigated the implementation in apache/fluss#1176. Seems instead of creating a replica for each file that uses incompatible API (between Flink 1.x and 2.x), Hongshun introduces a compatible layer in fluss-flink-common package and puts all incompatible API usages inside, and dispatch it in fluss-flink-1.x and fluss-flink-2.x. I think if we took this approach, the only module that needs to be versioned is flink-cdc-common.
The obvious advantage is we don't need so much code duplication. This PR adds ~20k sloc code (mostly duplicate, and it's hard to review what's changed) while Fluss support is merely +856 -36. Though two codebase could not be compared directly, such huge difference is not negligible. This design also causes maintenance issue, as any following PR modifying related file must remember updating both 1.x and 2.x implementation and keep them in-sync.
I'm not strongly against the approach used in this PR, just wondering if the alternative solution is possible / why it's impossible.
|
Hi @yuxiqian, We've been working on Flink 2.2 compatibility in a fork and have a working implementation following the approach referenced in fluss#1176 — i.e., introducing a version-specific compat module with runtime bridges. Our branch: https://github.com/macdoor/flink-cdc/tree/feature/opengauss-flink22-compat Key changes we made to get OpenGauss → Paimon pipelines running on Flink 2.2: flink-cdc-flink-compat module — two sub-modules: flink-cdc-flink-compat-flink1 (Flink 1.x bridge with SinkFunction/SourceFunction) and flink-cdc-flink-compat-flink2 (Flink 2.x stub classes: Sink$InitContext, CatalogFactory, Catalog). The pipeline now runs end-to-end on Flink 2.2.0 with a standalone session cluster (OpenGauss source → Paimon sink, all 4 operator stages visible including Sink Committer). Happy to share details, open a draft PR, or contribute directly to this effort. Let us know what would be most helpful! |
|
Hi @macdoor, I pulled your branch, but the code in this branch doesn't seem to be complete (it doesn't even compile). Did I miss something? Of course, your implementation in the PR is more concise. If you can get it to compile and run on Flink 2.2, feel free to submit a PR first so we can verify that the tests pass. |
|
Hi @lvyanquan, Thanks for trying the branch. We’ve since opened a dedicated PR with a complete, compilable implementation:
You can clone/build from branch macdoor:feature/flink22-compat and run the described steps to verify; we’ve already run a full build and conflict resolution on that branch. If anything doesn’t work on your side, we can iterate in that PR. |
375a622 to
5b47571
Compare
a814a86 to
acfb3b1
Compare
bbff971 to
abb4dee
Compare
Co-authored-by: xiaoxiong.duan@zznode.com <xiaoxiong.duan@zznode.com>
There was a problem hiding this comment.
Pull request overview
This PR introduces Flink 2.2.0 compatibility by keeping existing Flink 1.20-based modules intact and adding targeted Flink 2.x support via new modules/profiles, reflection-based shims, and dedicated CI coverage (unit + e2e) for the Flink 2.x build.
Changes:
- Adds a
flink2Maven profile and new Flink 2.x modules (notably values connector + pipeline e2e tests) while defaulting the build to Flink 1.20. - Introduces runtime compatibility utilities and adapts operators/serializers/translators to compile/run across Flink 1.x and 2.x API differences (often via reflection).
- Extends CI workflows to compile/test against Flink 2.2.0 and run a new Flink 2.x pipeline e2e suite.
Reviewed changes
Copilot reviewed 83 out of 83 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
pom.xml |
Adds Flink 1.x/2.x version properties, shaded guava handling, and a flink2 profile hook for tests. |
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/RegularEventOperatorTestHarness.java |
Uses reflection/proxy output and compatibility setupOperator for Flink 2.x access changes. |
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/DistributedEventOperatorTestHarness.java |
Same as above for distributed harness. |
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/SerializerTestBase.java |
Routes schema compatibility checks through compatibility utility. |
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/EnumSerializerTest.java |
Updates schema compatibility calls for Flink 1.x/2.x. |
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkOperatorAdapter.java |
Reflection-based chaining strategy + subtask index API update for Flink 2.x. |
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/utils/FlinkCompatibilityUtils.java |
New helper for cross-version reflection (setup, chaining strategy, serializer compatibility). |
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/PartitioningEventTypeInfo.java |
Adds Flink 2.x SerializerConfig serializer creation overload. |
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/EventTypeInfo.java |
Same as above for Event type info. |
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/ZonedTimestampDataSerializer.java |
Adds Flink 2.x snapshot-compat method variant. |
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/TimestampDataSerializer.java |
Adds Flink 2.x snapshot-compat method variant. |
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/MapDataSerializer.java |
Adds Flink 2.x snapshot-compat method variant. |
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/LocalZonedTimestampDataSerializer.java |
Adds Flink 2.x snapshot-compat method variant. |
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/DecimalDataSerializer.java |
Adds Flink 2.x snapshot-compat method variant. |
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/ArrayDataSerializer.java |
Adds Flink 2.x snapshot-compat method variant. |
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/NullableSerializerWrapper.java |
Adds Flink 2.x snapshot-compat method variant. |
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/NestedSerializersSnapshotDelegate.java |
Uses reflection to resolve schema compatibility across Flink 1.x/2.x. |
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/EnumSerializer.java |
Adds Flink 2.x snapshot-compat method variant. |
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/RegularPrePartitionOperator.java |
Uses compatibility chaining-strategy setter. |
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/DistributedPrePartitionOperator.java |
Uses compatibility chaining-strategy setter + updates subtask index access. |
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/BatchRegularPrePartitionOperator.java |
Uses compatibility chaining-strategy setter. |
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java |
Uses compatibility chaining-strategy setter + avoids wrapping chained-operator exceptions. |
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperatorFactory.java |
Removes Flink-1-only factory interface and switches mailbox executor sourcing for compatibility. |
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java |
Heavy reflection to invoke wrapped operator methods + Flink 2.2 committable emission workaround. |
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkFunctionOperator.java |
Uses compatibility chaining-strategy setter + updates subtask index access. |
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/BatchDataSinkWriterOperator.java |
Mirrors reflection invocation approach for batch writer operator. |
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/BatchDataSinkFunctionOperator.java |
Uses compatibility chaining-strategy setter. |
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java |
Uses compatibility chaining-strategy setter + updates subtask index access. |
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/BatchSchemaOperator.java |
Uses compatibility chaining-strategy setter. |
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java |
Updates subtask index access for Flink 2.x. |
flink-cdc-runtime/pom.xml |
Adds flink2 profile to switch Flink version and exclude legacy SinkFunction operators. |
flink-cdc-e2e-tests/pom.xml |
Adds new Flink 2.x pipeline e2e tests module. |
flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/.../FlinkContainerTestEnvironment.java |
Switches to project-owned ValidationException. |
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/.../PipelineTestEnvironment.java |
Switches to project-owned ValidationException. |
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/.../MySqlToHudiE2eITCase.java |
Switches to project-owned ValidationException. |
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/.../MaxComputeE2eITCase.java |
Switches to project-owned ValidationException. |
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/resources/log4j2-test.properties |
Adds logging config for Flink 2.x e2e module. |
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/resources/docker/mysql/setup.sql |
Adds MySQL init SQL for Flink 2.x e2e module. |
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/resources/docker/mysql/my.cnf |
Adds MySQL config for Flink 2.x e2e module. |
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/java/.../TarballFetcher.java |
New helper to fetch CDC tarballs/jars into containers for Flink 2.x e2e runs. |
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/java/.../PipelineTestEnvironment.java |
New Flink 2.x container-based pipeline test environment. |
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/java/.../ValuesE2eITCase.java |
Adds Flink 2.x values-pipeline end-to-end tests. |
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/pom.xml |
New Flink 2.x pipeline e2e module build, deps, and failsafe execution. |
flink-cdc-dist/pom.xml |
Adds flink2 profile dependency adjustment for shaded guava. |
flink-cdc-connect/flink-cdc-pipeline-connectors/pom.xml |
Adds flink-cdc-pipeline-connector-values-2.x module. |
flink-cdc-connect/.../flink-cdc-pipeline-connector-values/.../ValuesDataSinkFunction.java |
Removes legacy SinkFunction-based values sink implementation. |
flink-cdc-connect/.../flink-cdc-pipeline-connector-values/.../ValuesDataSink.java |
Forces values sink to Sink V2 and adds writer init compatibility overload. |
flink-cdc-connect/.../flink-cdc-pipeline-connector-values-2.x/src/main/resources/.../Factory |
Registers values connector factory for the Flink 2.x variant module. |
flink-cdc-connect/.../flink-cdc-pipeline-connector-values-2.x/.../ValuesDataSink.java |
Flink 2.x-specific values sink implementation. |
flink-cdc-connect/.../flink-cdc-pipeline-connector-values-2.x/pom.xml |
Adds Flink 2.x values connector module with shading configuration. |
flink-cdc-connect/.../flink-cdc-pipeline-connector-starrocks/.../StarRocksDataSinkFactoryTest.java |
Switches to project-owned ValidationException. |
flink-cdc-connect/.../flink-cdc-pipeline-connector-postgres/.../PostgresDataSourceFactoryTest.java |
Switches to project-owned ValidationException. |
flink-cdc-connect/.../flink-cdc-pipeline-connector-postgres/.../PostgresDataSourceFactory.java |
Switches to project-owned ValidationException. |
flink-cdc-connect/.../flink-cdc-pipeline-connector-paimon/.../PaimonDataSinkFactoryTest.java |
Switches to project-owned ValidationException. |
flink-cdc-connect/.../flink-cdc-pipeline-connector-oracle/.../OracleDataSourceFactory.java |
Switches to project-owned ValidationException. |
flink-cdc-connect/.../flink-cdc-pipeline-connector-mysql/.../MySqlDataSourceFactoryTest.java |
Switches to project-owned ValidationException. |
flink-cdc-connect/.../flink-cdc-pipeline-connector-mysql/.../MySqlDataSourceFactory.java |
Switches to project-owned ValidationException. |
flink-cdc-connect/.../flink-cdc-pipeline-connector-kafka/.../KafkaDataSinkFactoryTest.java |
Switches to project-owned ValidationException. |
flink-cdc-connect/.../flink-cdc-pipeline-connector-iceberg/.../IcebergDataSinkFactoryTest.java |
Switches to project-owned ValidationException. |
flink-cdc-connect/.../flink-cdc-pipeline-connector-hudi/.../HudiDataSinkFactoryTest.java |
Switches to project-owned ValidationException. |
flink-cdc-connect/.../flink-cdc-pipeline-connector-fluss/.../FlussDataSinkFactoryTest.java |
Switches to project-owned ValidationException. |
flink-cdc-connect/.../flink-cdc-pipeline-connector-elasticsearch/.../ElasticsearchDataSinkFactoryTest.java |
Switches to project-owned ValidationException. |
flink-cdc-connect/.../flink-cdc-pipeline-connector-elasticsearch/.../ElasticsearchDataSinkFactory.java |
Switches to project-owned ValidationException. |
flink-cdc-composer/src/test/java/.../DistributedSourceFunction.java |
Removes old SourceFunction-based test source. |
flink-cdc-composer/src/test/java/.../DistributedSource.java |
Adds FLIP-27 Source API test source implementation. |
flink-cdc-composer/src/test/java/.../DistributedDataSource.java |
Switches test datasource to new Source API provider. |
flink-cdc-composer/src/main/java/.../DataSourceTranslator.java |
Supports legacy SourceFunction providers via reflection (Flink 1.x), defaults to new Source API. |
flink-cdc-composer/src/main/java/.../DataSinkTranslator.java |
Supports legacy SinkFunction path via reflection and adapts commit topology checks for Flink 2.x. |
flink-cdc-composer/pom.xml |
Switches values connector artifact by profile (flink2) and excludes a Flink-1-only test. |
flink-cdc-common/src/test/java/.../FactoryHelperTests.java |
Switches to project-owned ValidationException. |
flink-cdc-common/src/main/java/.../source/EventSourceProvider.java |
Removes compile-time dependency on Flink SourceFunction. |
flink-cdc-common/src/main/java/.../sink/EventSinkProvider.java |
Removes compile-time dependency on Flink SinkFunction. |
flink-cdc-common/src/main/java/.../factories/FactoryHelper.java |
Switches to project-owned ValidationException. |
flink-cdc-common/src/main/java/.../exceptions/ValidationException.java |
Adds project-owned validation exception to avoid Flink Table API dependency. |
flink-cdc-common/pom.xml |
Adds flink2 profile to switch Flink version and exclude legacy provider classes. |
flink-cdc-cli/src/test/java/.../CliFrontendTest.java |
Adjusts savepoint restore mode assertions to support different Flink enums via reflection. |
flink-cdc-cli/src/main/java/.../utils/ConfigurationUtils.java |
Resolves Flink restore/claim mode class across Flink 1.x/2.x via reflection. |
flink-cdc-cli/src/main/java/.../CliFrontend.java |
Reworks default restore mode selection to avoid direct SavepointConfigOptions dependency. |
flink-cdc-cli/pom.xml |
Adds flink2 profile to switch Flink version. |
.github/workflows/modules.py |
Adds module grouping for Flink 2.x builds and new pipeline e2e 2.x module wiring. |
.github/workflows/flink_cdc_ci_nightly.yml |
Adds Flink 2.x unit tests and Flink 2.x pipeline e2e jobs. |
.github/workflows/flink_cdc_ci.yml |
Adds Flink 2.x unit tests and Flink 2.x pipeline e2e jobs. |
.github/workflows/flink_cdc_base.yml |
Ensures the compile/install phase also runs with custom Maven params (e.g., -Pflink2). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...untime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java
Show resolved
Hide resolved
...omposer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java
Show resolved
Hide resolved
...nnector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSink.java
Show resolved
Hide resolved
...sts-2.x/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
Show resolved
Hide resolved
...ne-e2e-tests-2.x/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/TarballFetcher.java
Show resolved
Hide resolved
|
Hi @macdoor. I've added the corresponding tests in CI. Perhaps @macdoor and @yuxiqian can continue reviewing the code now. |
|
Pulled and run pipeline e2e tests with Flink 1.19.3, and the following exceptions occur: |
Introduction
Building upon the foundation where existing modules continue to use Flink 1.20 dependencies, support for Flink 2.x versions is provided through newly added modules.
Development Plan
I plan to complete full support for Flink 2.x versions through three steps:
and perform integration tests and end-to-end tests on these modules based on a simple values pipeline
connector to verify correctness.
our most commonly used CDC connector.
feasible.
This PR will complete the work of the first step.
Topics for Discussion
1. Module Design
Question: Is it necessary to design each module with a structure consisting of a common module, a module with 1.x API, and a module with 2.x API, as Paimon does?
My Answer: This would require creating three modules for every module in the project. I think this introduces too many additional modules. Therefore, I will keep existing modules' dependency on Flink 1.x unchanged, and only add new modules that depend on Flink 2.x. I will rewrite classes that depend on the new API, and use the shade plugin to reduce the number of classes that need to be rewritten in the new modules.
2. Test Coverage
Question: Is it necessary to add tests equivalent to those in the 1.x modules for each newly added 2.x module?
My Answer: This is a difficult decision point. Adding sufficient tests can guarantee the correctness and reliability of 2.x modules, but it would introduce a large amount of duplicate code and also increase the time required for CI runs. To avoid the burden of review, I have only added composer tests and e2e tests in this PR to ensure that the support for Flink 2.x is functional. I plan to add more complete tests in subsequent PRs (if necessary).
The above lists the points that I still consider uncertain during the implementation of this PR. Discussions are welcome.