From 6e3325f348cbb45125c06832993a14d0138e07e9 Mon Sep 17 00:00:00 2001 From: Anupam Yadav Date: Sun, 17 May 2026 05:26:06 +0000 Subject: [PATCH 1/3] Kafka Connect: Surface commit failures instead of silently swallowing them Narrow the catch around doCommit() and rethrow on full-commit failures. Partial-commit failures (triggered by commit timeout) are logged at WARN and swallowed since the coordinator will retry on the next cycle. This ensures commit failures surface to operators by terminating the coordinator thread, which transitions the Connect task to FAILED. Fixes #15878 --- .../iceberg/connect/channel/Coordinator.java | 17 ++++--- .../connect/channel/TestCoordinator.java | 48 +++++++++++++++---- 2 files changed, 51 insertions(+), 14 deletions(-) diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java index c986f8afc2eb..4b46b941f4f6 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java @@ -150,12 +150,17 @@ protected boolean receive(Envelope envelope) { private void commit(boolean partialCommit) { try { doCommit(partialCommit); - } catch (Exception e) { - LOG.warn( - "Coordinator {} failed to commit for commit {}, will try again next cycle", - taskId, - commitState.currentCommitId(), - e); + } catch (RuntimeException e) { + if (partialCommit) { + LOG.warn( + "Partial commit {} failed for task {}, will retry", + commitState.currentCommitId(), + taskId, + e); + } else { + LOG.error("Commit {} failed for task {}", commitState.currentCommitId(), taskId, e); + throw e; + } } finally { commitState.endCurrentCommit(); } diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java index ed370fcdad35..0b5553e127ef 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java @@ -19,12 +19,16 @@ package org.apache.iceberg.connect.channel; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; import java.time.OffsetDateTime; import java.util.List; import java.util.UUID; +import org.apache.iceberg.AppendFiles; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.DataOperations; @@ -45,6 +49,8 @@ import org.apache.iceberg.connect.events.StartCommit; import org.apache.iceberg.connect.events.TableReference; import org.apache.iceberg.connect.events.TopicPartitionOffset; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types.StructType; @@ -135,14 +141,35 @@ public void testCommitError() { .withRecordCount(5) .build(); - coordinatorTest(ImmutableList.of(badDataFile), ImmutableList.of(), null); + assertThatThrownBy( + () -> coordinatorTest(ImmutableList.of(badDataFile), ImmutableList.of(), null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Cannot find partition spec"); - // no commit messages sent assertThat(producer.history()).hasSize(1); - assertThat(table.snapshots()).isEmpty(); } + @Test + public void testCommitFailedExceptionPropagates() { + Table spiedTable = spy(table); + AppendFiles spiedAppend = spy(table.newAppend()); + doThrow(new CommitFailedException("Glue detected concurrent update")) + .when(spiedAppend) + .commit(); + when(spiedTable.newAppend()).thenReturn(spiedAppend); + when(catalog.loadTable(TABLE_IDENTIFIER)).thenReturn(spiedTable); + + assertThatThrownBy( + () -> + coordinatorTest( + ImmutableList.of(EventTestUtil.createDataFile()), + ImmutableList.of(), + EventTestUtil.now())) + .isInstanceOf(CommitFailedException.class) + .hasMessageContaining("Glue detected concurrent update"); + } + private void assertCommitTable(int idx, UUID commitId, OffsetDateTime ts) { byte[] bytes = producer.history().get(idx).value(); Event commitTable = AvroUtil.decode(bytes); @@ -289,13 +316,18 @@ public void testCoordinatorCommittedOffsetValidation() { Snapshot firstSnapshot = table.currentSnapshot(); assertThat(firstSnapshot.summary()).containsEntry(OFFSETS_SNAPSHOT_PROP, "{\"0\":7}"); - // Trigger commit to the table - coordinatorTest( - ImmutableList.of(EventTestUtil.createDataFile()), ImmutableList.of(), EventTestUtil.now()); + // Trigger commit to the table - should throw ValidationException + assertThatThrownBy( + () -> + coordinatorTest( + ImmutableList.of(EventTestUtil.createDataFile()), + ImmutableList.of(), + EventTestUtil.now())) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("stale offsets"); - // Assert that the table was not updated and offsets remain table.refresh(); assertThat(table.snapshots()).hasSize(2); - assertThat(firstSnapshot.summary()).containsEntry(OFFSETS_SNAPSHOT_PROP, "{\"0\":7}"); + assertThat(table.currentSnapshot().summary()).containsEntry(OFFSETS_SNAPSHOT_PROP, "{\"0\":7}"); } } From ece6f34124416dd18f20d8b7b3934f953a6685a8 Mon Sep 17 00:00:00 2001 From: Anupam Yadav Date: Sun, 17 May 2026 20:15:30 +0000 Subject: [PATCH 2/3] Retrigger CI From 4a11304af00a4270b1157809ebdaa28433251c09 Mon Sep 17 00:00:00 2001 From: Anupam Yadav Date: Sun, 17 May 2026 20:21:01 +0000 Subject: [PATCH 3/3] Retrigger CI (attempt 2)