From 34f374f23de36c038063cc924ba331a40b8abb10 Mon Sep 17 00:00:00 2001 From: brainstorm Date: Wed, 1 Apr 2026 22:39:10 +0200 Subject: [PATCH 1/2] Fix EOF issues with sunset's channels --- async/src/async_channel.rs | 24 ++++++++++++ async/src/async_sunset.rs | 21 ++++++++-- src/channel.rs | 78 +++++++++++++++++++++++++++++++------- src/runner.rs | 54 +++++++++++++++++++++++++- 4 files changed, 159 insertions(+), 18 deletions(-) diff --git a/async/src/async_channel.rs b/async/src/async_channel.rs index 4db80865..2b7f4c8a 100644 --- a/async/src/async_channel.rs +++ b/async/src/async_channel.rs @@ -55,6 +55,14 @@ impl ChanIO<'_> { ) -> Result<()> { poll_fn(|cx| self.sunset.poll_term_window_change(cx, self.num, &winch)).await } + + /// Send an EOF to indicate no more data will be sent on this channel. + /// + /// This should be called when the application has finished writing data + /// to the channel but may still want to read incoming data. + pub async fn send_eof(&self) -> Result<()> { + poll_fn(|cx| self.sunset.poll_send_eof(cx, self.num)).await + } } impl Drop for ChanIO<'_> { @@ -173,6 +181,14 @@ impl<'g> ChanOut<'g> { ) -> Result<()> { self.0.term_window_change(winch).await } + + /// Send an EOF to indicate no more data will be sent on this channel. + /// + /// This should be called when the application has finished writing data + /// to the channel but may still want to read incoming data. + pub async fn send_eof(&self) -> Result<()> { + self.0.send_eof().await + } } /// A bidirectional SSH channel. @@ -223,6 +239,14 @@ impl<'g> ChanInOut<'g> { ) -> Result<()> { self.0.term_window_change(winch).await } + + /// Send an EOF to indicate no more data will be sent on this channel. + /// + /// This should be called when the application has finished writing data + /// to the channel but may still want to read incoming data. + pub async fn send_eof(&self) -> Result<()> { + self.0.send_eof().await + } } impl Drop for ChanInOut<'_> { diff --git a/async/src/async_sunset.rs b/async/src/async_sunset.rs index 9e7bd6a4..f5d717a1 100644 --- a/async/src/async_sunset.rs +++ b/async/src/async_sunset.rs @@ -478,6 +478,8 @@ pub(crate) trait ChanCore: MaybeSend { buf: &[u8], ) -> Poll>; + fn poll_send_eof(&self, cx: &mut Context, num: ChanNum) -> Poll>; + // Client only fn poll_term_window_change( &self, @@ -603,14 +605,27 @@ impl<'a, CS: CliServ> ChanCore for AsyncSunset<'a, CS> { // 0 bytes written, pending trace!("write ch {num:?} dt {dt:?} pending"); runner.set_channel_read_waker(h, dt, cx.waker()); - Poll::Pending + Pending } else { trace!("write ready ch {num:?} dt {dt:?} {l:?}"); self.wake_progress(); - Poll::Ready(l) + Ready(l) } } + fn poll_send_eof(&self, cx: &mut Context, num: ChanNum) -> Poll> { + let i = self.inner.lock(); + let i = pin!(i); + let Ready(mut inner) = i.poll(cx) else { + return Pending; + }; + + let (runner, h) = inner.fetch(num)?; + let result = runner.channel_send_eof(h); + self.wake_progress(); + Ready(result) + } + fn poll_term_window_change( &self, cx: &mut Context, @@ -624,7 +639,7 @@ impl<'a, CS: CliServ> ChanCore for AsyncSunset<'a, CS> { return Pending; }; let (runner, h) = inner.fetch(num)?; - Poll::Ready(runner.term_window_change(h, winch)) + Ready(runner.term_window_change(h, winch)) } } diff --git a/src/channel.rs b/src/channel.rs index 10c66142..400e868d 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -224,6 +224,10 @@ impl Channels { self.get(num).is_ok_and(|c| c.is_closed()) } + pub(crate) fn can_send_eof(&self, num: ChanNum) -> bool { + self.get(num).is_ok_and(|c| !c.sent_eof && !c.is_closed()) + } + pub(crate) fn send_allowed(&self, num: ChanNum) -> Option { self.get(num).map_or(Some(0), |c| c.send_allowed()) } @@ -277,6 +281,32 @@ impl Channels { } } + pub(crate) fn send_eof(&mut self, num: ChanNum, s: &mut TrafSend) -> Result<()> { + let ch = self.get_mut(num)?; + if ch.sent_eof || ch.is_closed() { + return Ok(()); + } + s.send(packets::ChannelEof { num: ch.send_num()? })?; + ch.sent_eof = true; + Ok(()) + } + + pub(crate) fn close(&mut self, num: ChanNum, s: &mut TrafSend) -> Result<()> { + let ch = self.get_mut(num)?; + if ch.state == ChanState::RecvClose { + return Ok(()); + } + if !ch.sent_eof { + s.send(packets::ChannelEof { num: ch.send_num()? })?; + ch.sent_eof = true; + } + if !ch.sent_close { + s.send(packets::ChannelClose { num: ch.send_num()? })?; + ch.sent_close = true; + } + Ok(()) + } + fn dispatch_open( &mut self, p: &ChannelOpen<'_>, @@ -708,7 +738,7 @@ struct ChanDir { window: usize, } -#[derive(Debug)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] enum ChanState { /// An incoming channel open request that has not yet been responded to. /// @@ -729,8 +759,12 @@ enum ChanState { pub(crate) struct Channel { ty: ChanType, state: ChanState, + /// Whether we've sent CHANNEL_EOF sent_eof: bool, + /// Whether we've sent CHANNEL_CLOSE sent_close: bool, + /// Whether we've received CHANNEL_EOF + recv_eof: bool, recv: ChanDir, /// populated in all states except `Opening` @@ -761,6 +795,7 @@ impl Channel { state: ChanState::Opening, sent_close: false, sent_eof: false, + recv_eof: false, recv: ChanDir { num: num.0, // TODO these should depend on SSH rx buffer size minus overhead @@ -966,39 +1001,50 @@ impl Channel { } } - fn handle_eof(&mut self, s: &mut TrafSend, is_client: bool) -> Result<()> { - //TODO: check existing state? - if !self.sent_eof { - s.send(packets::ChannelEof { num: self.send_num()? })?; - self.sent_eof = true; + fn handle_eof(&mut self, _s: &mut TrafSend, is_client: bool) -> Result<()> { + if self.recv_eof || self.state == ChanState::RecvClose { + return Ok(()); } - // Wake readers on EOF + self.recv_eof = true; + self.wake_read(ChanData::Normal, is_client); if is_client { self.wake_read(ChanData::Stderr, is_client); } - self.state = ChanState::RecvEof; - // todo!(); + match self.state { + ChanState::Normal => self.state = ChanState::RecvEof, + _ => (), + } Ok(()) } fn handle_close(&mut self, s: &mut TrafSend, is_client: bool) -> Result<()> { - //TODO: check existing state? + if self.state == ChanState::RecvClose { + return Ok(()); + } + + self.state = ChanState::RecvClose; + + // If we haven't already sent EOF, send it now + if !self.sent_eof { + s.send(packets::ChannelEof { num: self.send_num()? })?; + self.sent_eof = true; + } + + // Send close if we haven't already if !self.sent_close { s.send(packets::ChannelClose { num: self.send_num()? })?; self.sent_close = true; } - // Wake readers and writers on EOF self.wake_read(ChanData::Normal, is_client); if is_client { self.wake_read(ChanData::Stderr, is_client); } self.wake_write(None, is_client); - self.state = ChanState::RecvClose; Ok(()) } @@ -1007,15 +1053,19 @@ impl Channel { } fn have_recv_eof(&self) -> bool { - matches!(self.state, ChanState::RecvEof | ChanState::RecvClose) + self.recv_eof + || matches!(self.state, ChanState::RecvEof | ChanState::RecvClose) } fn is_closed(&self) -> bool { matches!(self.state, ChanState::RecvClose) } - // None on close + // None on close or EOF sent fn send_allowed(&self) -> Option { + if self.sent_eof || self.is_closed() { + return None; + } let r = self.send.as_ref().map(|s| usize::min(s.window, s.max_packet)); trace!("send_allowed {r:?}"); r diff --git a/src/runner.rs b/src/runner.rs index fb65c2d0..f210c473 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -591,7 +591,7 @@ impl<'a, CS: CliServ> Runner<'a, CS> { /// Returns the maximum data that may be sent to a channel /// - /// Returns `Ok(None)` on channel closed. + /// Returns `Ok(None)` on channel closed or EOF already sent. /// /// May fail with `BadChannelData` if dt is invalid for this session. pub fn write_channel_ready( @@ -638,6 +638,18 @@ impl<'a, CS: CliServ> Runner<'a, CS> { /// /// Channel numbers will not be re-used without calling this, so /// failing to call this may result in running out of channels. + /// + /// ## Channel Shutdown Sequence + /// + /// Per RFC 4254, SSH channels have independent send and receive directions. + /// The recommended shutdown sequence is: + /// + /// 1. When done writing, call `channel_send_eof()` to send CHANNEL_EOF + /// 2. Continue reading until `is_channel_eof()` returns true (peer sent EOF) + /// 3. Call `channel_done()` to mark the channel as finished + /// + /// Alternatively, `channel_close()` can be called to send both EOF and CLOSE, + /// but this is less graceful than the above sequence. pub fn channel_done(&mut self, chan: ChanHandle) -> Result<()> { self.conn.channels.done(chan.0)?; // Prevent giving any already-received data for this channel. @@ -646,6 +658,46 @@ impl<'a, CS: CliServ> Runner<'a, CS> { Ok(()) } + /// Send a CHANNEL_EOF to indicate no more data will be sent on this channel. + /// + /// Per RFC 4254, this should be called when the application has finished + /// writing data to the channel but may still read incoming data. + /// This is the proper way to shut down one direction of a bidirectional + /// channel. + /// + /// After sending EOF: + /// - No more data can be written to the channel (`write_channel_ready` returns `None`) + /// - Data can still be read until the peer sends EOF + /// - Eventually call `channel_done()` when reading is complete + pub fn channel_send_eof(&mut self, chan: &ChanHandle) -> Result<()> { + let mut s = self.traf_out.sender(&mut self.keys); + self.conn.channels.send_eof(chan.0, &mut s)?; + self.wake(); + Ok(()) + } + + /// Send a CHANNEL_CLOSE to close the channel. + /// + /// This sends both EOF and CLOSE if not already sent, properly shutting down + /// both directions of the channel. + /// + /// Note: This is a more abrupt shutdown than the recommended sequence of + /// calling `channel_send_eof()` first. Use this when you need to close both + /// directions immediately. + pub fn channel_close(&mut self, chan: &ChanHandle) -> Result<()> { + let mut s = self.traf_out.sender(&mut self.keys); + self.conn.channels.close(chan.0, &mut s)?; + self.wake(); + Ok(()) + } + + /// Check whether an EOF can be sent on this channel. + /// + /// Returns false if EOF has already been sent or the channel is closed. + pub fn can_channel_send_eof(&self, chan: &ChanHandle) -> bool { + self.conn.channels.can_send_eof(chan.0) + } + pub fn set_channel_read_waker( &mut self, ch: &ChanHandle, From b9af6cf766290e274376cc248e483da0a93a6850 Mon Sep 17 00:00:00 2001 From: brainstorm Date: Sun, 19 Apr 2026 10:43:45 +0200 Subject: [PATCH 2/2] [ci skip] Put back useful comment where it was --- src/channel.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/channel.rs b/src/channel.rs index 400e868d..1c85fc1b 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -1039,6 +1039,7 @@ impl Channel { self.sent_close = true; } + // Wake readers and writers on close self.wake_read(ChanData::Normal, is_client); if is_client { self.wake_read(ChanData::Stderr, is_client);