diff --git a/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationTestBase.java b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationTestBase.java index 8b5b1ddea34b..4ec8d2f4ed01 100644 --- a/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationTestBase.java +++ b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationTestBase.java @@ -130,7 +130,7 @@ public boolean matches(String str) { } }); assertThat(props).containsKey("kafka.connect.commit-id"); - assertThat(props).containsKey("kafka.connect.task-id"); + assertThat(props).containsKey("kafka.connect.coordinator-id"); } protected List dataFiles(TableIdentifier tableIdentifier, String branch) { diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java index 04602a66a5e1..4b8cef9b3db8 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java @@ -30,7 +30,6 @@ import org.apache.kafka.clients.admin.ConsumerGroupDescription; import org.apache.kafka.clients.admin.MemberDescription; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTaskContext; import org.slf4j.Logger; @@ -48,6 +47,7 @@ public class CommitterImpl implements Committer { private KafkaClientFactory clientFactory; private Collection membersWhenWorkerIsCoordinator; private final AtomicBoolean isInitialized = new AtomicBoolean(false); + private String identifier; private void initialize( Catalog icebergCatalog, @@ -58,6 +58,7 @@ private void initialize( this.config = icebergSinkConfig; this.context = sinkTaskContext; this.clientFactory = new KafkaClientFactory(config.kafkaProps()); + this.identifier = config.connectorName() + "-" + config.taskId(); } } @@ -92,16 +93,49 @@ boolean hasLeaderPartition(Collection currentAssignedPartitions) @VisibleForTesting boolean containsFirstPartition( Collection members, Collection partitions) { - // there should only be one task assigned partition 0 of the first topic, - // so elect that one the leader - TopicPartition firstTopicPartition = - members.stream() - .flatMap(member -> member.assignment().topicPartitions().stream()) - .min(new TopicPartitionComparator()) - .orElseThrow( - () -> new ConnectException("No partitions assigned, cannot determine leader")); - - return partitions.contains(firstTopicPartition); + // Determine the first partition across all members to elect the leader + TopicPartition firstTopicPartition = findFirstTopicPartition(members); + + if (firstTopicPartition == null) { + LOG.warn( + "Committer {} found no partitions assigned across all members, cannot determine leader", + identifier); + return false; + } + + boolean containsFirst = partitions.contains(firstTopicPartition); + if (containsFirst) { + LOG.info( + "Committer {} contains the first partition {}, this task is the leader", + identifier, + firstTopicPartition); + } else { + LOG.debug( + "Committer {} does not contain the first partition {}, not the leader", + identifier, + firstTopicPartition); + } + + return containsFirst; + } + + /** + * Finds the first (minimum) topic partition across all consumer group members. + * + *

The "first" partition is determined using {@link TopicPartitionComparator}, which orders + * {@link TopicPartition} instances lexicographically by topic name and, for equal topics, by + * ascending partition number. + * + * @param members the collection of consumer group members + * @return the first topic partition according to {@link TopicPartitionComparator}, or null if no + * partitions are assigned + */ + @VisibleForTesting + TopicPartition findFirstTopicPartition(Collection members) { + return members.stream() + .flatMap(member -> member.assignment().topicPartitions().stream()) + .min(new TopicPartitionComparator()) + .orElse(null); } @Override @@ -122,7 +156,7 @@ public void open( Collection addedPartitions) { initialize(icebergCatalog, icebergSinkConfig, sinkTaskContext); if (hasLeaderPartition(addedPartitions)) { - LOG.info("Committer received leader partition. Starting Coordinator."); + LOG.info("Committer {} received leader partition. Starting Coordinator.", identifier); startCoordinator(); } } @@ -141,31 +175,26 @@ public void close(Collection closedPartitions) { // Defensive: close called without prior initialization (should not happen). if (!isInitialized.get()) { - LOG.warn("Close unexpectedly called without partition assignment"); + LOG.warn( + "Close unexpectedly called on committer {} without partition assignment", identifier); return; } // Empty partitions → task was stopped explicitly. Stop coordinator if running. if (closedPartitions.isEmpty()) { - LOG.info("Task stopped. Closing coordinator."); + LOG.info("Committer {} stopped. Closing coordinator.", identifier); stopCoordinator(); return; } // Normal close: if leader partition is lost, stop coordinator. if (hasLeaderPartition(closedPartitions)) { - LOG.info( - "Committer {}-{} lost leader partition. Stopping coordinator.", - config.connectorName(), - config.taskId()); + LOG.info("Committer {} lost leader partition. Stopping coordinator.", identifier); stopCoordinator(); } // Reset offsets to last committed to avoid data loss. - LOG.info( - "Seeking to last committed offsets for worker {}-{}.", - config.connectorName(), - config.taskId()); + LOG.info("Seeking to last committed offsets for worker {}.", identifier); KafkaUtils.seekToLastCommittedOffsets(context); } @@ -181,9 +210,7 @@ public void save(Collection sinkRecords) { private void processControlEvents() { if (coordinatorThread != null && coordinatorThread.isTerminated()) { throw new NotRunningException( - String.format( - "Coordinator unexpectedly terminated on committer %s-%s", - config.connectorName(), config.taskId())); + String.format("Coordinator unexpectedly terminated on committer %s", identifier)); } if (worker != null) { worker.process(); @@ -192,7 +219,7 @@ private void processControlEvents() { private void startWorker() { if (null == this.worker) { - LOG.info("Starting commit worker {}-{}", config.connectorName(), config.taskId()); + LOG.info("Starting commit worker {}", identifier); SinkWriter sinkWriter = new SinkWriter(catalog, config); worker = new Worker(config, clientFactory, sinkWriter, context); worker.start(); @@ -201,10 +228,7 @@ private void startWorker() { private void startCoordinator() { if (null == this.coordinatorThread) { - LOG.info( - "Task {}-{} elected leader, starting commit coordinator", - config.connectorName(), - config.taskId()); + LOG.info("Task {} elected leader, starting commit coordinator", identifier); Coordinator coordinator = new Coordinator(catalog, config, membersWhenWorkerIsCoordinator, clientFactory, context); coordinatorThread = new CoordinatorThread(coordinator); diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java index 5c516311bd4c..f37ccedc6e9f 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java @@ -70,7 +70,7 @@ class Coordinator extends Channel { private static final Logger LOG = LoggerFactory.getLogger(Coordinator.class); private static final ObjectMapper MAPPER = new ObjectMapper(); private static final String COMMIT_ID_SNAPSHOT_PROP = "kafka.connect.commit-id"; - private static final String TASK_ID_SNAPSHOT_PROP = "kafka.connect.task-id"; + private static final String COORDINATOR_ID_SNAPSHOT_PROP = "kafka.connect.coordinator-id"; private static final String VALID_THROUGH_TS_SNAPSHOT_PROP = "kafka.connect.valid-through-ts"; private static final Duration POLL_DURATION = Duration.ofSeconds(1); @@ -81,6 +81,7 @@ class Coordinator extends Channel { private final ExecutorService exec; private final CommitState commitState; private volatile boolean terminated; + private final String coordinatorId; Coordinator( Catalog catalog, @@ -110,6 +111,7 @@ class Coordinator extends Channel { .setNameFormat("iceberg-committer" + "-%d") .build()); this.commitState = new CommitState(config); + this.coordinatorId = config.connectorName() + "-" + config.taskId(); } void process() { @@ -119,7 +121,7 @@ void process() { Event event = new Event(config.connectGroupId(), new StartCommit(commitState.currentCommitId())); send(event); - LOG.info("Commit {} initiated", commitState.currentCommitId()); + LOG.info("Coordinator {} initiated commit {}", coordinatorId, commitState.currentCommitId()); } consumeAvailable(POLL_DURATION); @@ -149,7 +151,11 @@ private void commit(boolean partialCommit) { try { doCommit(partialCommit); } catch (Exception e) { - LOG.warn("Commit failed, will try again next cycle", e); + LOG.warn( + "Coordinator {} failed to commit for commit {}, will try again next cycle", + coordinatorId, + commitState.currentCommitId(), + e); } finally { commitState.endCurrentCommit(); } @@ -179,7 +185,8 @@ private void doCommit(boolean partialCommit) { send(event); LOG.info( - "Commit {} complete, committed to {} table(s), valid-through {}", + "Coordinator {} completed commit {}, committed to {} table(s), valid-through {}", + coordinatorId, commitState.currentCommitId(), commitMap.size(), validThroughTs); @@ -246,13 +253,13 @@ private void commitToTable( .collect(Collectors.toList()); if (terminated) { - throw new ConnectException("Coordinator is terminated, commit aborted"); + throw new ConnectException( + String.format("Coordinator %s is terminated, commit aborted", coordinatorId)); } if (dataFiles.isEmpty() && deleteFiles.isEmpty()) { LOG.info("Nothing to commit to table {}, skipping", tableIdentifier); } else { - String taskId = String.format("%s-%s", config.connectorName(), config.taskId()); if (deleteFiles.isEmpty()) { AppendFiles appendOp = table.newAppend().validateWith(offsetValidator(tableIdentifier, committedOffsets)); @@ -261,7 +268,7 @@ private void commitToTable( } appendOp.set(snapshotOffsetsProp, offsetsJson); appendOp.set(COMMIT_ID_SNAPSHOT_PROP, commitState.currentCommitId().toString()); - appendOp.set(TASK_ID_SNAPSHOT_PROP, taskId); + appendOp.set(COORDINATOR_ID_SNAPSHOT_PROP, coordinatorId); if (validThroughTs != null) { appendOp.set(VALID_THROUGH_TS_SNAPSHOT_PROP, validThroughTs.toString()); } @@ -275,7 +282,7 @@ private void commitToTable( } deltaOp.set(snapshotOffsetsProp, offsetsJson); deltaOp.set(COMMIT_ID_SNAPSHOT_PROP, commitState.currentCommitId().toString()); - deltaOp.set(TASK_ID_SNAPSHOT_PROP, taskId); + deltaOp.set(COORDINATOR_ID_SNAPSHOT_PROP, coordinatorId); if (validThroughTs != null) { deltaOp.set(VALID_THROUGH_TS_SNAPSHOT_PROP, validThroughTs.toString()); } @@ -293,7 +300,8 @@ private void commitToTable( send(event); LOG.info( - "Commit complete to table {}, snapshot {}, commit ID {}, valid-through {}", + "Coordinator {} completed commit to table {}, snapshot {}, commit ID {}, valid-through {}", + coordinatorId, tableIdentifier, snapshotId, commitState.currentCommitId(),