diff --git a/lightning-persister/src/fs_store.rs b/lightning-persister/src/fs_store.rs index 73c24dc6fc0..58cf7eddd97 100644 --- a/lightning-persister/src/fs_store.rs +++ b/lightning-persister/src/fs_store.rs @@ -2,7 +2,9 @@ use crate::utils::{check_namespace_key_validity, is_valid_kvstore_str}; use lightning::types::string::PrintableString; -use lightning::util::persist::{KVStoreSync, MigratableKVStore}; +use lightning::util::persist::{ + KVStoreSync, MigratableKVStore, PaginatedKVStoreSync, PaginatedListResponse, +}; use std::collections::HashMap; use std::fs; @@ -10,11 +12,12 @@ use std::io::{Read, Write}; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex, RwLock}; +use std::time::SystemTime; #[cfg(feature = "tokio")] use core::future::Future; #[cfg(feature = "tokio")] -use lightning::util::persist::KVStore; +use lightning::util::persist::{KVStore, PaginatedKVStore}; #[cfg(target_os = "windows")] use {std::ffi::OsStr, std::os::windows::ffi::OsStrExt}; @@ -39,6 +42,9 @@ fn path_to_windows_str>(path: &T) -> Vec { // a consistent view and error out. const LIST_DIR_CONSISTENCY_RETRIES: usize = 10; +// The default page size for paginated list operations. +const PAGINATED_LIST_DEFAULT_PAGE_SIZE: usize = 50; + struct FilesystemStoreInner { data_dir: PathBuf, tmp_file_counter: AtomicUsize, @@ -148,6 +154,20 @@ impl KVStoreSync for FilesystemStore { } } +impl PaginatedKVStoreSync for FilesystemStore { + fn list_paginated( + &self, primary_namespace: &str, secondary_namespace: &str, last_key: Option, + ) -> Result { + let path = self.inner.get_checked_dest_file_path( + primary_namespace, + secondary_namespace, + None, + "list_paginated", + )?; + self.inner.list_paginated(path, last_key, PAGINATED_LIST_DEFAULT_PAGE_SIZE) + } +} + impl FilesystemStoreInner { fn get_inner_lock_ref(&self, path: PathBuf) -> Arc> { let mut outer_lock = self.locks.lock().unwrap(); @@ -456,6 +476,74 @@ impl FilesystemStoreInner { Ok(keys) } + + fn list_paginated( + &self, prefixed_dest: PathBuf, last_key: Option, page_size: usize, + ) -> lightning::io::Result { + if !Path::new(&prefixed_dest).exists() { + return Ok(PaginatedListResponse { keys: Vec::new(), last_key: None }); + } + + let mut entries: Vec<(String, SystemTime)> = Vec::with_capacity(page_size); + let mut retries = LIST_DIR_CONSISTENCY_RETRIES; + + 'retry_list: loop { + entries.clear(); + 'skip_entry: for entry in fs::read_dir(&prefixed_dest)? { + let entry = entry?; + let p = entry.path(); + + let res = dir_entry_is_key(&entry); + match res { + Ok(true) => { + let key = get_key_from_dir_entry_path(&p, &prefixed_dest)?; + // Get file creation time, falling back to modified time if unavailable. + let metadata = entry.metadata()?; + let created_time = metadata + .created() + .or_else(|_| metadata.modified()) + .unwrap_or(SystemTime::UNIX_EPOCH); + entries.push((key, created_time)); + }, + Ok(false) => { + continue 'skip_entry; + }, + Err(e) => { + if e.kind() == lightning::io::ErrorKind::NotFound && retries > 0 { + retries -= 1; + continue 'retry_list; + } else { + return Err(e.into()); + } + }, + } + } + break 'retry_list; + } + + if entries.is_empty() { + return Ok(PaginatedListResponse { keys: Vec::new(), last_key: None }); + } + + // Sort by creation time descending (newest first), then by key name for stability. + entries.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0))); + + // Apply pagination: find the first entry AFTER the given key in sort order. + let start_idx = if let Some(ref key) = last_key { + // Find the position of this key and start after it + entries.iter().position(|(k, _)| k == key).map(|pos| pos + 1).unwrap_or(0) + } else { + 0 + }; + + let page_entries: Vec = + entries.into_iter().skip(start_idx).take(page_size).map(|(k, _)| k).collect(); + + let response_last_key = + if page_entries.len() == page_size { page_entries.last().cloned() } else { None }; + + Ok(PaginatedListResponse { keys: page_entries, last_key: response_last_key }) + } } #[cfg(feature = "tokio")] @@ -544,6 +632,35 @@ impl KVStore for FilesystemStore { } } +#[cfg(feature = "tokio")] +impl PaginatedKVStore for FilesystemStore { + fn list_paginated( + &self, primary_namespace: &str, secondary_namespace: &str, last_key: Option, + ) -> impl Future> + 'static + Send + { + let this = Arc::clone(&self.inner); + + let path = this.get_checked_dest_file_path( + primary_namespace, + secondary_namespace, + None, + "list_paginated", + ); + + async move { + let path = match path { + Ok(path) => path, + Err(e) => return Err(e), + }; + tokio::task::spawn_blocking(move || { + this.list_paginated(path, last_key, PAGINATED_LIST_DEFAULT_PAGE_SIZE) + }) + .await + .unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))) + } + } +} + fn dir_entry_is_key(dir_entry: &fs::DirEntry) -> Result { let p = dir_entry.path(); if let Some(ext) = p.extension() { @@ -792,6 +909,55 @@ mod tests { assert_eq!(listed_keys.len(), 0); } + #[test] + fn test_list_paginated() { + let mut temp_path = std::env::temp_dir(); + temp_path.push("test_list_paginated"); + let fs_store = FilesystemStore::new(temp_path); + + let primary = "testspace"; + let secondary = "testsubspace"; + + // Write multiple keys with small delays to ensure different creation times + let keys = ["key_a", "key_b", "key_c", "key_d", "key_e"]; + for key in &keys { + KVStoreSync::write(&fs_store, primary, secondary, key, vec![42u8]).unwrap(); + // Small delay to ensure different creation times + std::thread::sleep(std::time::Duration::from_millis(10)); + } + + // Test that all keys are returned (no pagination cursor) + let response = + PaginatedKVStoreSync::list_paginated(&fs_store, primary, secondary, None).unwrap(); + assert_eq!(response.keys.len(), 5); + // Keys should be ordered by creation time descending (newest first) + // The last written key should be first + assert_eq!(response.keys[0], "key_e"); + assert_eq!(response.keys[4], "key_a"); + // No more pages since we have less than page_size (50) + assert!(response.last_key.is_none()); + + // Test pagination with a cursor + // First, get the first page starting from the beginning + let response = + PaginatedKVStoreSync::list_paginated(&fs_store, primary, secondary, None).unwrap(); + // Use one of the middle keys as a cursor to get remaining keys + let cursor = response.keys[2].clone(); // Should be "key_c" + let response2 = + PaginatedKVStoreSync::list_paginated(&fs_store, primary, secondary, Some(cursor)) + .unwrap(); + // Should return the keys after "key_c" in the sorted order + assert_eq!(response2.keys.len(), 2); + assert_eq!(response2.keys[0], "key_b"); + assert_eq!(response2.keys[1], "key_a"); + + // Test with non-existent namespace returns empty + let response = + PaginatedKVStoreSync::list_paginated(&fs_store, "nonexistent", "", None).unwrap(); + assert!(response.keys.is_empty()); + assert!(response.last_key.is_none()); + } + #[test] fn test_data_migration() { let mut source_temp_path = std::env::temp_dir(); diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index 2e1e8805d0a..bfa9c6d3561 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -347,6 +347,174 @@ pub trait KVStore { ) -> impl Future, io::Error>> + 'static + MaybeSend; } +/// Represents the response from a paginated `list` operation. +/// +/// Contains the list of keys and the last key returned, which can be used to retrieve the +/// next page of results. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct PaginatedListResponse { + /// A vector of keys, ordered from most recently created to least recently created. + pub keys: Vec, + + /// The last key in this page, which can be passed to the next call to continue pagination. + /// + /// Is `None` if there are no more pages to retrieve. + pub last_key: Option, +} + +/// Provides an interface that allows storage and retrieval of persisted values that are associated +/// with given keys, with support for pagination. +/// +/// In order to avoid collisions, the key space is segmented based on the given `primary_namespace`s +/// and `secondary_namespace`s. Implementations of this trait are free to handle them in different +/// ways, as long as per-namespace key uniqueness is asserted. +/// +/// Keys and namespaces are required to be valid ASCII strings in the range of +/// [`KVSTORE_NAMESPACE_KEY_ALPHABET`] and no longer than [`KVSTORE_NAMESPACE_KEY_MAX_LEN`]. Empty +/// primary namespaces and secondary namespaces (`""`) are considered valid; however, if +/// `primary_namespace` is empty, `secondary_namespace` must also be empty. This means that concerns +/// should always be separated by primary namespace first, before secondary namespaces are used. +/// While the number of primary namespaces will be relatively small and determined at compile time, +/// there may be many secondary namespaces per primary namespace. Note that per-namespace uniqueness +/// needs to also hold for keys *and* namespaces in any given namespace, i.e., conflicts between keys +/// and equally named primary or secondary namespaces must be avoided. +/// +/// **Note:** This trait extends the functionality of [`KVStoreSync`] by adding support for +/// paginated listing of keys in creation order. This is useful when dealing with a large number +/// of keys that cannot be efficiently retrieved all at once. +/// +/// For an asynchronous version of this trait, see [`PaginatedKVStore`]. +pub trait PaginatedKVStoreSync: KVStoreSync { + /// Returns a paginated list of keys that are stored under the given `secondary_namespace` in + /// `primary_namespace`, ordered from most recently created to least recently created. + /// + /// Implementations must return keys in reverse creation order (newest first). How creation + /// order is tracked is implementation-defined (e.g., storing creation timestamps, using an + /// incrementing ID, or another mechanism). + /// + /// If `last_key` is provided, listing starts after this key in creation order. If `None`, + /// listing starts from the most recently created entry. The `last_key` in the returned + /// [`PaginatedListResponse`] can be passed to subsequent calls to fetch the next page. + /// + /// Returns an empty list if `primary_namespace` or `secondary_namespace` is unknown or if + /// there are no more keys to return. + fn list_paginated( + &self, primary_namespace: &str, secondary_namespace: &str, last_key: Option, + ) -> Result; +} + +/// A wrapper around a [`PaginatedKVStoreSync`] that implements the [`PaginatedKVStore`] trait. +/// It is not necessary to use this type directly. +#[derive(Clone)] +pub struct PaginatedKVStoreSyncWrapper(pub K) +where + K::Target: PaginatedKVStoreSync; + +impl Deref for PaginatedKVStoreSyncWrapper +where + K::Target: PaginatedKVStoreSync, +{ + type Target = Self; + fn deref(&self) -> &Self::Target { + self + } +} + +/// This is not exported to bindings users as async is only supported in Rust. +impl KVStore for PaginatedKVStoreSyncWrapper +where + K::Target: PaginatedKVStoreSync, +{ + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> impl Future, io::Error>> + 'static + MaybeSend { + let res = self.0.read(primary_namespace, secondary_namespace, key); + + async move { res } + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> impl Future> + 'static + MaybeSend { + let res = self.0.write(primary_namespace, secondary_namespace, key, buf); + + async move { res } + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> impl Future> + 'static + MaybeSend { + let res = self.0.remove(primary_namespace, secondary_namespace, key, lazy); + + async move { res } + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> impl Future, io::Error>> + 'static + MaybeSend { + let res = self.0.list(primary_namespace, secondary_namespace); + + async move { res } + } +} + +/// This is not exported to bindings users as async is only supported in Rust. +impl PaginatedKVStore for PaginatedKVStoreSyncWrapper +where + K::Target: PaginatedKVStoreSync, +{ + fn list_paginated( + &self, primary_namespace: &str, secondary_namespace: &str, last_key: Option, + ) -> impl Future> + 'static + MaybeSend { + let res = self.0.list_paginated(primary_namespace, secondary_namespace, last_key); + + async move { res } + } +} + +/// Provides an interface that allows storage and retrieval of persisted values that are associated +/// with given keys, with support for pagination. +/// +/// In order to avoid collisions, the key space is segmented based on the given `primary_namespace`s +/// and `secondary_namespace`s. Implementations of this trait are free to handle them in different +/// ways, as long as per-namespace key uniqueness is asserted. +/// +/// Keys and namespaces are required to be valid ASCII strings in the range of +/// [`KVSTORE_NAMESPACE_KEY_ALPHABET`] and no longer than [`KVSTORE_NAMESPACE_KEY_MAX_LEN`]. Empty +/// primary namespaces and secondary namespaces (`""`) are considered valid; however, if +/// `primary_namespace` is empty, `secondary_namespace` must also be empty. This means that concerns +/// should always be separated by primary namespace first, before secondary namespaces are used. +/// While the number of primary namespaces will be relatively small and determined at compile time, +/// there may be many secondary namespaces per primary namespace. Note that per-namespace uniqueness +/// needs to also hold for keys *and* namespaces in any given namespace, i.e., conflicts between keys +/// and equally named primary or secondary namespaces must be avoided. +/// +/// **Note:** This trait extends the functionality of [`KVStore`] by adding support for +/// paginated listing of keys in creation order. This is useful when dealing with a large number +/// of keys that cannot be efficiently retrieved all at once. +/// +/// For a synchronous version of this trait, see [`PaginatedKVStoreSync`]. +/// +/// This is not exported to bindings users as async is only supported in Rust. +pub trait PaginatedKVStore: KVStore { + /// Returns a paginated list of keys that are stored under the given `secondary_namespace` in + /// `primary_namespace`, ordered from most recently created to least recently created. + /// + /// Implementations must return keys in reverse creation order (newest first). How creation + /// order is tracked is implementation-defined (e.g., storing creation timestamps, using an + /// incrementing ID, or another mechanism). + /// + /// If `last_key` is provided, listing starts after this key in creation order. If `None`, + /// listing starts from the most recently created entry. The `last_key` in the returned + /// [`PaginatedListResponse`] can be passed to subsequent calls to fetch the next page. + /// + /// Returns an empty list if `primary_namespace` or `secondary_namespace` is unknown or if + /// there are no more keys to return. + fn list_paginated( + &self, primary_namespace: &str, secondary_namespace: &str, last_key: Option, + ) -> impl Future> + 'static + MaybeSend; +} + /// Provides additional interface methods that are required for [`KVStore`]-to-[`KVStore`] /// data migration. pub trait MigratableKVStore: KVStoreSync { @@ -1565,7 +1733,7 @@ mod tests { use crate::ln::msgs::BaseMessageHandler; use crate::sync::Arc; use crate::util::test_channel_signer::TestChannelSigner; - use crate::util::test_utils::{self, TestStore}; + use crate::util::test_utils::{self, TestPaginatedStore, TestStore}; use bitcoin::hashes::hex::FromHex; use core::cmp; @@ -1975,4 +2143,78 @@ mod tests { let store: Arc = Arc::new(TestStore::new(false)); assert!(persist_fn::<_, TestChannelSigner>(Arc::clone(&store))); } + + #[test] + fn paginated_store_basic_operations() { + let store = TestPaginatedStore::new(10); + + // Write some data + store.write("ns1", "ns2", "key1", vec![1, 2, 3]).unwrap(); + store.write("ns1", "ns2", "key2", vec![4, 5, 6]).unwrap(); + + // Read it back + assert_eq!(KVStoreSync::read(&store, "ns1", "ns2", "key1").unwrap(), vec![1, 2, 3]); + assert_eq!(KVStoreSync::read(&store, "ns1", "ns2", "key2").unwrap(), vec![4, 5, 6]); + + // List should return keys in descending order + let response = store.list_paginated("ns1", "ns2", None).unwrap(); + assert_eq!(response.keys, vec!["key2", "key1"]); + assert!(response.last_key.is_none()); + + // Remove a key + KVStoreSync::remove(&store, "ns1", "ns2", "key1", false).unwrap(); + assert!(KVStoreSync::read(&store, "ns1", "ns2", "key1").is_err()); + } + + #[test] + fn paginated_store_pagination() { + let store = TestPaginatedStore::new(2); + + // Write 5 items with different order values + for i in 0..5i64 { + store.write("ns", "", &format!("key{i}"), vec![i as u8]).unwrap(); + } + + // First page should have 2 items (most recently created first: key4, key3) + let page1 = store.list_paginated("ns", "", None).unwrap(); + assert_eq!(page1.keys.len(), 2); + assert_eq!(page1.keys, vec!["key4", "key3"]); + assert!(page1.last_key.is_some()); + + // Second page + let page2 = store.list_paginated("ns", "", page1.last_key).unwrap(); + assert_eq!(page2.keys.len(), 2); + assert_eq!(page2.keys, vec!["key2", "key1"]); + assert!(page2.last_key.is_some()); + + // Third page (last item) + let page3 = store.list_paginated("ns", "", page2.last_key).unwrap(); + assert_eq!(page3.keys.len(), 1); + assert_eq!(page3.keys, vec!["key0"]); + assert!(page3.last_key.is_none()); + } + + #[test] + fn paginated_store_update_preserves_order() { + let store = TestPaginatedStore::new(10); + + // Write items with specific order values + store.write("ns", "", "key1", vec![1]).unwrap(); + store.write("ns", "", "key2", vec![2]).unwrap(); + store.write("ns", "", "key3", vec![3]).unwrap(); + + // Verify initial order (newest first) + let response = store.list_paginated("ns", "", None).unwrap(); + assert_eq!(response.keys, vec!["key3", "key2", "key1"]); + + // Update key1 with a new order value that would put it first if used + store.write("ns", "", "key1", vec![1, 1]).unwrap(); + + // Verify data was updated + assert_eq!(KVStoreSync::read(&store, "ns", "", "key1").unwrap(), vec![1, 1]); + + // Verify order is unchanged - creation order should have been preserved + let response = store.list_paginated("ns", "", None).unwrap(); + assert_eq!(response.keys, vec!["key3", "key2", "key1"]); + } } diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index 34f5d5fe36e..98dde154c3e 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -51,6 +51,7 @@ use crate::sign::{ChannelSigner, PeerStorageKey}; use crate::sync::RwLock; use crate::types::features::{ChannelFeatures, InitFeatures, NodeFeatures}; use crate::util::async_poll::MaybeSend; +use crate::util::atomic_counter::AtomicCounter; use crate::util::config::UserConfig; use crate::util::dyn_signer::{ DynKeysInterface, DynKeysInterfaceTrait, DynPhantomKeysInterface, DynSigner, @@ -58,7 +59,7 @@ use crate::util::dyn_signer::{ use crate::util::logger::{Logger, Record}; #[cfg(feature = "std")] use crate::util::mut_global::MutGlobal; -use crate::util::persist::{KVStore, KVStoreSync, MonitorName}; +use crate::util::persist::{KVStore, KVStoreSync, MonitorName, PaginatedListResponse}; use crate::util::ser::{Readable, ReadableArgs, Writeable, Writer}; use crate::util::test_channel_signer::{EnforcementState, TestChannelSigner}; use crate::util::wakers::Notifier; @@ -1124,6 +1125,121 @@ impl KVStoreSync for TestStore { unsafe impl Sync for TestStore {} unsafe impl Send for TestStore {} +/// A simple in-memory implementation of [`PaginatedKVStoreSync`] for testing. +/// +/// [`PaginatedKVStoreSync`]: crate::util::persist::PaginatedKVStoreSync +pub struct TestPaginatedStore { + data: Mutex)>>, + page_size: usize, + time_counter: AtomicCounter, +} + +impl TestPaginatedStore { + /// Creates a new `TestPaginatedStore` with the given page size. + pub fn new(page_size: usize) -> Self { + Self { data: Mutex::new(new_hash_map()), page_size, time_counter: AtomicCounter::new() } + } +} + +impl KVStoreSync for TestPaginatedStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Result, io::Error> { + let data = self.data.lock().unwrap(); + data.get(&(primary_namespace.to_string(), secondary_namespace.to_string(), key.to_string())) + .map(|(_, v)| v.clone()) + .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "key not found")) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> Result<(), io::Error> { + let mut data = self.data.lock().unwrap(); + let order = self.time_counter.next() as i64; + let key_tuple = + (primary_namespace.to_string(), secondary_namespace.to_string(), key.to_string()); + // Only use order for new entries; preserve existing order on updates + let order_to_use = + data.get(&key_tuple).map(|(existing_order, _)| *existing_order).unwrap_or(order); + data.insert(key_tuple, (order_to_use, buf)); + Ok(()) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool, + ) -> Result<(), io::Error> { + let mut data = self.data.lock().unwrap(); + data.remove(&( + primary_namespace.to_string(), + secondary_namespace.to_string(), + key.to_string(), + )); + Ok(()) + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> Result, io::Error> { + let mut all_keys = Vec::new(); + let mut last_key = None; + loop { + let response = crate::util::persist::PaginatedKVStoreSync::list_paginated( + self, + primary_namespace, + secondary_namespace, + last_key, + )?; + all_keys.extend(response.keys); + match response.last_key { + Some(key) => last_key = Some(key), + None => break, + } + } + Ok(all_keys) + } +} + +impl crate::util::persist::PaginatedKVStoreSync for TestPaginatedStore { + fn list_paginated( + &self, primary_namespace: &str, secondary_namespace: &str, last_key: Option, + ) -> Result { + let data = self.data.lock().unwrap(); + let mut entries: Vec<_> = data + .iter() + .filter(|((pn, sn, _), _)| pn == primary_namespace && sn == secondary_namespace) + .map(|((_, _, k), (t, _))| (k.clone(), *t)) + .collect(); + + // Sort by time descending, then by key + entries.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0))); + + // Apply pagination: find the first entry AFTER the given key in sort order. + let start_idx = if let Some(ref key) = last_key { + // Find the position of this key and start after it + entries.iter().position(|(k, _)| k == key).map(|pos| pos + 1).unwrap_or(0) + } else { + 0 + }; + + let page_entries: Vec<_> = + entries.into_iter().skip(start_idx).take(self.page_size).collect(); + + let response_last_key = if page_entries.len() == self.page_size { + page_entries.last().map(|(k, _)| k.clone()) + } else { + None + }; + + Ok(PaginatedListResponse { + keys: page_entries.into_iter().map(|(k, _)| k).collect(), + last_key: response_last_key, + }) + } +} + +unsafe impl Sync for TestPaginatedStore {} +unsafe impl Send for TestPaginatedStore {} + pub struct TestBroadcaster { pub txn_broadcasted: Mutex>, pub blocks: Arc>>,