From f1ff6c6eb88916e76ddc4b57a5e7b128f95f0787 Mon Sep 17 00:00:00 2001 From: benthecarman Date: Fri, 23 Jan 2026 13:47:22 -0600 Subject: [PATCH 1/2] Use dedicated storage types instead of proto defintions Previously we were storing payments and forwarded payments just using our proto definitions as the encoding. While this works, this will make updating the storage and/or the proto defintion harder as we have to respect backwards compatability for both. Splitting these out into two separate defintions will make this easier and safer in the future. --- Cargo.lock | 1 + ldk-server/Cargo.toml | 1 + ldk-server/src/api/list_forwarded_payments.rs | 53 +++--------- ldk-server/src/api/list_payments.rs | 45 ++-------- ldk-server/src/io/persist/mod.rs | 84 +++++++++++++++++++ ldk-server/src/io/persist/types.rs | 58 +++++++++++++ ldk-server/src/main.rs | 50 +++++++---- ldk-server/src/util/proto_adapter.rs | 49 +++++++++++ 8 files changed, 250 insertions(+), 91 deletions(-) create mode 100644 ldk-server/src/io/persist/types.rs diff --git a/Cargo.lock b/Cargo.lock index 538c9774..1e456cdd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1753,6 +1753,7 @@ dependencies = [ "lapin", "ldk-node", "ldk-server-protos", + "lightning-macros", "log", "prost", "rand 0.8.5", diff --git a/ldk-server/Cargo.toml b/ldk-server/Cargo.toml index 965c9604..cd0cf900 100644 --- a/ldk-server/Cargo.toml +++ b/ldk-server/Cargo.toml @@ -5,6 +5,7 @@ edition = "2021" [dependencies] ldk-node = { git = "https://github.com/lightningdevkit/ldk-node", rev = "d1bbf978c8b7abe87ae2e40793556c1fe4e7ea49" } +lightning-macros = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891" } serde = { version = "1.0.203", default-features = false, features = ["derive"] } hyper = { version = "1", default-features = false, features = ["server", "http1"] } http-body-util = { version = "0.1", default-features = false } diff --git a/ldk-server/src/api/list_forwarded_payments.rs b/ldk-server/src/api/list_forwarded_payments.rs index 78ce3dc4..4004f330 100644 --- a/ldk-server/src/api/list_forwarded_payments.rs +++ b/ldk-server/src/api/list_forwarded_payments.rs @@ -7,30 +7,21 @@ // You may not use this file except in accordance with one or both of these // licenses. -use bytes::Bytes; use ldk_server_protos::api::{ListForwardedPaymentsRequest, ListForwardedPaymentsResponse}; use ldk_server_protos::types::{ForwardedPayment, PageToken}; -use prost::Message; use crate::api::error::LdkServerError; use crate::api::error::LdkServerErrorCode::InternalServerError; -use crate::io::persist::{ - FORWARDED_PAYMENTS_PERSISTENCE_PRIMARY_NAMESPACE, - FORWARDED_PAYMENTS_PERSISTENCE_SECONDARY_NAMESPACE, -}; +use crate::io::persist; use crate::service::Context; +use crate::util::proto_adapter::stored_forwarded_payment_to_proto; pub(crate) fn handle_list_forwarded_payments_request( context: Context, request: ListForwardedPaymentsRequest, ) -> Result { let page_token = request.page_token.map(|p| (p.token, p.index)); - let list_response = context - .paginated_kv_store - .list( - FORWARDED_PAYMENTS_PERSISTENCE_PRIMARY_NAMESPACE, - FORWARDED_PAYMENTS_PERSISTENCE_SECONDARY_NAMESPACE, - page_token, - ) + + let list_response = persist::list_forwarded_payments(context.paginated_kv_store, page_token) .map_err(|e| { LdkServerError::new( InternalServerError, @@ -38,36 +29,16 @@ pub(crate) fn handle_list_forwarded_payments_request( ) })?; - let mut forwarded_payments: Vec = - Vec::with_capacity(list_response.keys.len()); - for key in list_response.keys { - let forwarded_payment_bytes = context - .paginated_kv_store - .read( - FORWARDED_PAYMENTS_PERSISTENCE_PRIMARY_NAMESPACE, - FORWARDED_PAYMENTS_PERSISTENCE_SECONDARY_NAMESPACE, - &key, - ) - .map_err(|e| { - LdkServerError::new( - InternalServerError, - format!("Failed to read forwarded payment data: {}", e), - ) - })?; - let forwarded_payment = ForwardedPayment::decode(Bytes::from(forwarded_payment_bytes)) - .map_err(|e| { - LdkServerError::new( - InternalServerError, - format!("Failed to decode forwarded payment: {}", e), - ) - })?; - forwarded_payments.push(forwarded_payment); - } - let response = ListForwardedPaymentsResponse { + let forwarded_payments: Vec = list_response + .forwarded_payments + .into_iter() + .map(stored_forwarded_payment_to_proto) + .collect(); + + Ok(ListForwardedPaymentsResponse { forwarded_payments, next_page_token: list_response .next_page_token .map(|(token, index)| PageToken { token, index }), - }; - Ok(response) + }) } diff --git a/ldk-server/src/api/list_payments.rs b/ldk-server/src/api/list_payments.rs index fbaf7c55..9543a144 100644 --- a/ldk-server/src/api/list_payments.rs +++ b/ldk-server/src/api/list_payments.rs @@ -7,58 +7,31 @@ // You may not use this file except in accordance with one or both of these // licenses. -use bytes::Bytes; use ldk_server_protos::api::{ListPaymentsRequest, ListPaymentsResponse}; use ldk_server_protos::types::{PageToken, Payment}; -use prost::Message; use crate::api::error::LdkServerError; use crate::api::error::LdkServerErrorCode::InternalServerError; -use crate::io::persist::{ - PAYMENTS_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENTS_PERSISTENCE_SECONDARY_NAMESPACE, -}; +use crate::io::persist; use crate::service::Context; +use crate::util::proto_adapter::payment_to_proto; pub(crate) fn handle_list_payments_request( context: Context, request: ListPaymentsRequest, ) -> Result { let page_token = request.page_token.map(|p| (p.token, p.index)); - let list_response = context - .paginated_kv_store - .list( - PAYMENTS_PERSISTENCE_PRIMARY_NAMESPACE, - PAYMENTS_PERSISTENCE_SECONDARY_NAMESPACE, - page_token, - ) - .map_err(|e| { + + let list_response = + persist::list_payments(context.paginated_kv_store, page_token).map_err(|e| { LdkServerError::new(InternalServerError, format!("Failed to list payments: {}", e)) })?; - let mut payments: Vec = Vec::with_capacity(list_response.keys.len()); - for key in list_response.keys { - let payment_bytes = context - .paginated_kv_store - .read( - PAYMENTS_PERSISTENCE_PRIMARY_NAMESPACE, - PAYMENTS_PERSISTENCE_SECONDARY_NAMESPACE, - &key, - ) - .map_err(|e| { - LdkServerError::new( - InternalServerError, - format!("Failed to read payment data: {}", e), - ) - })?; - let payment = Payment::decode(Bytes::from(payment_bytes)).map_err(|e| { - LdkServerError::new(InternalServerError, format!("Failed to decode payment: {}", e)) - })?; - payments.push(payment); - } - let response = ListPaymentsResponse { + let payments: Vec = list_response.payments.into_iter().map(payment_to_proto).collect(); + + Ok(ListPaymentsResponse { payments, next_page_token: list_response .next_page_token .map(|(token, index)| PageToken { token, index }), - }; - Ok(response) + }) } diff --git a/ldk-server/src/io/persist/mod.rs b/ldk-server/src/io/persist/mod.rs index 6c01795b..81125c3e 100644 --- a/ldk-server/src/io/persist/mod.rs +++ b/ldk-server/src/io/persist/mod.rs @@ -9,6 +9,16 @@ pub(crate) mod paginated_kv_store; pub(crate) mod sqlite_store; +pub(crate) mod types; + +use std::io; +use std::sync::Arc; + +use ldk_node::lightning::util::ser::Readable; +use ldk_node::payment::PaymentDetails; + +use paginated_kv_store::PaginatedKVStore; +use types::StoredForwardedPayment; /// The forwarded payments will be persisted under this prefix. pub(crate) const FORWARDED_PAYMENTS_PERSISTENCE_PRIMARY_NAMESPACE: &str = "forwarded_payments"; @@ -17,3 +27,77 @@ pub(crate) const FORWARDED_PAYMENTS_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; /// The payments will be persisted under this prefix. pub(crate) const PAYMENTS_PERSISTENCE_PRIMARY_NAMESPACE: &str = "payments"; pub(crate) const PAYMENTS_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; + +/// Response from listing payments. +pub(crate) struct ListPaymentsResponse { + pub payments: Vec, + pub next_page_token: Option<(String, i64)>, +} + +/// Response from listing forwarded payments. +pub(crate) struct ListForwardedPaymentsResponse { + pub forwarded_payments: Vec, + pub next_page_token: Option<(String, i64)>, +} + +/// List and deserialize payments from the store. +pub(crate) fn list_payments( + store: Arc, page_token: Option<(String, i64)>, +) -> Result { + let list_response = store.list( + PAYMENTS_PERSISTENCE_PRIMARY_NAMESPACE, + PAYMENTS_PERSISTENCE_SECONDARY_NAMESPACE, + page_token, + )?; + + let mut payments = Vec::with_capacity(list_response.keys.len()); + for key in list_response.keys { + let payment_bytes = store.read( + PAYMENTS_PERSISTENCE_PRIMARY_NAMESPACE, + PAYMENTS_PERSISTENCE_SECONDARY_NAMESPACE, + &key, + )?; + + let mut cursor = io::Cursor::new(&payment_bytes); + let payment = PaymentDetails::read(&mut cursor).map_err(|e| { + io::Error::new(io::ErrorKind::InvalidData, format!("Failed to decode payment: {}", e)) + })?; + payments.push(payment); + } + + Ok(ListPaymentsResponse { payments, next_page_token: list_response.next_page_token }) +} + +/// List and deserialize forwarded payments from the store. +pub(crate) fn list_forwarded_payments( + store: Arc, page_token: Option<(String, i64)>, +) -> Result { + let list_response = store.list( + FORWARDED_PAYMENTS_PERSISTENCE_PRIMARY_NAMESPACE, + FORWARDED_PAYMENTS_PERSISTENCE_SECONDARY_NAMESPACE, + page_token, + )?; + + let mut forwarded_payments = Vec::with_capacity(list_response.keys.len()); + for key in list_response.keys { + let payment_bytes = store.read( + FORWARDED_PAYMENTS_PERSISTENCE_PRIMARY_NAMESPACE, + FORWARDED_PAYMENTS_PERSISTENCE_SECONDARY_NAMESPACE, + &key, + )?; + + let mut cursor = io::Cursor::new(&payment_bytes); + let payment = StoredForwardedPayment::read(&mut cursor).map_err(|e| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("Failed to decode forwarded payment: {}", e), + ) + })?; + forwarded_payments.push(payment); + } + + Ok(ListForwardedPaymentsResponse { + forwarded_payments, + next_page_token: list_response.next_page_token, + }) +} diff --git a/ldk-server/src/io/persist/types.rs b/ldk-server/src/io/persist/types.rs new file mode 100644 index 00000000..78df334f --- /dev/null +++ b/ldk-server/src/io/persist/types.rs @@ -0,0 +1,58 @@ +// This file is Copyright its original authors, visible in version control +// history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license +// , at your option. +// You may not use this file except in accordance with one or both of these +// licenses. + +//! Storage types for persisting payment data. +//! +//! These types are separate from the proto definitions to decouple the storage format +//! from the API format. This allows the storage schema to evolve independently and +//! provides better control over backwards compatibility. + +use ldk_node::lightning::impl_writeable_tlv_based; +use ldk_node::lightning::routing::gossip::NodeId; + +/// A forwarded payment stored in the database. +/// +/// This type is needed because ldk-node doesn't persist forwarded payment events - +/// it only emits them. We need our own storage type to track forwarding history. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct StoredForwardedPayment { + /// The channel id of the incoming channel. + pub prev_channel_id: [u8; 32], + /// The channel id of the outgoing channel. + pub next_channel_id: [u8; 32], + /// The user_channel_id of the incoming channel. + pub prev_user_channel_id: u128, + /// The user_channel_id of the outgoing channel. + pub next_user_channel_id: Option, + /// The node id of the previous node. + pub prev_node_id: NodeId, + /// The node id of the next node. + pub next_node_id: NodeId, + /// The total fee earned in millisatoshis. + pub total_fee_earned_msat: Option, + /// The skimmed fee in millisatoshis. + pub skimmed_fee_msat: Option, + /// Whether the payment was claimed from an on-chain transaction. + pub claim_from_onchain_tx: bool, + /// The outbound amount forwarded in millisatoshis. + pub outbound_amount_forwarded_msat: Option, +} + +impl_writeable_tlv_based!(StoredForwardedPayment, { + (0, prev_channel_id, required), + (2, next_channel_id, required), + (4, prev_user_channel_id, required), + (6, next_user_channel_id, option), + (8, prev_node_id, required), + (10, next_node_id, required), + (12, total_fee_earned_msat, option), + (14, skimmed_fee_msat, option), + (16, claim_from_onchain_tx, required), + (18, outbound_amount_forwarded_msat, option), +}); diff --git a/ldk-server/src/main.rs b/ldk-server/src/main.rs index ba0731c9..ac1fc5a7 100644 --- a/ldk-server/src/main.rs +++ b/ldk-server/src/main.rs @@ -25,12 +25,12 @@ use ldk_node::bitcoin::Network; use ldk_node::config::Config; use ldk_node::entropy::NodeEntropy; use ldk_node::lightning::ln::channelmanager::PaymentId; +use ldk_node::lightning::util::ser::Writeable; use ldk_node::{Builder, Event, Node}; use ldk_server_protos::events; use ldk_server_protos::events::{event_envelope, EventEnvelope}; use ldk_server_protos::types::Payment; use log::{debug, error, info}; -use prost::Message; use rand::Rng; use tokio::net::TcpListener; use tokio::select; @@ -50,7 +50,9 @@ use crate::io::persist::{ use crate::service::NodeService; use crate::util::config::{load_config, ChainSource}; use crate::util::logger::ServerLogger; -use crate::util::proto_adapter::{forwarded_payment_to_proto, payment_to_proto}; +use crate::util::proto_adapter::{ + forwarded_payment_to_proto, forwarded_payment_to_stored, payment_to_proto, +}; use crate::util::tls::get_or_generate_tls_config; const DEFAULT_CONFIG_FILE: &str = "config.toml"; @@ -369,8 +371,7 @@ fn main() { }, Event::PaymentClaimable {payment_id, ..} => { if let Some(payment_details) = event_node.payment(&payment_id) { - let payment = payment_to_proto(payment_details); - upsert_payment_details(&event_node, Arc::clone(&paginated_store), &payment); + upsert_payment_details(&event_node, Arc::clone(&paginated_store), payment_details); } else { error!("Unable to find payment with paymentId: {payment_id}"); } @@ -392,7 +393,22 @@ fn main() { outbound_amount_forwarded_msat.unwrap_or(0), total_fee_earned_msat.unwrap_or(0), prev_channel_id, next_channel_id ); - let forwarded_payment = forwarded_payment_to_proto( + // Create proto for event publishing + let forwarded_payment_proto = forwarded_payment_to_proto( + prev_channel_id, + next_channel_id, + prev_user_channel_id, + next_user_channel_id, + prev_node_id, + next_node_id, + total_fee_earned_msat, + skimmed_fee_msat, + claim_from_onchain_tx, + outbound_amount_forwarded_msat + ); + + // Create storage type for persistence + let forwarded_payment_stored = forwarded_payment_to_stored( prev_channel_id, next_channel_id, prev_user_channel_id, @@ -415,7 +431,7 @@ fn main() { match event_publisher.publish(EventEnvelope { event: Some(event_envelope::Event::PaymentForwarded(events::PaymentForwarded { - forwarded_payment: Some(forwarded_payment.clone()), + forwarded_payment: Some(forwarded_payment_proto), })), }).await { Ok(_) => {}, @@ -425,10 +441,12 @@ fn main() { } }; - match paginated_store.write(FORWARDED_PAYMENTS_PERSISTENCE_PRIMARY_NAMESPACE,FORWARDED_PAYMENTS_PERSISTENCE_SECONDARY_NAMESPACE, + match paginated_store.write( + FORWARDED_PAYMENTS_PERSISTENCE_PRIMARY_NAMESPACE, + FORWARDED_PAYMENTS_PERSISTENCE_SECONDARY_NAMESPACE, &forwarded_payment_id.to_lower_hex_string(), forwarded_payment_creation_time, - &forwarded_payment.encode_to_vec(), + &forwarded_payment_stored.encode(), ) { Ok(_) => { if let Err(e) = event_node.event_handled() { @@ -494,9 +512,10 @@ async fn publish_event_and_upsert_payment( paginated_store: Arc, ) { if let Some(payment_details) = event_node.payment(payment_id) { - let payment = payment_to_proto(payment_details); + // Create proto for event publishing + let payment_proto = payment_to_proto(payment_details.clone()); - let event = payment_to_event(&payment); + let event = payment_to_event(&payment_proto); let event_name = get_event_name(&event); match event_publisher.publish(EventEnvelope { event: Some(event) }).await { Ok(_) => {}, @@ -506,24 +525,27 @@ async fn publish_event_and_upsert_payment( }, }; - upsert_payment_details(event_node, Arc::clone(&paginated_store), &payment); + upsert_payment_details(event_node, Arc::clone(&paginated_store), payment_details); } else { error!("Unable to find payment with paymentId: {payment_id}"); } } fn upsert_payment_details( - event_node: &Node, paginated_store: Arc, payment: &Payment, + event_node: &Node, paginated_store: Arc, + payment_details: ldk_node::payment::PaymentDetails, ) { let time = SystemTime::now().duration_since(UNIX_EPOCH).expect("Time must be > 1970").as_secs() as i64; + let payment_id = payment_details.id.to_string(); + match paginated_store.write( PAYMENTS_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENTS_PERSISTENCE_SECONDARY_NAMESPACE, - &payment.id, + &payment_id, time, - &payment.encode_to_vec(), + &payment_details.encode(), ) { Ok(_) => { if let Err(e) = event_node.event_handled() { diff --git a/ldk-server/src/util/proto_adapter.rs b/ldk-server/src/util/proto_adapter.rs index 2eece481..08027b47 100644 --- a/ldk-server/src/util/proto_adapter.rs +++ b/ldk-server/src/util/proto_adapter.rs @@ -14,6 +14,7 @@ use ldk_node::bitcoin::hashes::sha256; use ldk_node::bitcoin::secp256k1::PublicKey; use ldk_node::config::{ChannelConfig, MaxDustHTLCExposure}; use ldk_node::lightning::ln::types::ChannelId; +use ldk_node::lightning::routing::gossip::NodeId; use ldk_node::lightning_invoice::{Bolt11InvoiceDescription, Description, Sha256}; use ldk_node::payment::{ ConfirmationStatus, PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus, @@ -39,6 +40,7 @@ use crate::api::error::LdkServerError; use crate::api::error::LdkServerErrorCode::{ AuthError, InternalServerError, InvalidRequestError, LightningError, }; +use crate::io::persist::types::StoredForwardedPayment; pub(crate) fn channel_to_proto(channel: ChannelDetails) -> Channel { Channel { @@ -460,3 +462,50 @@ pub(crate) fn to_error_response(ldk_error: LdkServerError) -> (ErrorResponse, St (error_response, status) } + +/// Convert event parameters to a StoredForwardedPayment for persistence. +#[allow(clippy::too_many_arguments)] +pub(crate) fn forwarded_payment_to_stored( + prev_channel_id: ChannelId, next_channel_id: ChannelId, + prev_user_channel_id: Option, next_user_channel_id: Option, + prev_node_id: Option, next_node_id: Option, + total_fee_earned_msat: Option, skimmed_fee_msat: Option, claim_from_onchain_tx: bool, + outbound_amount_forwarded_msat: Option, +) -> StoredForwardedPayment { + StoredForwardedPayment { + prev_channel_id: prev_channel_id.0, + next_channel_id: next_channel_id.0, + prev_user_channel_id: prev_user_channel_id + .expect("prev_user_channel_id expected for ldk-server >=0.1") + .0, + next_user_channel_id: next_user_channel_id.map(|u| u.0), + prev_node_id: NodeId::from_pubkey( + &prev_node_id.expect("prev_node_id expected for ldk-server >=0.1"), + ), + next_node_id: NodeId::from_pubkey( + &next_node_id.expect("next_node_id expected for ldk-node >=0.1"), + ), + total_fee_earned_msat, + skimmed_fee_msat, + claim_from_onchain_tx, + outbound_amount_forwarded_msat, + } +} + +/// Convert a StoredForwardedPayment to a proto ForwardedPayment for API responses. +pub(crate) fn stored_forwarded_payment_to_proto( + stored: StoredForwardedPayment, +) -> ForwardedPayment { + ForwardedPayment { + prev_channel_id: stored.prev_channel_id.to_lower_hex_string(), + next_channel_id: stored.next_channel_id.to_lower_hex_string(), + prev_user_channel_id: stored.prev_user_channel_id.to_string(), + next_user_channel_id: stored.next_user_channel_id.map(|u| u.to_string()), + prev_node_id: stored.prev_node_id.as_slice().to_lower_hex_string(), + next_node_id: stored.next_node_id.as_slice().to_lower_hex_string(), + total_fee_earned_msat: stored.total_fee_earned_msat, + skimmed_fee_msat: stored.skimmed_fee_msat, + claim_from_onchain_tx: stored.claim_from_onchain_tx, + outbound_amount_forwarded_msat: stored.outbound_amount_forwarded_msat, + } +} From ac45a9fe11ecfb523805e9700e2c6501da52d181 Mon Sep 17 00:00:00 2001 From: benthecarman Date: Fri, 23 Jan 2026 14:00:56 -0600 Subject: [PATCH 2/2] Allow clippy::drop_non_drop in CI The impl_writeable_tlv_based macro always creates this warning so we ignore it in CI --- .github/workflows/build.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index b6f8a9fa..73aeb5df 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -40,7 +40,7 @@ jobs: run: cargo build --verbose --color always - name: Check clippy if on msrv if: matrix.msrv - run: cargo clippy --all-features -- -D warnings + run: cargo clippy --all-features -- -D warnings -A clippy::drop_non_drop - name: Test on Rust ${{ matrix.toolchain }} run: cargo test - name: Cargo check release on Rust ${{ matrix.toolchain }}