Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,8 @@ tokio-openssl = "0.6.5"
openssl = "0.10.73"
openssl-sys = "0.9.109"
webpki-roots = "0.26.0"
hyper-util = { version = "0.1", default-features = false }
hyper-rustls = { version = "0.26", default-features = false }
itertools = "0.14.0"
base64 = "0.22.1"
termcolor = "1.4.1"
Expand Down
4 changes: 3 additions & 1 deletion crates/wasi-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ all-features = true

[features]
default = ["default-send-request"]
default-send-request = ["dep:tokio-rustls", "dep:rustls", "dep:webpki-roots"]
default-send-request = ["dep:tokio-rustls", "dep:rustls", "dep:webpki-roots", "dep:hyper-util", "dep:hyper-rustls"]
p3 = ["wasmtime-wasi/p3", "dep:tokio-util"]
component-model-async = ["futures/alloc", "wasmtime/component-model-async"]

Expand All @@ -41,6 +41,8 @@ wasmtime = { workspace = true, features = ['component-model'] }
tokio-rustls = { workspace = true, optional = true }
rustls = { workspace = true, optional = true }
webpki-roots = { workspace = true, optional = true }
hyper-util = { workspace = true, features = ["http1", "client-legacy", "tokio"], optional = true }
hyper-rustls = { workspace = true, features = ["http1", "webpki-roots"], optional = true }

[dev-dependencies]
wasmtime-wasi-http = { path = ".", features = ['default-send-request'] }
Expand Down
77 changes: 69 additions & 8 deletions crates/wasi-http/src/body.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Implementation of the `wasi:http/types` interface's various body types.

use crate::bindings::http::types;
use crate::types::FieldMap;
use crate::types::{ConnWorkerErrorReceiver, FieldMap};
use bytes::Bytes;
use http_body::{Body, Frame};
use http_body_util::BodyExt;
Expand Down Expand Up @@ -30,6 +30,12 @@ pub struct HostIncomingBody {
/// This ensures that if the parent of this body is dropped before the body
/// then the backing data behind this worker is kept alive.
worker: Option<AbortOnDropJoinHandle<()>>,
/// Receives errors from the connection worker task, if any.
worker_error_receiver: Option<ConnWorkerErrorReceiver>,
/// Connection concurrency permits held while this body is being read.
/// Released when the body is dropped, allowing queued requests to proceed.
#[cfg(feature = "default-send-request")]
connection_permits: Option<crate::types::ConnectionPermits>,
}

impl HostIncomingBody {
Expand All @@ -39,18 +45,42 @@ impl HostIncomingBody {
between_bytes_timeout: Duration,
field_size_limit: usize,
) -> HostIncomingBody {
let body = BodyWithTimeout::new(body, between_bytes_timeout);
let body = BodyWithTimeout::new(body, between_bytes_timeout, None);
HostIncomingBody {
body: IncomingBodyState::Start(body),
field_size_limit,
worker: None,
worker_error_receiver: None,
#[cfg(feature = "default-send-request")]
connection_permits: None,
}
}

/// Retain a worker task that needs to be kept alive while this body is being read.
pub fn retain_worker(&mut self, worker: AbortOnDropJoinHandle<()>) {
///
/// If a `worker_error_receiver` is provided, connection worker errors will
/// be surfaced through the body stream during reads.
pub fn retain_worker(
&mut self,
worker: AbortOnDropJoinHandle<()>,
worker_error_receiver: Option<ConnWorkerErrorReceiver>,
) {
assert!(self.worker.is_none());
self.worker = Some(worker);
if let Some(rx) = worker_error_receiver {
self.worker_error_receiver = Some(rx.clone());
// Also propagate to the body if it hasn't been taken yet
if let IncomingBodyState::Start(body) = &mut self.body {
body.worker_error_receiver = Some(rx);
}
}
}

/// Retain connection concurrency permits that should be held while this body
/// is being read. The permits are released when the body is dropped.
#[cfg(feature = "default-send-request")]
pub fn retain_connection_permits(&mut self, permits: Option<crate::types::ConnectionPermits>) {
self.connection_permits = permits;
}

/// Create a new `HostIncomingBody` that always fails with the given error.
Expand All @@ -59,6 +89,9 @@ impl HostIncomingBody {
body: IncomingBodyState::Failing(Arc::from(error)),
field_size_limit: 0,
worker: None,
worker_error_receiver: None,
#[cfg(feature = "default-send-request")]
connection_permits: None,
}
}

Expand Down Expand Up @@ -118,17 +151,24 @@ struct BodyWithTimeout {
/// Maximal duration between when a frame is first requested and when it's
/// allowed to arrive.
between_bytes_timeout: Duration,
/// Receives errors from the connection worker task.
worker_error_receiver: Option<ConnWorkerErrorReceiver>,
}

impl BodyWithTimeout {
fn new(inner: HyperIncomingBody, between_bytes_timeout: Duration) -> BodyWithTimeout {
fn new(
inner: HyperIncomingBody,
between_bytes_timeout: Duration,
worker_error_receiver: Option<ConnWorkerErrorReceiver>,
) -> BodyWithTimeout {
BodyWithTimeout {
inner,
between_bytes_timeout,
reset_sleep: true,
timeout: Box::pin(wasmtime_wasi::runtime::with_ambient_tokio_runtime(|| {
tokio::time::sleep(Duration::new(0, 0))
})),
worker_error_receiver,
}
}
}
Expand Down Expand Up @@ -163,6 +203,25 @@ impl Body for BodyWithTimeout {
// arrives then the sleep timer will be reset on the next frame.
let result = Pin::new(&mut me.inner).poll_frame(cx);
me.reset_sleep = result.is_ready();

// At end-of-stream (EOF or trailers), check if the connection worker
// reported an error. This surfaces connection-level failures that
// might not otherwise propagate through the body stream (e.g. the
// connection was reset after all data frames were sent but before
// a clean shutdown).
let is_end_of_stream = match &result {
Poll::Ready(None) => true,
Poll::Ready(Some(Ok(frame))) if !frame.is_data() => true,
_ => false,
};
if is_end_of_stream {
if let Some(rx) = me.worker_error_receiver.take() {
if let Some(err) = rx.borrow().as_ref() {
return Poll::Ready(Some(Err(err.as_ref().clone())));
}
}
}

result
}
}
Expand Down Expand Up @@ -224,11 +283,13 @@ impl HostIncomingBodyStream {
self.state = IncomingBodyStreamState::Closed;
}

// No more frames are going to be received again, so drop the `body`
// and the `tx` channel we'd send the body back onto because it's
// not needed as frames are done.
// No more frames are going to be received again, so send an
// explicit EOF (no trailers) and close the stream.
None => {
self.state = IncomingBodyStreamState::Closed;
let prev = mem::replace(&mut self.state, IncomingBodyStreamState::Closed);
if let IncomingBodyStreamState::Open { body: _, tx } = prev {
let _ = tx.send(StreamEnd::Trailers(None));
}
}
}
}
Expand Down
7 changes: 5 additions & 2 deletions crates/wasi-http/src/http_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ where
request_id: Resource<HostOutgoingRequest>,
options: Option<Resource<types::RequestOptions>>,
) -> crate::HttpResult<Resource<HostFutureIncomingResponse>> {
let opts = options.and_then(|opts| self.table().get(&opts).ok());
let opts = options.map(|opts| self.table().get(&opts)).transpose()?;

let connect_timeout = opts
.and_then(|opts| opts.connect_timeout)
Expand Down Expand Up @@ -65,7 +65,10 @@ where
Scheme::Other(_) => return Err(types::ErrorCode::HttpProtocolError.into()),
};

let authority = req.authority.unwrap_or_else(String::new);
let authority = match req.authority {
Some(a) if !a.is_empty() => a,
_ => return Err(types::ErrorCode::HttpRequestUriInvalid.into()),
};

builder = builder.header(hyper::header::HOST, &authority);

Expand Down
3 changes: 3 additions & 0 deletions crates/wasi-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,9 @@ pub use crate::types::{
DEFAULT_OUTGOING_BODY_BUFFER_CHUNKS, DEFAULT_OUTGOING_BODY_CHUNK_SIZE, WasiHttpCtx,
WasiHttpImpl, WasiHttpView,
};
#[cfg(feature = "default-send-request")]
#[doc(inline)]
pub use crate::types::{HttpConnectionPool, HttpConnectionPoolConfig};
use http::header::CONTENT_LENGTH;
use wasmtime::component::HasData;

Expand Down
Loading