From 957e9ae075364180098a651a3cdd6a32a44cbb3b Mon Sep 17 00:00:00 2001 From: Benalleng Date: Mon, 9 Mar 2026 11:46:26 -0400 Subject: [PATCH 1/2] Force payjoin-cli resume to choose a random relay for each session This commit ensures that each resumed payjoin-cli session is using a separate instance of the RelayManager to then check the ohttp connection independently. This fixes a bug where resuming would converge all existing sessions to one ohttp relay. --- payjoin-cli/src/app/v2/mod.rs | 108 +++++++++++++++++++------------- payjoin-cli/src/app/v2/ohttp.rs | 19 ++---- 2 files changed, 68 insertions(+), 59 deletions(-) diff --git a/payjoin-cli/src/app/v2/mod.rs b/payjoin-cli/src/app/v2/mod.rs index 73f4dcf85..dc1be23f4 100644 --- a/payjoin-cli/src/app/v2/mod.rs +++ b/payjoin-cli/src/app/v2/mod.rs @@ -1,5 +1,5 @@ use std::fmt; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use anyhow::{anyhow, Context, Result}; use payjoin::bitcoin::consensus::encode::serialize_hex; @@ -40,7 +40,6 @@ pub(crate) struct App { db: Arc, wallet: BitcoindWallet, interrupt: watch::Receiver<()>, - relay_manager: Arc>, } trait StatusText { @@ -140,11 +139,10 @@ impl fmt::Display for SessionHistoryRow { impl AppTrait for App { async fn new(config: Config) -> Result { let db = Arc::new(Database::create(&config.db_path)?); - let relay_manager = Arc::new(Mutex::new(RelayManager::new())); let (interrupt_tx, interrupt_rx) = watch::channel(()); tokio::spawn(handle_interrupt(interrupt_tx)); let wallet = BitcoindWallet::new(&config.bitcoind).await?; - let app = Self { config, db, wallet, interrupt: interrupt_rx, relay_manager }; + let app = Self { config, db, wallet, interrupt: interrupt_rx }; app.wallet() .network() .context("Failed to connect to bitcoind. Check config RPC connection.")?; @@ -153,7 +151,6 @@ impl AppTrait for App { fn wallet(&self) -> BitcoindWallet { self.wallet.clone() } - #[allow(clippy::incompatible_msrv)] async fn send_payjoin(&self, bip21: &str, fee_rate: FeeRate) -> Result<()> { use payjoin::UriExt; let uri = Uri::try_from(bip21) @@ -254,10 +251,10 @@ impl AppTrait for App { async fn receive_payjoin(&self, amount: Amount) -> Result<()> { let address = self.wallet().get_new_address()?; - let ohttp_keys = - unwrap_ohttp_keys_or_else_fetch(&self.config, None, self.relay_manager.clone()) - .await? - .ohttp_keys; + let mut relay_manager = RelayManager::new(); + let ohttp_keys = unwrap_ohttp_keys_or_else_fetch(&self.config, None, &mut relay_manager) + .await? + .ohttp_keys; let persister = ReceiverPersister::new(self.db.clone())?; let session = ReceiverBuilder::new(address, self.config.v2()?.pj_directory.as_str(), ohttp_keys)? @@ -276,7 +273,6 @@ impl AppTrait for App { Ok(()) } - #[allow(clippy::incompatible_msrv)] async fn resume_payjoins(&self) -> Result<()> { let recv_session_ids = self.db.get_recv_session_ids()?; let send_session_ids = self.db.get_send_session_ids()?; @@ -480,11 +476,12 @@ impl App { session: SendSession, persister: &SenderPersister, ) -> Result<()> { + let mut relay_manager = RelayManager::new(); match session { SendSession::WithReplyKey(context) => - self.post_original_proposal(context, persister).await?, + self.post_original_proposal(context, persister, &mut relay_manager).await?, SendSession::PollingForProposal(context) => - self.get_proposed_payjoin_psbt(context, persister).await?, + self.get_proposed_payjoin_psbt(context, persister, &mut relay_manager).await?, SendSession::Closed(SenderSessionOutcome::Success(proposal)) => { self.process_pj_response(proposal)?; return Ok(()); @@ -498,22 +495,27 @@ impl App { &self, sender: Sender, persister: &SenderPersister, + relay_manager: &mut RelayManager, ) -> Result<()> { let (req, ctx) = sender.create_v2_post_request( - self.unwrap_relay_or_else_fetch(Some(&sender.endpoint())).await?.as_str(), + self.unwrap_relay_or_else_fetch(Some(&sender.endpoint()), relay_manager) + .await? + .as_str(), )?; let response = self.post_request(req).await?; println!("Posted original proposal..."); let sender = sender.process_response(&response.bytes().await?, ctx).save(persister)?; - self.get_proposed_payjoin_psbt(sender, persister).await + self.get_proposed_payjoin_psbt(sender, persister, relay_manager).await } async fn get_proposed_payjoin_psbt( &self, sender: Sender, persister: &SenderPersister, + relay_manager: &mut RelayManager, ) -> Result<()> { - let ohttp_relay = self.unwrap_relay_or_else_fetch(Some(&sender.endpoint())).await?; + let ohttp_relay = + self.unwrap_relay_or_else_fetch(Some(&sender.endpoint()), relay_manager).await?; let mut session = sender.clone(); // Long poll until we get a response loop { @@ -544,9 +546,11 @@ impl App { &self, session: Receiver, persister: &ReceiverPersister, + relay_manager: &mut RelayManager, ) -> Result> { - let ohttp_relay = - self.unwrap_relay_or_else_fetch(Some(&session.pj_uri().extras.endpoint())).await?; + let ohttp_relay = self + .unwrap_relay_or_else_fetch(Some(&session.pj_uri().extras.endpoint()), relay_manager) + .await?; let mut session = session; loop { @@ -575,30 +579,31 @@ impl App { session: ReceiveSession, persister: &ReceiverPersister, ) -> Result<()> { + let mut relay_manager = RelayManager::new(); let res = { match session { ReceiveSession::Initialized(proposal) => - self.read_from_directory(proposal, persister).await, + self.read_from_directory(proposal, persister, &mut relay_manager).await, ReceiveSession::UncheckedOriginalPayload(proposal) => - self.check_proposal(proposal, persister).await, + self.check_proposal(proposal, persister, &mut relay_manager).await, ReceiveSession::MaybeInputsOwned(proposal) => - self.check_inputs_not_owned(proposal, persister).await, + self.check_inputs_not_owned(proposal, persister, &mut relay_manager).await, ReceiveSession::MaybeInputsSeen(proposal) => - self.check_no_inputs_seen_before(proposal, persister).await, + self.check_no_inputs_seen_before(proposal, persister, &mut relay_manager).await, ReceiveSession::OutputsUnknown(proposal) => - self.identify_receiver_outputs(proposal, persister).await, + self.identify_receiver_outputs(proposal, persister, &mut relay_manager).await, ReceiveSession::WantsOutputs(proposal) => - self.commit_outputs(proposal, persister).await, + self.commit_outputs(proposal, persister, &mut relay_manager).await, ReceiveSession::WantsInputs(proposal) => - self.contribute_inputs(proposal, persister).await, + self.contribute_inputs(proposal, persister, &mut relay_manager).await, ReceiveSession::WantsFeeRange(proposal) => - self.apply_fee_range(proposal, persister).await, + self.apply_fee_range(proposal, persister, &mut relay_manager).await, ReceiveSession::ProvisionalProposal(proposal) => - self.finalize_proposal(proposal, persister).await, + self.finalize_proposal(proposal, persister, &mut relay_manager).await, ReceiveSession::PayjoinProposal(proposal) => - self.send_payjoin_proposal(proposal, persister).await, + self.send_payjoin_proposal(proposal, persister, &mut relay_manager).await, ReceiveSession::HasReplyableError(error) => - self.handle_error(error, persister).await, + self.handle_error(error, persister, &mut relay_manager).await, ReceiveSession::Monitor(proposal) => self.monitor_payjoin_proposal(proposal, persister).await, ReceiveSession::Closed(_) => return Err(anyhow!("Session closed")), @@ -612,22 +617,24 @@ impl App { &self, session: Receiver, persister: &ReceiverPersister, + relay_manager: &mut RelayManager, ) -> Result<()> { let mut interrupt = self.interrupt.clone(); let receiver = tokio::select! { - res = self.long_poll_fallback(session, persister) => res, + res = self.long_poll_fallback(session, persister, relay_manager) => res, _ = interrupt.changed() => { println!("Interrupted. Call the `resume` command to resume all sessions."); return Err(anyhow!("Interrupted")); } }?; - self.check_proposal(receiver, persister).await + self.check_proposal(receiver, persister, relay_manager).await } async fn check_proposal( &self, proposal: Receiver, persister: &ReceiverPersister, + relay_manager: &mut RelayManager, ) -> Result<()> { let wallet = self.wallet(); let proposal = proposal @@ -640,13 +647,14 @@ impl App { println!("Fallback transaction received. Consider broadcasting this to get paid if the Payjoin fails:"); println!("{}", serialize_hex(&proposal.extract_tx_to_schedule_broadcast())); - self.check_inputs_not_owned(proposal, persister).await + self.check_inputs_not_owned(proposal, persister, relay_manager).await } async fn check_inputs_not_owned( &self, proposal: Receiver, persister: &ReceiverPersister, + relay_manager: &mut RelayManager, ) -> Result<()> { let wallet = self.wallet(); let proposal = proposal @@ -656,26 +664,28 @@ impl App { .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error())) }) .save(persister)?; - self.check_no_inputs_seen_before(proposal, persister).await + self.check_no_inputs_seen_before(proposal, persister, relay_manager).await } async fn check_no_inputs_seen_before( &self, proposal: Receiver, persister: &ReceiverPersister, + relay_manager: &mut RelayManager, ) -> Result<()> { let proposal = proposal .check_no_inputs_seen_before(&mut |input| { Ok(self.db.insert_input_seen_before(*input)?) }) .save(persister)?; - self.identify_receiver_outputs(proposal, persister).await + self.identify_receiver_outputs(proposal, persister, relay_manager).await } async fn identify_receiver_outputs( &self, proposal: Receiver, persister: &ReceiverPersister, + relay_manager: &mut RelayManager, ) -> Result<()> { let wallet = self.wallet(); let proposal = proposal @@ -685,22 +695,24 @@ impl App { .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error())) }) .save(persister)?; - self.commit_outputs(proposal, persister).await + self.commit_outputs(proposal, persister, relay_manager).await } async fn commit_outputs( &self, proposal: Receiver, persister: &ReceiverPersister, + relay_manager: &mut RelayManager, ) -> Result<()> { let proposal = proposal.commit_outputs().save(persister)?; - self.contribute_inputs(proposal, persister).await + self.contribute_inputs(proposal, persister, relay_manager).await } async fn contribute_inputs( &self, proposal: Receiver, persister: &ReceiverPersister, + relay_manager: &mut RelayManager, ) -> Result<()> { let wallet = self.wallet(); let candidate_inputs = wallet.list_unspent()?; @@ -714,22 +726,24 @@ impl App { let selected_input = proposal.try_preserving_privacy(candidate_inputs)?; let proposal = proposal.contribute_inputs(vec![selected_input])?.commit_inputs().save(persister)?; - self.apply_fee_range(proposal, persister).await + self.apply_fee_range(proposal, persister, relay_manager).await } async fn apply_fee_range( &self, proposal: Receiver, persister: &ReceiverPersister, + relay_manager: &mut RelayManager, ) -> Result<()> { let proposal = proposal.apply_fee_range(None, self.config.max_fee_rate).save(persister)?; - self.finalize_proposal(proposal, persister).await + self.finalize_proposal(proposal, persister, relay_manager).await } async fn finalize_proposal( &self, proposal: Receiver, persister: &ReceiverPersister, + relay_manager: &mut RelayManager, ) -> Result<()> { let wallet = self.wallet(); let proposal = proposal @@ -739,16 +753,19 @@ impl App { .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error())) }) .save(persister)?; - self.send_payjoin_proposal(proposal, persister).await + self.send_payjoin_proposal(proposal, persister, relay_manager).await } async fn send_payjoin_proposal( &self, proposal: Receiver, persister: &ReceiverPersister, + relay_manager: &mut RelayManager, ) -> Result<()> { let (req, ohttp_ctx) = proposal - .create_post_request(self.unwrap_relay_or_else_fetch(None::<&str>).await?.as_str()) + .create_post_request( + self.unwrap_relay_or_else_fetch(None::<&str>, relay_manager).await?.as_str(), + ) .map_err(|e| anyhow!("v2 req extraction failed {}", e))?; let res = self.post_request(req).await?; let payjoin_psbt = proposal.psbt().clone(); @@ -813,14 +830,13 @@ impl App { async fn unwrap_relay_or_else_fetch( &self, directory: Option, + relay_manager: &mut RelayManager, ) -> Result { let directory = directory.map(|url| url.into_url()).transpose()?; - let selected_relay = - self.relay_manager.lock().expect("Lock should not be poisoned").get_selected_relay(); - let ohttp_relay = match selected_relay { + let ohttp_relay = match relay_manager.get_selected_relay() { Some(relay) => relay, None => - unwrap_ohttp_keys_or_else_fetch(&self.config, directory, self.relay_manager.clone()) + unwrap_ohttp_keys_or_else_fetch(&self.config, directory, relay_manager) .await? .relay_url, }; @@ -832,9 +848,11 @@ impl App { &self, session: Receiver, persister: &ReceiverPersister, + relay_manager: &mut RelayManager, ) -> Result<()> { - let (err_req, err_ctx) = session - .create_error_request(self.unwrap_relay_or_else_fetch(None::<&str>).await?.as_str())?; + let (err_req, err_ctx) = session.create_error_request( + self.unwrap_relay_or_else_fetch(None::<&str>, relay_manager).await?.as_str(), + )?; let err_response = match self.post_request(err_req).await { Ok(response) => response, diff --git a/payjoin-cli/src/app/v2/ohttp.rs b/payjoin-cli/src/app/v2/ohttp.rs index 4ee637ecb..cd07e0042 100644 --- a/payjoin-cli/src/app/v2/ohttp.rs +++ b/payjoin-cli/src/app/v2/ohttp.rs @@ -1,5 +1,3 @@ -use std::sync::{Arc, Mutex}; - use anyhow::{anyhow, Result}; use super::Config; @@ -30,7 +28,7 @@ pub(crate) struct ValidatedOhttpKeys { pub(crate) async fn unwrap_ohttp_keys_or_else_fetch( config: &Config, directory: Option, - relay_manager: Arc>, + relay_manager: &mut RelayManager, ) -> Result { if let Some(ohttp_keys) = config.v2()?.ohttp_keys.clone() { println!("Using OHTTP Keys from config"); @@ -47,15 +45,14 @@ pub(crate) async fn unwrap_ohttp_keys_or_else_fetch( async fn fetch_ohttp_keys( config: &Config, directory: Option, - relay_manager: Arc>, + relay_manager: &mut RelayManager, ) -> Result { use payjoin::bitcoin::secp256k1::rand::prelude::SliceRandom; let payjoin_directory = directory.unwrap_or(config.v2()?.pj_directory.clone()); let relays = config.v2()?.ohttp_relays.clone(); loop { - let failed_relays = - relay_manager.lock().expect("Lock should not be poisoned").get_failed_relays(); + let failed_relays = relay_manager.get_failed_relays(); let remaining_relays: Vec<_> = relays.iter().filter(|r| !failed_relays.contains(r)).cloned().collect(); @@ -70,10 +67,7 @@ async fn fetch_ohttp_keys( None => return Err(anyhow!("Failed to select from remaining relays")), }; - relay_manager - .lock() - .expect("Lock should not be poisoned") - .set_selected_relay(selected_relay.clone()); + relay_manager.set_selected_relay(selected_relay.clone()); let ohttp_keys = { #[cfg(feature = "_manual-tls")] @@ -106,10 +100,7 @@ async fn fetch_ohttp_keys( } Err(e) => { tracing::debug!("Failed to connect to relay: {selected_relay}, {e:?}"); - relay_manager - .lock() - .expect("Lock should not be poisoned") - .add_failed_relay(selected_relay); + relay_manager.add_failed_relay(selected_relay); } } } From a9dfc78a3fdb8078c6ae32d25494138a7ea496a9 Mon Sep 17 00:00:00 2001 From: Benalleng Date: Mon, 13 Apr 2026 11:19:53 -0400 Subject: [PATCH 2/2] Add one relay warning during randomized selection --- payjoin-cli/src/app/v2/ohttp.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/payjoin-cli/src/app/v2/ohttp.rs b/payjoin-cli/src/app/v2/ohttp.rs index cd07e0042..20d47b849 100644 --- a/payjoin-cli/src/app/v2/ohttp.rs +++ b/payjoin-cli/src/app/v2/ohttp.rs @@ -51,6 +51,12 @@ async fn fetch_ohttp_keys( let payjoin_directory = directory.unwrap_or(config.v2()?.pj_directory.clone()); let relays = config.v2()?.ohttp_relays.clone(); + if relays.len() < 2 { + tracing::warn!( + "Only one OHTTP relay configured. Add more ohttp_relays to improve privacy." + ); + } + loop { let failed_relays = relay_manager.get_failed_relays();