Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions async/src/async_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ impl ChanIO<'_> {
) -> Result<()> {
poll_fn(|cx| self.sunset.poll_term_window_change(cx, self.num, &winch)).await
}

/// Send an EOF to indicate no more data will be sent on this channel.
///
/// This should be called when the application has finished writing data
/// to the channel but may still want to read incoming data.
pub async fn send_eof(&self) -> Result<()> {
poll_fn(|cx| self.sunset.poll_send_eof(cx, self.num)).await
}
}

impl Drop for ChanIO<'_> {
Expand Down Expand Up @@ -173,6 +181,14 @@ impl<'g> ChanOut<'g> {
) -> Result<()> {
self.0.term_window_change(winch).await
}

/// Send an EOF to indicate no more data will be sent on this channel.
///
/// This should be called when the application has finished writing data
/// to the channel but may still want to read incoming data.
pub async fn send_eof(&self) -> Result<()> {
self.0.send_eof().await
}
}

/// A bidirectional SSH channel.
Expand Down Expand Up @@ -223,6 +239,14 @@ impl<'g> ChanInOut<'g> {
) -> Result<()> {
self.0.term_window_change(winch).await
}

/// Send an EOF to indicate no more data will be sent on this channel.
///
/// This should be called when the application has finished writing data
/// to the channel but may still want to read incoming data.
pub async fn send_eof(&self) -> Result<()> {
self.0.send_eof().await
}
}

impl Drop for ChanInOut<'_> {
Expand Down
21 changes: 18 additions & 3 deletions async/src/async_sunset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,8 @@ pub(crate) trait ChanCore: MaybeSend {
buf: &[u8],
) -> Poll<Result<usize>>;

fn poll_send_eof(&self, cx: &mut Context, num: ChanNum) -> Poll<Result<()>>;

// Client only
fn poll_term_window_change(
&self,
Expand Down Expand Up @@ -603,14 +605,27 @@ impl<'a, CS: CliServ> ChanCore for AsyncSunset<'a, CS> {
// 0 bytes written, pending
trace!("write ch {num:?} dt {dt:?} pending");
runner.set_channel_read_waker(h, dt, cx.waker());
Poll::Pending
Pending
} else {
trace!("write ready ch {num:?} dt {dt:?} {l:?}");
self.wake_progress();
Poll::Ready(l)
Ready(l)
}
}

fn poll_send_eof(&self, cx: &mut Context, num: ChanNum) -> Poll<Result<()>> {
let i = self.inner.lock();
let i = pin!(i);
let Ready(mut inner) = i.poll(cx) else {
return Pending;
};

let (runner, h) = inner.fetch(num)?;
let result = runner.channel_send_eof(h);
self.wake_progress();
Ready(result)
}

fn poll_term_window_change(
&self,
cx: &mut Context,
Expand All @@ -624,7 +639,7 @@ impl<'a, CS: CliServ> ChanCore for AsyncSunset<'a, CS> {
return Pending;
};
let (runner, h) = inner.fetch(num)?;
Poll::Ready(runner.term_window_change(h, winch))
Ready(runner.term_window_change(h, winch))
}
}

Expand Down
79 changes: 65 additions & 14 deletions src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,10 @@ impl Channels {
self.get(num).is_ok_and(|c| c.is_closed())
}

pub(crate) fn can_send_eof(&self, num: ChanNum) -> bool {
self.get(num).is_ok_and(|c| !c.sent_eof && !c.is_closed())
}

pub(crate) fn send_allowed(&self, num: ChanNum) -> Option<usize> {
self.get(num).map_or(Some(0), |c| c.send_allowed())
}
Expand Down Expand Up @@ -277,6 +281,32 @@ impl Channels {
}
}

pub(crate) fn send_eof(&mut self, num: ChanNum, s: &mut TrafSend) -> Result<()> {
let ch = self.get_mut(num)?;
if ch.sent_eof || ch.is_closed() {
return Ok(());
}
s.send(packets::ChannelEof { num: ch.send_num()? })?;
ch.sent_eof = true;
Ok(())
}

pub(crate) fn close(&mut self, num: ChanNum, s: &mut TrafSend) -> Result<()> {
let ch = self.get_mut(num)?;
if ch.state == ChanState::RecvClose {
return Ok(());
}
if !ch.sent_eof {
s.send(packets::ChannelEof { num: ch.send_num()? })?;
ch.sent_eof = true;
}
if !ch.sent_close {
s.send(packets::ChannelClose { num: ch.send_num()? })?;
ch.sent_close = true;
}
Ok(())
}

fn dispatch_open(
&mut self,
p: &ChannelOpen<'_>,
Expand Down Expand Up @@ -708,7 +738,7 @@ struct ChanDir {
window: usize,
}

#[derive(Debug)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ChanState {
/// An incoming channel open request that has not yet been responded to.
///
Expand All @@ -729,8 +759,12 @@ enum ChanState {
pub(crate) struct Channel {
ty: ChanType,
state: ChanState,
/// Whether we've sent CHANNEL_EOF
sent_eof: bool,
/// Whether we've sent CHANNEL_CLOSE
sent_close: bool,
/// Whether we've received CHANNEL_EOF
recv_eof: bool,

recv: ChanDir,
/// populated in all states except `Opening`
Expand Down Expand Up @@ -761,6 +795,7 @@ impl Channel {
state: ChanState::Opening,
sent_close: false,
sent_eof: false,
recv_eof: false,
recv: ChanDir {
num: num.0,
// TODO these should depend on SSH rx buffer size minus overhead
Expand Down Expand Up @@ -966,39 +1001,51 @@ impl Channel {
}
}

fn handle_eof(&mut self, s: &mut TrafSend, is_client: bool) -> Result<()> {
//TODO: check existing state?
if !self.sent_eof {
s.send(packets::ChannelEof { num: self.send_num()? })?;
self.sent_eof = true;
fn handle_eof(&mut self, _s: &mut TrafSend, is_client: bool) -> Result<()> {
if self.recv_eof || self.state == ChanState::RecvClose {
return Ok(());
}

// Wake readers on EOF
self.recv_eof = true;

self.wake_read(ChanData::Normal, is_client);
if is_client {
self.wake_read(ChanData::Stderr, is_client);
}

self.state = ChanState::RecvEof;
// todo!();
match self.state {
ChanState::Normal => self.state = ChanState::RecvEof,
_ => (),
}
Ok(())
}

fn handle_close(&mut self, s: &mut TrafSend, is_client: bool) -> Result<()> {
//TODO: check existing state?
if self.state == ChanState::RecvClose {
return Ok(());
}

self.state = ChanState::RecvClose;

// If we haven't already sent EOF, send it now
if !self.sent_eof {
s.send(packets::ChannelEof { num: self.send_num()? })?;
self.sent_eof = true;
}

// Send close if we haven't already
if !self.sent_close {
s.send(packets::ChannelClose { num: self.send_num()? })?;
self.sent_close = true;
}

// Wake readers and writers on EOF
// Wake readers and writers on close
self.wake_read(ChanData::Normal, is_client);
if is_client {
self.wake_read(ChanData::Stderr, is_client);
}
self.wake_write(None, is_client);

self.state = ChanState::RecvClose;
Ok(())
}

Expand All @@ -1007,15 +1054,19 @@ impl Channel {
}

fn have_recv_eof(&self) -> bool {
matches!(self.state, ChanState::RecvEof | ChanState::RecvClose)
self.recv_eof
|| matches!(self.state, ChanState::RecvEof | ChanState::RecvClose)
}

fn is_closed(&self) -> bool {
matches!(self.state, ChanState::RecvClose)
}

// None on close
// None on close or EOF sent
fn send_allowed(&self) -> Option<usize> {
if self.sent_eof || self.is_closed() {
return None;
}
let r = self.send.as_ref().map(|s| usize::min(s.window, s.max_packet));
trace!("send_allowed {r:?}");
r
Expand Down
54 changes: 53 additions & 1 deletion src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ impl<'a, CS: CliServ> Runner<'a, CS> {

/// Returns the maximum data that may be sent to a channel
///
/// Returns `Ok(None)` on channel closed.
/// Returns `Ok(None)` on channel closed or EOF already sent.
///
/// May fail with `BadChannelData` if dt is invalid for this session.
pub fn write_channel_ready(
Expand Down Expand Up @@ -638,6 +638,18 @@ impl<'a, CS: CliServ> Runner<'a, CS> {
///
/// Channel numbers will not be re-used without calling this, so
/// failing to call this may result in running out of channels.
///
/// ## Channel Shutdown Sequence
///
/// Per RFC 4254, SSH channels have independent send and receive directions.
/// The recommended shutdown sequence is:
///
/// 1. When done writing, call `channel_send_eof()` to send CHANNEL_EOF
/// 2. Continue reading until `is_channel_eof()` returns true (peer sent EOF)
/// 3. Call `channel_done()` to mark the channel as finished
///
/// Alternatively, `channel_close()` can be called to send both EOF and CLOSE,
/// but this is less graceful than the above sequence.
pub fn channel_done(&mut self, chan: ChanHandle) -> Result<()> {
self.conn.channels.done(chan.0)?;
// Prevent giving any already-received data for this channel.
Expand All @@ -646,6 +658,46 @@ impl<'a, CS: CliServ> Runner<'a, CS> {
Ok(())
}

/// Send a CHANNEL_EOF to indicate no more data will be sent on this channel.
///
/// Per RFC 4254, this should be called when the application has finished
/// writing data to the channel but may still read incoming data.
/// This is the proper way to shut down one direction of a bidirectional
/// channel.
///
/// After sending EOF:
/// - No more data can be written to the channel (`write_channel_ready` returns `None`)
/// - Data can still be read until the peer sends EOF
/// - Eventually call `channel_done()` when reading is complete
pub fn channel_send_eof(&mut self, chan: &ChanHandle) -> Result<()> {
let mut s = self.traf_out.sender(&mut self.keys);
self.conn.channels.send_eof(chan.0, &mut s)?;
self.wake();
Ok(())
}

/// Send a CHANNEL_CLOSE to close the channel.
///
/// This sends both EOF and CLOSE if not already sent, properly shutting down
/// both directions of the channel.
///
/// Note: This is a more abrupt shutdown than the recommended sequence of
/// calling `channel_send_eof()` first. Use this when you need to close both
/// directions immediately.
pub fn channel_close(&mut self, chan: &ChanHandle) -> Result<()> {
let mut s = self.traf_out.sender(&mut self.keys);
self.conn.channels.close(chan.0, &mut s)?;
self.wake();
Ok(())
}

/// Check whether an EOF can be sent on this channel.
///
/// Returns false if EOF has already been sent or the channel is closed.
pub fn can_channel_send_eof(&self, chan: &ChanHandle) -> bool {
self.conn.channels.can_send_eof(chan.0)
}

pub fn set_channel_read_waker(
&mut self,
ch: &ChanHandle,
Expand Down