Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,22 @@ protected boolean receive(Envelope envelope) {
private void commit(boolean partialCommit) {
try {
doCommit(partialCommit);
} catch (Exception e) {
LOG.warn(
"Coordinator {} failed to commit for commit {}, will try again next cycle",
taskId,
commitState.currentCommitId(),
e);
} catch (RuntimeException e) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two small things on this catch block:

SLF4J double-printing the message. Both LOG calls pass e.getMessage() as a {} arg and e as the trailing throwable. SLF4J already prints the exception's message (and full stack) when the last arg is a Throwable, so the message ends up rendered twice in every log line.

taskId asymmetry. It's in the ERROR but dropped from the WARN. In a multi-task cluster you'd lose task identity for retried partial commits.

Both fixed at once:

LOG.warn("Partial commit {} failed for task {}, will retry",
    commitState.currentCommitId(), taskId, e);
...
LOG.error("Commit {} failed for task {}",
    commitState.currentCommitId(), taskId, e);

if (partialCommit) {
LOG.warn(
"Partial commit {} failed, will retry: {}",
commitState.currentCommitId(),
e.getMessage(),
e);
} else {
LOG.error(
"Commit {} failed for task {}: {}",
commitState.currentCommitId(),
taskId,
e.getMessage(),
e);
throw e;
}
} finally {
commitState.endCurrentCommit();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@
package org.apache.iceberg.connect.channel;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

import java.time.OffsetDateTime;
import java.util.List;
import java.util.UUID;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DataOperations;
Expand All @@ -45,6 +49,8 @@
import org.apache.iceberg.connect.events.StartCommit;
import org.apache.iceberg.connect.events.TableReference;
import org.apache.iceberg.connect.events.TopicPartitionOffset;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types.StructType;
Expand Down Expand Up @@ -135,12 +141,30 @@ public void testCommitError() {
.withRecordCount(5)
.build();

coordinatorTest(ImmutableList.of(badDataFile), ImmutableList.of(), null);

// no commit messages sent
assertThat(producer.history()).hasSize(1);
assertThatThrownBy(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

coordinatorTest() calls coordinator.process() directly, so it skips the real production path.

In prod the flow is:

CoordinatorThread.run() catches the exception → marks terminated = true → next CommitterImpl.save() calls processControlEvents() → throws NotRunningException.

This test doesn’t cover that. It would still pass even if CoordinatorThread swallowed the failure.

I think we need an end-to-end test that goes through CoordinatorThread + CommitterImpl.

() -> coordinatorTest(ImmutableList.of(badDataFile), ImmutableList.of(), null))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Cannot find partition spec");
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the prev-prev version of this test also asserted producer.history().hasSize(1) (only the StartCommit event was sent, no CommitToTable) and table.snapshots().isEmpty() (no phantom commit landed). v4 drops both.

The test now only proves an exception is thrown, it would still pass if a future regression sent a CommitToTable event before failing, or somehow committed-then-threw.

Maybe keep side-effect guards:

assertThatThrownBy(...)
    .isInstanceOf(IllegalArgumentException.class)
    .hasMessageContaining("Cannot find partition spec");
assertThat(producer.history()).hasSize(1);
assertThat(table.snapshots()).isEmpty();

}

assertThat(table.snapshots()).isEmpty();
@Test
public void testCommitFailedExceptionPropagates() {
Table spiedTable = spy(table);
AppendFiles spiedAppend = spy(table.newAppend());
doThrow(new CommitFailedException("Glue detected concurrent update"))
.when(spiedAppend)
.commit();
when(spiedTable.newAppend()).thenReturn(spiedAppend);
when(catalog.loadTable(TABLE_IDENTIFIER)).thenReturn(spiedTable);

assertThatThrownBy(
() ->
coordinatorTest(
ImmutableList.of(EventTestUtil.createDataFile()),
ImmutableList.of(),
EventTestUtil.now()))
.isInstanceOf(CommitFailedException.class)
.hasMessageContaining("Glue detected concurrent update");
}

private void assertCommitTable(int idx, UUID commitId, OffsetDateTime ts) {
Expand Down Expand Up @@ -289,13 +313,14 @@ public void testCoordinatorCommittedOffsetValidation() {
Snapshot firstSnapshot = table.currentSnapshot();
assertThat(firstSnapshot.summary()).containsEntry(OFFSETS_SNAPSHOT_PROP, "{\"0\":7}");

// Trigger commit to the table
coordinatorTest(
ImmutableList.of(EventTestUtil.createDataFile()), ImmutableList.of(), EventTestUtil.now());

// Assert that the table was not updated and offsets remain
table.refresh();
assertThat(table.snapshots()).hasSize(2);
assertThat(firstSnapshot.summary()).containsEntry(OFFSETS_SNAPSHOT_PROP, "{\"0\":7}");
// Trigger commit to the table - should throw ValidationException
assertThatThrownBy(
() ->
coordinatorTest(
ImmutableList.of(EventTestUtil.createDataFile()),
ImmutableList.of(),
EventTestUtil.now()))
.isInstanceOf(ValidationException.class)
.hasMessageContaining("stale offsets");
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same shape as the comment above, stale-offset conflict the table was not mutated (snapshots().hasSize(2) and offset still {"0":7}).

Those are the actual correctness guarantees of the optimistic-concurrency guard, and they're no longer checked. The exception-type assertion alone would pass even if a future regression committed the row delta before throwing.

Lets keep the post-throw assertions:

assertThatThrownBy(...)
    .isInstanceOf(ValidationException.class)
    .hasMessageContaining("stale offsets");
table.refresh();
assertThat(table.snapshots()).hasSize(2);
assertThat(table.currentSnapshot().summary())
    .containsEntry(OFFSETS_SNAPSHOT_PROP, "{\"0\":7}");

}
}