Skip to content

Conversation

@EarthChen
Copy link
Member

What is the purpose of the change?

Checklist

  • Make sure there is a GitHub_issue field for the change.
  • Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
  • Write necessary unit-test to verify your logic correction. If the new feature or significant change is committed, please remember to add sample in dubbo samples project.
  • Make sure gitHub actions can pass. Why the workflow is failing and how to fix it?

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR implements HTTP/2 flow control (backpressure) for the Triple protocol by introducing manual flow control through consumeBytes() callbacks that trigger WINDOW_UPDATE frames. The implementation replaces automatic flow control with a manual pattern where the application explicitly signals when bytes have been processed and more data can be received.

  • Introduces TripleHttp2LocalFlowController to manage local flow control with configurable window update ratios
  • Implements the bytesRead() callback chain from decoders through listeners to flow controllers
  • Removes the custom TriHttp2RemoteFlowController in favor of manual local flow control

Reviewed changes

Copilot reviewed 25 out of 25 changed files in this pull request and generated 16 comments.

Show a summary per file
File Description
TripleHttp2LocalFlowController.java New custom flow controller extending Netty's DefaultHttp2LocalFlowController with configurable window update ratio
TripleHttp2Protocol.java Integrates custom flow controller and disables automatic stream flow control on both client and server
H2StreamChannel.java Adds consumeBytes interface for flow control
NettyH2StreamChannel.java Implements consumeBytes to call Http2LocalFlowController.consumeBytes()
AbstractTripleClientStream.java Adds abstract consumeBytes method for client-side flow control
Http2TripleClientStream.java Implements consumeBytes by delegating to Http2LocalFlowController
Http3TripleClientStream.java Provides no-op consumeBytes implementation (QUIC handles flow control)
TriDecoder.java Adds bytesRead notification after processing each message frame
LengthFieldStreamingDecoder.java Adds bytesRead notification for length-field based decoding
StreamingDecoder.java Adds bytesRead method to FragmentListener interface, removes DefaultFragmentListener
GenericHttp2ServerTransportListener.java Implements DefaultFragmentListener with bytesRead support
GrpcHttp2ServerTransportListener.java Adds bytesRead implementation to DetermineMethodDescriptorListener
ClientCall.java Adds streamingResponse() and setAutoRequestWithInitial() to API
CallStreamObserver.java Adds disableAutoInboundFlowControl() for gRPC compatibility
ClientStreamObserver.java Adds disableAutoRequestWithInitial() method
TripleClientCall.java Implements conditional initial request based on streaming response type
UnaryClientCallListener.java Adds streamingResponse() returning false, removes call.request(2)
ObserverToClientCallListenerAdapter.java Adds streamingResponse() returning true, removes auto-request logic
ClientCallToObserverAdapter.java Adds streamingResponse field and disableAutoRequestWithInitial() implementation
TripleConfig.java Adds windowUpdateRatio configuration with default value of 0.5
NettyHttp2ProtocolSelectorHandler.java Passes Http2Connection to handler constructors for flow control
RecordListener.java Adds empty bytesRead() implementation for tests
TriHttp2RemoteFlowController.java Deleted - replaced by standard Netty flow controller with manual local control

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

return;
}

// todo The current implementation is not optimal, and alternative implementations should be considered.
Copy link

Copilot AI Dec 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment on line 100 contains a typo: "todo" should be "TODO" (all caps) to follow standard code annotation conventions. This makes it easier for IDEs and tools to recognize it as a task marker.

Suggested change
// todo The current implementation is not optimal, and alternative implementations should be considered.
// TODO The current implementation is not optimal, and alternative implementations should be considered.

Copilot uses AI. Check for mistakes.
Comment on lines 36 to 38
* Whether the response is streaming response.
*
* @return
Copy link

Copilot AI Dec 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Javadoc comment for the ClientCall.Listener.streamingResponse() method. The method should have proper documentation explaining what it returns and when it should return true vs false.

Suggested change
* Whether the response is streaming response.
*
* @return
* Whether the response is a streaming response.
* <p>
* Implementations should return {@code true} when the server is expected to send
* zero or more response messages over time (server-streaming or bidi-streaming
* calls). Return {@code false} when at most a single response message is expected
* from the server (unary or non-streaming calls).
*
* @return {@code true} if the server may send multiple response messages,
* {@code false} if at most one response message is expected

Copilot uses AI. Check for mistakes.
Comment on lines +133 to +139
http2StreamChannel.eventLoop().execute(() -> {
try {
localFlowController.consumeBytes(stream, numBytes);
} catch (Exception e) {
LOGGER.warn(PROTOCOL_FAILED_RESPONSE, "", "", "Failed to consumeBytes for stream " + streamId, e);
}
});
Copy link

Copilot AI Dec 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent error handling: When in the event loop (line 131), exceptions from consumeBytes are propagated (declared in the method signature), but when executed asynchronously (line 135), exceptions are caught and logged. This inconsistency could lead to different behavior depending on which thread calls the method. Consider catching the exception in both cases or propagating it in both cases for consistency.

Suggested change
http2StreamChannel.eventLoop().execute(() -> {
try {
localFlowController.consumeBytes(stream, numBytes);
} catch (Exception e) {
LOGGER.warn(PROTOCOL_FAILED_RESPONSE, "", "", "Failed to consumeBytes for stream " + streamId, e);
}
});
http2StreamChannel.eventLoop().execute(() -> localFlowController.consumeBytes(stream, numBytes));

Copilot uses AI. Check for mistakes.
Comment on lines +120 to +121
// Get the stream from connection using stream id
int streamId = http2StreamChannel.stream().id();
Copy link

Copilot AI Dec 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential NullPointerException: If http2StreamChannel.stream() returns null at line 121, the code will throw a NullPointerException. While there's a check for the stream being null after retrieving it from the connection (line 123), the stream().id() call at line 121 could fail first if stream() itself returns null. Consider checking if stream() is null before calling id().

Suggested change
// Get the stream from connection using stream id
int streamId = http2StreamChannel.stream().id();
// Get the stream from Http2StreamChannel and then from connection using stream id
Http2Stream channelStream = http2StreamChannel.stream();
if (channelStream == null) {
LOGGER.debug("Http2StreamChannel.stream() is null, skip consumeBytes");
return;
}
int streamId = channelStream.id();

Copilot uses AI. Check for mistakes.
default void disableAutoRequest() {
disableAutoFlowControl();
}

Copy link

Copilot AI Dec 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Javadoc comment for the disableAutoRequestWithInitial method. This new API method should have proper documentation explaining what it does, the purpose of the request parameter, and how it differs from disableAutoRequest or disableAutoFlowControl.

Suggested change
/**
* Swaps to manual flow control and configures an initial number of messages to request
* when the call starts.
* <p>
* This is similar to {@link #disableAutoRequest()} but allows specifying how many messages
* should be requested immediately once the call is established, as if {@link #request(int)}
* were invoked with the given {@code request} value after disabling automatic flow control.
*
* @param request the number of messages to request initially after auto flow control has been
* disabled; must be a non-negative number.
*/

Copilot uses AI. Check for mistakes.
EarthChen and others added 9 commits December 30, 2025 23:30
…protocol/tri/observer/CallStreamObserver.java

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…protocol/tri/transport/TripleHttp2LocalFlowController.java

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…protocol/tri/call/ClientCall.java

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…leConfig.java

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…protocol/tri/call/ClientCall.java

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
@EarthChen
Copy link
Member Author

核心概念

为什么需要手动流控?

HTTP/2 协议使用流控窗口来防止接收端被发送端压垮。默认情况下,Netty 会自动管理流控:

  • 收到数据后自动发送 WINDOW_UPDATE
  • 这种模式无法实现背压(Backpressure)控制

手动流控允许应用层控制何时发送 WINDOW_UPDATE

  • 只有当数据被真正处理后才返回窗口
  • 实现真正的端到端背压控制

关键配置

配置项 描述 默认值
windowUpdateRatio 消费字节达到初始窗口的多少比例时发送 WINDOW_UPDATE 0.5 (50%)
initialWindowSize 初始流控窗口大小 8MB
AUTO_STREAM_FLOW_CONTROL Netty 配置,禁用 Http2StreamChannel 自动流控 false

架构图

┌─────────────────────────────────────────────────────────────────────────────┐
│                           HTTP/2 连接层                                      │
│  ┌─────────────────────────────────────────────────────────────────────┐   │
│  │                    Http2FrameCodec                                   │   │
│  │  ┌─────────────────────────────────────────────────────────────┐   │   │
│  │  │              TripleHttp2LocalFlowController                  │   │   │
│  │  │  - receiveFlowControlledFrame(): 减少窗口                    │   │   │
│  │  │  - consumeBytes(): 返回窗口,触发 WINDOW_UPDATE              │   │   │
│  │  │  - windowUpdateRatio: 控制何时发送 WINDOW_UPDATE             │   │   │
│  │  └─────────────────────────────────────────────────────────────┘   │   │
│  └─────────────────────────────────────────────────────────────────────┘   │
│                                    │                                        │
│                       Http2MultiplexHandler                                 │
│                                    │                                        │
│              ┌─────────────────────┴─────────────────────┐                  │
│              │                                           │                  │
│  ┌───────────┴────────────┐               ┌──────────────┴───────────┐     │
│  │   Http2StreamChannel   │               │   Http2StreamChannel     │     │
│  │   (Stream 1)           │               │   (Stream 3)             │     │
│  │   AUTO_FLOW = false    │               │   AUTO_FLOW = false      │     │
│  └────────────────────────┘               └──────────────────────────┘     │
└─────────────────────────────────────────────────────────────────────────────┘

服务端流控机制

数据流转图

sequenceDiagram
    participant Client as 客户端
    participant Netty as Netty Http2FrameCodec
    participant FC as TripleHttp2LocalFlowController
    participant Handler as NettyHttp2FrameHandler
    participant Decoder as StreamingDecoder
    participant Listener as FragmentListener
    participant Channel as H2StreamChannel
    participant App as 业务处理

    Client->>Netty: DATA 帧
    Netty->>FC: receiveFlowControlledFrame()
    Note over FC: 减少窗口大小<br/>跟踪 unconsumedBytes
    Netty->>Handler: onData(ByteBuf)
    Handler->>Decoder: decode(inputStream)
    
    loop 处理每个消息
        Decoder->>Decoder: processBody()
        Note over Decoder: 读取消息字节
        Decoder->>Listener: bytesRead(numBytes)
        Listener->>Channel: consumeBytes(numBytes)
        Channel->>FC: consumeBytes(stream, numBytes)
        Note over FC: 累加返回的字节<br/>检查是否达到阈值
        alt 达到 windowUpdateRatio
            FC->>Client: WINDOW_UPDATE 帧
        end
        Decoder->>Listener: onFragmentMessage(rawMessage)
        Listener->>App: 业务处理
    end
Loading

关键类与职责

1. TripleHttp2Protocol(初始化)

// 服务端创建连接时配置手动流控
private Http2Connection createHttp2ServerConnection(TripleConfig tripleConfig) {
    Http2Connection connection = new DefaultHttp2Connection(true);
    float windowUpdateRatio = tripleConfig.getWindowUpdateRatioOrDefault();
    connection.local().flowController(
        new TripleHttp2LocalFlowController(connection, windowUpdateRatio));
    return connection;
}

// 禁用 Http2StreamChannel 自动流控
protected void initChannel(Http2StreamChannel ch) {
    ch.config().setOption(Http2StreamChannelOption.AUTO_STREAM_FLOW_CONTROL, false);
    // ...
}

2. TripleHttp2LocalFlowController(流控核心)

public class TripleHttp2LocalFlowController extends DefaultHttp2LocalFlowController {
    
    public TripleHttp2LocalFlowController(Http2Connection connection, float windowUpdateRatio) {
        // windowUpdateRatio: 当消费字节达到初始窗口的该比例时发送 WINDOW_UPDATE
        // true: 启用按帧跟踪
        super(connection, windowUpdateRatio, true);
    }
}

3. GenericHttp2ServerTransportListener(服务端入口)

@Override
protected HttpMessageListener buildHttpMessageListener() {
    // 设置 FragmentListener 来处理流控
    streamingDecoder.setFragmentListener(new DefaultFragmentListener(listeningDecoder));
    return new StreamingHttpMessageListener(streamingDecoder);
}

final class DefaultFragmentListener implements FragmentListener {
    @Override
    public void bytesRead(int numBytes) {
        // 数据被读取后,通知流控器返回窗口
        getH2StreamChannel().consumeBytes(numBytes);
    }
}

4. LengthFieldStreamingDecoder / TriDecoder(解码触发)

private void processBody() {
    int totalBytesRead = lengthFieldOffset + lengthFieldLength + requiredLength;
    
    byte[] rawMessage;
    try {
        rawMessage = readRawMessage(accumulate, requiredLength);
    } finally {
        // 在 finally 块中通知 bytesRead,确保流控一定被触发
        listener.bytesRead(totalBytesRead);
    }
    
    // 处理消息
    invokeListener(inputStream);
}

5. NettyH2StreamChannel(执行流控)

@Override
public void consumeBytes(int numBytes) throws Exception {
    Http2LocalFlowController localFlowController = http2Connection.local().flowController();
    int streamId = http2StreamChannel.stream().id();
    Http2Stream stream = http2Connection.stream(streamId);
    
    // 必须在 EventLoop 线程执行
    if (http2StreamChannel.eventLoop().inEventLoop()) {
        localFlowController.consumeBytes(stream, numBytes);
    } else {
        http2StreamChannel.eventLoop().execute(() -> {
            localFlowController.consumeBytes(stream, numBytes);
        });
    }
}

服务端数据流转路径

收到 DATA 帧
    │
    ▼
Http2FrameCodec.onDataRead() → 返回 0,不自动消费
    │
    ▼
TripleHttp2LocalFlowController.receiveFlowControlledFrame()
    │  - 减少流窗口
    │  - 跟踪 unconsumedBytes
    ▼
NettyHttp2FrameHandler.onData()
    │
    ▼
StreamingDecoder.decode()
    │
    ▼
LengthFieldStreamingDecoder.processBody()
    │
    ├──► 读取消息字节
    │
    ├──► finally { listener.bytesRead(numBytes) }  ◄─── 关键!在 finally 块中
    │         │
    │         ▼
    │    DefaultFragmentListener.bytesRead()
    │         │
    │         ▼
    │    H2StreamChannel.consumeBytes()
    │         │
    │         ▼
    │    TripleHttp2LocalFlowController.consumeBytes()
    │         │
    │         ▼
    │    [如果达到 windowUpdateRatio] → 发送 WINDOW_UPDATE
    │
    ▼
listener.onFragmentMessage() → 业务处理

客户端流控机制

数据流转图

sequenceDiagram
    participant Server as 服务端
    participant Netty as Netty Http2FrameCodec
    participant FC as TripleHttp2LocalFlowController
    participant Handler as TripleHttp2ClientResponseHandler
    participant Stream as AbstractTripleClientStream
    participant Decoder as TriDecoder
    participant Listener as TriDecoder.Listener
    participant App as 业务回调

    Server->>Netty: DATA 帧
    Netty->>FC: receiveFlowControlledFrame()
    Note over FC: 减少窗口大小
    Netty->>Handler: onData(ByteBuf)
    Handler->>Stream: H2TransportListener.onData()
    Stream->>Decoder: deframer.deframe(data)
    
    loop 处理每个消息
        Decoder->>Decoder: processBody()
        Note over Decoder: 读取消息字节
        Decoder->>Listener: bytesRead(numBytes)
        Listener->>Stream: consumeBytes(numBytes)
        Stream->>FC: Http2LocalFlowController.consumeBytes()
        Note over FC: 累加返回的字节
        alt 达到 windowUpdateRatio
            FC->>Server: WINDOW_UPDATE 帧
        end
        Decoder->>Listener: onRawMessage(data)
        Listener->>App: 业务处理
    end
Loading

关键类与职责

1. TripleHttp2Protocol(客户端初始化)

@Override
public void configClientPipeline(URL url, ChannelOperator operator, ContextOperator contextOperator) {
    TripleConfig tripleConfig = ConfigManager.getProtocolOrDefault(url).getTripleOrDefault();
    // 创建带有手动流控的连接
    Http2Connection connection = createHttp2ClientConnection(tripleConfig);
    // ...
}

private Http2Connection createHttp2ClientConnection(TripleConfig tripleConfig) {
    Http2Connection connection = new DefaultHttp2Connection(false);
    float windowUpdateRatio = tripleConfig.getWindowUpdateRatioOrDefault();
    connection.local().flowController(
        new TripleHttp2LocalFlowController(connection, windowUpdateRatio));
    return connection;
}

2. Http2TripleClientStream(客户端流)

@Override
protected TripleStreamChannelFuture initStreamChannel(Channel parent) {
    Http2StreamChannelBootstrap bootstrap = new Http2StreamChannelBootstrap(parent);
    // 禁用自动流控
    bootstrap.option(Http2StreamChannelOption.AUTO_STREAM_FLOW_CONTROL, false);
    // ...
}

@Override
protected void consumeBytes(int numBytes) {
    // 从 parent channel 获取 Http2Connection
    Http2Connection http2Connection = getHttp2Connection();
    Http2LocalFlowController localFlowController = http2Connection.local().flowController();
    
    // 调用流控器消费字节
    localFlowController.consumeBytes(stream, numBytes);
}

private Http2Connection getHttp2Connection() {
    ChannelHandlerContext ctx = parent.pipeline().context(Http2FrameCodec.class);
    Http2FrameCodec codec = (Http2FrameCodec) ctx.handler();
    return codec.connection();
}

3. AbstractTripleClientStream.ClientTransportListener(设置解码监听器)

void onHeaderReceived(Http2Headers headers) {
    // 创建解码器监听器
    TriDecoder.Listener listener = new TriDecoder.Listener() {
        @Override
        public void bytesRead(int numBytes) {
            // 调用抽象方法,由子类实现
            consumeBytes(numBytes);
        }

        @Override
        public void onRawMessage(byte[] data) {
            AbstractTripleClientStream.this.listener.onMessage(data, isReturnTriException);
        }
    };
    deframer = new TriDecoder(decompressor, listener);
}

4. TriDecoder(gRPC 解码器)

private void processBody() {
    int totalBytesRead = HEADER_LENGTH + requiredLength;
    
    byte[] stream;
    try {
        stream = compressedFlag ? getCompressedBody() : getUncompressedBody();
    } finally {
        // 在 finally 块中通知流控
        listener.bytesRead(totalBytesRead);
    }
    
    listener.onRawMessage(stream);
}

客户端数据流转路径

收到 DATA 帧(来自服务端响应)
    │
    ▼
Http2FrameCodec.onDataRead() → 返回 0,不自动消费
    │
    ▼
TripleHttp2LocalFlowController.receiveFlowControlledFrame()
    │  - 减少流窗口
    ▼
TripleHttp2ClientResponseHandler.onData()
    │
    ▼
ClientTransportListener.onData()
    │
    ▼
TriDecoder.deframe(data)
    │
    ▼
TriDecoder.processBody()
    │
    ├──► 读取/解压消息
    │
    ├──► finally { listener.bytesRead(numBytes) }  ◄─── 关键!
    │         │
    │         ▼
    │    TriDecoder.Listener.bytesRead()
    │         │
    │         ▼
    │    AbstractTripleClientStream.consumeBytes()
    │         │
    │         ▼
    │    Http2TripleClientStream.consumeBytes()
    │         │
    │         ▼
    │    TripleHttp2LocalFlowController.consumeBytes()
    │         │
    │         ▼
    │    [如果达到 windowUpdateRatio] → 发送 WINDOW_UPDATE
    │
    ▼
listener.onRawMessage() → 业务回调

配置说明

windowUpdateRatio 工作原理

假设 initialWindowSize = 8MBwindowUpdateRatio = 0.5

  1. 初始窗口 = 8MB
  2. 接收数据,窗口减少
  3. 调用 consumeBytes() 返回字节
  4. 当累计返回 4MB (8MB × 0.5) 时,发送 WINDOW_UPDATE
  5. 远端收到后,可以继续发送数据
窗口状态变化:

初始: [████████████████████████████████] 8MB 可用
      
收到 2MB: [████████████████████████░░░░░░░░] 6MB 可用
          consumeBytes(2MB) - 累计 2MB,未达阈值
          
收到 2MB: [████████████████░░░░░░░░░░░░░░░░] 4MB 可用
          consumeBytes(2MB) - 累计 4MB,达到 50%!
          → 发送 WINDOW_UPDATE (4MB)

窗口恢复: [████████████████████████████████] 8MB 可用

设计要点总结

1. 两层禁用自动流控

层级 配置 作用
Http2Connection TripleHttp2LocalFlowController 控制 WINDOW_UPDATE 发送时机
Http2StreamChannel AUTO_STREAM_FLOW_CONTROL=false 禁止 Netty 自动发送 WINDOW_UPDATE

2. bytesRead 在 finally 块中调用

确保即使消息处理失败,流控也能正常工作:

try {
    rawMessage = readRawMessage();
} finally {
    listener.bytesRead(totalBytesRead);  // 必须执行
}

3. EventLoop 线程安全

consumeBytes() 必须在 EventLoop 线程执行:

if (eventLoop().inEventLoop()) {
    localFlowController.consumeBytes(stream, numBytes);
} else {
    eventLoop().execute(() -> localFlowController.consumeBytes(stream, numBytes));
}

类关系图

classDiagram
    class TripleConfig {
        +Float windowUpdateRatio
        +Integer initialWindowSize
        +getWindowUpdateRatioOrDefault() float
    }
    
    class TripleHttp2LocalFlowController {
        +TripleHttp2LocalFlowController(connection, windowUpdateRatio)
        +receiveFlowControlledFrame()
        +consumeBytes(stream, numBytes)
    }
    
    class DefaultHttp2LocalFlowController {
        +consumeBytes(stream, numBytes)
    }
    
    class H2StreamChannel {
        <<interface>>
        +consumeBytes(numBytes)
    }
    
    class NettyH2StreamChannel {
        -Http2Connection http2Connection
        +consumeBytes(numBytes)
    }
    
    class StreamingDecoder {
        <<interface>>
        +setFragmentListener(listener)
    }
    
    class FragmentListener {
        <<interface>>
        +bytesRead(numBytes)
        +onFragmentMessage(rawMessage)
    }
    
    class AbstractTripleClientStream {
        <<abstract>>
        #consumeBytes(numBytes)*
    }
    
    class Http2TripleClientStream {
        +consumeBytes(numBytes)
        -getHttp2Connection()
    }
    
    TripleHttp2LocalFlowController --|> DefaultHttp2LocalFlowController
    NettyH2StreamChannel ..|> H2StreamChannel
    Http2TripleClientStream --|> AbstractTripleClientStream
    
    NettyH2StreamChannel --> TripleHttp2LocalFlowController : uses
    Http2TripleClientStream --> TripleHttp2LocalFlowController : uses
Loading

@codecov-commenter
Copy link

codecov-commenter commented Dec 31, 2025

Codecov Report

❌ Patch coverage is 52.48869% with 105 lines in your changes missing coverage. Please review.
✅ Project coverage is 60.72%. Comparing base (ac1621f) to head (780cce7).

Files with missing lines Patch % Lines
.../dubbo/rpc/protocol/tri/call/TripleClientCall.java 25.00% 18 Missing ⚠️
...rotocol/tri/h12/http2/Http2TripleClientStream.java 60.00% 11 Missing and 5 partials ⚠️
...emoting/http12/netty4/h2/NettyH2StreamChannel.java 48.00% 9 Missing and 4 partials ⚠️
...rotocol/tri/stream/AbstractTripleClientStream.java 23.07% 10 Missing ⚠️
...remoting/http12/h2/Http2ServerChannelObserver.java 0.00% 7 Missing ⚠️
...h12/http2/GenericHttp2ServerTransportListener.java 70.00% 6 Missing ⚠️
...a/org/apache/dubbo/config/nested/TripleConfig.java 16.66% 4 Missing and 1 partial ⚠️
...oting/http12/netty4/h2/NettyHttp2FrameHandler.java 0.00% 3 Missing ⚠️
...he/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java 85.00% 3 Missing ⚠️
...ri/transport/TripleHttp2ClientResponseHandler.java 0.00% 3 Missing ⚠️
... and 15 more
Additional details and impacted files
@@             Coverage Diff              @@
##                3.3   #15957      +/-   ##
============================================
+ Coverage     58.78%   60.72%   +1.93%     
- Complexity       15    11711   +11696     
============================================
  Files          1938     1942       +4     
  Lines         88698    88642      -56     
  Branches      13387    13365      -22     
============================================
+ Hits          52143    53825    +1682     
+ Misses        30901    29317    -1584     
+ Partials       5654     5500     -154     
Flag Coverage Δ
integration-tests-java21 32.21% <40.72%> (?)
integration-tests-java8 32.37% <40.72%> (?)
samples-tests-java21 32.12% <22.62%> (?)
samples-tests-java8 29.66% <22.17%> (?)
unit-tests-java11 58.98% <52.03%> (-0.07%) ⬇️
unit-tests-java17 58.50% <52.03%> (-0.03%) ⬇️
unit-tests-java21 58.48% <52.03%> (-0.03%) ⬇️
unit-tests-java25 58.43% <52.03%> (-0.04%) ⬇️
unit-tests-java8 58.99% <52.03%> (-0.05%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Member

@wangchengming666 wangchengming666 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@wangchengming666
Copy link
Member

sample is apache/dubbo-samples#1278

@zrlw
Copy link
Contributor

zrlw commented Jan 4, 2026

sample is apache/dubbo-samples#1278

lack of PR effectiveness checking assertions.

@EarthChen
Copy link
Member Author

EarthChen commented Jan 4, 2026

应用层背压

应用层背压机制允许应用程序在发送数据时感知底层传输层的状态,避免在传输层缓冲区满时继续发送数据导致内存溢出或性能下降。

核心 API

API 说明
isReady() 检查当前流是否可写,返回 true 表示可以发送更多消息
setOnReadyHandler(Runnable) 设置回调,当流从不可写变为可写时触发

设计参考

本实现完全对齐 gRPC Java 的设计模式:

  • CallStreamObserver.isReady()CallStreamObserver.setOnReadyHandler(Runnable)
  • ClientCall.Listener.onReady()

客户端实现

核心类关系

┌─────────────────────────────────────────────────────────────────────────────┐
│                              用户代码                                        │
│  CallStreamObserver<T> observer = ...;                                      │
│  observer.setOnReadyHandler(() -> { /* 恢复发送 */ });                       │
│  if (observer.isReady()) { observer.onNext(data); }                         │
└─────────────────────────────────────────────────────────────────────────────┘
                                      │
                                      ▼
┌─────────────────────────────────────────────────────────────────────────────┐
│                    ClientCallToObserverAdapter                               │
│  - 实现 CallStreamObserver 接口                                              │
│  - 存储 onReadyHandler                                                       │
│  - isReady() 委托给 ClientCall.isReady()                                     │
└─────────────────────────────────────────────────────────────────────────────┘
                                      │
                    ┌─────────────────┴─────────────────┐
                    ▼                                   ▼
┌──────────────────────────────────┐    ┌──────────────────────────────────────┐
│     TripleClientCall             │    │  ObserverToClientCallListenerAdapter │
│  - 实现 ClientCall               │    │  - 实现 ClientCall.Listener          │
│  - 实现 ClientStream.Listener    │    │  - 持有 ClientCallToObserverAdapter  │
│  - isReady() 委托给 stream       │    │  - onReady() 触发 onReadyHandler     │
└──────────────────────────────────┘    └──────────────────────────────────────┘
                    │
                    ▼
┌─────────────────────────────────────────────────────────────────────────────┐
│                    AbstractTripleClientStream                                │
│  - 实现 ClientStream                                                         │
│  - isReady() 返回 channel.isWritable()                                       │
│  - onWritabilityChanged() 调用 listener.onReady()                            │
└─────────────────────────────────────────────────────────────────────────────┘
                    │
                    ▼
┌─────────────────────────────────────────────────────────────────────────────┐
│                 Netty Http2StreamChannel                                     │
│  - isWritable() 底层可写性检查                                                │
│  - channelWritabilityChanged 事件                                            │
└─────────────────────────────────────────────────────────────────────────────┘

isReady() 调用链

用户: observer.isReady()
        │
        ▼
ClientCallToObserverAdapter.isReady()
        │ return call.isReady()
        ▼
TripleClientCall.isReady()
        │ 检查 canceled/done 状态
        │ return stream.isReady()
        ▼
AbstractTripleClientStream.isReady()
        │ Channel channel = streamChannelFuture.getNow()
        │ return channel.isWritable()
        ▼
Netty: Http2StreamChannel.isWritable()

代码实现:

// ClientCallToObserverAdapter.java
@Override
public boolean isReady() {
    return call.isReady();
}

// TripleClientCall.java
@Override
public boolean isReady() {
    if (canceled) return false;
    if (done) return false;
    return stream.isReady();
}

// AbstractTripleClientStream.java
@Override
public boolean isReady() {
    Channel channel = streamChannelFuture.getNow();
    if (channel == null) return false;
    return channel.isWritable();
}

onReadyHandler 触发链

当 Netty Channel 从不可写变为可写时,触发以下调用链:

Netty: channelWritabilityChanged 事件
        │
        ▼
TripleHttp2ClientResponseHandler.channelWritabilityChanged(ctx)
        │ transportListener.onWritabilityChanged()
        ▼
ClientTransportListener.onWritabilityChanged()  [H2TransportListener 实现]
        │ AbstractTripleClientStream.this.onWritabilityChanged()
        ▼
AbstractTripleClientStream.onWritabilityChanged()
        │ 检查 channel.isWritable()
        │ listener.onReady() [同步调用]
        ▼
TripleClientCall.onReady()  [ClientStream.Listener 实现]
        │ executor.execute(() -> listener.onReady()) [异步执行]
        ▼
ObserverToClientCallListenerAdapter.onReady()  [ClientCall.Listener 实现]
        │ requestAdapter.getOnReadyHandler().run()
        ▼
用户设置的 Runnable 被执行 [在业务 executor 线程中]

代码实现:

// TripleHttp2ClientResponseHandler.java
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
    transportListener.onWritabilityChanged();
    super.channelWritabilityChanged(ctx);
}

// AbstractTripleClientStream.ClientTransportListener
@Override
public void onWritabilityChanged() {
    AbstractTripleClientStream.this.onWritabilityChanged();
}

// AbstractTripleClientStream.java
protected void onWritabilityChanged() {
    Channel channel = streamChannelFuture.getNow();
    if (channel != null && channel.isWritable()) {
        // 同步调用,由 TripleClientCall.onReady() 负责异步执行
        listener.onReady();
    }
}

// TripleClientCall.java
@Override
public void onReady() {
    if (listener == null) {
        return;
    }
    // ObserverToClientCallListenerAdapter.onReady() triggers the onReadyHandler
    executor.execute(() -> {
        try {
            listener.onReady();
        } catch (Throwable t) {
            LOGGER.warn(PROTOCOL_STREAM_LISTENER, "", "", 
                "Error executing listener.onReady()", t);
        }
    });
}

// ObserverToClientCallListenerAdapter.java
@Override
public void onReady() {
    if (requestAdapter == null) return;
    Runnable handler = requestAdapter.getOnReadyHandler();
    if (handler == null) return;
    handler.run();
}

setOnReadyHandler 设置流程

用户: observer.setOnReadyHandler(runnable)
        │
        ▼
ClientCallToObserverAdapter.setOnReadyHandler(runnable)
        │ this.onReadyHandler = runnable  // 存储在本地
        │
        └──────────────────────────────────────────────────────
                                                              │
TripleInvoker.streamCall() 中建立关联:                         │
        │                                                     │
        ▼                                                     │
listener.setRequestAdapter(adapter)                           │
        │                                                     │
        ▼                                                     │
ObserverToClientCallListenerAdapter 持有 requestAdapter 引用 ──┘
        │
        ▼ (当 onReady 被调用时)
requestAdapter.getOnReadyHandler().run()

代码实现:

// ClientCallToObserverAdapter.java
private Runnable onReadyHandler;

@Override
public void setOnReadyHandler(Runnable onReadyHandler) {
    this.onReadyHandler = onReadyHandler;
}

public Runnable getOnReadyHandler() {
    return onReadyHandler;
}

// TripleInvoker.java
StreamObserver<Object> streamCall(...) {
    ObserverToClientCallListenerAdapter listener = 
        new ObserverToClientCallListenerAdapter(responseObserver);
    StreamObserver<Object> streamObserver = call.start(metadata, listener);
    
    // 建立关联,让 listener 能够访问 adapter 的 onReadyHandler
    if (streamObserver instanceof ClientCallToObserverAdapter) {
        listener.setRequestAdapter((ClientCallToObserverAdapter<Object>) streamObserver);
    }
    // ...
}

服务端实现

服务端的背压机制通过 H2StreamChannelHttp2ServerChannelObserver 实现。

核心类关系

┌─────────────────────────────────────────────────────────────────────────────┐
│                    Http2ServerChannelObserver                                │
│  - 实现 CallStreamObserver                                                   │
│  - isReady() 委托给 H2StreamChannel.isReady()                                │
│  - setOnReadyHandler() 设置回调                                              │
└─────────────────────────────────────────────────────────────────────────────┘
                                      │
                                      ▼
┌─────────────────────────────────────────────────────────────────────────────┐
│                       NettyH2StreamChannel                                   │
│  - 实现 H2StreamChannel                                                      │
│  - isReady() 返回 http2StreamChannel.isWritable()                            │
│  - onWritabilityChanged() 触发 onReadyHandler                                │
└─────────────────────────────────────────────────────────────────────────────┘
                                      │
                                      ▼
┌─────────────────────────────────────────────────────────────────────────────┐
│                 Netty Http2StreamChannel                                     │
│  - isWritable() 底层可写性检查                                                │
└─────────────────────────────────────────────────────────────────────────────┘

服务端 onWritabilityChanged 触发链

Netty: channelWritabilityChanged 事件
        │
        ▼
NettyHttp2FrameHandler.channelWritabilityChanged(ctx)
        │ transportListener.onWritabilityChanged()
        ▼
Http2TransportListener.onWritabilityChanged()
        │ 由具体实现处理
        ▼
触发服务端的背压回调

isReady() 实现对比:应用层字节计数 vs Netty 可写性

gRPC 和 Dubbo Triple 在判断 isReady() 时采用了不同的策略:

gRPC:应用层字节计数

gRPC 在应用层维护一个发送字节队列计数器 numSentBytesQueued

// io.grpc.internal.AbstractStream.TransportState
@GuardedBy("onReadyLock")
private int numSentBytesQueued;

@GuardedBy("onReadyLock")
private int onReadyThreshold = 32 * 1024;  // 32KB

private boolean isReady() {
    synchronized (onReadyLock) {
        return allocated && numSentBytesQueued < onReadyThreshold && !deallocated;
    }
}

// 发送时增加计数
private void onSendingBytes(int numBytes) {
    synchronized (onReadyLock) {
        numSentBytesQueued += numBytes;
    }
}

// 发送完成后减少计数,必要时触发 onReady
public final void onSentBytes(int numBytes) {
    boolean doNotify;
    synchronized (onReadyLock) {
        boolean belowThresholdBefore = numSentBytesQueued < onReadyThreshold;
        numSentBytesQueued -= numBytes;
        boolean belowThresholdAfter = numSentBytesQueued < onReadyThreshold;
        doNotify = !belowThresholdBefore && belowThresholdAfter;
    }
    if (doNotify) {
        notifyIfReady();  // 触发 listener.onReady()
    }
}

工作流程:

发送 frame (numBytes)
    │
    ▼
onSendingBytes(numBytes)
    │ numSentBytesQueued += numBytes
    │
    ▼
Netty 写入完成回调
    │
    ▼
onSentBytes(numBytes)
    │ numSentBytesQueued -= numBytes
    │ if (crossed threshold from above to below) → notifyIfReady()
    ▼
listener.onReady()

Dubbo Triple:Netty Channel 可写性

Dubbo Triple 直接使用 Netty 的 Channel 可写性判断:

// AbstractTripleClientStream.java
@Override
public boolean isReady() {
    Channel channel = streamChannelFuture.getNow();
    if (channel == null) {
        return false;
    }
    return channel.isWritable();  // 直接依赖 Netty
}

Netty 可写性机制:

  • 基于 ChannelOutboundBuffer 的水位线
  • 高水位线 (WRITE_BUFFER_HIGH_WATER_MARK) 默认 64KB
  • 低水位线 (WRITE_BUFFER_LOW_WATER_MARK) 默认 32KB
  • 超过高水位线 → isWritable() = false
  • 降到低水位线以下 → channelWritabilityChanged() 事件

对比总结

特性 gRPC (应用层字节计数) Dubbo Triple (Netty 可写性)
阈值控制 onReadyThreshold (32KB) Netty 水位线 (32-64KB)
计数精度 精确到每个 frame Netty 内部管理
触发时机 frame 写入完成回调 channelWritabilityChanged
线程安全 synchronized Netty EventLoop 单线程
可配置性 setOnReadyThreshold() Netty ChannelOption
复杂度 较高(需维护计数) 较低(依赖 Netty)

使用示例

客户端使用背压

// 获取请求 StreamObserver
StreamObserver<Request> requestObserver = stub.biStreamCall(responseObserver);

// 转换为 CallStreamObserver 以使用背压 API
CallStreamObserver<Request> callObserver = (CallStreamObserver<Request>) requestObserver;

// 禁用自动流控
callObserver.disableAutoFlowControl();

// 设置 ready 回调
callObserver.setOnReadyHandler(() -> {
    // 当流变为可写时,恢复发送数据
    while (callObserver.isReady() && hasMoreData()) {
        callObserver.onNext(getNextData());
    }
});

// 初始发送(如果可写)
while (callObserver.isReady() && hasMoreData()) {
    callObserver.onNext(getNextData());
}

服务端使用背压

@Override
public StreamObserver<Request> biStreamCall(StreamObserver<Response> responseObserver) {
    // 转换为 ServerStreamObserver
    ServerStreamObserver<Response> serverObserver = 
        (ServerStreamObserver<Response>) responseObserver;
    
    // 设置 ready 回调
    serverObserver.setOnReadyHandler(() -> {
        while (serverObserver.isReady() && hasMoreResponses()) {
            serverObserver.onNext(getNextResponse());
        }
    });
    
    return new StreamObserver<Request>() {
        @Override
        public void onNext(Request request) {
            // 处理请求
        }
        // ...
    };
}

Copy link
Contributor

@oxsean oxsean left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

Copy link
Member

@wangchengming666 wangchengming666 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@zrlw zrlw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@zrlw zrlw merged commit 703290d into apache:3.3 Jan 5, 2026
33 checks passed
@EarthChen EarthChen mentioned this pull request Jan 6, 2026
4 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants