feat(table): add support for merge-on-read delete#721
feat(table): add support for merge-on-read delete#721zeroshade merged 14 commits intoapache:mainfrom
Conversation
5079248 to
114fc57
Compare
c9e30c4 to
a7a7ce6
Compare
a7a7ce6 to
6d41651
Compare
laskoviymishka
left a comment
There was a problem hiding this comment.
Overall, this looks correct (I mostly compared it with Iceberg Java), but I think a couple of things need attention:
panic/recoveris used in normal write-path error handling;- position-delete fanout needs additional tests;
- missing focused local unit tests for invariant-heavy code (
enrichRecordsWithPosDeleteFields, position-delete fanout).
Regarding the API change: the ToDataFile change is internal (package scope), but the ManifestWriter.ToManifestFile signature change is public and should be explicitly called out in the compatibility/release notes.
I would advocate for not changing this API and instead adding a new one with the extra argument, but this decision, in my opinion, is not critical.
ec1a905 to
abb1cf4
Compare
zeroshade
left a comment
There was a problem hiding this comment.
this is looking good to me. There's a conflict to resolve in README.md, and I'll wait for @laskoviymishka to give feedback before merging.
…recordsToDataFiles
a27e3da to
aaeb40b
Compare
aaeb40b to
1311f05
Compare
|
Here's the updated status of the TODO list:
Thanks for the in-depth review, @laskoviymishka .
|
laskoviymishka
left a comment
There was a problem hiding this comment.
Reviewed the latest changes — the core logic looks correct:
- MoR positional delete pipeline ordering is sound (positions assigned before filtering, remain valid through projection)
- Manifest evaluator fix for partition spec evolution is a genuine correctness improvement, applies to CoW paths too
- pos type fix (Int32 → Int64) aligns with the Iceberg spec
Two minor issues noted but will address in follow-up PRs to keep this moving:
- Arrow array refcount leak in enrichRecordsWithPosDeleteFields — NewArray() results are never Release()d after NewRecordBatch retains them
- Goroutine leak from iter.Pull in the partitioned path of positionDeleteRecordsToDataFiles — stopCount not called in the partitioned branch (same pattern from #718)
Neither blocks correctness for typical use. LGTM to merge.
![]()
|
Thanks to both of you! |
|
Thanks for the review, @laskoviymishka and @zeroshade ! |
…teFields Arrays returned by NewArray() have refcount=1. NewRecordBatch calls Retain() on each column, bumping to refcount=2. Without an explicit Release() on the temporary arrays, the count never drops back to 1 when the record batch is released by the caller. Fix by assigning NewArray() results to local variables and deferring their Release(), so the lifecycle is: NewArray() -> refcount 1, NewRecordBatch Retain() -> refcount 2, deferred Release() -> refcount 1 (owned by outData), caller releases outData -> refcount 0 -> freed. Also extend TestEnrichRecordsWithPosDeleteFields to use memory.NewCheckedAllocator with mem.AssertSize(t, 0) to catch this class of leak going forward. Fixes leak introduced in apache#721.
related to apache#721 * remove premature decoder close in the constructor so that reader can actually read the entries * add explicit close method for resource cleanup * call close in ReadManifest to prevent leak * add zstd codec based regression test Signed-off-by: ferhat elmas <elmas.ferhat@gmail.com>
| defer func() { | ||
| _ = dec.Close() | ||
| }() |
There was a problem hiding this comment.
I think there is a regression here to fix the leak #766
related to apache#721 * remove premature decoder close in the constructor so that reader can actually read the entries * add explicit close method for resource cleanup * call close in ReadManifest to prevent leak * add zstd codec based regression test Signed-off-by: ferhat elmas <elmas.ferhat@gmail.com>
related to apache#721 * remove premature decoder close in the constructor so that reader can actually read the entries * add explicit close method for resource cleanup * call close in ReadManifest to prevent leak * add zstd codec based regression test Signed-off-by: ferhat elmas <elmas.ferhat@gmail.com>
related to apache#721 * remove premature decoder close in the constructor so that reader can actually read the entries * add explicit close method for resource cleanup * call close in ReadManifest to prevent leak * add zstd codec based regression test Signed-off-by: ferhat elmas <elmas.ferhat@gmail.com>
related to #721 * remove premature decoder close in the constructor so that reader can actually read the entries * add explicit close method for resource cleanup * call close in ReadManifest to prevent leak * add zstd codec based regression test Signed-off-by: ferhat elmas <elmas.ferhat@gmail.com>
| } | ||
|
|
||
| func (p *positionDeletePartitionedFanoutWriter) partitionPath(partitionContext partitionContext) (string, error) { | ||
| data := partitionRecord(slices.Collect(maps.Values(partitionContext.partitionData))) |
Arrays returned by NewArray() have refcount=1. NewRecordBatch calls Retain() on each column, bumping to refcount=2. Without an explicit Release() on the temporary arrays, the count never drops back to 1 when the record batch is released by the caller. Fix by assigning NewArray() results to local variables and deferring their Release(), so the lifecycle is: NewArray() -> refcount 1, NewRecordBatch Retain() -> refcount 2, deferred Release() -> refcount 1 (owned by outData), caller releases outData -> refcount 0 -> freed. Also extend TestEnrichRecordsWithPosDeleteFields to use memory.NewCheckedAllocator with mem.AssertSize(t, 0) to catch this class of leak going forward. Fixes leak introduced in #721.
Follow-up to apache#721. Test TBD.
partition record order is expected to match partition spec but maps.Values can change it and cause cross-partition writer reuse, delete expected files, etc. related to #721 Signed-off-by: ferhat elmas <elmas.ferhat@gmail.com>
 This adds support for merge-on-read deletes. It offers an alternative to the copy-on-write to generate position delete files instead of rewriting existing data files. I'm not very confident in the elegance of my solution as I'm still new to the internals of iceberg-go but the high-level is: * Reuse the classification code from the existing delete implementation to get the list of files of dropped files vs files with partial deletes * Reuse the arrow scanning facilities to filter records from the data files with partial deletes and emit position delete records with file path and position. * This is done by reusing the pipeline code and function and making the first stage in the pipeline one to enrich the `RecordBatch` with the file Path and position before the original position is lost due to filtering. * After filtering, the RecordBatch is projected to the position delete schema (i.e. the original schema fields are dropped) * Once we have filtered PositionDelete records that need to be emitted, we reuse the record to file writing to generate position delete files. ## Testing Integration tests were added to exercise the partitioned and unpartitioned paths and the data is such that it's meant to actually produce a position delete file rather than just go through the quick path that drops an entire file because all records are gone. ## Indirect fixes While working on this change and adding the testing for the partitioned table deletions, I realized that the manifest evaluation when the filter affected a field that was part of a partition spec was not built correctly. It needed to use similar code as what's done during scanning to build projections and build a manifest evaluator per partition id. This is fixed in this PR but this technically also applies to copy-on-write and overwrite paths so the fix goes beyond the scope of the `merge-on-read`. Fixes apache#487.
related to apache#721 * remove premature decoder close in the constructor so that reader can actually read the entries * add explicit close method for resource cleanup * call close in ReadManifest to prevent leak * add zstd codec based regression test Signed-off-by: ferhat elmas <elmas.ferhat@gmail.com>
…ache#762) Arrays returned by NewArray() have refcount=1. NewRecordBatch calls Retain() on each column, bumping to refcount=2. Without an explicit Release() on the temporary arrays, the count never drops back to 1 when the record batch is released by the caller. Fix by assigning NewArray() results to local variables and deferring their Release(), so the lifecycle is: NewArray() -> refcount 1, NewRecordBatch Retain() -> refcount 2, deferred Release() -> refcount 1 (owned by outData), caller releases outData -> refcount 0 -> freed. Also extend TestEnrichRecordsWithPosDeleteFields to use memory.NewCheckedAllocator with mem.AssertSize(t, 0) to catch this class of leak going forward. Fixes leak introduced in apache#721.
partition record order is expected to match partition spec but maps.Values can change it and cause cross-partition writer reuse, delete expected files, etc. related to apache#721 Signed-off-by: ferhat elmas <elmas.ferhat@gmail.com>
This adds support for merge-on-read deletes. It offers an alternative to the copy-on-write to generate position delete files instead of rewriting existing data files.
I'm not very confident in the elegance of my solution as I'm still new to the internals of iceberg-go but the high-level is:
RecordBatchwith the file Path and position before the original position is lost due to filtering.Testing
Integration tests were added to exercise the partitioned and unpartitioned paths and the data is such that it's meant to actually produce a position delete file rather than just go through the quick path that drops an entire file because all records are gone.
Indirect fixes
While working on this change and adding the testing for the partitioned table deletions, I realized that the manifest evaluation when the filter affected a field that was part of a partition spec was not built correctly. It needed to use similar code as what's done during scanning to build projections and build a manifest evaluator per partition id. This is fixed in this PR but this technically also applies to copy-on-write and overwrite paths so the fix goes beyond the scope of the
merge-on-read.Fixes #487.