Skip to content

feat(metadata): Persistent WAL journal with recovery and compaction#2916

Open
krishvishal wants to merge 15 commits intoapache:masterfrom
krishvishal:reboot-state
Open

feat(metadata): Persistent WAL journal with recovery and compaction#2916
krishvishal wants to merge 15 commits intoapache:masterfrom
krishvishal:reboot-state

Conversation

@krishvishal
Copy link
Contributor

Which issue does this PR close?

Closes #2915

Summary

  • Add FileStorage: file-backed Storage impl with positional reads, appends, truncate, fsync.
  • Add MetadataJournal: append-only WAL indexed by a ring buffer (op % SLOT_COUNT). Crash recovery scans forward and truncates partial tail entries. Compaction atomically rewrites the WAL keeping only entries above the snapshot watermark.
  • Add recover(): startup recovery pipeline that loads the latest snapshot, opens the WAL, and replays entries past the snapshot sequence number through the state machine.
  • Add checkpoint() on IggyMetadata: persists a snapshot then advances the journal watermark and compacts.
  • Update Journal / Storage traits: io::Result return types, set_snapshot_op, remaining_capacity, compact default methods.

@codecov
Copy link

codecov bot commented Mar 11, 2026

Codecov Report

❌ Patch coverage is 61.03226% with 302 lines in your changes missing coverage. Please review.
✅ Project coverage is 71.79%. Comparing base (f0e8578) to head (01615c1).
⚠️ Report is 4 commits behind head on master.

Files with missing lines Patch % Lines
core/metadata/src/impls/metadata.rs 13.55% 100 Missing and 2 partials ⚠️
core/journal/src/metadata_journal.rs 84.03% 40 Missing and 21 partials ⚠️
core/metadata/src/impls/recovery.rs 70.40% 31 Missing and 6 partials ⚠️
core/partitions/src/journal.rs 0.00% 24 Missing ⚠️
core/simulator/src/deps.rs 0.00% 22 Missing ⚠️
core/journal/src/file_storage.rs 70.58% 13 Missing and 7 partials ⚠️
core/metadata/src/stm/snapshot.rs 0.00% 14 Missing ⚠️
core/journal/src/lib.rs 0.00% 10 Missing ⚠️
core/simulator/src/replica.rs 0.00% 6 Missing ⚠️
core/partitions/src/iggy_partition.rs 0.00% 5 Missing ⚠️
... and 1 more
Additional details and impacted files
@@             Coverage Diff              @@
##             master    #2916      +/-   ##
============================================
- Coverage     71.81%   71.79%   -0.03%     
  Complexity      930      930              
============================================
  Files          1116     1121       +5     
  Lines         92616    93699    +1083     
  Branches      70139    71231    +1092     
============================================
+ Hits          66512    67270     +758     
- Misses        23543    23820     +277     
- Partials       2561     2609      +48     
Flag Coverage Δ
csharp 67.43% <ø> (-0.21%) ⬇️
go 36.38% <ø> (ø)
java 62.08% <ø> (ø)
node 91.37% <ø> (-0.15%) ⬇️
python 81.43% <ø> (ø)
rust 72.48% <61.03%> (-0.03%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
core/partitions/src/log.rs 0.00% <0.00%> (ø)
core/partitions/src/iggy_partition.rs 0.00% <0.00%> (ø)
core/simulator/src/replica.rs 0.00% <0.00%> (ø)
core/journal/src/lib.rs 0.00% <0.00%> (ø)
core/metadata/src/stm/snapshot.rs 84.13% <0.00%> (-6.08%) ⬇️
core/journal/src/file_storage.rs 70.58% <70.58%> (ø)
core/simulator/src/deps.rs 0.00% <0.00%> (ø)
core/partitions/src/journal.rs 0.00% <0.00%> (ø)
core/metadata/src/impls/recovery.rs 70.40% <70.40%> (ø)
core/journal/src/metadata_journal.rs 84.03% <84.03%> (ø)
... and 1 more

... and 14 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@krishvishal
Copy link
Contributor Author

The FillSnapShot trait bound has become a bit viral. I'm thinking of ways to reduce that.

@atharvalade
Copy link
Contributor

The FillSnapShot trait bound has become a bit viral. I'm thinking of ways to reduce that.

hmm maybe you can try a blanket super-trait alias like trait MetadataStm: StateMachine<...> + FillSnapshot<MetadataSnapshot> {} with a blanket impl. It won't reduce the actual propagation but cuts the noise from 4 lines to 1 at each use site. Longer term, pulling checkpoint into a separate component that owns the FillSnapshot concern behind a callback would remove the bound from IggyMetadata's M parameter entirely.

Copy link
Contributor

@atharvalade atharvalade left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

Comment on lines +39 to +58

/// Advance the snapshot watermark so entries at or below `op` may be
/// evicted from the journal's in-memory index. The default is a no-op
/// for journals that do not require this watermark.
fn set_snapshot_op(&self, _op: u64) {}

/// Number of entries that can be appended before the journal would need
/// to evict un-snapshotted slots. Returns `None` for journals that don't persist to disk.
fn remaining_capacity(&self) -> Option<usize> {
None
}

/// Remove snapshotted entries from the WAL to reclaim disk space.
/// The default is a no-op for journals that do not persist to disk.
///
/// # Errors
/// Returns an I/O error if compaction fails.
fn compact(&self) -> impl Future<Output = io::Result<()>> {
async { Ok(()) }
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, I'd prefer the journal to have some sort of drain method that allows to "extract" range of items, simiarly to how Vec::drain(begin..end) works. This way we do not hack some apis on the interface just to cover an edge case, but we create a general purpose API, that can be used to shrink the journal and we handle the watermark outside of the Journal.

And yeah an Stream iter would be perfect, but since AsyncIterator is still unstable and it's probably going to replace the Stream trait, we can return no-async drain iterator and do the disk read for the entire range in one go.

Copy link
Contributor Author

@krishvishal krishvishal Mar 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that drain API is much cleaner. One thing to consider is, drain would read and deserialize all removed entries to return them to the caller, but the main consumer today (checkpoint) doesn't need the returned entries, it just wants them removed from WAL. What do we do considering wasted the deserialization cost?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also the ops in the journal aren't necessarily in a contiguous range. There can be gaps from pipelined prepares arriving out of order, slots overwritten when a new op lands on the same index. So drain(begin..end) is semantically not suitable here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The drain api would take op as the input e.g you would provide op_range rather than index_range, so the wrapping of the journal doesn't matter.

The cost of deserialization isn't that big, if not existent in-fact, since we store Message<PrepareHeader> as entries in our IggyMetadata journal, so it's just reading opaque bytes from disk and constructing the Message<PrepareHeader> from them (only cost there is the validation).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

pub snapshot: Option<S>,
/// State machine - lives on all shards
pub mux_stm: M,
/// Root data directory, used by checkpoint to persist snapshots.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's good idea to store some sort of snapshot_coordinator there struct, that would hide those details away ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

/// # Errors
/// Returns `SnapshotError` if snapshotting, persistence, or compaction fails.
#[allow(clippy::future_not_send)]
pub async fn checkpoint(&self, data_dir: &Path, last_op: u64) -> Result<(), SnapshotError>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we'd go with Comment on line R143, then this would be part of that coordinator I've mentioned on R143

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@krishvishal
Copy link
Contributor Author

CI failed due to flaky tests. It is being addressed here: #2963

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

State loading from disk during replica bootup

3 participants