diff --git a/docs/docs/flink-writes.md b/docs/docs/flink-writes.md index 03795b5beed0..cc20e4ff6598 100644 --- a/docs/docs/flink-writes.md +++ b/docs/docs/flink-writes.md @@ -487,10 +487,10 @@ We need the following information (DynamicRecord) for every record: | `Schema` | The schema of the record. | | `Spec` | The expected partitioning specification for the record. | | `RowData` | The actual row data to be written. | -| `DistributionMode` | The distribution mode for writing the record (NONE, HASH or `null`). When `null`, the record won't be shuffled at all. | +| `DistributionMode` | The distribution mode for writing the record (NONE, HASH or `null`). When `null`, the record won't be shuffled at all — except when the record resolves to a non-empty equality-field set, in which case the dynamic sink falls back to hash distribution to keep records sharing the same equality key on the same writer. | | `Parallelism` | The maximum number of parallel writers for a given table/branch/schema/spec (WriteTarget). | | `UpsertMode` | Overrides this table's write.upsert.enabled (optional). | -| `EqualityFields` | The equality fields for the table(optional). | +| `EqualityFields` | The equality fields for the table (optional). When unset, the dynamic sink falls back to the schema's identifier fields for both distribution (records sharing a key route to the same writer) and equality-delete inference. Identifier fields do not auto-enable upsert mode — that flag remains opt-in. | ### Schema Evolution diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java index c752b8e9b8d9..ba2d816ae9ea 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java @@ -129,7 +129,7 @@ public void processElement(T element, Context ctx, Collector getEqualityFieldIds(Set equalityFields, Schema schem return equalityFieldIds; } + /** + * Resolves the effective equality field names. Returns the user-supplied set when non-empty, + * otherwise falls back to the names of {@link Schema#identifierFieldIds()}. Mirrors {@link + * #getEqualityFieldIds} so distribution and write-side equality-field inference stay aligned. + */ + static Set resolveEqualityFieldNames( + @Nullable Set equalityFields, Schema schema) { + if (equalityFields != null && !equalityFields.isEmpty()) { + return equalityFields; + } + + return schema.identifierFieldNames(); + } + static int safeAbs(int input) { if (input >= 0) { return input; diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java index 61a850212bf4..4472de67b421 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java @@ -20,7 +20,6 @@ import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE; -import java.util.Collections; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -79,6 +78,9 @@ int generateKey( @Nullable PartitionSpec tableSpec, @Nullable RowData overrideRowData) { String tableIdent = dynamicRecord.tableIdentifier().toString(); + Schema effectiveSchema = MoreObjects.firstNonNull(tableSchema, dynamicRecord.schema()); + Set effectiveEqualityFields = + DynamicSinkUtil.resolveEqualityFieldNames(dynamicRecord.equalityFields(), effectiveSchema); SelectorKey cacheKey = new SelectorKey( tableIdent, @@ -87,8 +89,8 @@ int generateKey( tableSpec != null ? tableSpec.specId() : null, dynamicRecord.schema(), dynamicRecord.spec(), - dynamicRecord.equalityFields(), - dynamicRecord.distributionMode(), + effectiveEqualityFields, + MoreObjects.firstNonNull(dynamicRecord.distributionMode(), DistributionMode.NONE), Math.min(dynamicRecord.writeParallelism(), maxWriteParallelism)); KeySelector keySelector = keySelectorCache.computeIfAbsent( @@ -96,11 +98,11 @@ int generateKey( k -> getKeySelector( tableIdent, - MoreObjects.firstNonNull(tableSchema, dynamicRecord.schema()), + effectiveSchema, MoreObjects.firstNonNull(tableSpec, dynamicRecord.spec()), - dynamicRecord.distributionMode(), MoreObjects.firstNonNull( - dynamicRecord.equalityFields(), Collections.emptySet()), + dynamicRecord.distributionMode(), DistributionMode.NONE), + effectiveEqualityFields, Math.min(dynamicRecord.writeParallelism(), maxWriteParallelism))); try { return keySelector.getKey( diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordProcessor.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordProcessor.java new file mode 100644 index 000000000000..298a338ea9d0 --- /dev/null +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordProcessor.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Collections; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.StringData; +import org.apache.iceberg.DistributionMode; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SnapshotRef; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.Test; + +class TestDynamicRecordProcessor { + + private static final Schema SCHEMA = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "data", Types.StringType.get())); + + private static final Schema SCHEMA_WITH_IDENTIFIER = + new Schema( + Lists.newArrayList( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "data", Types.StringType.get())), + Sets.newHashSet(1)); + + private static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of("default", "table"); + private static final String BRANCH = SnapshotRef.MAIN_BRANCH; + + @Test + void testForwardEligibleWhenNoEqualityFields() { + DynamicRecord record = recordWithDistributionMode(SCHEMA, null); + + assertThat(DynamicRecordProcessor.isForwardEligible(record)).isTrue(); + } + + @Test + void testNotForwardEligibleWhenDistributionModeSet() { + DynamicRecord record = recordWithDistributionMode(SCHEMA, DistributionMode.NONE); + + assertThat(DynamicRecordProcessor.isForwardEligible(record)).isFalse(); + } + + @Test + void testNotForwardEligibleWhenUserEqualityFieldsSet() { + DynamicRecord record = recordWithDistributionMode(SCHEMA, null); + record.setEqualityFields(Collections.singleton("id")); + + assertThat(DynamicRecordProcessor.isForwardEligible(record)).isFalse(); + } + + @Test + void testNotForwardEligibleWhenSchemaHasIdentifierFields() { + DynamicRecord record = recordWithDistributionMode(SCHEMA_WITH_IDENTIFIER, null); + + assertThat(DynamicRecordProcessor.isForwardEligible(record)).isFalse(); + } + + @Test + void testNotForwardEligibleWhenEmptyEqualityFieldsButIdentifierFieldsPresent() { + DynamicRecord record = recordWithDistributionMode(SCHEMA_WITH_IDENTIFIER, null); + record.setEqualityFields(Collections.emptySet()); + + assertThat(DynamicRecordProcessor.isForwardEligible(record)).isFalse(); + } + + private static DynamicRecord recordWithDistributionMode( + Schema schema, DistributionMode distributionMode) { + return new DynamicRecord( + TABLE_IDENTIFIER, + BRANCH, + schema, + GenericRowData.of(1, StringData.fromString("foo")), + PartitionSpec.unpartitioned(), + distributionMode, + 2); + } +} diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java index 9a485fafaf47..4181f00d0a0a 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java @@ -38,6 +38,7 @@ import org.apache.iceberg.flink.FlinkWriteConf; import org.apache.iceberg.flink.FlinkWriteOptions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Types; import org.junit.jupiter.api.Test; @@ -294,6 +295,102 @@ void testCapAtMaxWriteParallelism() throws Exception { .isEqualTo(maxWriteParallelism); } + @Test + void testIdentifierFieldsResolvedAsEqualityFields() throws Exception { + int writeParallelism = 2; + int maxWriteParallelism = 8; + HashKeyGenerator generator = new HashKeyGenerator(16, maxWriteParallelism); + + Schema schemaWithIdentifier = + new Schema( + Lists.newArrayList( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "data", Types.StringType.get())), + Sets.newHashSet(1)); + PartitionSpec unpartitioned = PartitionSpec.unpartitioned(); + + GenericRowData row1 = GenericRowData.of(1, StringData.fromString("foo")); + GenericRowData row2 = GenericRowData.of(1, StringData.fromString("bar")); + GenericRowData row3 = GenericRowData.of(2, StringData.fromString("baz")); + + DynamicRecord record1 = + new DynamicRecord( + TABLE_IDENTIFIER, + BRANCH, + schemaWithIdentifier, + row1, + unpartitioned, + DistributionMode.NONE, + writeParallelism); + DynamicRecord record2 = + new DynamicRecord( + TABLE_IDENTIFIER, + BRANCH, + schemaWithIdentifier, + row2, + unpartitioned, + DistributionMode.NONE, + writeParallelism); + DynamicRecord record3 = + new DynamicRecord( + TABLE_IDENTIFIER, + BRANCH, + schemaWithIdentifier, + row3, + unpartitioned, + DistributionMode.NONE, + writeParallelism); + + int writeKey1 = generator.generateKey(record1); + int writeKey2 = generator.generateKey(record2); + int writeKey3 = generator.generateKey(record3); + + assertThat(writeKey1).isEqualTo(writeKey2); + assertThat(writeKey3).isNotEqualTo(writeKey1); + } + + @Test + void testForwardRecordWithIdentifierFieldsRoutesByEqualityFields() throws Exception { + int writeParallelism = 2; + int maxWriteParallelism = 8; + HashKeyGenerator generator = new HashKeyGenerator(16, maxWriteParallelism); + + Schema schemaWithIdentifier = + new Schema( + Lists.newArrayList( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "data", Types.StringType.get())), + Sets.newHashSet(1)); + PartitionSpec unpartitioned = PartitionSpec.unpartitioned(); + + GenericRowData rowSameKey1 = GenericRowData.of(1, StringData.fromString("foo")); + GenericRowData rowSameKey2 = GenericRowData.of(1, StringData.fromString("bar")); + GenericRowData rowDifferentKey = GenericRowData.of(2, StringData.fromString("baz")); + + // Forward records (null distribution mode) with identifier fields on the schema must still + // route by equality fields, so HashKeyGenerator can handle them when the processor falls back + // off the forward path. + DynamicRecord record1 = + new DynamicRecord( + TABLE_IDENTIFIER, BRANCH, schemaWithIdentifier, rowSameKey1, unpartitioned); + DynamicRecord record2 = + new DynamicRecord( + TABLE_IDENTIFIER, BRANCH, schemaWithIdentifier, rowSameKey2, unpartitioned); + DynamicRecord record3 = + new DynamicRecord( + TABLE_IDENTIFIER, BRANCH, schemaWithIdentifier, rowDifferentKey, unpartitioned); + record1.writeParallelism(writeParallelism); + record2.writeParallelism(writeParallelism); + record3.writeParallelism(writeParallelism); + + int writeKey1 = generator.generateKey(record1); + int writeKey2 = generator.generateKey(record2); + int writeKey3 = generator.generateKey(record3); + + assertThat(writeKey1).isEqualTo(writeKey2); + assertThat(writeKey3).isNotEqualTo(writeKey1); + } + @Test void testHashModeWithoutEqualityFieldsFallsBackToNone() throws Exception { int writeParallelism = 2; diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java index c752b8e9b8d9..ba2d816ae9ea 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java @@ -129,7 +129,7 @@ public void processElement(T element, Context ctx, Collector getEqualityFieldIds(Set equalityFields, Schema schem return equalityFieldIds; } + /** + * Resolves the effective equality field names. Returns the user-supplied set when non-empty, + * otherwise falls back to the names of {@link Schema#identifierFieldIds()}. Mirrors {@link + * #getEqualityFieldIds} so distribution and write-side equality-field inference stay aligned. + */ + static Set resolveEqualityFieldNames( + @Nullable Set equalityFields, Schema schema) { + if (equalityFields != null && !equalityFields.isEmpty()) { + return equalityFields; + } + + return schema.identifierFieldNames(); + } + static int safeAbs(int input) { if (input >= 0) { return input; diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java index 61a850212bf4..4472de67b421 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java @@ -20,7 +20,6 @@ import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE; -import java.util.Collections; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -79,6 +78,9 @@ int generateKey( @Nullable PartitionSpec tableSpec, @Nullable RowData overrideRowData) { String tableIdent = dynamicRecord.tableIdentifier().toString(); + Schema effectiveSchema = MoreObjects.firstNonNull(tableSchema, dynamicRecord.schema()); + Set effectiveEqualityFields = + DynamicSinkUtil.resolveEqualityFieldNames(dynamicRecord.equalityFields(), effectiveSchema); SelectorKey cacheKey = new SelectorKey( tableIdent, @@ -87,8 +89,8 @@ int generateKey( tableSpec != null ? tableSpec.specId() : null, dynamicRecord.schema(), dynamicRecord.spec(), - dynamicRecord.equalityFields(), - dynamicRecord.distributionMode(), + effectiveEqualityFields, + MoreObjects.firstNonNull(dynamicRecord.distributionMode(), DistributionMode.NONE), Math.min(dynamicRecord.writeParallelism(), maxWriteParallelism)); KeySelector keySelector = keySelectorCache.computeIfAbsent( @@ -96,11 +98,11 @@ int generateKey( k -> getKeySelector( tableIdent, - MoreObjects.firstNonNull(tableSchema, dynamicRecord.schema()), + effectiveSchema, MoreObjects.firstNonNull(tableSpec, dynamicRecord.spec()), - dynamicRecord.distributionMode(), MoreObjects.firstNonNull( - dynamicRecord.equalityFields(), Collections.emptySet()), + dynamicRecord.distributionMode(), DistributionMode.NONE), + effectiveEqualityFields, Math.min(dynamicRecord.writeParallelism(), maxWriteParallelism))); try { return keySelector.getKey( diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordProcessor.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordProcessor.java new file mode 100644 index 000000000000..298a338ea9d0 --- /dev/null +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordProcessor.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Collections; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.StringData; +import org.apache.iceberg.DistributionMode; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SnapshotRef; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.Test; + +class TestDynamicRecordProcessor { + + private static final Schema SCHEMA = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "data", Types.StringType.get())); + + private static final Schema SCHEMA_WITH_IDENTIFIER = + new Schema( + Lists.newArrayList( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "data", Types.StringType.get())), + Sets.newHashSet(1)); + + private static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of("default", "table"); + private static final String BRANCH = SnapshotRef.MAIN_BRANCH; + + @Test + void testForwardEligibleWhenNoEqualityFields() { + DynamicRecord record = recordWithDistributionMode(SCHEMA, null); + + assertThat(DynamicRecordProcessor.isForwardEligible(record)).isTrue(); + } + + @Test + void testNotForwardEligibleWhenDistributionModeSet() { + DynamicRecord record = recordWithDistributionMode(SCHEMA, DistributionMode.NONE); + + assertThat(DynamicRecordProcessor.isForwardEligible(record)).isFalse(); + } + + @Test + void testNotForwardEligibleWhenUserEqualityFieldsSet() { + DynamicRecord record = recordWithDistributionMode(SCHEMA, null); + record.setEqualityFields(Collections.singleton("id")); + + assertThat(DynamicRecordProcessor.isForwardEligible(record)).isFalse(); + } + + @Test + void testNotForwardEligibleWhenSchemaHasIdentifierFields() { + DynamicRecord record = recordWithDistributionMode(SCHEMA_WITH_IDENTIFIER, null); + + assertThat(DynamicRecordProcessor.isForwardEligible(record)).isFalse(); + } + + @Test + void testNotForwardEligibleWhenEmptyEqualityFieldsButIdentifierFieldsPresent() { + DynamicRecord record = recordWithDistributionMode(SCHEMA_WITH_IDENTIFIER, null); + record.setEqualityFields(Collections.emptySet()); + + assertThat(DynamicRecordProcessor.isForwardEligible(record)).isFalse(); + } + + private static DynamicRecord recordWithDistributionMode( + Schema schema, DistributionMode distributionMode) { + return new DynamicRecord( + TABLE_IDENTIFIER, + BRANCH, + schema, + GenericRowData.of(1, StringData.fromString("foo")), + PartitionSpec.unpartitioned(), + distributionMode, + 2); + } +} diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java index 9a485fafaf47..4181f00d0a0a 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java @@ -38,6 +38,7 @@ import org.apache.iceberg.flink.FlinkWriteConf; import org.apache.iceberg.flink.FlinkWriteOptions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Types; import org.junit.jupiter.api.Test; @@ -294,6 +295,102 @@ void testCapAtMaxWriteParallelism() throws Exception { .isEqualTo(maxWriteParallelism); } + @Test + void testIdentifierFieldsResolvedAsEqualityFields() throws Exception { + int writeParallelism = 2; + int maxWriteParallelism = 8; + HashKeyGenerator generator = new HashKeyGenerator(16, maxWriteParallelism); + + Schema schemaWithIdentifier = + new Schema( + Lists.newArrayList( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "data", Types.StringType.get())), + Sets.newHashSet(1)); + PartitionSpec unpartitioned = PartitionSpec.unpartitioned(); + + GenericRowData row1 = GenericRowData.of(1, StringData.fromString("foo")); + GenericRowData row2 = GenericRowData.of(1, StringData.fromString("bar")); + GenericRowData row3 = GenericRowData.of(2, StringData.fromString("baz")); + + DynamicRecord record1 = + new DynamicRecord( + TABLE_IDENTIFIER, + BRANCH, + schemaWithIdentifier, + row1, + unpartitioned, + DistributionMode.NONE, + writeParallelism); + DynamicRecord record2 = + new DynamicRecord( + TABLE_IDENTIFIER, + BRANCH, + schemaWithIdentifier, + row2, + unpartitioned, + DistributionMode.NONE, + writeParallelism); + DynamicRecord record3 = + new DynamicRecord( + TABLE_IDENTIFIER, + BRANCH, + schemaWithIdentifier, + row3, + unpartitioned, + DistributionMode.NONE, + writeParallelism); + + int writeKey1 = generator.generateKey(record1); + int writeKey2 = generator.generateKey(record2); + int writeKey3 = generator.generateKey(record3); + + assertThat(writeKey1).isEqualTo(writeKey2); + assertThat(writeKey3).isNotEqualTo(writeKey1); + } + + @Test + void testForwardRecordWithIdentifierFieldsRoutesByEqualityFields() throws Exception { + int writeParallelism = 2; + int maxWriteParallelism = 8; + HashKeyGenerator generator = new HashKeyGenerator(16, maxWriteParallelism); + + Schema schemaWithIdentifier = + new Schema( + Lists.newArrayList( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "data", Types.StringType.get())), + Sets.newHashSet(1)); + PartitionSpec unpartitioned = PartitionSpec.unpartitioned(); + + GenericRowData rowSameKey1 = GenericRowData.of(1, StringData.fromString("foo")); + GenericRowData rowSameKey2 = GenericRowData.of(1, StringData.fromString("bar")); + GenericRowData rowDifferentKey = GenericRowData.of(2, StringData.fromString("baz")); + + // Forward records (null distribution mode) with identifier fields on the schema must still + // route by equality fields, so HashKeyGenerator can handle them when the processor falls back + // off the forward path. + DynamicRecord record1 = + new DynamicRecord( + TABLE_IDENTIFIER, BRANCH, schemaWithIdentifier, rowSameKey1, unpartitioned); + DynamicRecord record2 = + new DynamicRecord( + TABLE_IDENTIFIER, BRANCH, schemaWithIdentifier, rowSameKey2, unpartitioned); + DynamicRecord record3 = + new DynamicRecord( + TABLE_IDENTIFIER, BRANCH, schemaWithIdentifier, rowDifferentKey, unpartitioned); + record1.writeParallelism(writeParallelism); + record2.writeParallelism(writeParallelism); + record3.writeParallelism(writeParallelism); + + int writeKey1 = generator.generateKey(record1); + int writeKey2 = generator.generateKey(record2); + int writeKey3 = generator.generateKey(record3); + + assertThat(writeKey1).isEqualTo(writeKey2); + assertThat(writeKey3).isNotEqualTo(writeKey1); + } + @Test void testHashModeWithoutEqualityFieldsFallsBackToNone() throws Exception { int writeParallelism = 2; diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java index c752b8e9b8d9..ba2d816ae9ea 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java @@ -129,7 +129,7 @@ public void processElement(T element, Context ctx, Collector getEqualityFieldIds(Set equalityFields, Schema schem return equalityFieldIds; } + /** + * Resolves the effective equality field names. Returns the user-supplied set when non-empty, + * otherwise falls back to the names of {@link Schema#identifierFieldIds()}. Mirrors {@link + * #getEqualityFieldIds} so distribution and write-side equality-field inference stay aligned. + */ + static Set resolveEqualityFieldNames( + @Nullable Set equalityFields, Schema schema) { + if (equalityFields != null && !equalityFields.isEmpty()) { + return equalityFields; + } + + return schema.identifierFieldNames(); + } + static int safeAbs(int input) { if (input >= 0) { return input; diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java index 61a850212bf4..4472de67b421 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java @@ -20,7 +20,6 @@ import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE; -import java.util.Collections; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -79,6 +78,9 @@ int generateKey( @Nullable PartitionSpec tableSpec, @Nullable RowData overrideRowData) { String tableIdent = dynamicRecord.tableIdentifier().toString(); + Schema effectiveSchema = MoreObjects.firstNonNull(tableSchema, dynamicRecord.schema()); + Set effectiveEqualityFields = + DynamicSinkUtil.resolveEqualityFieldNames(dynamicRecord.equalityFields(), effectiveSchema); SelectorKey cacheKey = new SelectorKey( tableIdent, @@ -87,8 +89,8 @@ int generateKey( tableSpec != null ? tableSpec.specId() : null, dynamicRecord.schema(), dynamicRecord.spec(), - dynamicRecord.equalityFields(), - dynamicRecord.distributionMode(), + effectiveEqualityFields, + MoreObjects.firstNonNull(dynamicRecord.distributionMode(), DistributionMode.NONE), Math.min(dynamicRecord.writeParallelism(), maxWriteParallelism)); KeySelector keySelector = keySelectorCache.computeIfAbsent( @@ -96,11 +98,11 @@ int generateKey( k -> getKeySelector( tableIdent, - MoreObjects.firstNonNull(tableSchema, dynamicRecord.schema()), + effectiveSchema, MoreObjects.firstNonNull(tableSpec, dynamicRecord.spec()), - dynamicRecord.distributionMode(), MoreObjects.firstNonNull( - dynamicRecord.equalityFields(), Collections.emptySet()), + dynamicRecord.distributionMode(), DistributionMode.NONE), + effectiveEqualityFields, Math.min(dynamicRecord.writeParallelism(), maxWriteParallelism))); try { return keySelector.getKey( diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordProcessor.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordProcessor.java new file mode 100644 index 000000000000..298a338ea9d0 --- /dev/null +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordProcessor.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Collections; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.StringData; +import org.apache.iceberg.DistributionMode; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SnapshotRef; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.Test; + +class TestDynamicRecordProcessor { + + private static final Schema SCHEMA = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "data", Types.StringType.get())); + + private static final Schema SCHEMA_WITH_IDENTIFIER = + new Schema( + Lists.newArrayList( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "data", Types.StringType.get())), + Sets.newHashSet(1)); + + private static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of("default", "table"); + private static final String BRANCH = SnapshotRef.MAIN_BRANCH; + + @Test + void testForwardEligibleWhenNoEqualityFields() { + DynamicRecord record = recordWithDistributionMode(SCHEMA, null); + + assertThat(DynamicRecordProcessor.isForwardEligible(record)).isTrue(); + } + + @Test + void testNotForwardEligibleWhenDistributionModeSet() { + DynamicRecord record = recordWithDistributionMode(SCHEMA, DistributionMode.NONE); + + assertThat(DynamicRecordProcessor.isForwardEligible(record)).isFalse(); + } + + @Test + void testNotForwardEligibleWhenUserEqualityFieldsSet() { + DynamicRecord record = recordWithDistributionMode(SCHEMA, null); + record.setEqualityFields(Collections.singleton("id")); + + assertThat(DynamicRecordProcessor.isForwardEligible(record)).isFalse(); + } + + @Test + void testNotForwardEligibleWhenSchemaHasIdentifierFields() { + DynamicRecord record = recordWithDistributionMode(SCHEMA_WITH_IDENTIFIER, null); + + assertThat(DynamicRecordProcessor.isForwardEligible(record)).isFalse(); + } + + @Test + void testNotForwardEligibleWhenEmptyEqualityFieldsButIdentifierFieldsPresent() { + DynamicRecord record = recordWithDistributionMode(SCHEMA_WITH_IDENTIFIER, null); + record.setEqualityFields(Collections.emptySet()); + + assertThat(DynamicRecordProcessor.isForwardEligible(record)).isFalse(); + } + + private static DynamicRecord recordWithDistributionMode( + Schema schema, DistributionMode distributionMode) { + return new DynamicRecord( + TABLE_IDENTIFIER, + BRANCH, + schema, + GenericRowData.of(1, StringData.fromString("foo")), + PartitionSpec.unpartitioned(), + distributionMode, + 2); + } +} diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java index 9a485fafaf47..4181f00d0a0a 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java @@ -38,6 +38,7 @@ import org.apache.iceberg.flink.FlinkWriteConf; import org.apache.iceberg.flink.FlinkWriteOptions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Types; import org.junit.jupiter.api.Test; @@ -294,6 +295,102 @@ void testCapAtMaxWriteParallelism() throws Exception { .isEqualTo(maxWriteParallelism); } + @Test + void testIdentifierFieldsResolvedAsEqualityFields() throws Exception { + int writeParallelism = 2; + int maxWriteParallelism = 8; + HashKeyGenerator generator = new HashKeyGenerator(16, maxWriteParallelism); + + Schema schemaWithIdentifier = + new Schema( + Lists.newArrayList( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "data", Types.StringType.get())), + Sets.newHashSet(1)); + PartitionSpec unpartitioned = PartitionSpec.unpartitioned(); + + GenericRowData row1 = GenericRowData.of(1, StringData.fromString("foo")); + GenericRowData row2 = GenericRowData.of(1, StringData.fromString("bar")); + GenericRowData row3 = GenericRowData.of(2, StringData.fromString("baz")); + + DynamicRecord record1 = + new DynamicRecord( + TABLE_IDENTIFIER, + BRANCH, + schemaWithIdentifier, + row1, + unpartitioned, + DistributionMode.NONE, + writeParallelism); + DynamicRecord record2 = + new DynamicRecord( + TABLE_IDENTIFIER, + BRANCH, + schemaWithIdentifier, + row2, + unpartitioned, + DistributionMode.NONE, + writeParallelism); + DynamicRecord record3 = + new DynamicRecord( + TABLE_IDENTIFIER, + BRANCH, + schemaWithIdentifier, + row3, + unpartitioned, + DistributionMode.NONE, + writeParallelism); + + int writeKey1 = generator.generateKey(record1); + int writeKey2 = generator.generateKey(record2); + int writeKey3 = generator.generateKey(record3); + + assertThat(writeKey1).isEqualTo(writeKey2); + assertThat(writeKey3).isNotEqualTo(writeKey1); + } + + @Test + void testForwardRecordWithIdentifierFieldsRoutesByEqualityFields() throws Exception { + int writeParallelism = 2; + int maxWriteParallelism = 8; + HashKeyGenerator generator = new HashKeyGenerator(16, maxWriteParallelism); + + Schema schemaWithIdentifier = + new Schema( + Lists.newArrayList( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "data", Types.StringType.get())), + Sets.newHashSet(1)); + PartitionSpec unpartitioned = PartitionSpec.unpartitioned(); + + GenericRowData rowSameKey1 = GenericRowData.of(1, StringData.fromString("foo")); + GenericRowData rowSameKey2 = GenericRowData.of(1, StringData.fromString("bar")); + GenericRowData rowDifferentKey = GenericRowData.of(2, StringData.fromString("baz")); + + // Forward records (null distribution mode) with identifier fields on the schema must still + // route by equality fields, so HashKeyGenerator can handle them when the processor falls back + // off the forward path. + DynamicRecord record1 = + new DynamicRecord( + TABLE_IDENTIFIER, BRANCH, schemaWithIdentifier, rowSameKey1, unpartitioned); + DynamicRecord record2 = + new DynamicRecord( + TABLE_IDENTIFIER, BRANCH, schemaWithIdentifier, rowSameKey2, unpartitioned); + DynamicRecord record3 = + new DynamicRecord( + TABLE_IDENTIFIER, BRANCH, schemaWithIdentifier, rowDifferentKey, unpartitioned); + record1.writeParallelism(writeParallelism); + record2.writeParallelism(writeParallelism); + record3.writeParallelism(writeParallelism); + + int writeKey1 = generator.generateKey(record1); + int writeKey2 = generator.generateKey(record2); + int writeKey3 = generator.generateKey(record3); + + assertThat(writeKey1).isEqualTo(writeKey2); + assertThat(writeKey3).isNotEqualTo(writeKey1); + } + @Test void testHashModeWithoutEqualityFieldsFallsBackToNone() throws Exception { int writeParallelism = 2;