[Store] Add LOCAL_DISK replica support in Python binding#1847
[Store] Add LOCAL_DISK replica support in Python binding#1847hnts03-moreh wants to merge 7 commits intokvcache-ai:mainfrom
Conversation
Add enable_offload to setup_real() and Python setup() so callers can enable LOCAL_DISK offload via FileStorage without patching C++ code.
Start a coro_rpc_server in setup_internal() with batch_get_offload_object and release_offload_buffer handlers. This enables LOCAL_DISK offload reads when using Python binding (previously only worked with standalone real_client_main). The RPC server runs on local_rpc_addr in a background jthread, following the same pattern as the existing IPC server.
TransferRead previously fell through to get_disk_descriptor() for any non-MEMORY replica, which would crash on LOCAL_DISK replicas (wrong variant access). Add explicit is_disk_replica() and is_local_disk_replica() branches matching the pattern used in client_buffer.cpp:calculate_total_size(). Currently LOCAL_DISK replicas are routed through the separate offload read path (batch_get_into_offload_object_internal) and do not reach TransferRead, but this defensive fix prevents crashes if routing changes in the future.
There was a problem hiding this comment.
Code Review
This pull request introduces an embedded RPC server within the client to support offload object reads, particularly for Python bindings. It includes updates to the setup_real interface across various client implementations to support an enable_offload flag and logic for automatic RPC port assignment. Feedback focuses on improving the robustness of the RPC server startup by using async_start() to avoid potential race conditions from manual thread management and sleeps. Additionally, it is recommended to extract duplicated port assignment logic into a helper method to improve maintainability.
| // Start in background thread (start() blocks) | ||
| client_rpc_thread_ = std::jthread([this]() { | ||
| LOG(INFO) << "Client RPC server starting on " | ||
| << this->local_rpc_addr; | ||
| auto ec = client_rpc_server_->start(); | ||
| if (ec) { | ||
| LOG(ERROR) << "Client RPC server stopped with error"; | ||
| } | ||
| }); | ||
|
|
||
| // Brief wait to confirm binding | ||
| std::this_thread::sleep_for(std::chrono::milliseconds(100)); | ||
| LOG(INFO) << "Client RPC server started on " << this->local_rpc_addr; | ||
| return 0; |
There was a problem hiding this comment.
The current implementation of starting the RPC server in a background thread and using sleep_for to wait for it to bind is not robust and can lead to race conditions. The server might fail to bind after the sleep duration, or the sleep might be unnecessarily long.
A better approach is to use async_start(), similar to how http_server_ is started. This avoids manual thread management and unreliable sleeps.
This would involve:
- Removing
client_rpc_thread_fromreal_client.h. - Refactoring
start_client_rpc_serverandstop_client_rpc_serveras follows:
// In real_client.cpp
int RealClient::start_client_rpc_server() {
if (this->local_rpc_addr.empty()) {
LOG(WARNING) << "local_rpc_addr is empty, skip client RPC server";
return -1;
}
// Parse host:port from local_rpc_addr
size_t colon = this->local_rpc_addr.find_last_of(':');
if (colon == std::string::npos) {
LOG(ERROR) << "Invalid local_rpc_addr (no port): "
<< this->local_rpc_addr;
return -1;
}
std::string rpc_host = this->local_rpc_addr.substr(0, colon);
int rpc_port = std::stoi(this->local_rpc_addr.substr(colon + 1));
try {
client_rpc_server_ =
std::make_unique<coro_rpc::coro_rpc_server>(1, rpc_port, rpc_host);
// Register offload-related handlers
client_rpc_server_
->register_handler<&RealClient::batch_get_offload_object>(this);
client_rpc_server_
->register_handler<&RealClient::release_offload_buffer>(this);
client_rpc_server_
->register_handler<&RealClient::service_ready_internal>(this);
auto ec = client_rpc_server_->async_start();
if (ec.has_value()) {
LOG(ERROR) << "Failed to start client RPC server: " << ec.value().message();
client_rpc_server_.reset();
return -1;
}
LOG(INFO) << "Client RPC server started on " << this->local_rpc_addr;
return 0;
} catch (const std::exception &e) {
LOG(ERROR) << "Failed to start client RPC server: " << e.what();
client_rpc_server_.reset();
return -1;
}
}
void RealClient::stop_client_rpc_server() {
if (client_rpc_server_) {
client_rpc_server_->stop();
client_rpc_server_.reset();
LOG(INFO) << "Client RPC server stopped";
}
}| if (local_rpc_port > 0) { | ||
| this->local_rpc_addr = hostname.substr(0, colon_pos + 1) + | ||
| std::to_string(local_rpc_port); | ||
| } else { | ||
| auto rpc_binder = std::make_unique<AutoPortBinder>(); | ||
| int rpc_auto = rpc_binder->getPort(); | ||
| int specified_port = std::stoi(hostname.substr(colon_pos + 1)); | ||
| if (rpc_auto > 0 && rpc_auto != specified_port) { | ||
| this->local_rpc_addr = hostname.substr(0, colon_pos + 1) + | ||
| std::to_string(rpc_auto); | ||
| rpc_port_binder_ = std::move(rpc_binder); | ||
| } else { | ||
| this->local_rpc_addr = hostname.substr(0, colon_pos + 1) + | ||
| std::to_string(specified_port); | ||
| } | ||
| } |
There was a problem hiding this comment.
The logic for auto-assigning an RPC port when local_rpc_port <= 0 is duplicated here and later in the function (lines 405-419). This could be extracted into a private helper method to improve maintainability and reduce code duplication.
For example:
void RealClient::auto_assign_rpc_address(const std::string& base_hostname, int te_port) {
auto rpc_binder = std::make_unique<AutoPortBinder>();
int rpc_auto_port = rpc_binder->getPort();
if (rpc_auto_port > 0 && rpc_auto_port != te_port) {
this->local_rpc_addr = base_hostname + ":" + std::to_string(rpc_auto_port);
rpc_port_binder_ = std::move(rpc_binder);
} else {
LOG(WARNING) << "Could not auto-assign a separate RPC port, "
<< "falling back to the Transfer Engine port " << te_port;
this->local_rpc_addr = base_hostname + ":" + std::to_string(te_port);
}
}Then you could replace the duplicated blocks with calls to this new helper method.
… helper - Replace manual jthread + sleep with async_start() for client RPC server startup, matching the existing http_server_ pattern. - Remove client_rpc_thread_ member as it is no longer needed. - Extract duplicated auto-port assignment logic into auto_assign_rpc_address() private helper method.
|
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Pull request overview
Adds Python-binding support for LOCAL_DISK replicas by enabling offload in MooncakeDistributedStore, starting an embedded client RPC server for offload reads, and fixing TransferRead() sizing logic for LOCAL_DISK descriptors.
Changes:
- Exposes
enable_offloadin the PythonMooncakeDistributedStore.setup()positional overload and threads it intoRealClient::setup_real. - Adds embedded
coro_rpc_serverlifecycle inRealClientand auto-assigns a client RPC address/port when requested. - Fixes
Client::TransferRead()to handle LOCAL_DISK replica descriptors explicitly.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| mooncake-store/src/real_client.cpp | Adds RPC address auto-assignment and starts/stops an embedded client RPC server during setup/teardown. |
| mooncake-store/src/client_service.cpp | Fixes TransferRead() size determination for LOCAL_DISK replicas. |
| mooncake-store/include/real_client.h | Declares new RPC server helpers/state and extends setup_real signature. |
| mooncake-store/include/pyclient.h | Extends PyClient::setup_real signature to include enable_offload. |
| mooncake-store/include/dummy_client.h | Updates DummyClient::setup_real signature to match interface change. |
| mooncake-integration/store/store_py.cpp | Exposes enable_offload in the Python positional setup() overload. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Release RPC port binder so the client RPC server can bind the same port. | ||
| // rpc_port_binder_ holds the port via bind() to prevent reuse; we must | ||
| // release it before coro_rpc_server tries to bind. | ||
| if (rpc_port_binder_) { | ||
| rpc_port_binder_.reset(); | ||
| } | ||
|
|
||
| // Start embedded Client RPC server for offload object reads. | ||
| // This is needed when using Python binding (no standalone server). | ||
| if (start_client_rpc_server() != 0) { | ||
| LOG(WARNING) << "Failed to start client RPC server on " | ||
| << this->local_rpc_addr | ||
| << ". LOCAL_DISK offload reads may not work."; |
There was a problem hiding this comment.
setup_internal unconditionally starts the embedded client RPC server on local_rpc_addr. This will conflict with the standalone real-client service (real_client_main.cpp), which also binds a coro_rpc_server on the same port after calling setup_internal, causing the main server bind to fail. Consider gating start_client_rpc_server() so it only runs in the embedded/Python case (e.g., when local_rpc_port==0 or another explicit signal), and likely only when enable_offload is true.
| // Release RPC port binder so the client RPC server can bind the same port. | |
| // rpc_port_binder_ holds the port via bind() to prevent reuse; we must | |
| // release it before coro_rpc_server tries to bind. | |
| if (rpc_port_binder_) { | |
| rpc_port_binder_.reset(); | |
| } | |
| // Start embedded Client RPC server for offload object reads. | |
| // This is needed when using Python binding (no standalone server). | |
| if (start_client_rpc_server() != 0) { | |
| LOG(WARNING) << "Failed to start client RPC server on " | |
| << this->local_rpc_addr | |
| << ". LOCAL_DISK offload reads may not work."; | |
| const bool has_reserved_rpc_port = static_cast<bool>(rpc_port_binder_); | |
| // Release RPC port binder so the eventual RPC server can bind the same | |
| // port. rpc_port_binder_ holds the port via bind() to prevent reuse; we | |
| // must release it before any coro_rpc_server tries to bind. | |
| if (rpc_port_binder_) { | |
| rpc_port_binder_.reset(); | |
| } | |
| // Start the embedded Client RPC server only for the embedded/offload case. | |
| // When the RPC port was pre-reserved, a standalone real-client service is | |
| // expected to bind local_rpc_addr after setup_internal(), so starting the | |
| // embedded server here would conflict on the same port. | |
| if (enable_offload && !has_reserved_rpc_port) { | |
| if (start_client_rpc_server() != 0) { | |
| LOG(WARNING) << "Failed to start client RPC server on " | |
| << this->local_rpc_addr | |
| << ". LOCAL_DISK offload reads may not work."; | |
| } |
There was a problem hiding this comment.
Addressed in 3b94d01.
Gated start_client_rpc_server() on local_rpc_port == 0 && enable_offload — standalone passes FLAGS_port (non-zero) and runs its own server separately, so the embedded one no longer collides. The handlers we register are also offload-only, so skipping the server when offload is disabled is the right behavior.
| size_t colon_pos = hostname.find(':'); | ||
| bool user_specified_port = (colon_pos != std::string::npos); | ||
|
|
||
| if (user_specified_port) { | ||
| // User specified port, no retry needed | ||
| this->local_hostname = local_hostname; | ||
| this->local_rpc_addr = | ||
| hostname.substr(0, colon_pos + 1) + std::to_string(local_rpc_port); | ||
| int specified_port = std::stoi(hostname.substr(colon_pos + 1)); | ||
| auto_assign_rpc_address(hostname.substr(0, colon_pos + 1), | ||
| local_rpc_port, specified_port); |
There was a problem hiding this comment.
Port parsing for local_hostname uses hostname.find(':') plus std::stoi(hostname.substr(colon_pos + 1)), which will throw for bracketed IPv6 endpoints like [::1]:17813 (and also makes host_prefix incorrect). This breaks the existing IPv6 endpoint format used in tests/docs. Use the shared endpoint parsing helpers (e.g., parseHostNameWithPort / extractIPv6HostAndPort) or at least rfind(':') with IPv6-aware handling, and avoid std::stoi throwing on malformed inputs.
There was a problem hiding this comment.
Addressed in 3b94d01.
Switched to parseHostNameWithPort() from mooncake-transfer-engine/include/common.h (already used elsewhere in the codebase, e.g. client_service.cpp:411). It strips IPv6 brackets correctly and returns port 0 on malformed input without throwing.
| // Parse host:port from local_rpc_addr | ||
| size_t colon = this->local_rpc_addr.find_last_of(':'); | ||
| if (colon == std::string::npos) { | ||
| LOG(ERROR) << "Invalid local_rpc_addr (no port): " | ||
| << this->local_rpc_addr; | ||
| return -1; | ||
| } | ||
| std::string rpc_host = this->local_rpc_addr.substr(0, colon); | ||
| int rpc_port = std::stoi(this->local_rpc_addr.substr(colon + 1)); | ||
|
|
||
| try { | ||
| client_rpc_server_ = | ||
| std::make_unique<coro_rpc::coro_rpc_server>(1, rpc_port, rpc_host); | ||
|
|
There was a problem hiding this comment.
start_client_rpc_server() derives rpc_host by taking the substring before the last ':' from local_rpc_addr and passes it directly to coro_rpc_server. For IPv6 endpoints, local_rpc_addr is typically bracketed (e.g. [::1]:port), so rpc_host will include brackets, which is not a valid bind address for most socket APIs. Also std::stoi is executed outside the try/catch and can terminate the process on invalid ports. Parse with parseHostNameWithPort (to strip brackets and validate) and keep all parsing inside the exception boundary (or use non-throwing parsing).
| // Parse host:port from local_rpc_addr | |
| size_t colon = this->local_rpc_addr.find_last_of(':'); | |
| if (colon == std::string::npos) { | |
| LOG(ERROR) << "Invalid local_rpc_addr (no port): " | |
| << this->local_rpc_addr; | |
| return -1; | |
| } | |
| std::string rpc_host = this->local_rpc_addr.substr(0, colon); | |
| int rpc_port = std::stoi(this->local_rpc_addr.substr(colon + 1)); | |
| try { | |
| client_rpc_server_ = | |
| std::make_unique<coro_rpc::coro_rpc_server>(1, rpc_port, rpc_host); | |
| try { | |
| auto rpc_endpoint = parseHostNameWithPort(this->local_rpc_addr); | |
| if (!rpc_endpoint.has_value()) { | |
| LOG(ERROR) << "Invalid local_rpc_addr: " << this->local_rpc_addr; | |
| return -1; | |
| } | |
| const auto &[rpc_host, rpc_port] = *rpc_endpoint; | |
| client_rpc_server_ = std::make_unique<coro_rpc::coro_rpc_server>( | |
| 1, rpc_port, rpc_host); |
There was a problem hiding this comment.
This was already addressed earlier in 445dd55 based on @gemini-code-assist's original review.
The duplicated port assignmentlogic was extracted into the private helper auto_assign_rpc_address() in real_client.h / real_client.cpp.
| "setup", | ||
| [](MooncakeStorePyWrapper &self, const std::string &local_hostname, | ||
| const std::string &metadata_server, | ||
| size_t global_segment_size = 1024 * 1024 * 16, | ||
| size_t local_buffer_size = 1024 * 1024 * 16, | ||
| const std::string &protocol = "tcp", | ||
| const std::string &rdma_devices = "", | ||
| const std::string &master_server_addr = "127.0.0.1:50051", | ||
| const py::object &engine = py::none()) { | ||
| const py::object &engine = py::none(), | ||
| bool enable_offload = false) { | ||
| auto real_client = self.init_real_client(); | ||
| std::shared_ptr<mooncake::TransferEngine> transfer_engine = | ||
| nullptr; | ||
| if (!engine.is_none()) { | ||
| transfer_engine = | ||
| engine.cast<std::shared_ptr<TransferEngine>>(); | ||
| } | ||
| return real_client->setup_real( | ||
| local_hostname, metadata_server, global_segment_size, | ||
| local_buffer_size, protocol, rdma_devices, | ||
| master_server_addr, transfer_engine, ""); | ||
| master_server_addr, transfer_engine, "", enable_offload); | ||
| }, | ||
| py::arg("local_hostname"), py::arg("metadata_server"), | ||
| py::arg("global_segment_size"), py::arg("local_buffer_size"), | ||
| py::arg("protocol"), py::arg("rdma_devices"), | ||
| py::arg("master_server_addr"), py::arg("engine") = py::none()) | ||
| py::arg("master_server_addr"), py::arg("engine") = py::none(), | ||
| py::arg("enable_offload") = false) | ||
| .def( |
There was a problem hiding this comment.
The positional-args setup() overload now supports enable_offload, but the setup(config_dict) overload (and its docstring of supported keys) does not. This creates two different configuration surfaces for the same feature and makes it easy for Python callers to think offload is enabled when using the dict-based setup. Consider supporting an enable_offload key in the dict overload (and documenting it), or clearly documenting that offload is only available via the positional overload.
There was a problem hiding this comment.
Addressed in 6ab73aa.
- Added
CONFIG_KEY_ENABLE_OFFLOAD = "enable_offload"to types.h. - Added a
get_config_bool()helper mirroring the existingget_config_size(); accepts the common boolean spellings (true/false, 1/0, yes/no, on/off) case-insensitively and logs a warning on malformed values. - setup_internal(ConfigDict) now reads enable_offload and forwards it to the positional-args setup_internal(...) so both overloads share the same code path.
- Updated the store_py pybind docstring for the dict overload to document the new key.
…ent_rpc_server Address Copilot AI review comments on PR kvcache-ai#1847: 1. Conflict with standalone real_client_main server The embedded client RPC server was started unconditionally inside setup_internal(), which conflicts with real_client_main.cpp that binds its own coro_rpc_server on the same FLAGS_port right after setup_internal() returns. Gate the embedded server start on: - local_rpc_port == 0 (Python auto-assign signal) - enable_offload == true (the registered handlers batch_get_offload_object / release_offload_buffer / service_ready_internal are offload-only) Standalone still runs its own server unchanged. 2. IPv6 endpoint parsing bug in start_client_rpc_server local_rpc_addr like "[::1]:17813" was parsed with find_last_of(':') and substr, leaving rpc_host = "[::1]" with brackets (invalid for most socket APIs). std::stoi also sat outside the try/catch, so a malformed port string would terminate the process. Switch to the shared parseHostNameWithPort() helper, which strips brackets and returns port 0 on malformed input without throwing.
…ent_rpc_server Address Copilot AI review comments on PR kvcache-ai#1847: 1. Conflict with standalone real_client_main server The embedded client RPC server was started unconditionally inside setup_internal(), which conflicts with real_client_main.cpp that binds its own coro_rpc_server on the same FLAGS_port right after setup_internal() returns. Gate the embedded server start on: - local_rpc_port == 0 (Python auto-assign signal) - enable_offload == true (the registered handlers batch_get_offload_object / release_offload_buffer / service_ready_internal are offload-only) Standalone still runs its own server unchanged. 2. IPv6 endpoint parsing bug in start_client_rpc_server local_rpc_addr like "[::1]:17813" was parsed with find_last_of(':') and substr, leaving rpc_host = "[::1]" with brackets (invalid for most socket APIs). std::stoi also sat outside the try/catch, so a malformed port string would terminate the process. Switch to the shared parseHostNameWithPort() helper, which strips brackets and returns port 0 on malformed input without throwing.
Address Copilot AI review comment on PR kvcache-ai#1847: the positional-args setup() overload supported enable_offload, but the ConfigDict-based setup(config_dict) overload did not. Users configuring via a Python dict had no way to turn offload on. Changes: - Add CONFIG_KEY_ENABLE_OFFLOAD = "enable_offload" to types.h alongside the other setup config keys. - Add a get_config_bool() helper that parses the usual boolean string spellings (true/false, 1/0, yes/no, on/off), case-insensitive, with a logged warning + default fallback on malformed values. Mirrors the existing get_config_size() helper. - Read the enable_offload key inside setup_internal(ConfigDict) and forward it to the positional-args setup_internal(...) so both overloads share the same path. - Document the new key in the store_py setup(dict) pybind docstring.
Address Copilot AI review comment on PR kvcache-ai#1847: the positional-args setup() overload supported enable_offload, but the ConfigDict-based setup(config_dict) overload did not. Users configuring via a Python dict had no way to turn offload on. Changes: - Add CONFIG_KEY_ENABLE_OFFLOAD = "enable_offload" to types.h alongside the other setup config keys. - Add a get_config_bool() helper that parses the usual boolean string spellings (true/false, 1/0, yes/no, on/off), case-insensitive, with a logged warning + default fallback on malformed values. Mirrors the existing get_config_size() helper. - Read the enable_offload key inside setup_internal(ConfigDict) and forward it to the positional-args setup_internal(...) so both overloads share the same path. - Document the new key in the store_py setup(dict) pybind docstring.
|
@stmatengss I'll close this PR. Thanks |
Description
Enable LOCAL_DISK replica support in the Python binding (
MooncakeDistributedStore).Previously, the Python binding could only operate on MEMORY (DRAM) replicas. This PR adds three changes to support the full LOCAL_DISK offload lifecycle:
enable_offloadparameter: Exposesenable_offloadinsetup()so FileStorage and the heartbeat-based offload mechanism can be activated from Python. Includeslocal_rpc_port=0auto-assignment viaAutoPortBinder.coro_rpc_serverin a background thread duringsetup(), registeringbatch_get_offload_object,release_offload_buffer, andservice_ready_internalhandlers. Without this, anybatch_get_intofor LOCAL_DISK replicas fails with connection refused.TransferRead()previously fell through toget_disk_descriptor()for all non-MEMORY replicas, which would crash on LOCAL_DISK descriptors. Added explicitis_disk_replica()/is_local_disk_replica()branching, matching the pattern already used inclient_buffer.cpp.Module
mooncake-store)mooncake-integration)Type of Change
How Has This Been Tested?
enable_offload=Trueactivates FileStorage and creates LOCAL_DISK replicas via heartbeatbatch_replica_clear+ LOCAL_DISK read path with Client RPC serverclang-format-20Checklist
./scripts/code_format.shbefore submitting.