From c0a2665a60ab53feac6a1e47214da910b13f9701 Mon Sep 17 00:00:00 2001 From: Pritam Kumar Mishra Date: Sat, 9 Aug 2025 11:27:29 +0530 Subject: [PATCH 1/7] added metadat and data path in case of dynamic routing --- committer/build.gradle | 36 ++ .../java/org/apache/iceberg/Coordinator.java | 315 ++++++++++++++++++ .../main/java/org/apache/iceberg/Main.java | 7 + .../iceberg/connect/IcebergSinkConfig.java | 34 +- .../iceberg/connect/TableSinkConfig.java | 13 +- .../connect/data/IcebergWriterFactory.java | 11 +- 6 files changed, 413 insertions(+), 3 deletions(-) create mode 100644 committer/build.gradle create mode 100644 committer/src/main/java/org/apache/iceberg/Coordinator.java create mode 100644 committer/src/main/java/org/apache/iceberg/Main.java diff --git a/committer/build.gradle b/committer/build.gradle new file mode 100644 index 000000000000..0de61912d850 --- /dev/null +++ b/committer/build.gradle @@ -0,0 +1,36 @@ +plugins { + id 'java-library' +} + +group = 'org.apache.iceberg' +version = '1.7.0-SNAPSHOT' + +repositories { + mavenCentral() +} + +dependencies { + api project(':iceberg-api') + implementation project(':iceberg-kafka-connect') + implementation project(':iceberg-core') + implementation project(':iceberg-common') + implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow') + implementation project(':iceberg-data') + implementation project(':iceberg-kafka-connect:iceberg-kafka-connect-events') + implementation platform(libs.jackson.bom) + implementation libs.jackson.core + implementation libs.jackson.databind + implementation libs.avro.avro + + compileOnly libs.kafka.clients + compileOnly libs.kafka.connect.api + compileOnly libs.kafka.connect.json + + testImplementation libs.hadoop3.client + testRuntimeOnly project(':iceberg-parquet') + testRuntimeOnly project(':iceberg-orc') +} + +test { + useJUnitPlatform() +} diff --git a/committer/src/main/java/org/apache/iceberg/Coordinator.java b/committer/src/main/java/org/apache/iceberg/Coordinator.java new file mode 100644 index 000000000000..6a260d91bb80 --- /dev/null +++ b/committer/src/main/java/org/apache/iceberg/Coordinator.java @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.channels.Channel; +import java.time.Duration; +import java.time.OffsetDateTime; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.connect.channel.CommitState; +import org.apache.iceberg.connect.events.CommitComplete; +import org.apache.iceberg.connect.events.CommitToTable; +import org.apache.iceberg.connect.events.DataWritten; +import org.apache.iceberg.connect.events.Event; +import org.apache.iceberg.connect.events.StartCommit; +import org.apache.iceberg.connect.events.TableReference; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.util.ThreadPools; +import org.apache.kafka.clients.admin.MemberDescription; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkTaskContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class Coordinator { + + private static final Logger LOG = LoggerFactory.getLogger(Coordinator.class); + private static final ObjectMapper MAPPER = new ObjectMapper(); + private static final String COMMIT_ID_SNAPSHOT_PROP = "kafka.connect.commit-id"; + private static final String VALID_THROUGH_TS_SNAPSHOT_PROP = "kafka.connect.valid-through-ts"; + private static final Duration POLL_DURATION = Duration.ofSeconds(1); + + private final Catalog catalog; + private final IcebergSinkConfig config; + private final int totalPartitionCount; + private final String snapshotOffsetsProp; + private final ExecutorService exec; + private final CommitState commitState; + private volatile boolean terminated; + + Coordinator( + Catalog catalog, + IcebergSinkConfig config, + Collection members, + KafkaClientFactory clientFactory, + SinkTaskContext context) { + // pass consumer group ID to which we commit low watermark offsets + super("coordinator", config.connectGroupId() + "-coord", config, clientFactory, context); + + this.catalog = catalog; + this.config = config; + this.totalPartitionCount = + members.stream().mapToInt(desc -> desc.assignment().topicPartitions().size()).sum(); + this.snapshotOffsetsProp = + String.format( + "kafka.connect.offsets.%s.%s", config.controlTopic(), config.connectGroupId()); + this.exec = ThreadPools.newFixedThreadPool("iceberg-committer", config.commitThreads()); + this.commitState = new CommitState(config); + } + + void process() { + if (commitState.isCommitIntervalReached()) { + // send out begin commit + commitState.startNewCommit(); + Event event = + new Event(config.connectGroupId(), new StartCommit(commitState.currentCommitId())); + send(event); + LOG.info("Commit {} initiated", commitState.currentCommitId()); + } + + consumeAvailable(POLL_DURATION); + + if (commitState.isCommitTimedOut()) { + commit(true); + } + } + + @Override + protected boolean receive(Envelope envelope) { + switch (envelope.event().payload().type()) { + case DATA_WRITTEN: + commitState.addResponse(envelope); + return true; + case DATA_COMPLETE: + commitState.addReady(envelope); + if (commitState.isCommitReady(totalPartitionCount)) { + commit(false); + } + return true; + } + return false; + } + + private void commit(boolean partialCommit) { + try { + doCommit(partialCommit); + } catch (Exception e) { + LOG.warn("Commit failed, will try again next cycle", e); + } finally { + commitState.endCurrentCommit(); + } + } + + private void doCommit(boolean partialCommit) { + Map> commitMap = commitState.tableCommitMap(); + + String offsetsJson = offsetsJson(); + OffsetDateTime validThroughTs = commitState.validThroughTs(partialCommit); + + Tasks.foreach(commitMap.entrySet()) + .executeWith(exec) + .stopOnFailure() + .run( + entry -> { + commitToTable(entry.getKey(), entry.getValue(), offsetsJson, validThroughTs); + }); + + // we should only get here if all tables committed successfully... + commitConsumerOffsets(); + commitState.clearResponses(); + + Event event = + new Event( + config.connectGroupId(), + new CommitComplete(commitState.currentCommitId(), validThroughTs)); + send(event); + + LOG.info( + "Commit {} complete, committed to {} table(s), valid-through {}", + commitState.currentCommitId(), + commitMap.size(), + validThroughTs); + } + + private String offsetsJson() { + try { + return MAPPER.writeValueAsString(controlTopicOffsets()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private void commitToTable( + TableReference tableReference, + List envelopeList, + String offsetsJson, + OffsetDateTime validThroughTs) { + TableIdentifier tableIdentifier = tableReference.identifier(); + Table table; + try { + table = catalog.loadTable(tableIdentifier); + } catch (NoSuchTableException e) { + LOG.warn("Table not found, skipping commit: {}", tableIdentifier, e); + return; + } + + String branch = config.tableConfig(tableIdentifier.toString()).commitBranch(); + + Map committedOffsets = lastCommittedOffsetsForTable(table, branch); + + List payloads = + envelopeList.stream() + .filter( + envelope -> { + Long minOffset = committedOffsets.get(envelope.partition()); + return minOffset == null || envelope.offset() >= minOffset; + }) + .map(envelope -> (DataWritten) envelope.event().payload()) + .collect(Collectors.toList()); + + List dataFiles = + payloads.stream() + .filter(payload -> payload.dataFiles() != null) + .flatMap(payload -> payload.dataFiles().stream()) + .filter(dataFile -> dataFile.recordCount() > 0) + .filter(distinctByKey(ContentFile::location)) + .collect(Collectors.toList()); + + List deleteFiles = + payloads.stream() + .filter(payload -> payload.deleteFiles() != null) + .flatMap(payload -> payload.deleteFiles().stream()) + .filter(deleteFile -> deleteFile.recordCount() > 0) + .filter(distinctByKey(ContentFile::location)) + .collect(Collectors.toList()); + + if (terminated) { + throw new ConnectException("Coordinator is terminated, commit aborted"); + } + + if (dataFiles.isEmpty() && deleteFiles.isEmpty()) { + LOG.info("Nothing to commit to table {}, skipping", tableIdentifier); + } else { + if (deleteFiles.isEmpty()) { + AppendFiles appendOp = table.newAppend(); + if (branch != null) { + appendOp.toBranch(branch); + } + appendOp.set(snapshotOffsetsProp, offsetsJson); + appendOp.set(COMMIT_ID_SNAPSHOT_PROP, commitState.currentCommitId().toString()); + if (validThroughTs != null) { + appendOp.set(VALID_THROUGH_TS_SNAPSHOT_PROP, validThroughTs.toString()); + } + dataFiles.forEach(appendOp::appendFile); + appendOp.commit(); + } else { + RowDelta deltaOp = table.newRowDelta(); + if (branch != null) { + deltaOp.toBranch(branch); + } + deltaOp.set(snapshotOffsetsProp, offsetsJson); + deltaOp.set(COMMIT_ID_SNAPSHOT_PROP, commitState.currentCommitId().toString()); + if (validThroughTs != null) { + deltaOp.set(VALID_THROUGH_TS_SNAPSHOT_PROP, validThroughTs.toString()); + } + dataFiles.forEach(deltaOp::addRows); + deleteFiles.forEach(deltaOp::addDeletes); + deltaOp.commit(); + } + + Long snapshotId = latestSnapshot(table, branch).snapshotId(); + Event event = + new Event( + config.connectGroupId(), + new CommitToTable( + commitState.currentCommitId(), tableReference, snapshotId, validThroughTs)); + send(event); + + LOG.info( + "Commit complete to table {}, snapshot {}, commit ID {}, valid-through {}", + tableIdentifier, + snapshotId, + commitState.currentCommitId(), + validThroughTs); + } + } + + private Predicate distinctByKey(Function keyExtractor) { + Map seen = Maps.newConcurrentMap(); + return t -> seen.putIfAbsent(keyExtractor.apply(t), Boolean.TRUE) == null; + } + + private Snapshot latestSnapshot(Table table, String branch) { + if (branch == null) { + return table.currentSnapshot(); + } + return table.snapshot(branch); + } + + private Map lastCommittedOffsetsForTable(Table table, String branch) { + Snapshot snapshot = latestSnapshot(table, branch); + while (snapshot != null) { + Map summary = snapshot.summary(); + String value = summary.get(snapshotOffsetsProp); + if (value != null) { + TypeReference> typeRef = new TypeReference>() {}; + try { + return MAPPER.readValue(value, typeRef); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + Long parentSnapshotId = snapshot.parentId(); + snapshot = parentSnapshotId != null ? table.snapshot(parentSnapshotId) : null; + } + return ImmutableMap.of(); + } + + void terminate() { + this.terminated = true; + + exec.shutdownNow(); + + // wait for coordinator termination, else cause the sink task to fail + try { + if (!exec.awaitTermination(1, TimeUnit.MINUTES)) { + throw new ConnectException("Timed out waiting for coordinator shutdown"); + } + } catch (InterruptedException e) { + throw new ConnectException("Interrupted while waiting for coordinator shutdown", e); + } + } +} diff --git a/committer/src/main/java/org/apache/iceberg/Main.java b/committer/src/main/java/org/apache/iceberg/Main.java new file mode 100644 index 000000000000..3a6b1ae495ae --- /dev/null +++ b/committer/src/main/java/org/apache/iceberg/Main.java @@ -0,0 +1,7 @@ +package org.apache.iceberg; + +public class Main { + public static void main(String[] args) { + System.out.println("Hello world!"); + } +} \ No newline at end of file diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java index 9650ce16270c..f6dd7cf3c0a6 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java @@ -28,6 +28,7 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; import org.apache.iceberg.IcebergBuild; +import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Splitter; @@ -56,6 +57,7 @@ public class IcebergSinkConfig extends AbstractConfig { private static final String ID_COLUMNS = "id-columns"; private static final String PARTITION_BY = "partition-by"; private static final String COMMIT_BRANCH = "commit-branch"; + private static final String DYNAMIC_ROUTE_DATA_METADATA_PREFIX = "iceberg.dynamic-route-data-metadata-prefix"; private static final String CATALOG_PROP_PREFIX = "iceberg.catalog."; private static final String HADOOP_PROP_PREFIX = "iceberg.hadoop."; @@ -235,6 +237,13 @@ private static ConfigDef newConfigDef() { 120000L, Importance.LOW, "config to control coordinator executor keep alive time"); + configDef.define( + DYNAMIC_ROUTE_DATA_METADATA_PREFIX, + ConfigDef.Type.STRING, + "", + Importance.HIGH, + "prefix for creation of metadata path and data path in case of dynamic routing" + ); return configDef; } @@ -375,7 +384,20 @@ public TableSinkConfig tableConfig(String tableName) { String commitBranch = tableConfig.getOrDefault(COMMIT_BRANCH, tablesDefaultCommitBranch()); - return new TableSinkConfig(routeRegex, idColumns, partitionBy, commitBranch); + String metadataPath = "", dataPath = ""; + + if (dynamicTablesEnabled()) { + TableIdentifier tableIdentifier = TableIdentifier.parse(tableName); + if (originalProps.containsKey("iceberg.catalog.warehouse")) { + metadataPath = originalProps.get("iceberg.catalog.warehouse") + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/metadata"; + dataPath = originalProps.get("iceberg.catalog.warehouse") + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/data"; + } else { + metadataPath = tableConfig.getOrDefault("write.metadata.path", defaultMetadataPath(tableName)); + dataPath = tableConfig.getOrDefault("write.data.path", defaultDataPath(tableName)); + } + } + + return new TableSinkConfig(routeRegex, idColumns, partitionBy, commitBranch, dataPath, metadataPath); }); } @@ -388,6 +410,16 @@ static List stringToList(String value, String regex) { return Arrays.stream(value.split(regex)).map(String::trim).collect(Collectors.toList()); } + private String defaultDataPath(String tableName) { + TableIdentifier tableIdentifier = TableIdentifier.parse(tableName); + return getString(DYNAMIC_ROUTE_DATA_METADATA_PREFIX) + "/" + connectorName() + "/" + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/data"; + } + + private String defaultMetadataPath(String tableName) { + TableIdentifier tableIdentifier = TableIdentifier.parse(tableName); + return getString(DYNAMIC_ROUTE_DATA_METADATA_PREFIX) + "/" + connectorName() + "/" + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/metadata"; + } + public String controlTopic() { return getString(CONTROL_TOPIC_PROP); } diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java index 0ecde1f7dd0b..28fdd4f36d15 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java @@ -27,13 +27,24 @@ public class TableSinkConfig { private final List idColumns; private final List partitionBy; private final String commitBranch; + private final String dataPath, metadataPath; public TableSinkConfig( - Pattern routeRegex, List idColumns, List partitionBy, String commitBranch) { + Pattern routeRegex, List idColumns, List partitionBy, String commitBranch, String dataPath, String metadataPath) { this.routeRegex = routeRegex; this.idColumns = idColumns; this.partitionBy = partitionBy; this.commitBranch = commitBranch; + this.dataPath = dataPath; + this.metadataPath = metadataPath; + } + + public String getDataPath() { + return dataPath; + } + + public String getMetadataPath() { + return metadataPath; } public Pattern routeRegex() { diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java index 92f5af2d7a87..5de1039e77b0 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java @@ -20,6 +20,7 @@ import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; @@ -89,6 +90,14 @@ Table autoCreateTable(String tableName, SinkRecord sample) { createNamespaceIfNotExist(catalog, identifier.namespace()); List partitionBy = config.tableConfig(tableName).partitionBy(); + + Map tableAutoCreateProps = config.autoCreateProps(); + + if (config.dynamicTablesEnabled()) { + tableAutoCreateProps.put("write.metadata.path", config.tableConfig(tableName).getMetadataPath()); + tableAutoCreateProps.put("write.data.path", config.tableConfig(tableName).getDataPath()); + } + PartitionSpec spec; try { spec = SchemaUtils.createPartitionSpec(schema, partitionBy); @@ -110,7 +119,7 @@ Table autoCreateTable(String tableName, SinkRecord sample) { try { result.set( catalog.createTable( - identifier, schema, partitionSpec, config.autoCreateProps())); + identifier, schema, partitionSpec, tableAutoCreateProps)); } catch (AlreadyExistsException e) { result.set(catalog.loadTable(identifier)); } From 67619ec140a65c7768af074178c95408d76e605d Mon Sep 17 00:00:00 2001 From: Pritam Kumar Mishra Date: Sat, 9 Aug 2025 11:28:07 +0530 Subject: [PATCH 2/7] spotless --- .../iceberg/connect/IcebergSinkConfig.java | 44 +++++++++++++++---- .../iceberg/connect/TableSinkConfig.java | 7 ++- .../connect/data/IcebergWriterFactory.java | 6 +-- 3 files changed, 44 insertions(+), 13 deletions(-) diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java index f6dd7cf3c0a6..8339cf7f89ac 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java @@ -57,7 +57,8 @@ public class IcebergSinkConfig extends AbstractConfig { private static final String ID_COLUMNS = "id-columns"; private static final String PARTITION_BY = "partition-by"; private static final String COMMIT_BRANCH = "commit-branch"; - private static final String DYNAMIC_ROUTE_DATA_METADATA_PREFIX = "iceberg.dynamic-route-data-metadata-prefix"; + private static final String DYNAMIC_ROUTE_DATA_METADATA_PREFIX = + "iceberg.dynamic-route-data-metadata-prefix"; private static final String CATALOG_PROP_PREFIX = "iceberg.catalog."; private static final String HADOOP_PROP_PREFIX = "iceberg.hadoop."; @@ -242,8 +243,7 @@ private static ConfigDef newConfigDef() { ConfigDef.Type.STRING, "", Importance.HIGH, - "prefix for creation of metadata path and data path in case of dynamic routing" - ); + "prefix for creation of metadata path and data path in case of dynamic routing"); return configDef; } @@ -389,15 +389,27 @@ public TableSinkConfig tableConfig(String tableName) { if (dynamicTablesEnabled()) { TableIdentifier tableIdentifier = TableIdentifier.parse(tableName); if (originalProps.containsKey("iceberg.catalog.warehouse")) { - metadataPath = originalProps.get("iceberg.catalog.warehouse") + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/metadata"; - dataPath = originalProps.get("iceberg.catalog.warehouse") + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/data"; + metadataPath = + originalProps.get("iceberg.catalog.warehouse") + + tableIdentifier.namespace() + + "/" + + tableIdentifier.name() + + "/metadata"; + dataPath = + originalProps.get("iceberg.catalog.warehouse") + + tableIdentifier.namespace() + + "/" + + tableIdentifier.name() + + "/data"; } else { - metadataPath = tableConfig.getOrDefault("write.metadata.path", defaultMetadataPath(tableName)); + metadataPath = + tableConfig.getOrDefault("write.metadata.path", defaultMetadataPath(tableName)); dataPath = tableConfig.getOrDefault("write.data.path", defaultDataPath(tableName)); } } - return new TableSinkConfig(routeRegex, idColumns, partitionBy, commitBranch, dataPath, metadataPath); + return new TableSinkConfig( + routeRegex, idColumns, partitionBy, commitBranch, dataPath, metadataPath); }); } @@ -412,12 +424,26 @@ static List stringToList(String value, String regex) { private String defaultDataPath(String tableName) { TableIdentifier tableIdentifier = TableIdentifier.parse(tableName); - return getString(DYNAMIC_ROUTE_DATA_METADATA_PREFIX) + "/" + connectorName() + "/" + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/data"; + return getString(DYNAMIC_ROUTE_DATA_METADATA_PREFIX) + + "/" + + connectorName() + + "/" + + tableIdentifier.namespace() + + "/" + + tableIdentifier.name() + + "/data"; } private String defaultMetadataPath(String tableName) { TableIdentifier tableIdentifier = TableIdentifier.parse(tableName); - return getString(DYNAMIC_ROUTE_DATA_METADATA_PREFIX) + "/" + connectorName() + "/" + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/metadata"; + return getString(DYNAMIC_ROUTE_DATA_METADATA_PREFIX) + + "/" + + connectorName() + + "/" + + tableIdentifier.namespace() + + "/" + + tableIdentifier.name() + + "/metadata"; } public String controlTopic() { diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java index 28fdd4f36d15..36879bf6929a 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java @@ -30,7 +30,12 @@ public class TableSinkConfig { private final String dataPath, metadataPath; public TableSinkConfig( - Pattern routeRegex, List idColumns, List partitionBy, String commitBranch, String dataPath, String metadataPath) { + Pattern routeRegex, + List idColumns, + List partitionBy, + String commitBranch, + String dataPath, + String metadataPath) { this.routeRegex = routeRegex; this.idColumns = idColumns; this.partitionBy = partitionBy; diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java index 5de1039e77b0..df87e8d2a958 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java @@ -94,7 +94,8 @@ Table autoCreateTable(String tableName, SinkRecord sample) { Map tableAutoCreateProps = config.autoCreateProps(); if (config.dynamicTablesEnabled()) { - tableAutoCreateProps.put("write.metadata.path", config.tableConfig(tableName).getMetadataPath()); + tableAutoCreateProps.put( + "write.metadata.path", config.tableConfig(tableName).getMetadataPath()); tableAutoCreateProps.put("write.data.path", config.tableConfig(tableName).getDataPath()); } @@ -118,8 +119,7 @@ Table autoCreateTable(String tableName, SinkRecord sample) { notUsed -> { try { result.set( - catalog.createTable( - identifier, schema, partitionSpec, tableAutoCreateProps)); + catalog.createTable(identifier, schema, partitionSpec, tableAutoCreateProps)); } catch (AlreadyExistsException e) { result.set(catalog.loadTable(identifier)); } From 6b15ae402deded39a34843f85f5342dbb9ae28a0 Mon Sep 17 00:00:00 2001 From: Pritam Kumar Mishra Date: Sat, 9 Aug 2025 17:54:24 +0530 Subject: [PATCH 3/7] Revert "spotless" This reverts commit 67619ec140a65c7768af074178c95408d76e605d. --- .../iceberg/connect/IcebergSinkConfig.java | 44 ++++--------------- .../iceberg/connect/TableSinkConfig.java | 7 +-- .../connect/data/IcebergWriterFactory.java | 6 +-- 3 files changed, 13 insertions(+), 44 deletions(-) diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java index 8339cf7f89ac..f6dd7cf3c0a6 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java @@ -57,8 +57,7 @@ public class IcebergSinkConfig extends AbstractConfig { private static final String ID_COLUMNS = "id-columns"; private static final String PARTITION_BY = "partition-by"; private static final String COMMIT_BRANCH = "commit-branch"; - private static final String DYNAMIC_ROUTE_DATA_METADATA_PREFIX = - "iceberg.dynamic-route-data-metadata-prefix"; + private static final String DYNAMIC_ROUTE_DATA_METADATA_PREFIX = "iceberg.dynamic-route-data-metadata-prefix"; private static final String CATALOG_PROP_PREFIX = "iceberg.catalog."; private static final String HADOOP_PROP_PREFIX = "iceberg.hadoop."; @@ -243,7 +242,8 @@ private static ConfigDef newConfigDef() { ConfigDef.Type.STRING, "", Importance.HIGH, - "prefix for creation of metadata path and data path in case of dynamic routing"); + "prefix for creation of metadata path and data path in case of dynamic routing" + ); return configDef; } @@ -389,27 +389,15 @@ public TableSinkConfig tableConfig(String tableName) { if (dynamicTablesEnabled()) { TableIdentifier tableIdentifier = TableIdentifier.parse(tableName); if (originalProps.containsKey("iceberg.catalog.warehouse")) { - metadataPath = - originalProps.get("iceberg.catalog.warehouse") - + tableIdentifier.namespace() - + "/" - + tableIdentifier.name() - + "/metadata"; - dataPath = - originalProps.get("iceberg.catalog.warehouse") - + tableIdentifier.namespace() - + "/" - + tableIdentifier.name() - + "/data"; + metadataPath = originalProps.get("iceberg.catalog.warehouse") + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/metadata"; + dataPath = originalProps.get("iceberg.catalog.warehouse") + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/data"; } else { - metadataPath = - tableConfig.getOrDefault("write.metadata.path", defaultMetadataPath(tableName)); + metadataPath = tableConfig.getOrDefault("write.metadata.path", defaultMetadataPath(tableName)); dataPath = tableConfig.getOrDefault("write.data.path", defaultDataPath(tableName)); } } - return new TableSinkConfig( - routeRegex, idColumns, partitionBy, commitBranch, dataPath, metadataPath); + return new TableSinkConfig(routeRegex, idColumns, partitionBy, commitBranch, dataPath, metadataPath); }); } @@ -424,26 +412,12 @@ static List stringToList(String value, String regex) { private String defaultDataPath(String tableName) { TableIdentifier tableIdentifier = TableIdentifier.parse(tableName); - return getString(DYNAMIC_ROUTE_DATA_METADATA_PREFIX) - + "/" - + connectorName() - + "/" - + tableIdentifier.namespace() - + "/" - + tableIdentifier.name() - + "/data"; + return getString(DYNAMIC_ROUTE_DATA_METADATA_PREFIX) + "/" + connectorName() + "/" + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/data"; } private String defaultMetadataPath(String tableName) { TableIdentifier tableIdentifier = TableIdentifier.parse(tableName); - return getString(DYNAMIC_ROUTE_DATA_METADATA_PREFIX) - + "/" - + connectorName() - + "/" - + tableIdentifier.namespace() - + "/" - + tableIdentifier.name() - + "/metadata"; + return getString(DYNAMIC_ROUTE_DATA_METADATA_PREFIX) + "/" + connectorName() + "/" + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/metadata"; } public String controlTopic() { diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java index 36879bf6929a..28fdd4f36d15 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java @@ -30,12 +30,7 @@ public class TableSinkConfig { private final String dataPath, metadataPath; public TableSinkConfig( - Pattern routeRegex, - List idColumns, - List partitionBy, - String commitBranch, - String dataPath, - String metadataPath) { + Pattern routeRegex, List idColumns, List partitionBy, String commitBranch, String dataPath, String metadataPath) { this.routeRegex = routeRegex; this.idColumns = idColumns; this.partitionBy = partitionBy; diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java index df87e8d2a958..5de1039e77b0 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java @@ -94,8 +94,7 @@ Table autoCreateTable(String tableName, SinkRecord sample) { Map tableAutoCreateProps = config.autoCreateProps(); if (config.dynamicTablesEnabled()) { - tableAutoCreateProps.put( - "write.metadata.path", config.tableConfig(tableName).getMetadataPath()); + tableAutoCreateProps.put("write.metadata.path", config.tableConfig(tableName).getMetadataPath()); tableAutoCreateProps.put("write.data.path", config.tableConfig(tableName).getDataPath()); } @@ -119,7 +118,8 @@ Table autoCreateTable(String tableName, SinkRecord sample) { notUsed -> { try { result.set( - catalog.createTable(identifier, schema, partitionSpec, tableAutoCreateProps)); + catalog.createTable( + identifier, schema, partitionSpec, tableAutoCreateProps)); } catch (AlreadyExistsException e) { result.set(catalog.loadTable(identifier)); } From 8398e4c2675f5e319850c6fe1555007c153ec143 Mon Sep 17 00:00:00 2001 From: Pritam Kumar Mishra Date: Sat, 9 Aug 2025 17:54:36 +0530 Subject: [PATCH 4/7] Revert "added metadat and data path in case of dynamic routing" This reverts commit c0a2665a60ab53feac6a1e47214da910b13f9701. --- committer/build.gradle | 36 -- .../java/org/apache/iceberg/Coordinator.java | 315 ------------------ .../main/java/org/apache/iceberg/Main.java | 7 - .../iceberg/connect/IcebergSinkConfig.java | 34 +- .../iceberg/connect/TableSinkConfig.java | 13 +- .../connect/data/IcebergWriterFactory.java | 11 +- 6 files changed, 3 insertions(+), 413 deletions(-) delete mode 100644 committer/build.gradle delete mode 100644 committer/src/main/java/org/apache/iceberg/Coordinator.java delete mode 100644 committer/src/main/java/org/apache/iceberg/Main.java diff --git a/committer/build.gradle b/committer/build.gradle deleted file mode 100644 index 0de61912d850..000000000000 --- a/committer/build.gradle +++ /dev/null @@ -1,36 +0,0 @@ -plugins { - id 'java-library' -} - -group = 'org.apache.iceberg' -version = '1.7.0-SNAPSHOT' - -repositories { - mavenCentral() -} - -dependencies { - api project(':iceberg-api') - implementation project(':iceberg-kafka-connect') - implementation project(':iceberg-core') - implementation project(':iceberg-common') - implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow') - implementation project(':iceberg-data') - implementation project(':iceberg-kafka-connect:iceberg-kafka-connect-events') - implementation platform(libs.jackson.bom) - implementation libs.jackson.core - implementation libs.jackson.databind - implementation libs.avro.avro - - compileOnly libs.kafka.clients - compileOnly libs.kafka.connect.api - compileOnly libs.kafka.connect.json - - testImplementation libs.hadoop3.client - testRuntimeOnly project(':iceberg-parquet') - testRuntimeOnly project(':iceberg-orc') -} - -test { - useJUnitPlatform() -} diff --git a/committer/src/main/java/org/apache/iceberg/Coordinator.java b/committer/src/main/java/org/apache/iceberg/Coordinator.java deleted file mode 100644 index 6a260d91bb80..000000000000 --- a/committer/src/main/java/org/apache/iceberg/Coordinator.java +++ /dev/null @@ -1,315 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg; - -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import java.io.IOException; -import java.io.UncheckedIOException; -import java.nio.channels.Channel; -import java.time.Duration; -import java.time.OffsetDateTime; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import java.util.function.Predicate; -import java.util.stream.Collectors; -import org.apache.iceberg.catalog.Catalog; -import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.connect.IcebergSinkConfig; -import org.apache.iceberg.connect.channel.CommitState; -import org.apache.iceberg.connect.events.CommitComplete; -import org.apache.iceberg.connect.events.CommitToTable; -import org.apache.iceberg.connect.events.DataWritten; -import org.apache.iceberg.connect.events.Event; -import org.apache.iceberg.connect.events.StartCommit; -import org.apache.iceberg.connect.events.TableReference; -import org.apache.iceberg.exceptions.NoSuchTableException; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.util.Tasks; -import org.apache.iceberg.util.ThreadPools; -import org.apache.kafka.clients.admin.MemberDescription; -import org.apache.kafka.connect.errors.ConnectException; -import org.apache.kafka.connect.sink.SinkTaskContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -class Coordinator { - - private static final Logger LOG = LoggerFactory.getLogger(Coordinator.class); - private static final ObjectMapper MAPPER = new ObjectMapper(); - private static final String COMMIT_ID_SNAPSHOT_PROP = "kafka.connect.commit-id"; - private static final String VALID_THROUGH_TS_SNAPSHOT_PROP = "kafka.connect.valid-through-ts"; - private static final Duration POLL_DURATION = Duration.ofSeconds(1); - - private final Catalog catalog; - private final IcebergSinkConfig config; - private final int totalPartitionCount; - private final String snapshotOffsetsProp; - private final ExecutorService exec; - private final CommitState commitState; - private volatile boolean terminated; - - Coordinator( - Catalog catalog, - IcebergSinkConfig config, - Collection members, - KafkaClientFactory clientFactory, - SinkTaskContext context) { - // pass consumer group ID to which we commit low watermark offsets - super("coordinator", config.connectGroupId() + "-coord", config, clientFactory, context); - - this.catalog = catalog; - this.config = config; - this.totalPartitionCount = - members.stream().mapToInt(desc -> desc.assignment().topicPartitions().size()).sum(); - this.snapshotOffsetsProp = - String.format( - "kafka.connect.offsets.%s.%s", config.controlTopic(), config.connectGroupId()); - this.exec = ThreadPools.newFixedThreadPool("iceberg-committer", config.commitThreads()); - this.commitState = new CommitState(config); - } - - void process() { - if (commitState.isCommitIntervalReached()) { - // send out begin commit - commitState.startNewCommit(); - Event event = - new Event(config.connectGroupId(), new StartCommit(commitState.currentCommitId())); - send(event); - LOG.info("Commit {} initiated", commitState.currentCommitId()); - } - - consumeAvailable(POLL_DURATION); - - if (commitState.isCommitTimedOut()) { - commit(true); - } - } - - @Override - protected boolean receive(Envelope envelope) { - switch (envelope.event().payload().type()) { - case DATA_WRITTEN: - commitState.addResponse(envelope); - return true; - case DATA_COMPLETE: - commitState.addReady(envelope); - if (commitState.isCommitReady(totalPartitionCount)) { - commit(false); - } - return true; - } - return false; - } - - private void commit(boolean partialCommit) { - try { - doCommit(partialCommit); - } catch (Exception e) { - LOG.warn("Commit failed, will try again next cycle", e); - } finally { - commitState.endCurrentCommit(); - } - } - - private void doCommit(boolean partialCommit) { - Map> commitMap = commitState.tableCommitMap(); - - String offsetsJson = offsetsJson(); - OffsetDateTime validThroughTs = commitState.validThroughTs(partialCommit); - - Tasks.foreach(commitMap.entrySet()) - .executeWith(exec) - .stopOnFailure() - .run( - entry -> { - commitToTable(entry.getKey(), entry.getValue(), offsetsJson, validThroughTs); - }); - - // we should only get here if all tables committed successfully... - commitConsumerOffsets(); - commitState.clearResponses(); - - Event event = - new Event( - config.connectGroupId(), - new CommitComplete(commitState.currentCommitId(), validThroughTs)); - send(event); - - LOG.info( - "Commit {} complete, committed to {} table(s), valid-through {}", - commitState.currentCommitId(), - commitMap.size(), - validThroughTs); - } - - private String offsetsJson() { - try { - return MAPPER.writeValueAsString(controlTopicOffsets()); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - - private void commitToTable( - TableReference tableReference, - List envelopeList, - String offsetsJson, - OffsetDateTime validThroughTs) { - TableIdentifier tableIdentifier = tableReference.identifier(); - Table table; - try { - table = catalog.loadTable(tableIdentifier); - } catch (NoSuchTableException e) { - LOG.warn("Table not found, skipping commit: {}", tableIdentifier, e); - return; - } - - String branch = config.tableConfig(tableIdentifier.toString()).commitBranch(); - - Map committedOffsets = lastCommittedOffsetsForTable(table, branch); - - List payloads = - envelopeList.stream() - .filter( - envelope -> { - Long minOffset = committedOffsets.get(envelope.partition()); - return minOffset == null || envelope.offset() >= minOffset; - }) - .map(envelope -> (DataWritten) envelope.event().payload()) - .collect(Collectors.toList()); - - List dataFiles = - payloads.stream() - .filter(payload -> payload.dataFiles() != null) - .flatMap(payload -> payload.dataFiles().stream()) - .filter(dataFile -> dataFile.recordCount() > 0) - .filter(distinctByKey(ContentFile::location)) - .collect(Collectors.toList()); - - List deleteFiles = - payloads.stream() - .filter(payload -> payload.deleteFiles() != null) - .flatMap(payload -> payload.deleteFiles().stream()) - .filter(deleteFile -> deleteFile.recordCount() > 0) - .filter(distinctByKey(ContentFile::location)) - .collect(Collectors.toList()); - - if (terminated) { - throw new ConnectException("Coordinator is terminated, commit aborted"); - } - - if (dataFiles.isEmpty() && deleteFiles.isEmpty()) { - LOG.info("Nothing to commit to table {}, skipping", tableIdentifier); - } else { - if (deleteFiles.isEmpty()) { - AppendFiles appendOp = table.newAppend(); - if (branch != null) { - appendOp.toBranch(branch); - } - appendOp.set(snapshotOffsetsProp, offsetsJson); - appendOp.set(COMMIT_ID_SNAPSHOT_PROP, commitState.currentCommitId().toString()); - if (validThroughTs != null) { - appendOp.set(VALID_THROUGH_TS_SNAPSHOT_PROP, validThroughTs.toString()); - } - dataFiles.forEach(appendOp::appendFile); - appendOp.commit(); - } else { - RowDelta deltaOp = table.newRowDelta(); - if (branch != null) { - deltaOp.toBranch(branch); - } - deltaOp.set(snapshotOffsetsProp, offsetsJson); - deltaOp.set(COMMIT_ID_SNAPSHOT_PROP, commitState.currentCommitId().toString()); - if (validThroughTs != null) { - deltaOp.set(VALID_THROUGH_TS_SNAPSHOT_PROP, validThroughTs.toString()); - } - dataFiles.forEach(deltaOp::addRows); - deleteFiles.forEach(deltaOp::addDeletes); - deltaOp.commit(); - } - - Long snapshotId = latestSnapshot(table, branch).snapshotId(); - Event event = - new Event( - config.connectGroupId(), - new CommitToTable( - commitState.currentCommitId(), tableReference, snapshotId, validThroughTs)); - send(event); - - LOG.info( - "Commit complete to table {}, snapshot {}, commit ID {}, valid-through {}", - tableIdentifier, - snapshotId, - commitState.currentCommitId(), - validThroughTs); - } - } - - private Predicate distinctByKey(Function keyExtractor) { - Map seen = Maps.newConcurrentMap(); - return t -> seen.putIfAbsent(keyExtractor.apply(t), Boolean.TRUE) == null; - } - - private Snapshot latestSnapshot(Table table, String branch) { - if (branch == null) { - return table.currentSnapshot(); - } - return table.snapshot(branch); - } - - private Map lastCommittedOffsetsForTable(Table table, String branch) { - Snapshot snapshot = latestSnapshot(table, branch); - while (snapshot != null) { - Map summary = snapshot.summary(); - String value = summary.get(snapshotOffsetsProp); - if (value != null) { - TypeReference> typeRef = new TypeReference>() {}; - try { - return MAPPER.readValue(value, typeRef); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - Long parentSnapshotId = snapshot.parentId(); - snapshot = parentSnapshotId != null ? table.snapshot(parentSnapshotId) : null; - } - return ImmutableMap.of(); - } - - void terminate() { - this.terminated = true; - - exec.shutdownNow(); - - // wait for coordinator termination, else cause the sink task to fail - try { - if (!exec.awaitTermination(1, TimeUnit.MINUTES)) { - throw new ConnectException("Timed out waiting for coordinator shutdown"); - } - } catch (InterruptedException e) { - throw new ConnectException("Interrupted while waiting for coordinator shutdown", e); - } - } -} diff --git a/committer/src/main/java/org/apache/iceberg/Main.java b/committer/src/main/java/org/apache/iceberg/Main.java deleted file mode 100644 index 3a6b1ae495ae..000000000000 --- a/committer/src/main/java/org/apache/iceberg/Main.java +++ /dev/null @@ -1,7 +0,0 @@ -package org.apache.iceberg; - -public class Main { - public static void main(String[] args) { - System.out.println("Hello world!"); - } -} \ No newline at end of file diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java index f6dd7cf3c0a6..9650ce16270c 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java @@ -28,7 +28,6 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; import org.apache.iceberg.IcebergBuild; -import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Splitter; @@ -57,7 +56,6 @@ public class IcebergSinkConfig extends AbstractConfig { private static final String ID_COLUMNS = "id-columns"; private static final String PARTITION_BY = "partition-by"; private static final String COMMIT_BRANCH = "commit-branch"; - private static final String DYNAMIC_ROUTE_DATA_METADATA_PREFIX = "iceberg.dynamic-route-data-metadata-prefix"; private static final String CATALOG_PROP_PREFIX = "iceberg.catalog."; private static final String HADOOP_PROP_PREFIX = "iceberg.hadoop."; @@ -237,13 +235,6 @@ private static ConfigDef newConfigDef() { 120000L, Importance.LOW, "config to control coordinator executor keep alive time"); - configDef.define( - DYNAMIC_ROUTE_DATA_METADATA_PREFIX, - ConfigDef.Type.STRING, - "", - Importance.HIGH, - "prefix for creation of metadata path and data path in case of dynamic routing" - ); return configDef; } @@ -384,20 +375,7 @@ public TableSinkConfig tableConfig(String tableName) { String commitBranch = tableConfig.getOrDefault(COMMIT_BRANCH, tablesDefaultCommitBranch()); - String metadataPath = "", dataPath = ""; - - if (dynamicTablesEnabled()) { - TableIdentifier tableIdentifier = TableIdentifier.parse(tableName); - if (originalProps.containsKey("iceberg.catalog.warehouse")) { - metadataPath = originalProps.get("iceberg.catalog.warehouse") + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/metadata"; - dataPath = originalProps.get("iceberg.catalog.warehouse") + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/data"; - } else { - metadataPath = tableConfig.getOrDefault("write.metadata.path", defaultMetadataPath(tableName)); - dataPath = tableConfig.getOrDefault("write.data.path", defaultDataPath(tableName)); - } - } - - return new TableSinkConfig(routeRegex, idColumns, partitionBy, commitBranch, dataPath, metadataPath); + return new TableSinkConfig(routeRegex, idColumns, partitionBy, commitBranch); }); } @@ -410,16 +388,6 @@ static List stringToList(String value, String regex) { return Arrays.stream(value.split(regex)).map(String::trim).collect(Collectors.toList()); } - private String defaultDataPath(String tableName) { - TableIdentifier tableIdentifier = TableIdentifier.parse(tableName); - return getString(DYNAMIC_ROUTE_DATA_METADATA_PREFIX) + "/" + connectorName() + "/" + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/data"; - } - - private String defaultMetadataPath(String tableName) { - TableIdentifier tableIdentifier = TableIdentifier.parse(tableName); - return getString(DYNAMIC_ROUTE_DATA_METADATA_PREFIX) + "/" + connectorName() + "/" + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/metadata"; - } - public String controlTopic() { return getString(CONTROL_TOPIC_PROP); } diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java index 28fdd4f36d15..0ecde1f7dd0b 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java @@ -27,24 +27,13 @@ public class TableSinkConfig { private final List idColumns; private final List partitionBy; private final String commitBranch; - private final String dataPath, metadataPath; public TableSinkConfig( - Pattern routeRegex, List idColumns, List partitionBy, String commitBranch, String dataPath, String metadataPath) { + Pattern routeRegex, List idColumns, List partitionBy, String commitBranch) { this.routeRegex = routeRegex; this.idColumns = idColumns; this.partitionBy = partitionBy; this.commitBranch = commitBranch; - this.dataPath = dataPath; - this.metadataPath = metadataPath; - } - - public String getDataPath() { - return dataPath; - } - - public String getMetadataPath() { - return metadataPath; } public Pattern routeRegex() { diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java index 5de1039e77b0..92f5af2d7a87 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java @@ -20,7 +20,6 @@ import java.util.Arrays; import java.util.List; -import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; @@ -90,14 +89,6 @@ Table autoCreateTable(String tableName, SinkRecord sample) { createNamespaceIfNotExist(catalog, identifier.namespace()); List partitionBy = config.tableConfig(tableName).partitionBy(); - - Map tableAutoCreateProps = config.autoCreateProps(); - - if (config.dynamicTablesEnabled()) { - tableAutoCreateProps.put("write.metadata.path", config.tableConfig(tableName).getMetadataPath()); - tableAutoCreateProps.put("write.data.path", config.tableConfig(tableName).getDataPath()); - } - PartitionSpec spec; try { spec = SchemaUtils.createPartitionSpec(schema, partitionBy); @@ -119,7 +110,7 @@ Table autoCreateTable(String tableName, SinkRecord sample) { try { result.set( catalog.createTable( - identifier, schema, partitionSpec, tableAutoCreateProps)); + identifier, schema, partitionSpec, config.autoCreateProps())); } catch (AlreadyExistsException e) { result.set(catalog.loadTable(identifier)); } From 166e7750b17e889774e0c29642f652f22d6bf567 Mon Sep 17 00:00:00 2001 From: Pritam Kumar Mishra Date: Thu, 20 Nov 2025 13:54:41 +0530 Subject: [PATCH 5/7] committer should not fail if not topic partition is assigned and should wait for the kafka connect to give a proper and valid call --- .../connect/channel/CommitterImpl.java | 79 ++++++++++++------- .../iceberg/connect/channel/Coordinator.java | 26 +++--- 2 files changed, 66 insertions(+), 39 deletions(-) diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java index 04602a66a5e1..cae757332656 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java @@ -30,7 +30,6 @@ import org.apache.kafka.clients.admin.ConsumerGroupDescription; import org.apache.kafka.clients.admin.MemberDescription; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTaskContext; import org.slf4j.Logger; @@ -48,6 +47,7 @@ public class CommitterImpl implements Committer { private KafkaClientFactory clientFactory; private Collection membersWhenWorkerIsCoordinator; private final AtomicBoolean isInitialized = new AtomicBoolean(false); + private String identifier; private void initialize( Catalog icebergCatalog, @@ -58,6 +58,7 @@ private void initialize( this.config = icebergSinkConfig; this.context = sinkTaskContext; this.clientFactory = new KafkaClientFactory(config.kafkaProps()); + this.identifier = config.connectorName() + "-" + config.taskId(); } } @@ -92,16 +93,44 @@ boolean hasLeaderPartition(Collection currentAssignedPartitions) @VisibleForTesting boolean containsFirstPartition( Collection members, Collection partitions) { - // there should only be one task assigned partition 0 of the first topic, - // so elect that one the leader - TopicPartition firstTopicPartition = - members.stream() - .flatMap(member -> member.assignment().topicPartitions().stream()) - .min(new TopicPartitionComparator()) - .orElseThrow( - () -> new ConnectException("No partitions assigned, cannot determine leader")); - - return partitions.contains(firstTopicPartition); + // Determine the first partition across all members to elect the leader + TopicPartition firstTopicPartition = findFirstTopicPartition(members); + + if (firstTopicPartition == null) { + LOG.warn( + "Committer {} found no partitions assigned across all members, cannot determine leader", + identifier); + return false; + } + + boolean containsFirst = partitions.contains(firstTopicPartition); + if (containsFirst) { + LOG.info( + "Committer {} contains the first partition {}, this task is the leader", + identifier, + firstTopicPartition); + } else { + LOG.debug( + "Committer {} does not contain the first partition {}, not the leader", + identifier, + firstTopicPartition); + } + + return containsFirst; + } + + /** + * Finds the first (minimum) topic partition across all consumer group members. + * + * @param members the collection of consumer group members + * @return the first topic partition, or null if no partitions are assigned + */ + @VisibleForTesting + TopicPartition findFirstTopicPartition(Collection members) { + return members.stream() + .flatMap(member -> member.assignment().topicPartitions().stream()) + .min(new TopicPartitionComparator()) + .orElse(null); } @Override @@ -122,7 +151,7 @@ public void open( Collection addedPartitions) { initialize(icebergCatalog, icebergSinkConfig, sinkTaskContext); if (hasLeaderPartition(addedPartitions)) { - LOG.info("Committer received leader partition. Starting Coordinator."); + LOG.info("Committer {} received leader partition. Starting Coordinator.", identifier); startCoordinator(); } } @@ -141,31 +170,26 @@ public void close(Collection closedPartitions) { // Defensive: close called without prior initialization (should not happen). if (!isInitialized.get()) { - LOG.warn("Close unexpectedly called without partition assignment"); + LOG.warn( + "Close unexpectedly called on committer {} without partition assignment", identifier); return; } // Empty partitions → task was stopped explicitly. Stop coordinator if running. if (closedPartitions.isEmpty()) { - LOG.info("Task stopped. Closing coordinator."); + LOG.info("Committer {} stopped. Closing coordinator.", identifier); stopCoordinator(); return; } // Normal close: if leader partition is lost, stop coordinator. if (hasLeaderPartition(closedPartitions)) { - LOG.info( - "Committer {}-{} lost leader partition. Stopping coordinator.", - config.connectorName(), - config.taskId()); + LOG.info("Committer {} lost leader partition. Stopping coordinator.", identifier); stopCoordinator(); } // Reset offsets to last committed to avoid data loss. - LOG.info( - "Seeking to last committed offsets for worker {}-{}.", - config.connectorName(), - config.taskId()); + LOG.info("Seeking to last committed offsets for worker {}.", identifier); KafkaUtils.seekToLastCommittedOffsets(context); } @@ -181,9 +205,7 @@ public void save(Collection sinkRecords) { private void processControlEvents() { if (coordinatorThread != null && coordinatorThread.isTerminated()) { throw new NotRunningException( - String.format( - "Coordinator unexpectedly terminated on committer %s-%s", - config.connectorName(), config.taskId())); + String.format("Coordinator unexpectedly terminated on committer %s", identifier)); } if (worker != null) { worker.process(); @@ -192,7 +214,7 @@ private void processControlEvents() { private void startWorker() { if (null == this.worker) { - LOG.info("Starting commit worker {}-{}", config.connectorName(), config.taskId()); + LOG.info("Starting commit worker {}", identifier); SinkWriter sinkWriter = new SinkWriter(catalog, config); worker = new Worker(config, clientFactory, sinkWriter, context); worker.start(); @@ -201,10 +223,7 @@ private void startWorker() { private void startCoordinator() { if (null == this.coordinatorThread) { - LOG.info( - "Task {}-{} elected leader, starting commit coordinator", - config.connectorName(), - config.taskId()); + LOG.info("Task {} elected leader, starting commit coordinator", identifier); Coordinator coordinator = new Coordinator(catalog, config, membersWhenWorkerIsCoordinator, clientFactory, context); coordinatorThread = new CoordinatorThread(coordinator); diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java index 5c516311bd4c..f37ccedc6e9f 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java @@ -70,7 +70,7 @@ class Coordinator extends Channel { private static final Logger LOG = LoggerFactory.getLogger(Coordinator.class); private static final ObjectMapper MAPPER = new ObjectMapper(); private static final String COMMIT_ID_SNAPSHOT_PROP = "kafka.connect.commit-id"; - private static final String TASK_ID_SNAPSHOT_PROP = "kafka.connect.task-id"; + private static final String COORDINATOR_ID_SNAPSHOT_PROP = "kafka.connect.coordinator-id"; private static final String VALID_THROUGH_TS_SNAPSHOT_PROP = "kafka.connect.valid-through-ts"; private static final Duration POLL_DURATION = Duration.ofSeconds(1); @@ -81,6 +81,7 @@ class Coordinator extends Channel { private final ExecutorService exec; private final CommitState commitState; private volatile boolean terminated; + private final String coordinatorId; Coordinator( Catalog catalog, @@ -110,6 +111,7 @@ class Coordinator extends Channel { .setNameFormat("iceberg-committer" + "-%d") .build()); this.commitState = new CommitState(config); + this.coordinatorId = config.connectorName() + "-" + config.taskId(); } void process() { @@ -119,7 +121,7 @@ void process() { Event event = new Event(config.connectGroupId(), new StartCommit(commitState.currentCommitId())); send(event); - LOG.info("Commit {} initiated", commitState.currentCommitId()); + LOG.info("Coordinator {} initiated commit {}", coordinatorId, commitState.currentCommitId()); } consumeAvailable(POLL_DURATION); @@ -149,7 +151,11 @@ private void commit(boolean partialCommit) { try { doCommit(partialCommit); } catch (Exception e) { - LOG.warn("Commit failed, will try again next cycle", e); + LOG.warn( + "Coordinator {} failed to commit for commit {}, will try again next cycle", + coordinatorId, + commitState.currentCommitId(), + e); } finally { commitState.endCurrentCommit(); } @@ -179,7 +185,8 @@ private void doCommit(boolean partialCommit) { send(event); LOG.info( - "Commit {} complete, committed to {} table(s), valid-through {}", + "Coordinator {} completed commit {}, committed to {} table(s), valid-through {}", + coordinatorId, commitState.currentCommitId(), commitMap.size(), validThroughTs); @@ -246,13 +253,13 @@ private void commitToTable( .collect(Collectors.toList()); if (terminated) { - throw new ConnectException("Coordinator is terminated, commit aborted"); + throw new ConnectException( + String.format("Coordinator %s is terminated, commit aborted", coordinatorId)); } if (dataFiles.isEmpty() && deleteFiles.isEmpty()) { LOG.info("Nothing to commit to table {}, skipping", tableIdentifier); } else { - String taskId = String.format("%s-%s", config.connectorName(), config.taskId()); if (deleteFiles.isEmpty()) { AppendFiles appendOp = table.newAppend().validateWith(offsetValidator(tableIdentifier, committedOffsets)); @@ -261,7 +268,7 @@ private void commitToTable( } appendOp.set(snapshotOffsetsProp, offsetsJson); appendOp.set(COMMIT_ID_SNAPSHOT_PROP, commitState.currentCommitId().toString()); - appendOp.set(TASK_ID_SNAPSHOT_PROP, taskId); + appendOp.set(COORDINATOR_ID_SNAPSHOT_PROP, coordinatorId); if (validThroughTs != null) { appendOp.set(VALID_THROUGH_TS_SNAPSHOT_PROP, validThroughTs.toString()); } @@ -275,7 +282,7 @@ private void commitToTable( } deltaOp.set(snapshotOffsetsProp, offsetsJson); deltaOp.set(COMMIT_ID_SNAPSHOT_PROP, commitState.currentCommitId().toString()); - deltaOp.set(TASK_ID_SNAPSHOT_PROP, taskId); + deltaOp.set(COORDINATOR_ID_SNAPSHOT_PROP, coordinatorId); if (validThroughTs != null) { deltaOp.set(VALID_THROUGH_TS_SNAPSHOT_PROP, validThroughTs.toString()); } @@ -293,7 +300,8 @@ private void commitToTable( send(event); LOG.info( - "Commit complete to table {}, snapshot {}, commit ID {}, valid-through {}", + "Coordinator {} completed commit to table {}, snapshot {}, commit ID {}, valid-through {}", + coordinatorId, tableIdentifier, snapshotId, commitState.currentCommitId(), From 5fbb0393c9dc76e2100af2e372fc5003c972461c Mon Sep 17 00:00:00 2001 From: Pritam Kumar Mishra Date: Fri, 21 Nov 2025 11:26:52 +0530 Subject: [PATCH 6/7] fixed integration tests --- .../java/org/apache/iceberg/connect/IntegrationTestBase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationTestBase.java b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationTestBase.java index 8b5b1ddea34b..4ec8d2f4ed01 100644 --- a/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationTestBase.java +++ b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationTestBase.java @@ -130,7 +130,7 @@ public boolean matches(String str) { } }); assertThat(props).containsKey("kafka.connect.commit-id"); - assertThat(props).containsKey("kafka.connect.task-id"); + assertThat(props).containsKey("kafka.connect.coordinator-id"); } protected List dataFiles(TableIdentifier tableIdentifier, String branch) { From 412cedf37ce1197e09930ec05fcf0961e559c22f Mon Sep 17 00:00:00 2001 From: kumarpritam863 <148938310+kumarpritam863@users.noreply.github.com> Date: Mon, 2 Feb 2026 11:39:11 +0530 Subject: [PATCH 7/7] Update kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../org/apache/iceberg/connect/channel/CommitterImpl.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java index cae757332656..4b8cef9b3db8 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java @@ -122,8 +122,13 @@ boolean containsFirstPartition( /** * Finds the first (minimum) topic partition across all consumer group members. * + *

The "first" partition is determined using {@link TopicPartitionComparator}, which orders + * {@link TopicPartition} instances lexicographically by topic name and, for equal topics, by + * ascending partition number. + * * @param members the collection of consumer group members - * @return the first topic partition, or null if no partitions are assigned + * @return the first topic partition according to {@link TopicPartitionComparator}, or null if no + * partitions are assigned */ @VisibleForTesting TopicPartition findFirstTopicPartition(Collection members) {