Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions docs/cn/auto_concurrency_limiter.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,21 @@ netflix中的gradient算法公式为:max_concurrency = min_latency / latency *
* gradient算法中的max_concurrency / latency从概念上和qps有关联(根据little's law),但可能严重脱节。比如在重测
min_latency前,若所有latency都小于min_latency,那么max_concurrency会不断下降甚至到0;但按照本算法,max_qps和min_latency仍然是稳定的,它们计算出的max_concurrency也不会剧烈变动。究其本质,gradient算法在迭代max_concurrency时,latency并不能代表实际并发为max_concurrency时的延时,两者是脱节的,所以max_concurrency / latency的实际物理含义不明,与qps可能差异甚大,最后导致了很大的偏差。
* gradient算法的queue_size推荐为sqrt(max_concurrency),这是不合理的。netflix对queue_size的理解大概是代表各种不可控环节的缓存,比如socket里的,和max_concurrency存在一定的正向关系情有可原。但在我们的理解中,这部分queue_size作用微乎其微,没有或用常量即可。我们关注的queue_size是给concurrency上升留出的探索空间: max_concurrency的更新是有延迟的,在并发从低到高的增长过程中,queue_size的作用就是在max_concurrency更新前不限制qps上升。而当concurrency高时,服务可能已经过载了,queue_size就应该小一点,防止进一步恶化延时。这里的queue_size和并发是反向关系。

## 错误率惩罚阈值

`auto_cl_error_rate_punish_threshold`用于设置错误率"死区",低于该阈值的错误率不会产生惩罚,避免少量错误请求对max_concurrency的过度影响。

| GFlag | 默认值 | 有效范围 | 说明 |
|-------|--------|----------|------|
| auto_cl_error_rate_punish_threshold | 0 | [0, 1) | 错误率惩罚阈值,0表示禁用 |

- **默认值为0**:禁用该功能,保持原有行为
- **设置为有效值(如0.1)**:错误率 ≤ 阈值时惩罚为0;错误率 > 阈值时惩罚线性增长
- **无效值处理**:≥1 的值会被忽略,等同于0

**示例**:
```
# 错误率低于10%时不惩罚,高于10%时线性增加惩罚
--auto_cl_error_rate_punish_threshold=0.1
```
31 changes: 30 additions & 1 deletion src/brpc/policy/auto_concurrency_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ DEFINE_int32(auto_cl_latency_fluctuation_correction_factor, 1,
"the value, the higher the tolerance for the fluctuation of the "
"latency. If the value is too large, the latency will be higher "
"when the server is overloaded.");
DEFINE_double(auto_cl_error_rate_punish_threshold, 0,
"Threshold for error-rate-based punishment attenuation. "
"Valid range: [0, 1). 0 (default) disables the feature. "
"Values >= 1 are ignored and treated as 0. "
"e.g. 0.1: error rates below 10%% produce zero punishment; "
"above it the punishment scales linearly from 0 to full strength. "
"Only effective when auto_cl_enable_error_punish is true.");

AutoConcurrencyLimiter::AutoConcurrencyLimiter()
: _max_concurrency(FLAGS_auto_cl_initial_max_concurrency)
Expand Down Expand Up @@ -236,7 +243,29 @@ void AutoConcurrencyLimiter::AdjustMaxConcurrency(int next_max_concurrency) {
void AutoConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_us) {
int32_t total_succ_req = _total_succ_req.load(butil::memory_order_relaxed);
double failed_punish = _sw.total_failed_us * FLAGS_auto_cl_fail_punish_ratio;
int64_t avg_latency =

// Threshold-based attenuation: when 0 < threshold < 1, attenuate punishment
// based on error rate. Inspired by Sentinel's threshold-based circuit breaker:
// low error rates should not inflate avg_latency. Above threshold, punishment
// scales linearly from 0 to full strength.
// Invalid values (<=0 or >=1) skip this block entirely, preserving original behavior.
if (FLAGS_auto_cl_error_rate_punish_threshold > 0 &&
FLAGS_auto_cl_error_rate_punish_threshold < 1.0 &&
_sw.failed_count > 0) {
double threshold = FLAGS_auto_cl_error_rate_punish_threshold;
double error_rate = static_cast<double>(_sw.failed_count) /
(_sw.succ_count + _sw.failed_count);
if (error_rate <= threshold) {
// Error rate within dead zone, cancel punishment.
failed_punish = 0;
} else {
// Linear ramp: 0 at threshold, 1.0 at 100% error rate.
double punish_factor = (error_rate - threshold) / (1.0 - threshold);
failed_punish *= punish_factor;
}
}

int64_t avg_latency =
std::ceil((failed_punish + _sw.total_succ_us) / _sw.succ_count);
double qps = 1000000.0 * total_succ_req / (sampling_time_us - _sw.start_time_us);
UpdateMinLatency(avg_latency);
Expand Down
13 changes: 13 additions & 0 deletions test/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,19 @@ cc_test(
],
)

cc_test(
name = "brpc_auto_concurrency_limiter_test",
srcs = [
"brpc_auto_concurrency_limiter_unittest.cpp",
],
copts = COPTS,
deps = [
"//:brpc",
"@com_google_googletest//:gtest",
"@com_google_googletest//:gtest_main",
],
)

refresh_compile_commands(
name = "brpc_test_compdb",
# Specify the targets of interest.
Expand Down
168 changes: 168 additions & 0 deletions test/brpc_auto_concurrency_limiter_unittest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "brpc/policy/auto_concurrency_limiter.h"
#include "butil/time.h"
#include <gtest/gtest.h>

namespace brpc {
namespace policy {

DECLARE_int32(auto_cl_sample_window_size_ms);
DECLARE_int32(auto_cl_min_sample_count);
DECLARE_int32(auto_cl_max_sample_count);
DECLARE_bool(auto_cl_enable_error_punish);
DECLARE_double(auto_cl_fail_punish_ratio);
DECLARE_double(auto_cl_error_rate_punish_threshold);

} // namespace policy
} // namespace brpc

class AutoConcurrencyLimiterTest : public ::testing::Test {
protected:
void SetUp() override {
// Save original values
orig_sample_window_size_ms_ = brpc::policy::FLAGS_auto_cl_sample_window_size_ms;
orig_min_sample_count_ = brpc::policy::FLAGS_auto_cl_min_sample_count;
orig_max_sample_count_ = brpc::policy::FLAGS_auto_cl_max_sample_count;
orig_enable_error_punish_ = brpc::policy::FLAGS_auto_cl_enable_error_punish;
orig_fail_punish_ratio_ = brpc::policy::FLAGS_auto_cl_fail_punish_ratio;
orig_error_rate_threshold_ = brpc::policy::FLAGS_auto_cl_error_rate_punish_threshold;

// Set test-friendly values
brpc::policy::FLAGS_auto_cl_sample_window_size_ms = 1000;
brpc::policy::FLAGS_auto_cl_min_sample_count = 5;
brpc::policy::FLAGS_auto_cl_max_sample_count = 200;
brpc::policy::FLAGS_auto_cl_enable_error_punish = true;
brpc::policy::FLAGS_auto_cl_fail_punish_ratio = 1.0;
}

void TearDown() override {
// Restore original values
brpc::policy::FLAGS_auto_cl_sample_window_size_ms = orig_sample_window_size_ms_;
brpc::policy::FLAGS_auto_cl_min_sample_count = orig_min_sample_count_;
brpc::policy::FLAGS_auto_cl_max_sample_count = orig_max_sample_count_;
brpc::policy::FLAGS_auto_cl_enable_error_punish = orig_enable_error_punish_;
brpc::policy::FLAGS_auto_cl_fail_punish_ratio = orig_fail_punish_ratio_;
brpc::policy::FLAGS_auto_cl_error_rate_punish_threshold = orig_error_rate_threshold_;
}

private:
int32_t orig_sample_window_size_ms_;
int32_t orig_min_sample_count_;
int32_t orig_max_sample_count_;
bool orig_enable_error_punish_;
double orig_fail_punish_ratio_;
double orig_error_rate_threshold_;
};

// Helper function to add samples and trigger window completion
// Uses synthetic timestamps instead of sleeping for faster, deterministic tests.
// The final successful sample is used as the trigger, so actual counts match
// succ_count/fail_count exactly (preserving intended error rates).
void AddSamplesAndTriggerWindow(brpc::policy::AutoConcurrencyLimiter& limiter,
int succ_count, int64_t succ_latency,
int fail_count, int64_t fail_latency) {
ASSERT_GT(succ_count, 0) << "Need at least 1 success to trigger window";
int64_t now = butil::gettimeofday_us();

// Add successful samples (reserve one for the trigger)
for (int i = 0; i < succ_count - 1; ++i) {
limiter.AddSample(0, succ_latency, now);
}
// Add failed samples
for (int i = 0; i < fail_count; ++i) {
limiter.AddSample(1, fail_latency, now);
}

// Advance timestamp past window expiry instead of sleeping
int64_t after_window = now + brpc::policy::FLAGS_auto_cl_sample_window_size_ms * 1000 + 1000;

// Use the final success sample to trigger window submission
limiter.AddSample(0, succ_latency, after_window);
}

// Test 1: Backward compatibility - threshold=0 preserves original punishment behavior
TEST_F(AutoConcurrencyLimiterTest, ThresholdZeroPreservesOriginalBehavior) {
brpc::policy::FLAGS_auto_cl_error_rate_punish_threshold = 0;
brpc::policy::FLAGS_auto_cl_sample_window_size_ms = 10;

brpc::policy::AutoConcurrencyLimiter limiter;
AddSamplesAndTriggerWindow(limiter, 90, 100, 10, 1000);

// 10% error rate, threshold=0 means full punishment applied
// avg_latency = ceil((10*1000 + 90*100) / 90) = ceil(211.1) = 212us
ASSERT_GT(limiter._min_latency_us, 180);
ASSERT_LT(limiter._min_latency_us, 250);
}

// Test 2: Dead zone - error rate below threshold produces zero punishment
TEST_F(AutoConcurrencyLimiterTest, BelowThresholdZeroPunishment) {
brpc::policy::FLAGS_auto_cl_error_rate_punish_threshold = 0.2; // 20% threshold
brpc::policy::FLAGS_auto_cl_sample_window_size_ms = 10;

brpc::policy::AutoConcurrencyLimiter limiter;
AddSamplesAndTriggerWindow(limiter, 90, 100, 10, 1000);

// 10% error rate < 20% threshold, punishment should be zero
// avg_latency = 90*100 / 90 = 100us (no inflation)
ASSERT_GT(limiter._min_latency_us, 80);
ASSERT_LT(limiter._min_latency_us, 130);
}

// Test 3: Boundary - error rate exactly at threshold produces zero punishment
TEST_F(AutoConcurrencyLimiterTest, ExactlyAtThresholdZeroPunishment) {
brpc::policy::FLAGS_auto_cl_error_rate_punish_threshold = 0.1; // 10% threshold
brpc::policy::FLAGS_auto_cl_sample_window_size_ms = 10;

brpc::policy::AutoConcurrencyLimiter limiter;
AddSamplesAndTriggerWindow(limiter, 90, 100, 10, 1000);

// 10% error rate == 10% threshold, punishment should be zero
// avg_latency = 90*100 / 90 = 100us
ASSERT_GT(limiter._min_latency_us, 80);
ASSERT_LT(limiter._min_latency_us, 130);
}

// Test 4: Linear scaling - above threshold, punishment scales proportionally
TEST_F(AutoConcurrencyLimiterTest, AboveThresholdLinearScaling) {
brpc::policy::FLAGS_auto_cl_error_rate_punish_threshold = 0.1; // 10% threshold
brpc::policy::FLAGS_auto_cl_sample_window_size_ms = 10;

// Case A: 50% error rate
// punish_factor = (0.5 - 0.1) / (1.0 - 0.1) = 4/9 ≈ 0.444
// failed_punish = 50 * 1000 * (4/9) = 22222.2us
// avg_latency = ceil((22222.2 + 50*100) / 50) = ceil(544.4) = 545us
{
brpc::policy::AutoConcurrencyLimiter limiter;
AddSamplesAndTriggerWindow(limiter, 50, 100, 50, 1000);
ASSERT_GT(limiter._min_latency_us, 450);
ASSERT_LT(limiter._min_latency_us, 650);
}

// Case B: 90% error rate (near full punishment)
// punish_factor = (0.9 - 0.1) / (1.0 - 0.1) = 8/9 ≈ 0.889
// failed_punish = 90 * 1000 * (8/9) = 80000us
// avg_latency = ceil((80000 + 10*100) / 10) = ceil(8100) = 8100us
{
brpc::policy::AutoConcurrencyLimiter limiter;
AddSamplesAndTriggerWindow(limiter, 10, 100, 90, 1000);
ASSERT_GT(limiter._min_latency_us, 7000);
ASSERT_LT(limiter._min_latency_us, 9000);
}
}

Loading