Add start and end offset for each source topic partition in iceberg snapshot as a property#15206
Conversation
There was a problem hiding this comment.
Pull request overview
This PR enhances the Kafka Connect sink to track offset ranges (start and end offsets) for each topic partition in Iceberg snapshots, improving data lineage and traceability capabilities.
Changes:
- Modified core data models (
OffsetandTopicPartitionOffset) to track both start and end offsets instead of a single offset value - Updated offset tracking logic in
SinkWriterto maintain offset ranges across multiple records in the same commit cycle - Added snapshot metadata property
kafka.connect.topic-partition-offsetscontaining JSON-serialized offset ranges for all partitions
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Offset.java | Changed from single offset to startOffset/endOffset fields with backward compatibility method |
| kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java | Enhanced to track offset ranges per partition using compute() to preserve startOffset |
| kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Worker.java | Updated to pass both startOffset and endOffset when creating TopicPartitionOffset objects |
| kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitState.java | Added topicPartitionOffsets() method to extract offset information for current commit |
| kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java | Added JSON serialization of offset ranges and storage in snapshot metadata properties |
| kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TopicPartitionOffset.java | Updated schema to include start_offset and end_offset fields with adjusted field IDs |
| kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSinkWriter.java | Updated test assertions to verify both startOffset and endOffset |
| kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestWorker.java | Updated test to use new Offset constructor with startOffset and endOffset |
| kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java | Updated test to use new TopicPartitionOffset constructor with offset range |
| kafka-connect/kafka-connect-events/src/test/java/org/apache/iceberg/connect/events/TestEventSerialization.java | Updated test data to include both startOffset and endOffset values |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions. |
|
This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
Summary
This PR enhances the Kafka Connect sink to track offset ranges (start and end offsets) for each topic partition, providing better traceability of which source data was included in each Iceberg
snapshot.
Motivation
Previously, the connector only tracked a single offset value per partition, which represented the next offset to consume. This made it difficult to determine the exact range of data that was
committed in a particular snapshot. By tracking both start and end offsets, we can now:
Changes
Core Data Models
TopicPartitionOffsetclass:offsetfield tostartOffsetandendOffsetfieldsSTART_OFFSET = 10_702,END_OFFSET = 10_703,TIMESTAMP = 10_704startOffset()andendOffset()Offsetclass:startOffsetandendOffsetinstead of a single offsetoffset()that returnsendOffsetOffset(Long startOffset, Long endOffset, OffsetDateTime timestamp)Offset Tracking
SinkWriterclass:save()method to track offset ranges per partitionstartOffset = currentOffset,endOffset = currentOffset + 1startOffset, updatesendOffset = currentOffset + 1Workerclass:startOffsetandendOffsetwhen creatingTopicPartitionOffsetobjectsSnapshot Metadata
CommitStateclass:topicPartitionOffsets()method to extract all topic partition offsets from ready bufferCoordinatorclass:kafka.connect.topic-partition-offsetstopicPartitionOffsetsToJson()to serialize offset ranges to JSONtopic,partition,startOffset,endOffset, andtimestampfor each partitioncommitToTable()to store topic partition offsets in both append and delta operationsExample Snapshot Metadata
After this change, Iceberg snapshots will include metadata like:
{ "kafka.connect.topic-partition-offsets": [ { "topic": "events", "partition": 0, "startOffset": 100, "endOffset": 250, "timestamp": "2024-01-15T10:30:00Z" }, { "topic": "events", "partition": 1, "startOffset": 50, "endOffset": 175, "timestamp": "2024-01-15T10:30:05Z" } ] }