From ebd6a823f940f8e25c5348072160a0c18766c629 Mon Sep 17 00:00:00 2001 From: Jordan Epstein Date: Thu, 7 May 2026 11:24:32 -0500 Subject: [PATCH] Flink: Honor schema identifier fields in dynamic-sink record routing DynamicSinkUtil.getEqualityFieldIds and DynamicWriter.getEqualityFields both fall back to the schema's identifierFieldIds when the user-supplied equality fields are empty, but two routing decisions in the dynamic sink ignored that fallback: 1. HashKeyGenerator distributed identifier-only records round-robin, so two rows sharing an identifier-derived key could land on different writer subtasks while the writer still emitted equality deletes keyed by those identifier fields - breaking equality-delete correctness. 2. DynamicRecordProcessor forwarded any record with a null distributionMode straight to the writer, even when the record resolved to a non-empty equality-field set. Forward-mode records sharing an equality key could likewise split across writers and leave duplicates behind. Centralize the resolution in DynamicSinkUtil.resolveEqualityFieldNames and use it in both call sites so distribution and write-side equality-field inference stay aligned. Document the carve-out in flink-writes.md and add unit tests covering both paths across v2.1, v2.0 and v1.20. --- docs/docs/flink-writes.md | 4 +- .../sink/dynamic/DynamicRecordProcessor.java | 15 ++- .../flink/sink/dynamic/DynamicSinkUtil.java | 15 +++ .../flink/sink/dynamic/HashKeyGenerator.java | 14 +-- .../dynamic/TestDynamicRecordProcessor.java | 101 ++++++++++++++++++ .../sink/dynamic/TestHashKeyGenerator.java | 97 +++++++++++++++++ .../sink/dynamic/DynamicRecordProcessor.java | 15 ++- .../flink/sink/dynamic/DynamicSinkUtil.java | 15 +++ .../flink/sink/dynamic/HashKeyGenerator.java | 14 +-- .../dynamic/TestDynamicRecordProcessor.java | 101 ++++++++++++++++++ .../sink/dynamic/TestHashKeyGenerator.java | 97 +++++++++++++++++ .../sink/dynamic/DynamicRecordProcessor.java | 15 ++- .../flink/sink/dynamic/DynamicSinkUtil.java | 15 +++ .../flink/sink/dynamic/HashKeyGenerator.java | 14 +-- .../dynamic/TestDynamicRecordProcessor.java | 101 ++++++++++++++++++ .../sink/dynamic/TestHashKeyGenerator.java | 97 +++++++++++++++++ 16 files changed, 707 insertions(+), 23 deletions(-) create mode 100644 flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordProcessor.java create mode 100644 flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordProcessor.java create mode 100644 flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordProcessor.java diff --git a/docs/docs/flink-writes.md b/docs/docs/flink-writes.md index 03795b5beed0..cc20e4ff6598 100644 --- a/docs/docs/flink-writes.md +++ b/docs/docs/flink-writes.md @@ -487,10 +487,10 @@ We need the following information (DynamicRecord) for every record: | `Schema` | The schema of the record. | | `Spec` | The expected partitioning specification for the record. | | `RowData` | The actual row data to be written. | -| `DistributionMode` | The distribution mode for writing the record (NONE, HASH or `null`). When `null`, the record won't be shuffled at all. | +| `DistributionMode` | The distribution mode for writing the record (NONE, HASH or `null`). When `null`, the record won't be shuffled at all — except when the record resolves to a non-empty equality-field set, in which case the dynamic sink falls back to hash distribution to keep records sharing the same equality key on the same writer. | | `Parallelism` | The maximum number of parallel writers for a given table/branch/schema/spec (WriteTarget). | | `UpsertMode` | Overrides this table's write.upsert.enabled (optional). | -| `EqualityFields` | The equality fields for the table(optional). | +| `EqualityFields` | The equality fields for the table (optional). When unset, the dynamic sink falls back to the schema's identifier fields for both distribution (records sharing a key route to the same writer) and equality-delete inference. Identifier fields do not auto-enable upsert mode — that flag remains opt-in. | ### Schema Evolution diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java index c752b8e9b8d9..ba2d816ae9ea 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java @@ -129,7 +129,7 @@ public void processElement(T element, Context ctx, Collector getEqualityFieldIds(Set equalityFields, Schema schem return equalityFieldIds; } + /** + * Resolves the effective equality field names. Returns the user-supplied set when non-empty, + * otherwise falls back to the names of {@link Schema#identifierFieldIds()}. Mirrors {@link + * #getEqualityFieldIds} so distribution and write-side equality-field inference stay aligned. + */ + static Set resolveEqualityFieldNames( + @Nullable Set equalityFields, Schema schema) { + if (equalityFields != null && !equalityFields.isEmpty()) { + return equalityFields; + } + + return schema.identifierFieldNames(); + } + static int safeAbs(int input) { if (input >= 0) { return input; diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java index 61a850212bf4..4472de67b421 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java @@ -20,7 +20,6 @@ import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE; -import java.util.Collections; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -79,6 +78,9 @@ int generateKey( @Nullable PartitionSpec tableSpec, @Nullable RowData overrideRowData) { String tableIdent = dynamicRecord.tableIdentifier().toString(); + Schema effectiveSchema = MoreObjects.firstNonNull(tableSchema, dynamicRecord.schema()); + Set effectiveEqualityFields = + DynamicSinkUtil.resolveEqualityFieldNames(dynamicRecord.equalityFields(), effectiveSchema); SelectorKey cacheKey = new SelectorKey( tableIdent, @@ -87,8 +89,8 @@ int generateKey( tableSpec != null ? tableSpec.specId() : null, dynamicRecord.schema(), dynamicRecord.spec(), - dynamicRecord.equalityFields(), - dynamicRecord.distributionMode(), + effectiveEqualityFields, + MoreObjects.firstNonNull(dynamicRecord.distributionMode(), DistributionMode.NONE), Math.min(dynamicRecord.writeParallelism(), maxWriteParallelism)); KeySelector keySelector = keySelectorCache.computeIfAbsent( @@ -96,11 +98,11 @@ int generateKey( k -> getKeySelector( tableIdent, - MoreObjects.firstNonNull(tableSchema, dynamicRecord.schema()), + effectiveSchema, MoreObjects.firstNonNull(tableSpec, dynamicRecord.spec()), - dynamicRecord.distributionMode(), MoreObjects.firstNonNull( - dynamicRecord.equalityFields(), Collections.emptySet()), + dynamicRecord.distributionMode(), DistributionMode.NONE), + effectiveEqualityFields, Math.min(dynamicRecord.writeParallelism(), maxWriteParallelism))); try { return keySelector.getKey( diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordProcessor.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordProcessor.java new file mode 100644 index 000000000000..298a338ea9d0 --- /dev/null +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordProcessor.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Collections; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.StringData; +import org.apache.iceberg.DistributionMode; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SnapshotRef; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.Test; + +class TestDynamicRecordProcessor { + + private static final Schema SCHEMA = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "data", Types.StringType.get())); + + private static final Schema SCHEMA_WITH_IDENTIFIER = + new Schema( + Lists.newArrayList( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "data", Types.StringType.get())), + Sets.newHashSet(1)); + + private static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of("default", "table"); + private static final String BRANCH = SnapshotRef.MAIN_BRANCH; + + @Test + void testForwardEligibleWhenNoEqualityFields() { + DynamicRecord record = recordWithDistributionMode(SCHEMA, null); + + assertThat(DynamicRecordProcessor.isForwardEligible(record)).isTrue(); + } + + @Test + void testNotForwardEligibleWhenDistributionModeSet() { + DynamicRecord record = recordWithDistributionMode(SCHEMA, DistributionMode.NONE); + + assertThat(DynamicRecordProcessor.isForwardEligible(record)).isFalse(); + } + + @Test + void testNotForwardEligibleWhenUserEqualityFieldsSet() { + DynamicRecord record = recordWithDistributionMode(SCHEMA, null); + record.setEqualityFields(Collections.singleton("id")); + + assertThat(DynamicRecordProcessor.isForwardEligible(record)).isFalse(); + } + + @Test + void testNotForwardEligibleWhenSchemaHasIdentifierFields() { + DynamicRecord record = recordWithDistributionMode(SCHEMA_WITH_IDENTIFIER, null); + + assertThat(DynamicRecordProcessor.isForwardEligible(record)).isFalse(); + } + + @Test + void testNotForwardEligibleWhenEmptyEqualityFieldsButIdentifierFieldsPresent() { + DynamicRecord record = recordWithDistributionMode(SCHEMA_WITH_IDENTIFIER, null); + record.setEqualityFields(Collections.emptySet()); + + assertThat(DynamicRecordProcessor.isForwardEligible(record)).isFalse(); + } + + private static DynamicRecord recordWithDistributionMode( + Schema schema, DistributionMode distributionMode) { + return new DynamicRecord( + TABLE_IDENTIFIER, + BRANCH, + schema, + GenericRowData.of(1, StringData.fromString("foo")), + PartitionSpec.unpartitioned(), + distributionMode, + 2); + } +} diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java index 9a485fafaf47..4181f00d0a0a 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java @@ -38,6 +38,7 @@ import org.apache.iceberg.flink.FlinkWriteConf; import org.apache.iceberg.flink.FlinkWriteOptions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Types; import org.junit.jupiter.api.Test; @@ -294,6 +295,102 @@ void testCapAtMaxWriteParallelism() throws Exception { .isEqualTo(maxWriteParallelism); } + @Test + void testIdentifierFieldsResolvedAsEqualityFields() throws Exception { + int writeParallelism = 2; + int maxWriteParallelism = 8; + HashKeyGenerator generator = new HashKeyGenerator(16, maxWriteParallelism); + + Schema schemaWithIdentifier = + new Schema( + Lists.newArrayList( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "data", Types.StringType.get())), + Sets.newHashSet(1)); + PartitionSpec unpartitioned = PartitionSpec.unpartitioned(); + + GenericRowData row1 = GenericRowData.of(1, StringData.fromString("foo")); + GenericRowData row2 = GenericRowData.of(1, StringData.fromString("bar")); + GenericRowData row3 = GenericRowData.of(2, StringData.fromString("baz")); + + DynamicRecord record1 = + new DynamicRecord( + TABLE_IDENTIFIER, + BRANCH, + schemaWithIdentifier, + row1, + unpartitioned, + DistributionMode.NONE, + writeParallelism); + DynamicRecord record2 = + new DynamicRecord( + TABLE_IDENTIFIER, + BRANCH, + schemaWithIdentifier, + row2, + unpartitioned, + DistributionMode.NONE, + writeParallelism); + DynamicRecord record3 = + new DynamicRecord( + TABLE_IDENTIFIER, + BRANCH, + schemaWithIdentifier, + row3, + unpartitioned, + DistributionMode.NONE, + writeParallelism); + + int writeKey1 = generator.generateKey(record1); + int writeKey2 = generator.generateKey(record2); + int writeKey3 = generator.generateKey(record3); + + assertThat(writeKey1).isEqualTo(writeKey2); + assertThat(writeKey3).isNotEqualTo(writeKey1); + } + + @Test + void testForwardRecordWithIdentifierFieldsRoutesByEqualityFields() throws Exception { + int writeParallelism = 2; + int maxWriteParallelism = 8; + HashKeyGenerator generator = new HashKeyGenerator(16, maxWriteParallelism); + + Schema schemaWithIdentifier = + new Schema( + Lists.newArrayList( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "data", Types.StringType.get())), + Sets.newHashSet(1)); + PartitionSpec unpartitioned = PartitionSpec.unpartitioned(); + + GenericRowData rowSameKey1 = GenericRowData.of(1, StringData.fromString("foo")); + GenericRowData rowSameKey2 = GenericRowData.of(1, StringData.fromString("bar")); + GenericRowData rowDifferentKey = GenericRowData.of(2, StringData.fromString("baz")); + + // Forward records (null distribution mode) with identifier fields on the schema must still + // route by equality fields, so HashKeyGenerator can handle them when the processor falls back + // off the forward path. + DynamicRecord record1 = + new DynamicRecord( + TABLE_IDENTIFIER, BRANCH, schemaWithIdentifier, rowSameKey1, unpartitioned); + DynamicRecord record2 = + new DynamicRecord( + TABLE_IDENTIFIER, BRANCH, schemaWithIdentifier, rowSameKey2, unpartitioned); + DynamicRecord record3 = + new DynamicRecord( + TABLE_IDENTIFIER, BRANCH, schemaWithIdentifier, rowDifferentKey, unpartitioned); + record1.writeParallelism(writeParallelism); + record2.writeParallelism(writeParallelism); + record3.writeParallelism(writeParallelism); + + int writeKey1 = generator.generateKey(record1); + int writeKey2 = generator.generateKey(record2); + int writeKey3 = generator.generateKey(record3); + + assertThat(writeKey1).isEqualTo(writeKey2); + assertThat(writeKey3).isNotEqualTo(writeKey1); + } + @Test void testHashModeWithoutEqualityFieldsFallsBackToNone() throws Exception { int writeParallelism = 2; diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java index c752b8e9b8d9..ba2d816ae9ea 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java @@ -129,7 +129,7 @@ public void processElement(T element, Context ctx, Collector getEqualityFieldIds(Set equalityFields, Schema schem return equalityFieldIds; } + /** + * Resolves the effective equality field names. Returns the user-supplied set when non-empty, + * otherwise falls back to the names of {@link Schema#identifierFieldIds()}. Mirrors {@link + * #getEqualityFieldIds} so distribution and write-side equality-field inference stay aligned. + */ + static Set resolveEqualityFieldNames( + @Nullable Set equalityFields, Schema schema) { + if (equalityFields != null && !equalityFields.isEmpty()) { + return equalityFields; + } + + return schema.identifierFieldNames(); + } + static int safeAbs(int input) { if (input >= 0) { return input; diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java index 61a850212bf4..4472de67b421 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java @@ -20,7 +20,6 @@ import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE; -import java.util.Collections; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -79,6 +78,9 @@ int generateKey( @Nullable PartitionSpec tableSpec, @Nullable RowData overrideRowData) { String tableIdent = dynamicRecord.tableIdentifier().toString(); + Schema effectiveSchema = MoreObjects.firstNonNull(tableSchema, dynamicRecord.schema()); + Set effectiveEqualityFields = + DynamicSinkUtil.resolveEqualityFieldNames(dynamicRecord.equalityFields(), effectiveSchema); SelectorKey cacheKey = new SelectorKey( tableIdent, @@ -87,8 +89,8 @@ int generateKey( tableSpec != null ? tableSpec.specId() : null, dynamicRecord.schema(), dynamicRecord.spec(), - dynamicRecord.equalityFields(), - dynamicRecord.distributionMode(), + effectiveEqualityFields, + MoreObjects.firstNonNull(dynamicRecord.distributionMode(), DistributionMode.NONE), Math.min(dynamicRecord.writeParallelism(), maxWriteParallelism)); KeySelector keySelector = keySelectorCache.computeIfAbsent( @@ -96,11 +98,11 @@ int generateKey( k -> getKeySelector( tableIdent, - MoreObjects.firstNonNull(tableSchema, dynamicRecord.schema()), + effectiveSchema, MoreObjects.firstNonNull(tableSpec, dynamicRecord.spec()), - dynamicRecord.distributionMode(), MoreObjects.firstNonNull( - dynamicRecord.equalityFields(), Collections.emptySet()), + dynamicRecord.distributionMode(), DistributionMode.NONE), + effectiveEqualityFields, Math.min(dynamicRecord.writeParallelism(), maxWriteParallelism))); try { return keySelector.getKey( diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordProcessor.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordProcessor.java new file mode 100644 index 000000000000..298a338ea9d0 --- /dev/null +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordProcessor.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Collections; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.StringData; +import org.apache.iceberg.DistributionMode; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SnapshotRef; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.Test; + +class TestDynamicRecordProcessor { + + private static final Schema SCHEMA = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "data", Types.StringType.get())); + + private static final Schema SCHEMA_WITH_IDENTIFIER = + new Schema( + Lists.newArrayList( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "data", Types.StringType.get())), + Sets.newHashSet(1)); + + private static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of("default", "table"); + private static final String BRANCH = SnapshotRef.MAIN_BRANCH; + + @Test + void testForwardEligibleWhenNoEqualityFields() { + DynamicRecord record = recordWithDistributionMode(SCHEMA, null); + + assertThat(DynamicRecordProcessor.isForwardEligible(record)).isTrue(); + } + + @Test + void testNotForwardEligibleWhenDistributionModeSet() { + DynamicRecord record = recordWithDistributionMode(SCHEMA, DistributionMode.NONE); + + assertThat(DynamicRecordProcessor.isForwardEligible(record)).isFalse(); + } + + @Test + void testNotForwardEligibleWhenUserEqualityFieldsSet() { + DynamicRecord record = recordWithDistributionMode(SCHEMA, null); + record.setEqualityFields(Collections.singleton("id")); + + assertThat(DynamicRecordProcessor.isForwardEligible(record)).isFalse(); + } + + @Test + void testNotForwardEligibleWhenSchemaHasIdentifierFields() { + DynamicRecord record = recordWithDistributionMode(SCHEMA_WITH_IDENTIFIER, null); + + assertThat(DynamicRecordProcessor.isForwardEligible(record)).isFalse(); + } + + @Test + void testNotForwardEligibleWhenEmptyEqualityFieldsButIdentifierFieldsPresent() { + DynamicRecord record = recordWithDistributionMode(SCHEMA_WITH_IDENTIFIER, null); + record.setEqualityFields(Collections.emptySet()); + + assertThat(DynamicRecordProcessor.isForwardEligible(record)).isFalse(); + } + + private static DynamicRecord recordWithDistributionMode( + Schema schema, DistributionMode distributionMode) { + return new DynamicRecord( + TABLE_IDENTIFIER, + BRANCH, + schema, + GenericRowData.of(1, StringData.fromString("foo")), + PartitionSpec.unpartitioned(), + distributionMode, + 2); + } +} diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java index 9a485fafaf47..4181f00d0a0a 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java @@ -38,6 +38,7 @@ import org.apache.iceberg.flink.FlinkWriteConf; import org.apache.iceberg.flink.FlinkWriteOptions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Types; import org.junit.jupiter.api.Test; @@ -294,6 +295,102 @@ void testCapAtMaxWriteParallelism() throws Exception { .isEqualTo(maxWriteParallelism); } + @Test + void testIdentifierFieldsResolvedAsEqualityFields() throws Exception { + int writeParallelism = 2; + int maxWriteParallelism = 8; + HashKeyGenerator generator = new HashKeyGenerator(16, maxWriteParallelism); + + Schema schemaWithIdentifier = + new Schema( + Lists.newArrayList( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "data", Types.StringType.get())), + Sets.newHashSet(1)); + PartitionSpec unpartitioned = PartitionSpec.unpartitioned(); + + GenericRowData row1 = GenericRowData.of(1, StringData.fromString("foo")); + GenericRowData row2 = GenericRowData.of(1, StringData.fromString("bar")); + GenericRowData row3 = GenericRowData.of(2, StringData.fromString("baz")); + + DynamicRecord record1 = + new DynamicRecord( + TABLE_IDENTIFIER, + BRANCH, + schemaWithIdentifier, + row1, + unpartitioned, + DistributionMode.NONE, + writeParallelism); + DynamicRecord record2 = + new DynamicRecord( + TABLE_IDENTIFIER, + BRANCH, + schemaWithIdentifier, + row2, + unpartitioned, + DistributionMode.NONE, + writeParallelism); + DynamicRecord record3 = + new DynamicRecord( + TABLE_IDENTIFIER, + BRANCH, + schemaWithIdentifier, + row3, + unpartitioned, + DistributionMode.NONE, + writeParallelism); + + int writeKey1 = generator.generateKey(record1); + int writeKey2 = generator.generateKey(record2); + int writeKey3 = generator.generateKey(record3); + + assertThat(writeKey1).isEqualTo(writeKey2); + assertThat(writeKey3).isNotEqualTo(writeKey1); + } + + @Test + void testForwardRecordWithIdentifierFieldsRoutesByEqualityFields() throws Exception { + int writeParallelism = 2; + int maxWriteParallelism = 8; + HashKeyGenerator generator = new HashKeyGenerator(16, maxWriteParallelism); + + Schema schemaWithIdentifier = + new Schema( + Lists.newArrayList( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "data", Types.StringType.get())), + Sets.newHashSet(1)); + PartitionSpec unpartitioned = PartitionSpec.unpartitioned(); + + GenericRowData rowSameKey1 = GenericRowData.of(1, StringData.fromString("foo")); + GenericRowData rowSameKey2 = GenericRowData.of(1, StringData.fromString("bar")); + GenericRowData rowDifferentKey = GenericRowData.of(2, StringData.fromString("baz")); + + // Forward records (null distribution mode) with identifier fields on the schema must still + // route by equality fields, so HashKeyGenerator can handle them when the processor falls back + // off the forward path. + DynamicRecord record1 = + new DynamicRecord( + TABLE_IDENTIFIER, BRANCH, schemaWithIdentifier, rowSameKey1, unpartitioned); + DynamicRecord record2 = + new DynamicRecord( + TABLE_IDENTIFIER, BRANCH, schemaWithIdentifier, rowSameKey2, unpartitioned); + DynamicRecord record3 = + new DynamicRecord( + TABLE_IDENTIFIER, BRANCH, schemaWithIdentifier, rowDifferentKey, unpartitioned); + record1.writeParallelism(writeParallelism); + record2.writeParallelism(writeParallelism); + record3.writeParallelism(writeParallelism); + + int writeKey1 = generator.generateKey(record1); + int writeKey2 = generator.generateKey(record2); + int writeKey3 = generator.generateKey(record3); + + assertThat(writeKey1).isEqualTo(writeKey2); + assertThat(writeKey3).isNotEqualTo(writeKey1); + } + @Test void testHashModeWithoutEqualityFieldsFallsBackToNone() throws Exception { int writeParallelism = 2; diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java index c752b8e9b8d9..ba2d816ae9ea 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java @@ -129,7 +129,7 @@ public void processElement(T element, Context ctx, Collector getEqualityFieldIds(Set equalityFields, Schema schem return equalityFieldIds; } + /** + * Resolves the effective equality field names. Returns the user-supplied set when non-empty, + * otherwise falls back to the names of {@link Schema#identifierFieldIds()}. Mirrors {@link + * #getEqualityFieldIds} so distribution and write-side equality-field inference stay aligned. + */ + static Set resolveEqualityFieldNames( + @Nullable Set equalityFields, Schema schema) { + if (equalityFields != null && !equalityFields.isEmpty()) { + return equalityFields; + } + + return schema.identifierFieldNames(); + } + static int safeAbs(int input) { if (input >= 0) { return input; diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java index 61a850212bf4..4472de67b421 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java @@ -20,7 +20,6 @@ import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE; -import java.util.Collections; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -79,6 +78,9 @@ int generateKey( @Nullable PartitionSpec tableSpec, @Nullable RowData overrideRowData) { String tableIdent = dynamicRecord.tableIdentifier().toString(); + Schema effectiveSchema = MoreObjects.firstNonNull(tableSchema, dynamicRecord.schema()); + Set effectiveEqualityFields = + DynamicSinkUtil.resolveEqualityFieldNames(dynamicRecord.equalityFields(), effectiveSchema); SelectorKey cacheKey = new SelectorKey( tableIdent, @@ -87,8 +89,8 @@ int generateKey( tableSpec != null ? tableSpec.specId() : null, dynamicRecord.schema(), dynamicRecord.spec(), - dynamicRecord.equalityFields(), - dynamicRecord.distributionMode(), + effectiveEqualityFields, + MoreObjects.firstNonNull(dynamicRecord.distributionMode(), DistributionMode.NONE), Math.min(dynamicRecord.writeParallelism(), maxWriteParallelism)); KeySelector keySelector = keySelectorCache.computeIfAbsent( @@ -96,11 +98,11 @@ int generateKey( k -> getKeySelector( tableIdent, - MoreObjects.firstNonNull(tableSchema, dynamicRecord.schema()), + effectiveSchema, MoreObjects.firstNonNull(tableSpec, dynamicRecord.spec()), - dynamicRecord.distributionMode(), MoreObjects.firstNonNull( - dynamicRecord.equalityFields(), Collections.emptySet()), + dynamicRecord.distributionMode(), DistributionMode.NONE), + effectiveEqualityFields, Math.min(dynamicRecord.writeParallelism(), maxWriteParallelism))); try { return keySelector.getKey( diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordProcessor.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordProcessor.java new file mode 100644 index 000000000000..298a338ea9d0 --- /dev/null +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordProcessor.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Collections; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.StringData; +import org.apache.iceberg.DistributionMode; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SnapshotRef; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.Test; + +class TestDynamicRecordProcessor { + + private static final Schema SCHEMA = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "data", Types.StringType.get())); + + private static final Schema SCHEMA_WITH_IDENTIFIER = + new Schema( + Lists.newArrayList( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "data", Types.StringType.get())), + Sets.newHashSet(1)); + + private static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of("default", "table"); + private static final String BRANCH = SnapshotRef.MAIN_BRANCH; + + @Test + void testForwardEligibleWhenNoEqualityFields() { + DynamicRecord record = recordWithDistributionMode(SCHEMA, null); + + assertThat(DynamicRecordProcessor.isForwardEligible(record)).isTrue(); + } + + @Test + void testNotForwardEligibleWhenDistributionModeSet() { + DynamicRecord record = recordWithDistributionMode(SCHEMA, DistributionMode.NONE); + + assertThat(DynamicRecordProcessor.isForwardEligible(record)).isFalse(); + } + + @Test + void testNotForwardEligibleWhenUserEqualityFieldsSet() { + DynamicRecord record = recordWithDistributionMode(SCHEMA, null); + record.setEqualityFields(Collections.singleton("id")); + + assertThat(DynamicRecordProcessor.isForwardEligible(record)).isFalse(); + } + + @Test + void testNotForwardEligibleWhenSchemaHasIdentifierFields() { + DynamicRecord record = recordWithDistributionMode(SCHEMA_WITH_IDENTIFIER, null); + + assertThat(DynamicRecordProcessor.isForwardEligible(record)).isFalse(); + } + + @Test + void testNotForwardEligibleWhenEmptyEqualityFieldsButIdentifierFieldsPresent() { + DynamicRecord record = recordWithDistributionMode(SCHEMA_WITH_IDENTIFIER, null); + record.setEqualityFields(Collections.emptySet()); + + assertThat(DynamicRecordProcessor.isForwardEligible(record)).isFalse(); + } + + private static DynamicRecord recordWithDistributionMode( + Schema schema, DistributionMode distributionMode) { + return new DynamicRecord( + TABLE_IDENTIFIER, + BRANCH, + schema, + GenericRowData.of(1, StringData.fromString("foo")), + PartitionSpec.unpartitioned(), + distributionMode, + 2); + } +} diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java index 9a485fafaf47..4181f00d0a0a 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java @@ -38,6 +38,7 @@ import org.apache.iceberg.flink.FlinkWriteConf; import org.apache.iceberg.flink.FlinkWriteOptions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Types; import org.junit.jupiter.api.Test; @@ -294,6 +295,102 @@ void testCapAtMaxWriteParallelism() throws Exception { .isEqualTo(maxWriteParallelism); } + @Test + void testIdentifierFieldsResolvedAsEqualityFields() throws Exception { + int writeParallelism = 2; + int maxWriteParallelism = 8; + HashKeyGenerator generator = new HashKeyGenerator(16, maxWriteParallelism); + + Schema schemaWithIdentifier = + new Schema( + Lists.newArrayList( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "data", Types.StringType.get())), + Sets.newHashSet(1)); + PartitionSpec unpartitioned = PartitionSpec.unpartitioned(); + + GenericRowData row1 = GenericRowData.of(1, StringData.fromString("foo")); + GenericRowData row2 = GenericRowData.of(1, StringData.fromString("bar")); + GenericRowData row3 = GenericRowData.of(2, StringData.fromString("baz")); + + DynamicRecord record1 = + new DynamicRecord( + TABLE_IDENTIFIER, + BRANCH, + schemaWithIdentifier, + row1, + unpartitioned, + DistributionMode.NONE, + writeParallelism); + DynamicRecord record2 = + new DynamicRecord( + TABLE_IDENTIFIER, + BRANCH, + schemaWithIdentifier, + row2, + unpartitioned, + DistributionMode.NONE, + writeParallelism); + DynamicRecord record3 = + new DynamicRecord( + TABLE_IDENTIFIER, + BRANCH, + schemaWithIdentifier, + row3, + unpartitioned, + DistributionMode.NONE, + writeParallelism); + + int writeKey1 = generator.generateKey(record1); + int writeKey2 = generator.generateKey(record2); + int writeKey3 = generator.generateKey(record3); + + assertThat(writeKey1).isEqualTo(writeKey2); + assertThat(writeKey3).isNotEqualTo(writeKey1); + } + + @Test + void testForwardRecordWithIdentifierFieldsRoutesByEqualityFields() throws Exception { + int writeParallelism = 2; + int maxWriteParallelism = 8; + HashKeyGenerator generator = new HashKeyGenerator(16, maxWriteParallelism); + + Schema schemaWithIdentifier = + new Schema( + Lists.newArrayList( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "data", Types.StringType.get())), + Sets.newHashSet(1)); + PartitionSpec unpartitioned = PartitionSpec.unpartitioned(); + + GenericRowData rowSameKey1 = GenericRowData.of(1, StringData.fromString("foo")); + GenericRowData rowSameKey2 = GenericRowData.of(1, StringData.fromString("bar")); + GenericRowData rowDifferentKey = GenericRowData.of(2, StringData.fromString("baz")); + + // Forward records (null distribution mode) with identifier fields on the schema must still + // route by equality fields, so HashKeyGenerator can handle them when the processor falls back + // off the forward path. + DynamicRecord record1 = + new DynamicRecord( + TABLE_IDENTIFIER, BRANCH, schemaWithIdentifier, rowSameKey1, unpartitioned); + DynamicRecord record2 = + new DynamicRecord( + TABLE_IDENTIFIER, BRANCH, schemaWithIdentifier, rowSameKey2, unpartitioned); + DynamicRecord record3 = + new DynamicRecord( + TABLE_IDENTIFIER, BRANCH, schemaWithIdentifier, rowDifferentKey, unpartitioned); + record1.writeParallelism(writeParallelism); + record2.writeParallelism(writeParallelism); + record3.writeParallelism(writeParallelism); + + int writeKey1 = generator.generateKey(record1); + int writeKey2 = generator.generateKey(record2); + int writeKey3 = generator.generateKey(record3); + + assertThat(writeKey1).isEqualTo(writeKey2); + assertThat(writeKey3).isNotEqualTo(writeKey1); + } + @Test void testHashModeWithoutEqualityFieldsFallsBackToNone() throws Exception { int writeParallelism = 2;