Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions rust/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
services:
postgres:
image: postgres:15
environment:
POSTGRES_DB: postgres
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
volumes:
- postgres-data:/var/lib/postgresql/data
ports:
- "5432:5432"
networks:
- app-network

volumes:
postgres-data:

networks:
app-network:
driver: bridge
71 changes: 71 additions & 0 deletions rust/impls/src/postgres_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -695,10 +695,15 @@ where
}

#[cfg(test)]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should have caught this, we don't need an additional line here

mod tests {
use super::{drop_database, DUMMY_MIGRATION, MIGRATIONS};
use crate::postgres_store::PostgresPlaintextBackend;
use api::define_kv_store_tests;
use api::kv_store::KvStore;
use api::types::{DeleteObjectRequest, GetObjectRequest, KeyValue, PutObjectRequest};

use bytes::Bytes;
use tokio::sync::OnceCell;
use tokio_postgres::NoTls;

Expand Down Expand Up @@ -814,4 +819,70 @@ mod tests {

drop_database(POSTGRES_ENDPOINT, DEFAULT_DB, vss_db, NoTls).await.unwrap();
}

#[tokio::test]
async fn supports_objects_up_to_non_large_object_threshold() {
let vss_db = "supports_objects_up_to_non_large_object_threshold";
let _ = drop_database(POSTGRES_ENDPOINT, DEFAULT_DB, vss_db, NoTls).await;

const MAXIMUM_SUPPORTED_VALUE_SIZE: usize = 1024 * 1024 * 1024;
const PROTOCOL_OVERHEAD_MARGIN: usize = 150;

// Construct entry that's for a field that's the maximum size of a non-"large_object" object
let large_value = vec![0u8; MAXIMUM_SUPPORTED_VALUE_SIZE - PROTOCOL_OVERHEAD_MARGIN];
let kv = KeyValue { key: "k1".into(), version: 0, value: Bytes::from(large_value) };

{
let store =
PostgresPlaintextBackend::new(POSTGRES_ENDPOINT, DEFAULT_DB, vss_db).await.unwrap();
let (start, end) = store.migrate_vss_database(MIGRATIONS).await.unwrap();
assert_eq!(start, MIGRATIONS_START);
assert_eq!(end, MIGRATIONS_END);
assert_eq!(store.get_upgrades_list().await, [MIGRATIONS_START]);
assert_eq!(store.get_schema_version().await, MIGRATIONS_END);

// Round trip with non-large_object of threshold size

store
.put(
"token".to_string(),
PutObjectRequest {
store_id: "store_id".to_string(),
global_version: None,
transaction_items: vec![kv],
delete_items: vec![],
},
)
.await
.unwrap();

let resp_kv = store
.get(
"token".to_string(),
GetObjectRequest { store_id: "store_id".to_string(), key: "k1".to_string() },
)
.await
.unwrap()
.value
.unwrap();
assert_eq!(
resp_kv.value.len(),
MAXIMUM_SUPPORTED_VALUE_SIZE - PROTOCOL_OVERHEAD_MARGIN
);
assert!(resp_kv.value.iter().all(|&b| b == 0));

store
.delete(
"token".to_string(),
DeleteObjectRequest {
store_id: "store_id".to_string(),
key_value: Some(resp_kv),
},
)
.await
.unwrap();
};

drop_database(POSTGRES_ENDPOINT, DEFAULT_DB, vss_db, NoTls).await.unwrap();
}
}
14 changes: 12 additions & 2 deletions rust/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use auth_impls::jwt::JWTAuthorizer;
use auth_impls::signature::SignatureValidatingAuthorizer;
use impls::postgres_store::{PostgresPlaintextBackend, PostgresTlsBackend};
use util::logger::ServerLogger;
use vss_service::VssService;
use vss_service::{VssService, VssServiceConfig};

mod util;
mod vss_service;
Expand All @@ -42,6 +42,16 @@ fn main() {
eprintln!("Failed to load configuration: {}", e);
std::process::exit(-1);
});
let vss_service_config = match &config.max_request_body_size {
Some(size) => match VssServiceConfig::new(*size) {
Ok(config) => config,
Err(e) => {
eprintln!("Configuration validation error: {}", e);
std::process::exit(-1);
},
},
None => VssServiceConfig::default(),
};

let logger = match ServerLogger::init(config.log_level, &config.log_file) {
Ok(logger) => logger,
Expand Down Expand Up @@ -162,7 +172,7 @@ fn main() {
match res {
Ok((stream, _)) => {
let io_stream = TokioIo::new(stream);
let vss_service = VssService::new(Arc::clone(&store), Arc::clone(&authorizer));
let vss_service = VssService::new(Arc::clone(&store), Arc::clone(&authorizer), vss_service_config);
runtime.spawn(async move {
if let Err(err) = http1::Builder::new().serve_connection(io_stream, vss_service).await {
warn!("Failed to serve connection: {}", err);
Expand Down
20 changes: 19 additions & 1 deletion rust/server/src/util/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::net::SocketAddr;
use std::path::PathBuf;

const BIND_ADDR_VAR: &str = "VSS_BIND_ADDRESS";
const MAX_REQUEST_BODY_SIZE_VAR: &str = "VSS_MAX_REQUEST_BODY_SIZE";
const LOG_FILE_VAR: &str = "VSS_LOG_FILE";
const LOG_LEVEL_VAR: &str = "VSS_LOG_LEVEL";
const JWT_RSA_PEM_VAR: &str = "VSS_JWT_RSA_PEM";
Expand All @@ -28,6 +29,7 @@ struct TomlConfig {
#[derive(Deserialize)]
struct ServerConfig {
bind_address: Option<SocketAddr>,
max_request_body_size: Option<usize>,
}

#[derive(Deserialize)]
Expand Down Expand Up @@ -59,6 +61,7 @@ struct LogConfig {
// Encapsulates the result of reading both the environment variables and the config file.
pub(crate) struct Configuration {
pub(crate) bind_address: SocketAddr,
pub(crate) max_request_body_size: Option<usize>,
pub(crate) rsa_pem: Option<String>,
pub(crate) postgresql_prefix: String,
pub(crate) default_db: String,
Expand Down Expand Up @@ -99,6 +102,11 @@ pub(crate) fn load_configuration(config_file_path: Option<&str>) -> Result<Confi
None => TomlConfig::default(), // All fields are set to `None`
};

let (bind_address_config, max_request_body_size_config) = match server_config {
Some(c) => (c.bind_address, c.max_request_body_size),
None => (None, None),
};

let bind_address_env = read_env(BIND_ADDR_VAR)?
.map(|addr| {
addr.parse().map_err(|e| {
Expand All @@ -108,7 +116,7 @@ pub(crate) fn load_configuration(config_file_path: Option<&str>) -> Result<Confi
.transpose()?;
let bind_address = read_config(
bind_address_env,
server_config.and_then(|c| c.bind_address),
bind_address_config,
"VSS server bind address",
BIND_ADDR_VAR,
)?;
Expand Down Expand Up @@ -141,6 +149,15 @@ pub(crate) fn load_configuration(config_file_path: Option<&str>) -> Result<Confi
let log_file_config: Option<PathBuf> = log_config.and_then(|config| config.file);
let log_file = log_file_env.or(log_file_config).unwrap_or(PathBuf::from("vss.log"));

let max_request_body_size_env = read_env(MAX_REQUEST_BODY_SIZE_VAR)?
.map(|mrbs| {
mrbs.parse::<usize>().map_err(|e| {
format!("Unable to parse the maximum request body size environment variable: {}", e)
})
})
.transpose()?;
let max_request_body_size = max_request_body_size_env.or(max_request_body_size_config);

let rsa_pem_env = read_env(JWT_RSA_PEM_VAR)?;
let rsa_pem = rsa_pem_env.or(jwt_auth_config.and_then(|config| config.rsa_pem));

Expand Down Expand Up @@ -199,6 +216,7 @@ pub(crate) fn load_configuration(config_file_path: Option<&str>) -> Result<Confi

Ok(Configuration {
bind_address,
max_request_body_size,
log_file,
log_level,
rsa_pem,
Expand Down
87 changes: 77 additions & 10 deletions rust/server/src/vss_service.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use http_body_util::{BodyExt, Full};
use http_body_util::{BodyExt, Full, Limited};
use hyper::body::{Bytes, Incoming};
use hyper::service::Service;
use hyper::{Request, Response, StatusCode};
Expand All @@ -22,15 +22,44 @@ use log::{debug, trace};

use crate::util::KeyValueVecKeyPrinter;

const MAXIMUM_REQUEST_BODY_SIZE: usize = 1024 * 1024 * 1024;

#[derive(Clone, Copy)]
pub(crate) struct VssServiceConfig {
maximum_request_body_size: usize,
}

impl VssServiceConfig {
pub fn new(maximum_request_body_size: usize) -> Result<Self, String> {
if maximum_request_body_size > MAXIMUM_REQUEST_BODY_SIZE {
return Err(format!(
"Maximum request body size {} exceeds maximum {}",
maximum_request_body_size, MAXIMUM_REQUEST_BODY_SIZE
));
}

Ok(Self { maximum_request_body_size })
}
}

impl Default for VssServiceConfig {
fn default() -> Self {
Self { maximum_request_body_size: MAXIMUM_REQUEST_BODY_SIZE }
}
}

#[derive(Clone)]
pub struct VssService {
store: Arc<dyn KvStore>,
authorizer: Arc<dyn Authorizer>,
config: VssServiceConfig,
}

impl VssService {
pub(crate) fn new(store: Arc<dyn KvStore>, authorizer: Arc<dyn Authorizer>) -> Self {
Self { store, authorizer }
pub(crate) fn new(
store: Arc<dyn KvStore>, authorizer: Arc<dyn Authorizer>, config: VssServiceConfig,
) -> Self {
Self { store, authorizer, config }
}
}

Expand All @@ -45,22 +74,51 @@ impl Service<Request<Incoming>> for VssService {
let store = Arc::clone(&self.store);
let authorizer = Arc::clone(&self.authorizer);
let path = req.uri().path().to_owned();
let maximum_request_body_size = self.config.maximum_request_body_size;

Box::pin(async move {
let prefix_stripped_path = path.strip_prefix(BASE_PATH_PREFIX).unwrap_or_default();

match prefix_stripped_path {
"/getObject" => {
handle_request(store, authorizer, req, handle_get_object_request).await
handle_request(
store,
authorizer,
req,
maximum_request_body_size,
handle_get_object_request,
)
.await
},
"/putObjects" => {
handle_request(store, authorizer, req, handle_put_object_request).await
handle_request(
store,
authorizer,
req,
maximum_request_body_size,
handle_put_object_request,
)
.await
},
"/deleteObject" => {
handle_request(store, authorizer, req, handle_delete_object_request).await
handle_request(
store,
authorizer,
req,
maximum_request_body_size,
handle_delete_object_request,
)
.await
},
"/listKeyVersions" => {
handle_request(store, authorizer, req, handle_list_object_request).await
handle_request(
store,
authorizer,
req,
maximum_request_body_size,
handle_list_object_request,
)
.await
},
_ => {
let error_msg = "Invalid request path.".as_bytes();
Expand Down Expand Up @@ -140,7 +198,7 @@ async fn handle_request<
Fut: Future<Output = Result<R, VssError>> + Send,
>(
store: Arc<dyn KvStore>, authorizer: Arc<dyn Authorizer>, request: Request<Incoming>,
handler: F,
maximum_request_body_size: usize, handler: F,
) -> Result<<VssService as Service<Request<Incoming>>>::Response, hyper::Error> {
let (parts, body) = request.into_parts();
let headers_map = parts
Expand All @@ -155,8 +213,17 @@ async fn handle_request<
Ok(auth_response) => auth_response.user_token,
Err(e) => return Ok(build_error_response(e)),
};
// TODO: we should bound the amount of data we read to avoid allocating too much memory.
let bytes = body.collect().await?.to_bytes();

let limited_body = Limited::new(body, maximum_request_body_size);
let bytes = match limited_body.collect().await {
Ok(body) => body.to_bytes(),
Err(_) => {
return Ok(Response::builder()
.status(StatusCode::PAYLOAD_TOO_LARGE)
.body(Full::new(Bytes::from("Request body too large")))
.unwrap());
},
};
match T::decode(bytes) {
Ok(request) => match handler(store.clone(), user_token, request).await {
Ok(response) => Ok(Response::builder()
Expand Down
4 changes: 3 additions & 1 deletion rust/server/vss-server-config.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
[server_config]
bind_address = "127.0.0.1:8080" # Optional in TOML, can be overridden by env var `VSS_BIND_ADDRESS`

# Maximum request body size in bytes. Can be set here or be overridden by env var 'VSS_MAX_REQUEST_BODY_SIZE'
# Defaults to the maximum possible value of 1 GB if unset.
# max_request_body_size = 1073741824
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm sounds like the trailing whitespace is still there ? feel free to use something like git diff --check to check

# Uncomment the table below to verify JWT tokens in the HTTP Authorization header against the given RSA public key,
# can be overridden by env var `VSS_JWT_RSA_PEM`
# [jwt_auth_config]
Expand Down