diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index b6f8a9fa..73aeb5df 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -40,7 +40,7 @@ jobs: run: cargo build --verbose --color always - name: Check clippy if on msrv if: matrix.msrv - run: cargo clippy --all-features -- -D warnings + run: cargo clippy --all-features -- -D warnings -A clippy::drop_non_drop - name: Test on Rust ${{ matrix.toolchain }} run: cargo test - name: Cargo check release on Rust ${{ matrix.toolchain }} diff --git a/Cargo.lock b/Cargo.lock index 538c9774..1e456cdd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1753,6 +1753,7 @@ dependencies = [ "lapin", "ldk-node", "ldk-server-protos", + "lightning-macros", "log", "prost", "rand 0.8.5", diff --git a/ldk-server/Cargo.toml b/ldk-server/Cargo.toml index 965c9604..cd0cf900 100644 --- a/ldk-server/Cargo.toml +++ b/ldk-server/Cargo.toml @@ -5,6 +5,7 @@ edition = "2021" [dependencies] ldk-node = { git = "https://github.com/lightningdevkit/ldk-node", rev = "d1bbf978c8b7abe87ae2e40793556c1fe4e7ea49" } +lightning-macros = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891" } serde = { version = "1.0.203", default-features = false, features = ["derive"] } hyper = { version = "1", default-features = false, features = ["server", "http1"] } http-body-util = { version = "0.1", default-features = false } diff --git a/ldk-server/src/api/list_forwarded_payments.rs b/ldk-server/src/api/list_forwarded_payments.rs index 78ce3dc4..4004f330 100644 --- a/ldk-server/src/api/list_forwarded_payments.rs +++ b/ldk-server/src/api/list_forwarded_payments.rs @@ -7,30 +7,21 @@ // You may not use this file except in accordance with one or both of these // licenses. -use bytes::Bytes; use ldk_server_protos::api::{ListForwardedPaymentsRequest, ListForwardedPaymentsResponse}; use ldk_server_protos::types::{ForwardedPayment, PageToken}; -use prost::Message; use crate::api::error::LdkServerError; use crate::api::error::LdkServerErrorCode::InternalServerError; -use crate::io::persist::{ - FORWARDED_PAYMENTS_PERSISTENCE_PRIMARY_NAMESPACE, - FORWARDED_PAYMENTS_PERSISTENCE_SECONDARY_NAMESPACE, -}; +use crate::io::persist; use crate::service::Context; +use crate::util::proto_adapter::stored_forwarded_payment_to_proto; pub(crate) fn handle_list_forwarded_payments_request( context: Context, request: ListForwardedPaymentsRequest, ) -> Result { let page_token = request.page_token.map(|p| (p.token, p.index)); - let list_response = context - .paginated_kv_store - .list( - FORWARDED_PAYMENTS_PERSISTENCE_PRIMARY_NAMESPACE, - FORWARDED_PAYMENTS_PERSISTENCE_SECONDARY_NAMESPACE, - page_token, - ) + + let list_response = persist::list_forwarded_payments(context.paginated_kv_store, page_token) .map_err(|e| { LdkServerError::new( InternalServerError, @@ -38,36 +29,16 @@ pub(crate) fn handle_list_forwarded_payments_request( ) })?; - let mut forwarded_payments: Vec = - Vec::with_capacity(list_response.keys.len()); - for key in list_response.keys { - let forwarded_payment_bytes = context - .paginated_kv_store - .read( - FORWARDED_PAYMENTS_PERSISTENCE_PRIMARY_NAMESPACE, - FORWARDED_PAYMENTS_PERSISTENCE_SECONDARY_NAMESPACE, - &key, - ) - .map_err(|e| { - LdkServerError::new( - InternalServerError, - format!("Failed to read forwarded payment data: {}", e), - ) - })?; - let forwarded_payment = ForwardedPayment::decode(Bytes::from(forwarded_payment_bytes)) - .map_err(|e| { - LdkServerError::new( - InternalServerError, - format!("Failed to decode forwarded payment: {}", e), - ) - })?; - forwarded_payments.push(forwarded_payment); - } - let response = ListForwardedPaymentsResponse { + let forwarded_payments: Vec = list_response + .forwarded_payments + .into_iter() + .map(stored_forwarded_payment_to_proto) + .collect(); + + Ok(ListForwardedPaymentsResponse { forwarded_payments, next_page_token: list_response .next_page_token .map(|(token, index)| PageToken { token, index }), - }; - Ok(response) + }) } diff --git a/ldk-server/src/api/list_payments.rs b/ldk-server/src/api/list_payments.rs index fbaf7c55..9543a144 100644 --- a/ldk-server/src/api/list_payments.rs +++ b/ldk-server/src/api/list_payments.rs @@ -7,58 +7,31 @@ // You may not use this file except in accordance with one or both of these // licenses. -use bytes::Bytes; use ldk_server_protos::api::{ListPaymentsRequest, ListPaymentsResponse}; use ldk_server_protos::types::{PageToken, Payment}; -use prost::Message; use crate::api::error::LdkServerError; use crate::api::error::LdkServerErrorCode::InternalServerError; -use crate::io::persist::{ - PAYMENTS_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENTS_PERSISTENCE_SECONDARY_NAMESPACE, -}; +use crate::io::persist; use crate::service::Context; +use crate::util::proto_adapter::payment_to_proto; pub(crate) fn handle_list_payments_request( context: Context, request: ListPaymentsRequest, ) -> Result { let page_token = request.page_token.map(|p| (p.token, p.index)); - let list_response = context - .paginated_kv_store - .list( - PAYMENTS_PERSISTENCE_PRIMARY_NAMESPACE, - PAYMENTS_PERSISTENCE_SECONDARY_NAMESPACE, - page_token, - ) - .map_err(|e| { + + let list_response = + persist::list_payments(context.paginated_kv_store, page_token).map_err(|e| { LdkServerError::new(InternalServerError, format!("Failed to list payments: {}", e)) })?; - let mut payments: Vec = Vec::with_capacity(list_response.keys.len()); - for key in list_response.keys { - let payment_bytes = context - .paginated_kv_store - .read( - PAYMENTS_PERSISTENCE_PRIMARY_NAMESPACE, - PAYMENTS_PERSISTENCE_SECONDARY_NAMESPACE, - &key, - ) - .map_err(|e| { - LdkServerError::new( - InternalServerError, - format!("Failed to read payment data: {}", e), - ) - })?; - let payment = Payment::decode(Bytes::from(payment_bytes)).map_err(|e| { - LdkServerError::new(InternalServerError, format!("Failed to decode payment: {}", e)) - })?; - payments.push(payment); - } - let response = ListPaymentsResponse { + let payments: Vec = list_response.payments.into_iter().map(payment_to_proto).collect(); + + Ok(ListPaymentsResponse { payments, next_page_token: list_response .next_page_token .map(|(token, index)| PageToken { token, index }), - }; - Ok(response) + }) } diff --git a/ldk-server/src/io/persist/mod.rs b/ldk-server/src/io/persist/mod.rs index 6c01795b..81125c3e 100644 --- a/ldk-server/src/io/persist/mod.rs +++ b/ldk-server/src/io/persist/mod.rs @@ -9,6 +9,16 @@ pub(crate) mod paginated_kv_store; pub(crate) mod sqlite_store; +pub(crate) mod types; + +use std::io; +use std::sync::Arc; + +use ldk_node::lightning::util::ser::Readable; +use ldk_node::payment::PaymentDetails; + +use paginated_kv_store::PaginatedKVStore; +use types::StoredForwardedPayment; /// The forwarded payments will be persisted under this prefix. pub(crate) const FORWARDED_PAYMENTS_PERSISTENCE_PRIMARY_NAMESPACE: &str = "forwarded_payments"; @@ -17,3 +27,77 @@ pub(crate) const FORWARDED_PAYMENTS_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; /// The payments will be persisted under this prefix. pub(crate) const PAYMENTS_PERSISTENCE_PRIMARY_NAMESPACE: &str = "payments"; pub(crate) const PAYMENTS_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; + +/// Response from listing payments. +pub(crate) struct ListPaymentsResponse { + pub payments: Vec, + pub next_page_token: Option<(String, i64)>, +} + +/// Response from listing forwarded payments. +pub(crate) struct ListForwardedPaymentsResponse { + pub forwarded_payments: Vec, + pub next_page_token: Option<(String, i64)>, +} + +/// List and deserialize payments from the store. +pub(crate) fn list_payments( + store: Arc, page_token: Option<(String, i64)>, +) -> Result { + let list_response = store.list( + PAYMENTS_PERSISTENCE_PRIMARY_NAMESPACE, + PAYMENTS_PERSISTENCE_SECONDARY_NAMESPACE, + page_token, + )?; + + let mut payments = Vec::with_capacity(list_response.keys.len()); + for key in list_response.keys { + let payment_bytes = store.read( + PAYMENTS_PERSISTENCE_PRIMARY_NAMESPACE, + PAYMENTS_PERSISTENCE_SECONDARY_NAMESPACE, + &key, + )?; + + let mut cursor = io::Cursor::new(&payment_bytes); + let payment = PaymentDetails::read(&mut cursor).map_err(|e| { + io::Error::new(io::ErrorKind::InvalidData, format!("Failed to decode payment: {}", e)) + })?; + payments.push(payment); + } + + Ok(ListPaymentsResponse { payments, next_page_token: list_response.next_page_token }) +} + +/// List and deserialize forwarded payments from the store. +pub(crate) fn list_forwarded_payments( + store: Arc, page_token: Option<(String, i64)>, +) -> Result { + let list_response = store.list( + FORWARDED_PAYMENTS_PERSISTENCE_PRIMARY_NAMESPACE, + FORWARDED_PAYMENTS_PERSISTENCE_SECONDARY_NAMESPACE, + page_token, + )?; + + let mut forwarded_payments = Vec::with_capacity(list_response.keys.len()); + for key in list_response.keys { + let payment_bytes = store.read( + FORWARDED_PAYMENTS_PERSISTENCE_PRIMARY_NAMESPACE, + FORWARDED_PAYMENTS_PERSISTENCE_SECONDARY_NAMESPACE, + &key, + )?; + + let mut cursor = io::Cursor::new(&payment_bytes); + let payment = StoredForwardedPayment::read(&mut cursor).map_err(|e| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("Failed to decode forwarded payment: {}", e), + ) + })?; + forwarded_payments.push(payment); + } + + Ok(ListForwardedPaymentsResponse { + forwarded_payments, + next_page_token: list_response.next_page_token, + }) +} diff --git a/ldk-server/src/io/persist/types.rs b/ldk-server/src/io/persist/types.rs new file mode 100644 index 00000000..78df334f --- /dev/null +++ b/ldk-server/src/io/persist/types.rs @@ -0,0 +1,58 @@ +// This file is Copyright its original authors, visible in version control +// history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license +// , at your option. +// You may not use this file except in accordance with one or both of these +// licenses. + +//! Storage types for persisting payment data. +//! +//! These types are separate from the proto definitions to decouple the storage format +//! from the API format. This allows the storage schema to evolve independently and +//! provides better control over backwards compatibility. + +use ldk_node::lightning::impl_writeable_tlv_based; +use ldk_node::lightning::routing::gossip::NodeId; + +/// A forwarded payment stored in the database. +/// +/// This type is needed because ldk-node doesn't persist forwarded payment events - +/// it only emits them. We need our own storage type to track forwarding history. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct StoredForwardedPayment { + /// The channel id of the incoming channel. + pub prev_channel_id: [u8; 32], + /// The channel id of the outgoing channel. + pub next_channel_id: [u8; 32], + /// The user_channel_id of the incoming channel. + pub prev_user_channel_id: u128, + /// The user_channel_id of the outgoing channel. + pub next_user_channel_id: Option, + /// The node id of the previous node. + pub prev_node_id: NodeId, + /// The node id of the next node. + pub next_node_id: NodeId, + /// The total fee earned in millisatoshis. + pub total_fee_earned_msat: Option, + /// The skimmed fee in millisatoshis. + pub skimmed_fee_msat: Option, + /// Whether the payment was claimed from an on-chain transaction. + pub claim_from_onchain_tx: bool, + /// The outbound amount forwarded in millisatoshis. + pub outbound_amount_forwarded_msat: Option, +} + +impl_writeable_tlv_based!(StoredForwardedPayment, { + (0, prev_channel_id, required), + (2, next_channel_id, required), + (4, prev_user_channel_id, required), + (6, next_user_channel_id, option), + (8, prev_node_id, required), + (10, next_node_id, required), + (12, total_fee_earned_msat, option), + (14, skimmed_fee_msat, option), + (16, claim_from_onchain_tx, required), + (18, outbound_amount_forwarded_msat, option), +}); diff --git a/ldk-server/src/main.rs b/ldk-server/src/main.rs index ba0731c9..ac1fc5a7 100644 --- a/ldk-server/src/main.rs +++ b/ldk-server/src/main.rs @@ -25,12 +25,12 @@ use ldk_node::bitcoin::Network; use ldk_node::config::Config; use ldk_node::entropy::NodeEntropy; use ldk_node::lightning::ln::channelmanager::PaymentId; +use ldk_node::lightning::util::ser::Writeable; use ldk_node::{Builder, Event, Node}; use ldk_server_protos::events; use ldk_server_protos::events::{event_envelope, EventEnvelope}; use ldk_server_protos::types::Payment; use log::{debug, error, info}; -use prost::Message; use rand::Rng; use tokio::net::TcpListener; use tokio::select; @@ -50,7 +50,9 @@ use crate::io::persist::{ use crate::service::NodeService; use crate::util::config::{load_config, ChainSource}; use crate::util::logger::ServerLogger; -use crate::util::proto_adapter::{forwarded_payment_to_proto, payment_to_proto}; +use crate::util::proto_adapter::{ + forwarded_payment_to_proto, forwarded_payment_to_stored, payment_to_proto, +}; use crate::util::tls::get_or_generate_tls_config; const DEFAULT_CONFIG_FILE: &str = "config.toml"; @@ -369,8 +371,7 @@ fn main() { }, Event::PaymentClaimable {payment_id, ..} => { if let Some(payment_details) = event_node.payment(&payment_id) { - let payment = payment_to_proto(payment_details); - upsert_payment_details(&event_node, Arc::clone(&paginated_store), &payment); + upsert_payment_details(&event_node, Arc::clone(&paginated_store), payment_details); } else { error!("Unable to find payment with paymentId: {payment_id}"); } @@ -392,7 +393,22 @@ fn main() { outbound_amount_forwarded_msat.unwrap_or(0), total_fee_earned_msat.unwrap_or(0), prev_channel_id, next_channel_id ); - let forwarded_payment = forwarded_payment_to_proto( + // Create proto for event publishing + let forwarded_payment_proto = forwarded_payment_to_proto( + prev_channel_id, + next_channel_id, + prev_user_channel_id, + next_user_channel_id, + prev_node_id, + next_node_id, + total_fee_earned_msat, + skimmed_fee_msat, + claim_from_onchain_tx, + outbound_amount_forwarded_msat + ); + + // Create storage type for persistence + let forwarded_payment_stored = forwarded_payment_to_stored( prev_channel_id, next_channel_id, prev_user_channel_id, @@ -415,7 +431,7 @@ fn main() { match event_publisher.publish(EventEnvelope { event: Some(event_envelope::Event::PaymentForwarded(events::PaymentForwarded { - forwarded_payment: Some(forwarded_payment.clone()), + forwarded_payment: Some(forwarded_payment_proto), })), }).await { Ok(_) => {}, @@ -425,10 +441,12 @@ fn main() { } }; - match paginated_store.write(FORWARDED_PAYMENTS_PERSISTENCE_PRIMARY_NAMESPACE,FORWARDED_PAYMENTS_PERSISTENCE_SECONDARY_NAMESPACE, + match paginated_store.write( + FORWARDED_PAYMENTS_PERSISTENCE_PRIMARY_NAMESPACE, + FORWARDED_PAYMENTS_PERSISTENCE_SECONDARY_NAMESPACE, &forwarded_payment_id.to_lower_hex_string(), forwarded_payment_creation_time, - &forwarded_payment.encode_to_vec(), + &forwarded_payment_stored.encode(), ) { Ok(_) => { if let Err(e) = event_node.event_handled() { @@ -494,9 +512,10 @@ async fn publish_event_and_upsert_payment( paginated_store: Arc, ) { if let Some(payment_details) = event_node.payment(payment_id) { - let payment = payment_to_proto(payment_details); + // Create proto for event publishing + let payment_proto = payment_to_proto(payment_details.clone()); - let event = payment_to_event(&payment); + let event = payment_to_event(&payment_proto); let event_name = get_event_name(&event); match event_publisher.publish(EventEnvelope { event: Some(event) }).await { Ok(_) => {}, @@ -506,24 +525,27 @@ async fn publish_event_and_upsert_payment( }, }; - upsert_payment_details(event_node, Arc::clone(&paginated_store), &payment); + upsert_payment_details(event_node, Arc::clone(&paginated_store), payment_details); } else { error!("Unable to find payment with paymentId: {payment_id}"); } } fn upsert_payment_details( - event_node: &Node, paginated_store: Arc, payment: &Payment, + event_node: &Node, paginated_store: Arc, + payment_details: ldk_node::payment::PaymentDetails, ) { let time = SystemTime::now().duration_since(UNIX_EPOCH).expect("Time must be > 1970").as_secs() as i64; + let payment_id = payment_details.id.to_string(); + match paginated_store.write( PAYMENTS_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENTS_PERSISTENCE_SECONDARY_NAMESPACE, - &payment.id, + &payment_id, time, - &payment.encode_to_vec(), + &payment_details.encode(), ) { Ok(_) => { if let Err(e) = event_node.event_handled() { diff --git a/ldk-server/src/util/proto_adapter.rs b/ldk-server/src/util/proto_adapter.rs index 2eece481..08027b47 100644 --- a/ldk-server/src/util/proto_adapter.rs +++ b/ldk-server/src/util/proto_adapter.rs @@ -14,6 +14,7 @@ use ldk_node::bitcoin::hashes::sha256; use ldk_node::bitcoin::secp256k1::PublicKey; use ldk_node::config::{ChannelConfig, MaxDustHTLCExposure}; use ldk_node::lightning::ln::types::ChannelId; +use ldk_node::lightning::routing::gossip::NodeId; use ldk_node::lightning_invoice::{Bolt11InvoiceDescription, Description, Sha256}; use ldk_node::payment::{ ConfirmationStatus, PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus, @@ -39,6 +40,7 @@ use crate::api::error::LdkServerError; use crate::api::error::LdkServerErrorCode::{ AuthError, InternalServerError, InvalidRequestError, LightningError, }; +use crate::io::persist::types::StoredForwardedPayment; pub(crate) fn channel_to_proto(channel: ChannelDetails) -> Channel { Channel { @@ -460,3 +462,50 @@ pub(crate) fn to_error_response(ldk_error: LdkServerError) -> (ErrorResponse, St (error_response, status) } + +/// Convert event parameters to a StoredForwardedPayment for persistence. +#[allow(clippy::too_many_arguments)] +pub(crate) fn forwarded_payment_to_stored( + prev_channel_id: ChannelId, next_channel_id: ChannelId, + prev_user_channel_id: Option, next_user_channel_id: Option, + prev_node_id: Option, next_node_id: Option, + total_fee_earned_msat: Option, skimmed_fee_msat: Option, claim_from_onchain_tx: bool, + outbound_amount_forwarded_msat: Option, +) -> StoredForwardedPayment { + StoredForwardedPayment { + prev_channel_id: prev_channel_id.0, + next_channel_id: next_channel_id.0, + prev_user_channel_id: prev_user_channel_id + .expect("prev_user_channel_id expected for ldk-server >=0.1") + .0, + next_user_channel_id: next_user_channel_id.map(|u| u.0), + prev_node_id: NodeId::from_pubkey( + &prev_node_id.expect("prev_node_id expected for ldk-server >=0.1"), + ), + next_node_id: NodeId::from_pubkey( + &next_node_id.expect("next_node_id expected for ldk-node >=0.1"), + ), + total_fee_earned_msat, + skimmed_fee_msat, + claim_from_onchain_tx, + outbound_amount_forwarded_msat, + } +} + +/// Convert a StoredForwardedPayment to a proto ForwardedPayment for API responses. +pub(crate) fn stored_forwarded_payment_to_proto( + stored: StoredForwardedPayment, +) -> ForwardedPayment { + ForwardedPayment { + prev_channel_id: stored.prev_channel_id.to_lower_hex_string(), + next_channel_id: stored.next_channel_id.to_lower_hex_string(), + prev_user_channel_id: stored.prev_user_channel_id.to_string(), + next_user_channel_id: stored.next_user_channel_id.map(|u| u.to_string()), + prev_node_id: stored.prev_node_id.as_slice().to_lower_hex_string(), + next_node_id: stored.next_node_id.as_slice().to_lower_hex_string(), + total_fee_earned_msat: stored.total_fee_earned_msat, + skimmed_fee_msat: stored.skimmed_fee_msat, + claim_from_onchain_tx: stored.claim_from_onchain_tx, + outbound_amount_forwarded_msat: stored.outbound_amount_forwarded_msat, + } +}