-
Notifications
You must be signed in to change notification settings - Fork 3.2k
Kafka Connect: Surface commit failures instead of silently swallowing them #16237
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,12 +19,16 @@ | |
| package org.apache.iceberg.connect.channel; | ||
|
|
||
| import static org.assertj.core.api.Assertions.assertThat; | ||
| import static org.assertj.core.api.Assertions.assertThatThrownBy; | ||
| import static org.mockito.Mockito.doThrow; | ||
| import static org.mockito.Mockito.mock; | ||
| import static org.mockito.Mockito.spy; | ||
| import static org.mockito.Mockito.when; | ||
|
|
||
| import java.time.OffsetDateTime; | ||
| import java.util.List; | ||
| import java.util.UUID; | ||
| import org.apache.iceberg.AppendFiles; | ||
| import org.apache.iceberg.DataFile; | ||
| import org.apache.iceberg.DataFiles; | ||
| import org.apache.iceberg.DataOperations; | ||
|
|
@@ -45,6 +49,8 @@ | |
| import org.apache.iceberg.connect.events.StartCommit; | ||
| import org.apache.iceberg.connect.events.TableReference; | ||
| import org.apache.iceberg.connect.events.TopicPartitionOffset; | ||
| import org.apache.iceberg.exceptions.CommitFailedException; | ||
| import org.apache.iceberg.exceptions.ValidationException; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Lists; | ||
| import org.apache.iceberg.types.Types.StructType; | ||
|
|
@@ -135,12 +141,30 @@ public void testCommitError() { | |
| .withRecordCount(5) | ||
| .build(); | ||
|
|
||
| coordinatorTest(ImmutableList.of(badDataFile), ImmutableList.of(), null); | ||
|
|
||
| // no commit messages sent | ||
| assertThat(producer.history()).hasSize(1); | ||
| assertThatThrownBy( | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
In prod the flow is:
This test doesn’t cover that. It would still pass even if I think we need an end-to-end test that goes through |
||
| () -> coordinatorTest(ImmutableList.of(badDataFile), ImmutableList.of(), null)) | ||
| .isInstanceOf(IllegalArgumentException.class) | ||
| .hasMessageContaining("Cannot find partition spec"); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the prev-prev version of this test also asserted The test now only proves an exception is thrown, it would still pass if a future regression sent a Maybe keep side-effect guards: assertThatThrownBy(...)
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Cannot find partition spec");
assertThat(producer.history()).hasSize(1);
assertThat(table.snapshots()).isEmpty(); |
||
| } | ||
|
|
||
| assertThat(table.snapshots()).isEmpty(); | ||
| @Test | ||
| public void testCommitFailedExceptionPropagates() { | ||
| Table spiedTable = spy(table); | ||
| AppendFiles spiedAppend = spy(table.newAppend()); | ||
| doThrow(new CommitFailedException("Glue detected concurrent update")) | ||
| .when(spiedAppend) | ||
| .commit(); | ||
| when(spiedTable.newAppend()).thenReturn(spiedAppend); | ||
| when(catalog.loadTable(TABLE_IDENTIFIER)).thenReturn(spiedTable); | ||
|
|
||
| assertThatThrownBy( | ||
| () -> | ||
| coordinatorTest( | ||
| ImmutableList.of(EventTestUtil.createDataFile()), | ||
| ImmutableList.of(), | ||
| EventTestUtil.now())) | ||
| .isInstanceOf(CommitFailedException.class) | ||
| .hasMessageContaining("Glue detected concurrent update"); | ||
| } | ||
|
|
||
| private void assertCommitTable(int idx, UUID commitId, OffsetDateTime ts) { | ||
|
|
@@ -289,13 +313,14 @@ public void testCoordinatorCommittedOffsetValidation() { | |
| Snapshot firstSnapshot = table.currentSnapshot(); | ||
| assertThat(firstSnapshot.summary()).containsEntry(OFFSETS_SNAPSHOT_PROP, "{\"0\":7}"); | ||
|
|
||
| // Trigger commit to the table | ||
| coordinatorTest( | ||
| ImmutableList.of(EventTestUtil.createDataFile()), ImmutableList.of(), EventTestUtil.now()); | ||
|
|
||
| // Assert that the table was not updated and offsets remain | ||
| table.refresh(); | ||
| assertThat(table.snapshots()).hasSize(2); | ||
| assertThat(firstSnapshot.summary()).containsEntry(OFFSETS_SNAPSHOT_PROP, "{\"0\":7}"); | ||
| // Trigger commit to the table - should throw ValidationException | ||
| assertThatThrownBy( | ||
| () -> | ||
| coordinatorTest( | ||
| ImmutableList.of(EventTestUtil.createDataFile()), | ||
| ImmutableList.of(), | ||
| EventTestUtil.now())) | ||
| .isInstanceOf(ValidationException.class) | ||
| .hasMessageContaining("stale offsets"); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same shape as the comment above, stale-offset conflict the table was not mutated ( Those are the actual correctness guarantees of the optimistic-concurrency guard, and they're no longer checked. The exception-type assertion alone would pass even if a future regression committed the row delta before throwing. Lets keep the post-throw assertions: assertThatThrownBy(...)
.isInstanceOf(ValidationException.class)
.hasMessageContaining("stale offsets");
table.refresh();
assertThat(table.snapshots()).hasSize(2);
assertThat(table.currentSnapshot().summary())
.containsEntry(OFFSETS_SNAPSHOT_PROP, "{\"0\":7}"); |
||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two small things on this catch block:
SLF4J double-printing the message. Both LOG calls pass
e.getMessage()as a{}arg andeas the trailing throwable. SLF4J already prints the exception's message (and full stack) when the last arg is aThrowable, so the message ends up rendered twice in every log line.taskIdasymmetry. It's in the ERROR but dropped from the WARN. In a multi-task cluster you'd lose task identity for retried partial commits.Both fixed at once: