feat(metadata): Persistent WAL journal with recovery and compaction#2916
feat(metadata): Persistent WAL journal with recovery and compaction#2916krishvishal wants to merge 15 commits intoapache:masterfrom
Conversation
8162f81 to
f31d9f2
Compare
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #2916 +/- ##
============================================
- Coverage 71.81% 71.79% -0.03%
Complexity 930 930
============================================
Files 1116 1121 +5
Lines 92616 93699 +1083
Branches 70139 71231 +1092
============================================
+ Hits 66512 67270 +758
- Misses 23543 23820 +277
- Partials 2561 2609 +48
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
|
The |
hmm maybe you can try a blanket super-trait alias like |
|
|
||
| /// Advance the snapshot watermark so entries at or below `op` may be | ||
| /// evicted from the journal's in-memory index. The default is a no-op | ||
| /// for journals that do not require this watermark. | ||
| fn set_snapshot_op(&self, _op: u64) {} | ||
|
|
||
| /// Number of entries that can be appended before the journal would need | ||
| /// to evict un-snapshotted slots. Returns `None` for journals that don't persist to disk. | ||
| fn remaining_capacity(&self) -> Option<usize> { | ||
| None | ||
| } | ||
|
|
||
| /// Remove snapshotted entries from the WAL to reclaim disk space. | ||
| /// The default is a no-op for journals that do not persist to disk. | ||
| /// | ||
| /// # Errors | ||
| /// Returns an I/O error if compaction fails. | ||
| fn compact(&self) -> impl Future<Output = io::Result<()>> { | ||
| async { Ok(()) } | ||
| } |
There was a problem hiding this comment.
I think, I'd prefer the journal to have some sort of drain method that allows to "extract" range of items, simiarly to how Vec::drain(begin..end) works. This way we do not hack some apis on the interface just to cover an edge case, but we create a general purpose API, that can be used to shrink the journal and we handle the watermark outside of the Journal.
And yeah an Stream iter would be perfect, but since AsyncIterator is still unstable and it's probably going to replace the Stream trait, we can return no-async drain iterator and do the disk read for the entire range in one go.
There was a problem hiding this comment.
I agree that drain API is much cleaner. One thing to consider is, drain would read and deserialize all removed entries to return them to the caller, but the main consumer today (checkpoint) doesn't need the returned entries, it just wants them removed from WAL. What do we do considering wasted the deserialization cost?
There was a problem hiding this comment.
Also the ops in the journal aren't necessarily in a contiguous range. There can be gaps from pipelined prepares arriving out of order, slots overwritten when a new op lands on the same index. So drain(begin..end) is semantically not suitable here.
There was a problem hiding this comment.
The drain api would take op as the input e.g you would provide op_range rather than index_range, so the wrapping of the journal doesn't matter.
The cost of deserialization isn't that big, if not existent in-fact, since we store Message<PrepareHeader> as entries in our IggyMetadata journal, so it's just reading opaque bytes from disk and constructing the Message<PrepareHeader> from them (only cost there is the validation).
core/metadata/src/impls/metadata.rs
Outdated
| pub snapshot: Option<S>, | ||
| /// State machine - lives on all shards | ||
| pub mux_stm: M, | ||
| /// Root data directory, used by checkpoint to persist snapshots. |
There was a problem hiding this comment.
Maybe it's good idea to store some sort of snapshot_coordinator there struct, that would hide those details away ?
core/metadata/src/impls/metadata.rs
Outdated
| /// # Errors | ||
| /// Returns `SnapshotError` if snapshotting, persistence, or compaction fails. | ||
| #[allow(clippy::future_not_send)] | ||
| pub async fn checkpoint(&self, data_dir: &Path, last_op: u64) -> Result<(), SnapshotError> |
There was a problem hiding this comment.
If we'd go with Comment on line R143, then this would be part of that coordinator I've mentioned on R143
pollution. Now the requirement is captured at the creation of IggyMetaData. - Fix review comments
5a99456 to
704207a
Compare
|
CI failed due to flaky tests. It is being addressed here: #2963 |
Which issue does this PR close?
Closes #2915
Summary
FileStorage: file-backedStorageimpl with positional reads, appends, truncate, fsync.MetadataJournal: append-only WAL indexed by a ring buffer (op % SLOT_COUNT). Crash recovery scans forward and truncates partial tail entries. Compaction atomically rewrites the WAL keeping only entries above the snapshot watermark.recover(): startup recovery pipeline that loads the latest snapshot, opens the WAL, and replays entries past the snapshot sequence number through the state machine.checkpoint()onIggyMetadata: persists a snapshot then advances the journal watermark and compacts.Journal/Storagetraits:io::Resultreturn types,set_snapshot_op,remaining_capacity,compactdefault methods.