Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public boolean matches(String str) {
}
});
assertThat(props).containsKey("kafka.connect.commit-id");
assertThat(props).containsKey("kafka.connect.task-id");
assertThat(props).containsKey("kafka.connect.coordinator-id");
}

protected List<DataFile> dataFiles(TableIdentifier tableIdentifier, String branch) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.MemberDescription;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.slf4j.Logger;
Expand All @@ -48,6 +47,7 @@ public class CommitterImpl implements Committer {
private KafkaClientFactory clientFactory;
private Collection<MemberDescription> membersWhenWorkerIsCoordinator;
private final AtomicBoolean isInitialized = new AtomicBoolean(false);
private String identifier;
Comment thread
kumarpritam863 marked this conversation as resolved.

private void initialize(
Catalog icebergCatalog,
Expand All @@ -58,6 +58,7 @@ private void initialize(
this.config = icebergSinkConfig;
this.context = sinkTaskContext;
this.clientFactory = new KafkaClientFactory(config.kafkaProps());
this.identifier = config.connectorName() + "-" + config.taskId();
}
}

Expand Down Expand Up @@ -92,16 +93,49 @@ boolean hasLeaderPartition(Collection<TopicPartition> currentAssignedPartitions)
@VisibleForTesting
boolean containsFirstPartition(
Collection<MemberDescription> members, Collection<TopicPartition> partitions) {
// there should only be one task assigned partition 0 of the first topic,
// so elect that one the leader
TopicPartition firstTopicPartition =
members.stream()
.flatMap(member -> member.assignment().topicPartitions().stream())
.min(new TopicPartitionComparator())
.orElseThrow(
() -> new ConnectException("No partitions assigned, cannot determine leader"));

return partitions.contains(firstTopicPartition);
// Determine the first partition across all members to elect the leader
TopicPartition firstTopicPartition = findFirstTopicPartition(members);

if (firstTopicPartition == null) {
LOG.warn(
"Committer {} found no partitions assigned across all members, cannot determine leader",
identifier);
return false;
}

boolean containsFirst = partitions.contains(firstTopicPartition);
if (containsFirst) {
LOG.info(
"Committer {} contains the first partition {}, this task is the leader",
identifier,
firstTopicPartition);
} else {
LOG.debug(
"Committer {} does not contain the first partition {}, not the leader",
identifier,
firstTopicPartition);
}

return containsFirst;
}

/**
* Finds the first (minimum) topic partition across all consumer group members.
*
* <p>The "first" partition is determined using {@link TopicPartitionComparator}, which orders
* {@link TopicPartition} instances lexicographically by topic name and, for equal topics, by
* ascending partition number.
*
* @param members the collection of consumer group members
* @return the first topic partition according to {@link TopicPartitionComparator}, or null if no
* partitions are assigned
*/
@VisibleForTesting
TopicPartition findFirstTopicPartition(Collection<MemberDescription> members) {
return members.stream()
.flatMap(member -> member.assignment().topicPartitions().stream())
.min(new TopicPartitionComparator())
.orElse(null);
}

@Override
Expand All @@ -122,7 +156,7 @@ public void open(
Collection<TopicPartition> addedPartitions) {
initialize(icebergCatalog, icebergSinkConfig, sinkTaskContext);
if (hasLeaderPartition(addedPartitions)) {
LOG.info("Committer received leader partition. Starting Coordinator.");
LOG.info("Committer {} received leader partition. Starting Coordinator.", identifier);
startCoordinator();
}
}
Expand All @@ -141,31 +175,26 @@ public void close(Collection<TopicPartition> closedPartitions) {

// Defensive: close called without prior initialization (should not happen).
if (!isInitialized.get()) {
LOG.warn("Close unexpectedly called without partition assignment");
LOG.warn(
"Close unexpectedly called on committer {} without partition assignment", identifier);
return;
}

// Empty partitions → task was stopped explicitly. Stop coordinator if running.
if (closedPartitions.isEmpty()) {
LOG.info("Task stopped. Closing coordinator.");
LOG.info("Committer {} stopped. Closing coordinator.", identifier);
stopCoordinator();
return;
}

// Normal close: if leader partition is lost, stop coordinator.
if (hasLeaderPartition(closedPartitions)) {
LOG.info(
"Committer {}-{} lost leader partition. Stopping coordinator.",
config.connectorName(),
config.taskId());
LOG.info("Committer {} lost leader partition. Stopping coordinator.", identifier);
stopCoordinator();
}

// Reset offsets to last committed to avoid data loss.
LOG.info(
"Seeking to last committed offsets for worker {}-{}.",
config.connectorName(),
config.taskId());
LOG.info("Seeking to last committed offsets for worker {}.", identifier);
KafkaUtils.seekToLastCommittedOffsets(context);
}

Expand All @@ -181,9 +210,7 @@ public void save(Collection<SinkRecord> sinkRecords) {
private void processControlEvents() {
if (coordinatorThread != null && coordinatorThread.isTerminated()) {
throw new NotRunningException(
String.format(
"Coordinator unexpectedly terminated on committer %s-%s",
config.connectorName(), config.taskId()));
String.format("Coordinator unexpectedly terminated on committer %s", identifier));
}
if (worker != null) {
worker.process();
Expand All @@ -192,7 +219,7 @@ private void processControlEvents() {

private void startWorker() {
if (null == this.worker) {
LOG.info("Starting commit worker {}-{}", config.connectorName(), config.taskId());
LOG.info("Starting commit worker {}", identifier);
SinkWriter sinkWriter = new SinkWriter(catalog, config);
worker = new Worker(config, clientFactory, sinkWriter, context);
worker.start();
Expand All @@ -201,10 +228,7 @@ private void startWorker() {

private void startCoordinator() {
if (null == this.coordinatorThread) {
LOG.info(
"Task {}-{} elected leader, starting commit coordinator",
config.connectorName(),
config.taskId());
LOG.info("Task {} elected leader, starting commit coordinator", identifier);
Coordinator coordinator =
new Coordinator(catalog, config, membersWhenWorkerIsCoordinator, clientFactory, context);
coordinatorThread = new CoordinatorThread(coordinator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class Coordinator extends Channel {
private static final Logger LOG = LoggerFactory.getLogger(Coordinator.class);
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final String COMMIT_ID_SNAPSHOT_PROP = "kafka.connect.commit-id";
private static final String TASK_ID_SNAPSHOT_PROP = "kafka.connect.task-id";
private static final String COORDINATOR_ID_SNAPSHOT_PROP = "kafka.connect.coordinator-id";
private static final String VALID_THROUGH_TS_SNAPSHOT_PROP = "kafka.connect.valid-through-ts";
private static final Duration POLL_DURATION = Duration.ofSeconds(1);

Expand All @@ -81,6 +81,7 @@ class Coordinator extends Channel {
private final ExecutorService exec;
private final CommitState commitState;
private volatile boolean terminated;
private final String coordinatorId;

Coordinator(
Catalog catalog,
Expand Down Expand Up @@ -110,6 +111,7 @@ class Coordinator extends Channel {
.setNameFormat("iceberg-committer" + "-%d")
.build());
this.commitState = new CommitState(config);
this.coordinatorId = config.connectorName() + "-" + config.taskId();
}

void process() {
Expand All @@ -119,7 +121,7 @@ void process() {
Event event =
new Event(config.connectGroupId(), new StartCommit(commitState.currentCommitId()));
send(event);
LOG.info("Commit {} initiated", commitState.currentCommitId());
LOG.info("Coordinator {} initiated commit {}", coordinatorId, commitState.currentCommitId());
}

consumeAvailable(POLL_DURATION);
Expand Down Expand Up @@ -149,7 +151,11 @@ private void commit(boolean partialCommit) {
try {
doCommit(partialCommit);
} catch (Exception e) {
LOG.warn("Commit failed, will try again next cycle", e);
LOG.warn(
"Coordinator {} failed to commit for commit {}, will try again next cycle",
coordinatorId,
commitState.currentCommitId(),
e);
} finally {
commitState.endCurrentCommit();
}
Expand Down Expand Up @@ -179,7 +185,8 @@ private void doCommit(boolean partialCommit) {
send(event);

LOG.info(
"Commit {} complete, committed to {} table(s), valid-through {}",
"Coordinator {} completed commit {}, committed to {} table(s), valid-through {}",
coordinatorId,
commitState.currentCommitId(),
commitMap.size(),
validThroughTs);
Expand Down Expand Up @@ -246,13 +253,13 @@ private void commitToTable(
.collect(Collectors.toList());

if (terminated) {
throw new ConnectException("Coordinator is terminated, commit aborted");
throw new ConnectException(
String.format("Coordinator %s is terminated, commit aborted", coordinatorId));
}

if (dataFiles.isEmpty() && deleteFiles.isEmpty()) {
LOG.info("Nothing to commit to table {}, skipping", tableIdentifier);
} else {
String taskId = String.format("%s-%s", config.connectorName(), config.taskId());
if (deleteFiles.isEmpty()) {
AppendFiles appendOp =
table.newAppend().validateWith(offsetValidator(tableIdentifier, committedOffsets));
Expand All @@ -261,7 +268,7 @@ private void commitToTable(
}
appendOp.set(snapshotOffsetsProp, offsetsJson);
appendOp.set(COMMIT_ID_SNAPSHOT_PROP, commitState.currentCommitId().toString());
appendOp.set(TASK_ID_SNAPSHOT_PROP, taskId);
appendOp.set(COORDINATOR_ID_SNAPSHOT_PROP, coordinatorId);
if (validThroughTs != null) {
appendOp.set(VALID_THROUGH_TS_SNAPSHOT_PROP, validThroughTs.toString());
}
Expand All @@ -275,7 +282,7 @@ private void commitToTable(
}
deltaOp.set(snapshotOffsetsProp, offsetsJson);
deltaOp.set(COMMIT_ID_SNAPSHOT_PROP, commitState.currentCommitId().toString());
deltaOp.set(TASK_ID_SNAPSHOT_PROP, taskId);
deltaOp.set(COORDINATOR_ID_SNAPSHOT_PROP, coordinatorId);
if (validThroughTs != null) {
deltaOp.set(VALID_THROUGH_TS_SNAPSHOT_PROP, validThroughTs.toString());
}
Expand All @@ -293,7 +300,8 @@ private void commitToTable(
send(event);

LOG.info(
"Commit complete to table {}, snapshot {}, commit ID {}, valid-through {}",
"Coordinator {} completed commit to table {}, snapshot {}, commit ID {}, valid-through {}",
coordinatorId,
tableIdentifier,
snapshotId,
commitState.currentCommitId(),
Expand Down
Loading