Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1152,6 +1152,11 @@ where

let mut futures = Joiner::new();

// Capture the number of pending monitor writes before persisting the channel manager.
// We'll only flush this many writes after the manager is persisted, to avoid flushing
// monitor updates that arrived after the manager state was captured.
let pending_monitor_writes = chain_monitor.pending_monitor_operation_count();

if channel_manager.get_cm().get_and_clear_needs_persistence() {
log_trace!(logger, "Persisting ChannelManager...");

Expand Down Expand Up @@ -1349,6 +1354,13 @@ where
res?;
}

// Flush the monitor writes that were pending before we persisted the channel manager.
// Any writes that arrived after are left in the queue for the next iteration.
if pending_monitor_writes > 0 {
log_trace!(logger, "Flushing {} monitor writes", pending_monitor_writes);
chain_monitor.flush(pending_monitor_writes);
}

match check_and_reset_sleeper(&mut last_onion_message_handler_call, || {
sleeper(ONION_MESSAGE_HANDLER_TIMER)
}) {
Expand Down Expand Up @@ -1413,6 +1425,14 @@ where
channel_manager.get_cm().encode(),
)
.await?;

// Flush all pending monitor writes after final channel manager persistence.
let pending_monitor_writes = chain_monitor.pending_monitor_operation_count();
if pending_monitor_writes > 0 {
log_trace!(logger, "Flushing {} monitor writes on shutdown", pending_monitor_writes);
chain_monitor.flush(pending_monitor_writes);
}

if let Some(ref scorer) = scorer {
kv_store
.write(
Expand Down Expand Up @@ -1722,6 +1742,10 @@ impl BackgroundProcessor {
channel_manager.get_cm().timer_tick_occurred();
last_freshness_call = Instant::now();
}

// Capture the number of pending monitor writes before persisting the channel manager.
let pending_monitor_writes = chain_monitor.pending_monitor_operation_count();

if channel_manager.get_cm().get_and_clear_needs_persistence() {
log_trace!(logger, "Persisting ChannelManager...");
(kv_store.write(
Expand All @@ -1733,6 +1757,12 @@ impl BackgroundProcessor {
log_trace!(logger, "Done persisting ChannelManager.");
}

// Flush the monitor writes that were pending before we persisted the channel manager.
if pending_monitor_writes > 0 {
log_trace!(logger, "Flushing {} monitor writes", pending_monitor_writes);
chain_monitor.flush(pending_monitor_writes);
}

if let Some(liquidity_manager) = liquidity_manager.as_ref() {
log_trace!(logger, "Persisting LiquidityManager...");
let _ = liquidity_manager.get_lm().persist().map_err(|e| {
Expand Down Expand Up @@ -1853,6 +1883,18 @@ impl BackgroundProcessor {
CHANNEL_MANAGER_PERSISTENCE_KEY,
channel_manager.get_cm().encode(),
)?;

// Flush all pending monitor writes after final channel manager persistence.
let pending_monitor_writes = chain_monitor.pending_monitor_operation_count();
if pending_monitor_writes > 0 {
log_trace!(
logger,
"Flushing {} monitor writes on shutdown",
pending_monitor_writes
);
chain_monitor.flush(pending_monitor_writes);
}

if let Some(ref scorer) = scorer {
kv_store.write(
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
Expand Down
Loading
Loading