Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/docs/flink-writes.md
Original file line number Diff line number Diff line change
Expand Up @@ -487,10 +487,10 @@ We need the following information (DynamicRecord) for every record:
| `Schema` | The schema of the record. |
| `Spec` | The expected partitioning specification for the record. |
| `RowData` | The actual row data to be written. |
| `DistributionMode` | The distribution mode for writing the record (NONE, HASH or `null`). When `null`, the record won't be shuffled at all. |
| `DistributionMode` | The distribution mode for writing the record (NONE, HASH or `null`). When `null`, the record won't be shuffled at all — except when the record resolves to a non-empty equality-field set, in which case the dynamic sink falls back to hash distribution to keep records sharing the same equality key on the same writer. |
| `Parallelism` | The maximum number of parallel writers for a given table/branch/schema/spec (WriteTarget). |
| `UpsertMode` | Overrides this table's write.upsert.enabled (optional). |
| `EqualityFields` | The equality fields for the table(optional). |
| `EqualityFields` | The equality fields for the table (optional). When unset, the dynamic sink falls back to the schema's identifier fields for both distribution (records sharing a key route to the same writer) and equality-delete inference. Identifier fields do not auto-enable upsert mode — that flag remains opt-in. |

### Schema Evolution

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public void processElement(T element, Context ctx, Collector<DynamicRecordIntern
public void collect(DynamicRecord inputData) {
DynamicRecordWithConfig data = dynamicRecordWithConfig.wrap(inputData);

boolean isForward = data.distributionMode() == null;
boolean isForward = isForwardEligible(data);
boolean exists = tableCache.exists(data.tableIdentifier()).f0;
String foundBranch = exists ? tableCache.branch(data.tableIdentifier(), data.branch()) : null;

Expand Down Expand Up @@ -224,4 +224,17 @@ public void close() {
throw new RuntimeException(e);
}
}

/**
* Forward mode is incompatible with equality fields: records sharing the same equality key must
* land on the same writer subtask to preserve equality-delete semantics. A record can only take
* the forward path when the user requested it (null distribution mode) and the record resolves to
* an empty equality-field set (no user-supplied set and no schema identifier fields).
*/
@VisibleForTesting
static boolean isForwardEligible(DynamicRecord data) {
return data.distributionMode() == null
&& DynamicSinkUtil.resolveEqualityFieldNames(data.equalityFields(), data.schema())
.isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.Collections;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.iceberg.Schema;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
Expand Down Expand Up @@ -49,6 +50,20 @@ static Set<Integer> getEqualityFieldIds(Set<String> equalityFields, Schema schem
return equalityFieldIds;
}

/**
* Resolves the effective equality field names. Returns the user-supplied set when non-empty,
* otherwise falls back to the names of {@link Schema#identifierFieldIds()}. Mirrors {@link
* #getEqualityFieldIds} so distribution and write-side equality-field inference stay aligned.
*/
static Set<String> resolveEqualityFieldNames(
@Nullable Set<String> equalityFields, Schema schema) {
if (equalityFields != null && !equalityFields.isEmpty()) {
return equalityFields;
}

return schema.identifierFieldNames();
}

static int safeAbs(int input) {
if (input >= 0) {
return input;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE;

import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
Expand Down Expand Up @@ -79,6 +78,9 @@ int generateKey(
@Nullable PartitionSpec tableSpec,
@Nullable RowData overrideRowData) {
String tableIdent = dynamicRecord.tableIdentifier().toString();
Schema effectiveSchema = MoreObjects.firstNonNull(tableSchema, dynamicRecord.schema());
Set<String> effectiveEqualityFields =
DynamicSinkUtil.resolveEqualityFieldNames(dynamicRecord.equalityFields(), effectiveSchema);
SelectorKey cacheKey =
new SelectorKey(
tableIdent,
Expand All @@ -87,20 +89,20 @@ int generateKey(
tableSpec != null ? tableSpec.specId() : null,
dynamicRecord.schema(),
dynamicRecord.spec(),
dynamicRecord.equalityFields(),
dynamicRecord.distributionMode(),
effectiveEqualityFields,
MoreObjects.firstNonNull(dynamicRecord.distributionMode(), DistributionMode.NONE),
Math.min(dynamicRecord.writeParallelism(), maxWriteParallelism));
KeySelector<RowData, Integer> keySelector =
keySelectorCache.computeIfAbsent(
cacheKey,
k ->
getKeySelector(
tableIdent,
MoreObjects.firstNonNull(tableSchema, dynamicRecord.schema()),
effectiveSchema,
MoreObjects.firstNonNull(tableSpec, dynamicRecord.spec()),
dynamicRecord.distributionMode(),
MoreObjects.firstNonNull(
dynamicRecord.equalityFields(), Collections.emptySet()),
dynamicRecord.distributionMode(), DistributionMode.NONE),
effectiveEqualityFields,
Math.min(dynamicRecord.writeParallelism(), maxWriteParallelism)));
try {
return keySelector.getKey(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.sink.dynamic;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.Collections;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.StringData;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
import org.junit.jupiter.api.Test;

class TestDynamicRecordProcessor {

private static final Schema SCHEMA =
new Schema(
Types.NestedField.required(1, "id", Types.IntegerType.get()),
Types.NestedField.required(2, "data", Types.StringType.get()));

private static final Schema SCHEMA_WITH_IDENTIFIER =
new Schema(
Lists.newArrayList(
Types.NestedField.required(1, "id", Types.IntegerType.get()),
Types.NestedField.required(2, "data", Types.StringType.get())),
Sets.newHashSet(1));

private static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of("default", "table");
private static final String BRANCH = SnapshotRef.MAIN_BRANCH;

@Test
void testForwardEligibleWhenNoEqualityFields() {
DynamicRecord record = recordWithDistributionMode(SCHEMA, null);

assertThat(DynamicRecordProcessor.isForwardEligible(record)).isTrue();
}

@Test
void testNotForwardEligibleWhenDistributionModeSet() {
DynamicRecord record = recordWithDistributionMode(SCHEMA, DistributionMode.NONE);

assertThat(DynamicRecordProcessor.isForwardEligible(record)).isFalse();
}

@Test
void testNotForwardEligibleWhenUserEqualityFieldsSet() {
DynamicRecord record = recordWithDistributionMode(SCHEMA, null);
record.setEqualityFields(Collections.singleton("id"));

assertThat(DynamicRecordProcessor.isForwardEligible(record)).isFalse();
}

@Test
void testNotForwardEligibleWhenSchemaHasIdentifierFields() {
DynamicRecord record = recordWithDistributionMode(SCHEMA_WITH_IDENTIFIER, null);

assertThat(DynamicRecordProcessor.isForwardEligible(record)).isFalse();
}

@Test
void testNotForwardEligibleWhenEmptyEqualityFieldsButIdentifierFieldsPresent() {
DynamicRecord record = recordWithDistributionMode(SCHEMA_WITH_IDENTIFIER, null);
record.setEqualityFields(Collections.emptySet());

assertThat(DynamicRecordProcessor.isForwardEligible(record)).isFalse();
}

private static DynamicRecord recordWithDistributionMode(
Schema schema, DistributionMode distributionMode) {
return new DynamicRecord(
TABLE_IDENTIFIER,
BRANCH,
schema,
GenericRowData.of(1, StringData.fromString("foo")),
PartitionSpec.unpartitioned(),
distributionMode,
2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.iceberg.flink.FlinkWriteConf;
import org.apache.iceberg.flink.FlinkWriteOptions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -294,6 +295,102 @@ void testCapAtMaxWriteParallelism() throws Exception {
.isEqualTo(maxWriteParallelism);
}

@Test
void testIdentifierFieldsResolvedAsEqualityFields() throws Exception {
int writeParallelism = 2;
int maxWriteParallelism = 8;
HashKeyGenerator generator = new HashKeyGenerator(16, maxWriteParallelism);

Schema schemaWithIdentifier =
new Schema(
Lists.newArrayList(
Types.NestedField.required(1, "id", Types.IntegerType.get()),
Types.NestedField.required(2, "data", Types.StringType.get())),
Sets.newHashSet(1));
PartitionSpec unpartitioned = PartitionSpec.unpartitioned();

GenericRowData row1 = GenericRowData.of(1, StringData.fromString("foo"));
GenericRowData row2 = GenericRowData.of(1, StringData.fromString("bar"));
GenericRowData row3 = GenericRowData.of(2, StringData.fromString("baz"));

DynamicRecord record1 =
new DynamicRecord(
TABLE_IDENTIFIER,
BRANCH,
schemaWithIdentifier,
row1,
unpartitioned,
DistributionMode.NONE,
writeParallelism);
DynamicRecord record2 =
new DynamicRecord(
TABLE_IDENTIFIER,
BRANCH,
schemaWithIdentifier,
row2,
unpartitioned,
DistributionMode.NONE,
writeParallelism);
DynamicRecord record3 =
new DynamicRecord(
TABLE_IDENTIFIER,
BRANCH,
schemaWithIdentifier,
row3,
unpartitioned,
DistributionMode.NONE,
writeParallelism);

int writeKey1 = generator.generateKey(record1);
int writeKey2 = generator.generateKey(record2);
int writeKey3 = generator.generateKey(record3);

assertThat(writeKey1).isEqualTo(writeKey2);
assertThat(writeKey3).isNotEqualTo(writeKey1);
}

@Test
void testForwardRecordWithIdentifierFieldsRoutesByEqualityFields() throws Exception {
int writeParallelism = 2;
int maxWriteParallelism = 8;
HashKeyGenerator generator = new HashKeyGenerator(16, maxWriteParallelism);

Schema schemaWithIdentifier =
new Schema(
Lists.newArrayList(
Types.NestedField.required(1, "id", Types.IntegerType.get()),
Types.NestedField.required(2, "data", Types.StringType.get())),
Sets.newHashSet(1));
PartitionSpec unpartitioned = PartitionSpec.unpartitioned();

GenericRowData rowSameKey1 = GenericRowData.of(1, StringData.fromString("foo"));
GenericRowData rowSameKey2 = GenericRowData.of(1, StringData.fromString("bar"));
GenericRowData rowDifferentKey = GenericRowData.of(2, StringData.fromString("baz"));

// Forward records (null distribution mode) with identifier fields on the schema must still
// route by equality fields, so HashKeyGenerator can handle them when the processor falls back
// off the forward path.
DynamicRecord record1 =
new DynamicRecord(
TABLE_IDENTIFIER, BRANCH, schemaWithIdentifier, rowSameKey1, unpartitioned);
DynamicRecord record2 =
new DynamicRecord(
TABLE_IDENTIFIER, BRANCH, schemaWithIdentifier, rowSameKey2, unpartitioned);
DynamicRecord record3 =
new DynamicRecord(
TABLE_IDENTIFIER, BRANCH, schemaWithIdentifier, rowDifferentKey, unpartitioned);
record1.writeParallelism(writeParallelism);
record2.writeParallelism(writeParallelism);
record3.writeParallelism(writeParallelism);

int writeKey1 = generator.generateKey(record1);
int writeKey2 = generator.generateKey(record2);
int writeKey3 = generator.generateKey(record3);

assertThat(writeKey1).isEqualTo(writeKey2);
assertThat(writeKey3).isNotEqualTo(writeKey1);
}

@Test
void testHashModeWithoutEqualityFieldsFallsBackToNone() throws Exception {
int writeParallelism = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public void processElement(T element, Context ctx, Collector<DynamicRecordIntern
public void collect(DynamicRecord inputData) {
DynamicRecordWithConfig data = dynamicRecordWithConfig.wrap(inputData);

boolean isForward = data.distributionMode() == null;
boolean isForward = isForwardEligible(data);
boolean exists = tableCache.exists(data.tableIdentifier()).f0;
String foundBranch = exists ? tableCache.branch(data.tableIdentifier(), data.branch()) : null;

Expand Down Expand Up @@ -224,4 +224,17 @@ public void close() {
throw new RuntimeException(e);
}
}

/**
* Forward mode is incompatible with equality fields: records sharing the same equality key must
* land on the same writer subtask to preserve equality-delete semantics. A record can only take
* the forward path when the user requested it (null distribution mode) and the record resolves to
* an empty equality-field set (no user-supplied set and no schema identifier fields).
*/
@VisibleForTesting
static boolean isForwardEligible(DynamicRecord data) {
return data.distributionMode() == null
&& DynamicSinkUtil.resolveEqualityFieldNames(data.equalityFields(), data.schema())
.isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.Collections;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.iceberg.Schema;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
Expand Down Expand Up @@ -49,6 +50,20 @@ static Set<Integer> getEqualityFieldIds(Set<String> equalityFields, Schema schem
return equalityFieldIds;
}

/**
* Resolves the effective equality field names. Returns the user-supplied set when non-empty,
* otherwise falls back to the names of {@link Schema#identifierFieldIds()}. Mirrors {@link
* #getEqualityFieldIds} so distribution and write-side equality-field inference stay aligned.
*/
static Set<String> resolveEqualityFieldNames(
@Nullable Set<String> equalityFields, Schema schema) {
if (equalityFields != null && !equalityFields.isEmpty()) {
return equalityFields;
}

return schema.identifierFieldNames();
}

static int safeAbs(int input) {
if (input >= 0) {
return input;
Expand Down
Loading