[Store] Add client bandwidth metrics for real and dummy clients#1874
[Store] Add client bandwidth metrics for real and dummy clients#1874stmatengss wants to merge 6 commits intokvcache-ai:mainfrom
Conversation
There was a problem hiding this comment.
Code Review
This pull request implements bandwidth and throughput reporting in the ClientMetric system, enabling performance tracking for read and write operations. It instruments DummyClient to record metrics across multiple RPC methods and adds environment variable control for bandwidth summaries. The review feedback recommends refactoring the logging construction for clarity and utilizing a template helper in DummyClient to reduce the code duplication introduced by the repeated timing and observation logic.
| LOG(INFO) << "Client Metrics Report:\n" | ||
| << summary | ||
| << (bandwidth_report.empty() | ||
| ? "" | ||
| : std::string("\n") + bandwidth_report); |
There was a problem hiding this comment.
The construction of the log message using a ternary operator within the stream is a bit difficult to read. For better clarity and maintainability, consider building the full report string before logging it. For example:
std::string report = "Client Metrics Report:\n" + summary;
if (!bandwidth_report.empty()) {
report += "\n" + bandwidth_report;
}
LOG(INFO) << report;| int DummyClient::put(const std::string& key, std::span<const char> value, | ||
| const ReplicateConfig& config) { | ||
| return to_py_ret(invoke_rpc<&RealClient::put_dummy_helper, void>( | ||
| key, value, config, client_id_)); | ||
| const auto start_time = std::chrono::steady_clock::now(); | ||
| auto result = | ||
| invoke_rpc<&RealClient::put_dummy_helper, void>(key, value, config, | ||
| client_id_); | ||
| if (result.has_value()) { | ||
| ObserveWriteMetric(value.size_bytes(), elapsed_us_since(start_time), | ||
| false); | ||
| } | ||
| return to_py_ret(result); | ||
| } | ||
|
|
||
| int DummyClient::put_batch(const std::vector<std::string>& keys, | ||
| const std::vector<std::span<const char>>& values, | ||
| const ReplicateConfig& config) { | ||
| return to_py_ret(invoke_rpc<&RealClient::put_batch_dummy_helper, void>( | ||
| keys, values, config, client_id_)); | ||
| const auto start_time = std::chrono::steady_clock::now(); | ||
| auto result = invoke_rpc<&RealClient::put_batch_dummy_helper, void>( | ||
| keys, values, config, client_id_); | ||
| if (result.has_value()) { | ||
| ObserveWriteMetric(sum_value_sizes(values), elapsed_us_since(start_time), | ||
| true); | ||
| } | ||
| return to_py_ret(result); | ||
| } | ||
|
|
||
| int DummyClient::put_parts(const std::string& key, | ||
| std::vector<std::span<const char>> values, | ||
| const ReplicateConfig& config) { | ||
| return to_py_ret(invoke_rpc<&RealClient::put_parts_dummy_helper, void>( | ||
| key, values, config, client_id_)); | ||
| const auto start_time = std::chrono::steady_clock::now(); | ||
| auto result = invoke_rpc<&RealClient::put_parts_dummy_helper, void>( | ||
| key, values, config, client_id_); | ||
| if (result.has_value()) { | ||
| ObserveWriteMetric(sum_value_sizes(values), elapsed_us_since(start_time), | ||
| false); | ||
| } | ||
| return to_py_ret(result); | ||
| } | ||
|
|
||
| int DummyClient::upsert(const std::string& key, std::span<const char> value, | ||
| const ReplicateConfig& config) { | ||
| return to_py_ret(invoke_rpc<&RealClient::upsert_dummy_helper, void>( | ||
| key, value, config, client_id_)); | ||
| const auto start_time = std::chrono::steady_clock::now(); | ||
| auto result = invoke_rpc<&RealClient::upsert_dummy_helper, void>( | ||
| key, value, config, client_id_); | ||
| if (result.has_value()) { | ||
| ObserveWriteMetric(value.size_bytes(), elapsed_us_since(start_time), | ||
| false); | ||
| } | ||
| return to_py_ret(result); | ||
| } |
There was a problem hiding this comment.
There's a recurring pattern of timing an RPC call and observing a write metric upon success. This pattern is repeated across put, put_batch, put_parts, upsert, and upsert_parts, leading to significant code duplication.
To improve maintainability and reduce redundancy, consider refactoring this into a private helper method. For example, you could create a template helper function that wraps the RPC invocation and metric observation logic.
Here's an example of how such a helper could look and how put could be simplified:
Helper method in DummyClient:
template <auto ServiceMethod, typename... Args>
int invoke_write_rpc_and_observe(size_t bytes, bool is_batch, Args&&... args) {
const auto start_time = std::chrono::steady_clock::now();
auto result = invoke_rpc<ServiceMethod, void>(std::forward<Args>(args)...);
if (result.has_value()) {
ObserveWriteMetric(bytes, elapsed_us_since(start_time), is_batch);
}
return to_py_ret(result);
}Refactored put method:
int DummyClient::put(const std::string& key, std::span<const char> value,
const ReplicateConfig& config) {
return invoke_write_rpc_and_observe<&RealClient::put_dummy_helper>(
value.size_bytes(), false, key, value, config, client_id_);
}Applying this pattern to other similar methods would make the code more concise and easier to maintain.
|
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
| value == "on" || value == "enable"); | ||
| } | ||
|
|
||
| bool parseBoolEnv(const char* env_name, bool default_value) { |
There was a problem hiding this comment.
parseBoolEnv is a useful utility, but it lives in an anonymous namespace inside client_metric.cpp, so it can't be reused elsewhere. Once PR #1538 lands and a GetEnvBoolOr (or equivalent) is available in utils.h, this function and parseMetricsEnabled can both be removed in a follow-up.
| // Ascend physical device id for dummy-real RPC to real, set in setup_dummy | ||
| int32_t device_id_ = 0; | ||
|
|
||
| std::unique_ptr<ClientMetric> metrics_; |
There was a problem hiding this comment.
DummyClient now owns a ClientMetric that contains master_client_metric, but DummyClient's RPCs don't go through MasterClient, so rpc_count and rpc_latency are never updated. In practice this doesn't produce noisy output (ylt skips serializing zero-value metrics), but the field is semantically dead for DummyClient. Worth tracking as a follow-up: either wire up RPC observation in DummyClient::invoke_rpc, or introduce a slimmer metrics struct for DummyClient that omits MasterClientMetric.
| ObserveWriteMetric(value.size_bytes(), elapsed_us_since(start_time), | ||
| false); |
There was a problem hiding this comment.
Nit:
There's a subtle inconsistency in how latency is observed for single vs. batch operations.
Single ops (e.g. put, upsert) only call ObserveWriteMetric when the result is successful:
if (result.has_value()) {
ObserveWriteMetric(value.size_bytes(), elapsed_us_since(start_time), false);
}
Batch ops (e.g. batch_upsert_from, batch_put_from) always call ObserveWriteMetric regardless of per-key success — sum_successful_sizes zeroes out the bytes for failed keys, but the elapsed time is still recorded into the latency histogram:
ObserveWriteMetric(sum_successful_sizes(results, sizes),
elapsed_us_since(start_time), true); // called even if all keys failed
This means the batch latency histogram captures "time for the batch RPC to complete" (including partial failures), while the single-op histogram only captures "latency of successful operations". The two distributions aren't directly comparable.
Either approach is defensible — recording all latencies (including failures) gives better tail-latency visibility; recording only successes gives cleaner throughput-correlated data.
|
@copilot resolve the merge conflicts in this pull request |
# Conflicts: # mooncake-store/include/client_metric.h # mooncake-store/src/client_metric.cpp
|
@00fish0 Plz check again for fixes. |
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Adds client-side bandwidth/throughput and per-interface operation metrics for both real and dummy clients, with flags to control reporting and RPC metric inclusion.
Changes:
- Add bandwidth (average + interval) reporting with an env flag (
MC_STORE_CLIENT_METRIC_BANDWIDTH) and make master RPC metrics optionally disabled. - Introduce per-interface transfer operation metrics and wire observations into real/dummy client APIs.
- Extend unit tests to cover new summary sections and toggles.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| mooncake-store/include/client_metric.h | Adds bandwidth formatting/helpers, timed-operation wrapper, and per-operation transfer metrics. |
| mooncake-store/src/client_metric.cpp | Adds env parsing, toggles for bandwidth + master RPC metrics, and interval bandwidth reporting. |
| mooncake-store/include/client_service.h | Exposes a Client::ObserveTransferOperation hook to record per-interface metrics. |
| mooncake-store/src/real_client.cpp | Instruments real client read/write APIs to observe bytes + latency per interface operation. |
| mooncake-store/src/dummy_client.cpp | Adds dummy-client metrics collection and instruments dummy RPC wrappers and read/write APIs. |
| mooncake-store/include/dummy_client.h | Adds metrics member and an observed RPC helper to reduce instrumentation boilerplate. |
| mooncake-store/tests/client_metrics_test.cpp | Extends tests for bandwidth strings, interface op summary, and new flags. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| int64_t p95_target = (total_count * 95) / 100; | ||
| int64_t cumulative = 0; | ||
| double p95_bucket = 0; | ||
| for (size_t i = 0; | ||
| i < bucket_counts.size() && i < kLatencyBucket.size(); ++i) { | ||
| cumulative += bucket_counts[i]->value(label); | ||
| if (cumulative >= p95_target && p95_bucket == 0) { | ||
| p95_bucket = kLatencyBucket[i]; | ||
| break; | ||
| } | ||
| } |
There was a problem hiding this comment.
The p95 target calculation can become 0 for small counts (e.g., total_count=1), causing p95 to always resolve to the first bucket. Compute the rank using a ceil-like formula and clamp to at least 1 (e.g., (total_count*95+99)/100) so p95 reflects at least one observation.
| void ObserveTransferOperation(TransferOperationKind kind, | ||
| const std::string& op_name, uint64_t bytes, | ||
| uint64_t latency_us) { | ||
| if (metrics_ != nullptr) { | ||
| metrics_->ObserveTransferOperation(kind, op_name, bytes, | ||
| latency_us); | ||
| } | ||
| } |
There was a problem hiding this comment.
Taking op_name as const std::string& forces callers that pass string literals (as in this PR) to construct a temporary std::string, which can add overhead on hot paths. Prefer std::string_view (or const char*) in this API, and forward to the metric implementation without allocating.
| size_t sum_value_sizes(const std::vector<std::span<const char>> &values) { | ||
| size_t total = 0; | ||
| for (const auto &value : values) { | ||
| total += value.size_bytes(); | ||
| } | ||
| return total; | ||
| } |
There was a problem hiding this comment.
These helper functions are defined at global namespace scope in a .cpp file with external linkage. To avoid accidental symbol collisions across translation units, move them into an unnamed namespace or mark them static (and consider consolidating shared helpers used by both real_client.cpp and dummy_client.cpp into a single internal utility to reduce duplicated logic).
| TEST_F(ClientMetricsTest, BandwidthSummaryRespectsEnvFlag) { | ||
| setenv("MC_STORE_CLIENT_METRIC_BANDWIDTH", "0", 1); | ||
| auto metrics = ClientMetric::Create(); | ||
| ASSERT_NE(metrics, nullptr); | ||
|
|
||
| metrics->transfer_metric.total_read_bytes.inc(1024); | ||
| std::string summary = metrics->summary_metrics(); | ||
| EXPECT_TRUE(summary.find("Average Read Throughput:") == std::string::npos); | ||
|
|
||
| unsetenv("MC_STORE_CLIENT_METRIC_BANDWIDTH"); | ||
| } |
There was a problem hiding this comment.
This test mutates a process-wide environment variable but does not restore any pre-existing value if it was already set, which can make the test order-dependent. Capture the prior value (if any) and restore it in teardown (or use an RAII env-guard) so the test is hermetic.
| bool has_any_valid_fragment = false; | ||
| }; | ||
|
|
||
| size_t sum_value_sizes(const std::vector<std::span<const char>> &values) { |
There was a problem hiding this comment.
Nits:
sum_value_sizes / sum_sizes / sum_successful_sizes / sum_successful_nested_sizes / sum_positive_results / sum_positive_ranges are copied verbatim in both dummy_client.cpp:26-95 and real_client.cpp:65-108.
Suggest lifting them into a metric_detail nested namespace in client_metric.h so the two sides can't drift in the future.
Description
Module
mooncake-transfer-engine)mooncake-store)mooncake-ep)mooncake-integration)mooncake-p2p-store)mooncake-wheel)mooncake-pg)mooncake-rl)Type of Change
How Has This Been Tested?
Checklist
./scripts/code_format.shbefore submitting.