-
Notifications
You must be signed in to change notification settings - Fork 26.6k
Support Tri backpress #15957
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support Tri backpress #15957
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR implements HTTP/2 flow control (backpressure) for the Triple protocol by introducing manual flow control through consumeBytes() callbacks that trigger WINDOW_UPDATE frames. The implementation replaces automatic flow control with a manual pattern where the application explicitly signals when bytes have been processed and more data can be received.
- Introduces
TripleHttp2LocalFlowControllerto manage local flow control with configurable window update ratios - Implements the
bytesRead()callback chain from decoders through listeners to flow controllers - Removes the custom
TriHttp2RemoteFlowControllerin favor of manual local flow control
Reviewed changes
Copilot reviewed 25 out of 25 changed files in this pull request and generated 16 comments.
Show a summary per file
| File | Description |
|---|---|
| TripleHttp2LocalFlowController.java | New custom flow controller extending Netty's DefaultHttp2LocalFlowController with configurable window update ratio |
| TripleHttp2Protocol.java | Integrates custom flow controller and disables automatic stream flow control on both client and server |
| H2StreamChannel.java | Adds consumeBytes interface for flow control |
| NettyH2StreamChannel.java | Implements consumeBytes to call Http2LocalFlowController.consumeBytes() |
| AbstractTripleClientStream.java | Adds abstract consumeBytes method for client-side flow control |
| Http2TripleClientStream.java | Implements consumeBytes by delegating to Http2LocalFlowController |
| Http3TripleClientStream.java | Provides no-op consumeBytes implementation (QUIC handles flow control) |
| TriDecoder.java | Adds bytesRead notification after processing each message frame |
| LengthFieldStreamingDecoder.java | Adds bytesRead notification for length-field based decoding |
| StreamingDecoder.java | Adds bytesRead method to FragmentListener interface, removes DefaultFragmentListener |
| GenericHttp2ServerTransportListener.java | Implements DefaultFragmentListener with bytesRead support |
| GrpcHttp2ServerTransportListener.java | Adds bytesRead implementation to DetermineMethodDescriptorListener |
| ClientCall.java | Adds streamingResponse() and setAutoRequestWithInitial() to API |
| CallStreamObserver.java | Adds disableAutoInboundFlowControl() for gRPC compatibility |
| ClientStreamObserver.java | Adds disableAutoRequestWithInitial() method |
| TripleClientCall.java | Implements conditional initial request based on streaming response type |
| UnaryClientCallListener.java | Adds streamingResponse() returning false, removes call.request(2) |
| ObserverToClientCallListenerAdapter.java | Adds streamingResponse() returning true, removes auto-request logic |
| ClientCallToObserverAdapter.java | Adds streamingResponse field and disableAutoRequestWithInitial() implementation |
| TripleConfig.java | Adds windowUpdateRatio configuration with default value of 0.5 |
| NettyHttp2ProtocolSelectorHandler.java | Passes Http2Connection to handler constructors for flow control |
| RecordListener.java | Adds empty bytesRead() implementation for tests |
| TriHttp2RemoteFlowController.java | Deleted - replaced by standard Netty flow controller with manual local control |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/observer/CallStreamObserver.java
Outdated
Show resolved
Hide resolved
| return; | ||
| } | ||
|
|
||
| // todo The current implementation is not optimal, and alternative implementations should be considered. |
Copilot
AI
Dec 30, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment on line 100 contains a typo: "todo" should be "TODO" (all caps) to follow standard code annotation conventions. This makes it easier for IDEs and tools to recognize it as a task marker.
| // todo The current implementation is not optimal, and alternative implementations should be considered. | |
| // TODO The current implementation is not optimal, and alternative implementations should be considered. |
...rc/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleHttp2LocalFlowController.java
Outdated
Show resolved
Hide resolved
| * Whether the response is streaming response. | ||
| * | ||
| * @return |
Copilot
AI
Dec 30, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing Javadoc comment for the ClientCall.Listener.streamingResponse() method. The method should have proper documentation explaining what it returns and when it should return true vs false.
| * Whether the response is streaming response. | |
| * | |
| * @return | |
| * Whether the response is a streaming response. | |
| * <p> | |
| * Implementations should return {@code true} when the server is expected to send | |
| * zero or more response messages over time (server-streaming or bidi-streaming | |
| * calls). Return {@code false} when at most a single response message is expected | |
| * from the server (unary or non-streaming calls). | |
| * | |
| * @return {@code true} if the server may send multiple response messages, | |
| * {@code false} if at most one response message is expected |
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ClientCall.java
Show resolved
Hide resolved
| http2StreamChannel.eventLoop().execute(() -> { | ||
| try { | ||
| localFlowController.consumeBytes(stream, numBytes); | ||
| } catch (Exception e) { | ||
| LOGGER.warn(PROTOCOL_FAILED_RESPONSE, "", "", "Failed to consumeBytes for stream " + streamId, e); | ||
| } | ||
| }); |
Copilot
AI
Dec 30, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inconsistent error handling: When in the event loop (line 131), exceptions from consumeBytes are propagated (declared in the method signature), but when executed asynchronously (line 135), exceptions are caught and logged. This inconsistency could lead to different behavior depending on which thread calls the method. Consider catching the exception in both cases or propagating it in both cases for consistency.
| http2StreamChannel.eventLoop().execute(() -> { | |
| try { | |
| localFlowController.consumeBytes(stream, numBytes); | |
| } catch (Exception e) { | |
| LOGGER.warn(PROTOCOL_FAILED_RESPONSE, "", "", "Failed to consumeBytes for stream " + streamId, e); | |
| } | |
| }); | |
| http2StreamChannel.eventLoop().execute(() -> localFlowController.consumeBytes(stream, numBytes)); |
dubbo-common/src/main/java/org/apache/dubbo/config/nested/TripleConfig.java
Show resolved
Hide resolved
| // Get the stream from connection using stream id | ||
| int streamId = http2StreamChannel.stream().id(); |
Copilot
AI
Dec 30, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential NullPointerException: If http2StreamChannel.stream() returns null at line 121, the code will throw a NullPointerException. While there's a check for the stream being null after retrieving it from the connection (line 123), the stream().id() call at line 121 could fail first if stream() itself returns null. Consider checking if stream() is null before calling id().
| // Get the stream from connection using stream id | |
| int streamId = http2StreamChannel.stream().id(); | |
| // Get the stream from Http2StreamChannel and then from connection using stream id | |
| Http2Stream channelStream = http2StreamChannel.stream(); | |
| if (channelStream == null) { | |
| LOGGER.debug("Http2StreamChannel.stream() is null, skip consumeBytes"); | |
| return; | |
| } | |
| int streamId = channelStream.id(); |
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ClientCall.java
Outdated
Show resolved
Hide resolved
| default void disableAutoRequest() { | ||
| disableAutoFlowControl(); | ||
| } | ||
|
|
Copilot
AI
Dec 30, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing Javadoc comment for the disableAutoRequestWithInitial method. This new API method should have proper documentation explaining what it does, the purpose of the request parameter, and how it differs from disableAutoRequest or disableAutoFlowControl.
| /** | |
| * Swaps to manual flow control and configures an initial number of messages to request | |
| * when the call starts. | |
| * <p> | |
| * This is similar to {@link #disableAutoRequest()} but allows specifying how many messages | |
| * should be requested immediately once the call is established, as if {@link #request(int)} | |
| * were invoked with the given {@code request} value after disabling automatic flow control. | |
| * | |
| * @param request the number of messages to request initially after auto flow control has been | |
| * disabled; must be a non-negative number. | |
| */ |
…protocol/tri/observer/CallStreamObserver.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…protocol/tri/transport/TripleHttp2LocalFlowController.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…protocol/tri/call/ClientCall.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…leConfig.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…protocol/tri/call/ClientCall.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
核心概念为什么需要手动流控?HTTP/2 协议使用流控窗口来防止接收端被发送端压垮。默认情况下,Netty 会自动管理流控:
手动流控允许应用层控制何时发送
关键配置
架构图服务端流控机制数据流转图sequenceDiagram
participant Client as 客户端
participant Netty as Netty Http2FrameCodec
participant FC as TripleHttp2LocalFlowController
participant Handler as NettyHttp2FrameHandler
participant Decoder as StreamingDecoder
participant Listener as FragmentListener
participant Channel as H2StreamChannel
participant App as 业务处理
Client->>Netty: DATA 帧
Netty->>FC: receiveFlowControlledFrame()
Note over FC: 减少窗口大小<br/>跟踪 unconsumedBytes
Netty->>Handler: onData(ByteBuf)
Handler->>Decoder: decode(inputStream)
loop 处理每个消息
Decoder->>Decoder: processBody()
Note over Decoder: 读取消息字节
Decoder->>Listener: bytesRead(numBytes)
Listener->>Channel: consumeBytes(numBytes)
Channel->>FC: consumeBytes(stream, numBytes)
Note over FC: 累加返回的字节<br/>检查是否达到阈值
alt 达到 windowUpdateRatio
FC->>Client: WINDOW_UPDATE 帧
end
Decoder->>Listener: onFragmentMessage(rawMessage)
Listener->>App: 业务处理
end
关键类与职责1. TripleHttp2Protocol(初始化)// 服务端创建连接时配置手动流控
private Http2Connection createHttp2ServerConnection(TripleConfig tripleConfig) {
Http2Connection connection = new DefaultHttp2Connection(true);
float windowUpdateRatio = tripleConfig.getWindowUpdateRatioOrDefault();
connection.local().flowController(
new TripleHttp2LocalFlowController(connection, windowUpdateRatio));
return connection;
}
// 禁用 Http2StreamChannel 自动流控
protected void initChannel(Http2StreamChannel ch) {
ch.config().setOption(Http2StreamChannelOption.AUTO_STREAM_FLOW_CONTROL, false);
// ...
}2. TripleHttp2LocalFlowController(流控核心)public class TripleHttp2LocalFlowController extends DefaultHttp2LocalFlowController {
public TripleHttp2LocalFlowController(Http2Connection connection, float windowUpdateRatio) {
// windowUpdateRatio: 当消费字节达到初始窗口的该比例时发送 WINDOW_UPDATE
// true: 启用按帧跟踪
super(connection, windowUpdateRatio, true);
}
}3. GenericHttp2ServerTransportListener(服务端入口)@Override
protected HttpMessageListener buildHttpMessageListener() {
// 设置 FragmentListener 来处理流控
streamingDecoder.setFragmentListener(new DefaultFragmentListener(listeningDecoder));
return new StreamingHttpMessageListener(streamingDecoder);
}
final class DefaultFragmentListener implements FragmentListener {
@Override
public void bytesRead(int numBytes) {
// 数据被读取后,通知流控器返回窗口
getH2StreamChannel().consumeBytes(numBytes);
}
}4. LengthFieldStreamingDecoder / TriDecoder(解码触发)private void processBody() {
int totalBytesRead = lengthFieldOffset + lengthFieldLength + requiredLength;
byte[] rawMessage;
try {
rawMessage = readRawMessage(accumulate, requiredLength);
} finally {
// 在 finally 块中通知 bytesRead,确保流控一定被触发
listener.bytesRead(totalBytesRead);
}
// 处理消息
invokeListener(inputStream);
}5. NettyH2StreamChannel(执行流控)@Override
public void consumeBytes(int numBytes) throws Exception {
Http2LocalFlowController localFlowController = http2Connection.local().flowController();
int streamId = http2StreamChannel.stream().id();
Http2Stream stream = http2Connection.stream(streamId);
// 必须在 EventLoop 线程执行
if (http2StreamChannel.eventLoop().inEventLoop()) {
localFlowController.consumeBytes(stream, numBytes);
} else {
http2StreamChannel.eventLoop().execute(() -> {
localFlowController.consumeBytes(stream, numBytes);
});
}
}服务端数据流转路径客户端流控机制数据流转图sequenceDiagram
participant Server as 服务端
participant Netty as Netty Http2FrameCodec
participant FC as TripleHttp2LocalFlowController
participant Handler as TripleHttp2ClientResponseHandler
participant Stream as AbstractTripleClientStream
participant Decoder as TriDecoder
participant Listener as TriDecoder.Listener
participant App as 业务回调
Server->>Netty: DATA 帧
Netty->>FC: receiveFlowControlledFrame()
Note over FC: 减少窗口大小
Netty->>Handler: onData(ByteBuf)
Handler->>Stream: H2TransportListener.onData()
Stream->>Decoder: deframer.deframe(data)
loop 处理每个消息
Decoder->>Decoder: processBody()
Note over Decoder: 读取消息字节
Decoder->>Listener: bytesRead(numBytes)
Listener->>Stream: consumeBytes(numBytes)
Stream->>FC: Http2LocalFlowController.consumeBytes()
Note over FC: 累加返回的字节
alt 达到 windowUpdateRatio
FC->>Server: WINDOW_UPDATE 帧
end
Decoder->>Listener: onRawMessage(data)
Listener->>App: 业务处理
end
关键类与职责1. TripleHttp2Protocol(客户端初始化)@Override
public void configClientPipeline(URL url, ChannelOperator operator, ContextOperator contextOperator) {
TripleConfig tripleConfig = ConfigManager.getProtocolOrDefault(url).getTripleOrDefault();
// 创建带有手动流控的连接
Http2Connection connection = createHttp2ClientConnection(tripleConfig);
// ...
}
private Http2Connection createHttp2ClientConnection(TripleConfig tripleConfig) {
Http2Connection connection = new DefaultHttp2Connection(false);
float windowUpdateRatio = tripleConfig.getWindowUpdateRatioOrDefault();
connection.local().flowController(
new TripleHttp2LocalFlowController(connection, windowUpdateRatio));
return connection;
}2. Http2TripleClientStream(客户端流)@Override
protected TripleStreamChannelFuture initStreamChannel(Channel parent) {
Http2StreamChannelBootstrap bootstrap = new Http2StreamChannelBootstrap(parent);
// 禁用自动流控
bootstrap.option(Http2StreamChannelOption.AUTO_STREAM_FLOW_CONTROL, false);
// ...
}
@Override
protected void consumeBytes(int numBytes) {
// 从 parent channel 获取 Http2Connection
Http2Connection http2Connection = getHttp2Connection();
Http2LocalFlowController localFlowController = http2Connection.local().flowController();
// 调用流控器消费字节
localFlowController.consumeBytes(stream, numBytes);
}
private Http2Connection getHttp2Connection() {
ChannelHandlerContext ctx = parent.pipeline().context(Http2FrameCodec.class);
Http2FrameCodec codec = (Http2FrameCodec) ctx.handler();
return codec.connection();
}3. AbstractTripleClientStream.ClientTransportListener(设置解码监听器)void onHeaderReceived(Http2Headers headers) {
// 创建解码器监听器
TriDecoder.Listener listener = new TriDecoder.Listener() {
@Override
public void bytesRead(int numBytes) {
// 调用抽象方法,由子类实现
consumeBytes(numBytes);
}
@Override
public void onRawMessage(byte[] data) {
AbstractTripleClientStream.this.listener.onMessage(data, isReturnTriException);
}
};
deframer = new TriDecoder(decompressor, listener);
}4. TriDecoder(gRPC 解码器)private void processBody() {
int totalBytesRead = HEADER_LENGTH + requiredLength;
byte[] stream;
try {
stream = compressedFlag ? getCompressedBody() : getUncompressedBody();
} finally {
// 在 finally 块中通知流控
listener.bytesRead(totalBytesRead);
}
listener.onRawMessage(stream);
}客户端数据流转路径配置说明windowUpdateRatio 工作原理假设
设计要点总结1. 两层禁用自动流控
2. bytesRead 在 finally 块中调用确保即使消息处理失败,流控也能正常工作: try {
rawMessage = readRawMessage();
} finally {
listener.bytesRead(totalBytesRead); // 必须执行
}3. EventLoop 线程安全
if (eventLoop().inEventLoop()) {
localFlowController.consumeBytes(stream, numBytes);
} else {
eventLoop().execute(() -> localFlowController.consumeBytes(stream, numBytes));
}类关系图classDiagram
class TripleConfig {
+Float windowUpdateRatio
+Integer initialWindowSize
+getWindowUpdateRatioOrDefault() float
}
class TripleHttp2LocalFlowController {
+TripleHttp2LocalFlowController(connection, windowUpdateRatio)
+receiveFlowControlledFrame()
+consumeBytes(stream, numBytes)
}
class DefaultHttp2LocalFlowController {
+consumeBytes(stream, numBytes)
}
class H2StreamChannel {
<<interface>>
+consumeBytes(numBytes)
}
class NettyH2StreamChannel {
-Http2Connection http2Connection
+consumeBytes(numBytes)
}
class StreamingDecoder {
<<interface>>
+setFragmentListener(listener)
}
class FragmentListener {
<<interface>>
+bytesRead(numBytes)
+onFragmentMessage(rawMessage)
}
class AbstractTripleClientStream {
<<abstract>>
#consumeBytes(numBytes)*
}
class Http2TripleClientStream {
+consumeBytes(numBytes)
-getHttp2Connection()
}
TripleHttp2LocalFlowController --|> DefaultHttp2LocalFlowController
NettyH2StreamChannel ..|> H2StreamChannel
Http2TripleClientStream --|> AbstractTripleClientStream
NettyH2StreamChannel --> TripleHttp2LocalFlowController : uses
Http2TripleClientStream --> TripleHttp2LocalFlowController : uses
|
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## 3.3 #15957 +/- ##
============================================
+ Coverage 58.78% 60.72% +1.93%
- Complexity 15 11711 +11696
============================================
Files 1938 1942 +4
Lines 88698 88642 -56
Branches 13387 13365 -22
============================================
+ Hits 52143 53825 +1682
+ Misses 30901 29317 -1584
+ Partials 5654 5500 -154
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
wangchengming666
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
sample is apache/dubbo-samples#1278 |
lack of PR effectiveness checking assertions. |
应用层背压应用层背压机制允许应用程序在发送数据时感知底层传输层的状态,避免在传输层缓冲区满时继续发送数据导致内存溢出或性能下降。 核心 API
设计参考本实现完全对齐 gRPC Java 的设计模式:
客户端实现核心类关系isReady() 调用链代码实现: // ClientCallToObserverAdapter.java
@Override
public boolean isReady() {
return call.isReady();
}
// TripleClientCall.java
@Override
public boolean isReady() {
if (canceled) return false;
if (done) return false;
return stream.isReady();
}
// AbstractTripleClientStream.java
@Override
public boolean isReady() {
Channel channel = streamChannelFuture.getNow();
if (channel == null) return false;
return channel.isWritable();
}onReadyHandler 触发链当 Netty Channel 从不可写变为可写时,触发以下调用链: 代码实现: // TripleHttp2ClientResponseHandler.java
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
transportListener.onWritabilityChanged();
super.channelWritabilityChanged(ctx);
}
// AbstractTripleClientStream.ClientTransportListener
@Override
public void onWritabilityChanged() {
AbstractTripleClientStream.this.onWritabilityChanged();
}
// AbstractTripleClientStream.java
protected void onWritabilityChanged() {
Channel channel = streamChannelFuture.getNow();
if (channel != null && channel.isWritable()) {
// 同步调用,由 TripleClientCall.onReady() 负责异步执行
listener.onReady();
}
}
// TripleClientCall.java
@Override
public void onReady() {
if (listener == null) {
return;
}
// ObserverToClientCallListenerAdapter.onReady() triggers the onReadyHandler
executor.execute(() -> {
try {
listener.onReady();
} catch (Throwable t) {
LOGGER.warn(PROTOCOL_STREAM_LISTENER, "", "",
"Error executing listener.onReady()", t);
}
});
}
// ObserverToClientCallListenerAdapter.java
@Override
public void onReady() {
if (requestAdapter == null) return;
Runnable handler = requestAdapter.getOnReadyHandler();
if (handler == null) return;
handler.run();
}setOnReadyHandler 设置流程代码实现: // ClientCallToObserverAdapter.java
private Runnable onReadyHandler;
@Override
public void setOnReadyHandler(Runnable onReadyHandler) {
this.onReadyHandler = onReadyHandler;
}
public Runnable getOnReadyHandler() {
return onReadyHandler;
}
// TripleInvoker.java
StreamObserver<Object> streamCall(...) {
ObserverToClientCallListenerAdapter listener =
new ObserverToClientCallListenerAdapter(responseObserver);
StreamObserver<Object> streamObserver = call.start(metadata, listener);
// 建立关联,让 listener 能够访问 adapter 的 onReadyHandler
if (streamObserver instanceof ClientCallToObserverAdapter) {
listener.setRequestAdapter((ClientCallToObserverAdapter<Object>) streamObserver);
}
// ...
}服务端实现服务端的背压机制通过 核心类关系服务端 onWritabilityChanged 触发链isReady() 实现对比:应用层字节计数 vs Netty 可写性gRPC 和 Dubbo Triple 在判断 gRPC:应用层字节计数gRPC 在应用层维护一个发送字节队列计数器 // io.grpc.internal.AbstractStream.TransportState
@GuardedBy("onReadyLock")
private int numSentBytesQueued;
@GuardedBy("onReadyLock")
private int onReadyThreshold = 32 * 1024; // 32KB
private boolean isReady() {
synchronized (onReadyLock) {
return allocated && numSentBytesQueued < onReadyThreshold && !deallocated;
}
}
// 发送时增加计数
private void onSendingBytes(int numBytes) {
synchronized (onReadyLock) {
numSentBytesQueued += numBytes;
}
}
// 发送完成后减少计数,必要时触发 onReady
public final void onSentBytes(int numBytes) {
boolean doNotify;
synchronized (onReadyLock) {
boolean belowThresholdBefore = numSentBytesQueued < onReadyThreshold;
numSentBytesQueued -= numBytes;
boolean belowThresholdAfter = numSentBytesQueued < onReadyThreshold;
doNotify = !belowThresholdBefore && belowThresholdAfter;
}
if (doNotify) {
notifyIfReady(); // 触发 listener.onReady()
}
}工作流程: Dubbo Triple:Netty Channel 可写性Dubbo Triple 直接使用 Netty 的 Channel 可写性判断: // AbstractTripleClientStream.java
@Override
public boolean isReady() {
Channel channel = streamChannelFuture.getNow();
if (channel == null) {
return false;
}
return channel.isWritable(); // 直接依赖 Netty
}Netty 可写性机制:
对比总结
使用示例客户端使用背压// 获取请求 StreamObserver
StreamObserver<Request> requestObserver = stub.biStreamCall(responseObserver);
// 转换为 CallStreamObserver 以使用背压 API
CallStreamObserver<Request> callObserver = (CallStreamObserver<Request>) requestObserver;
// 禁用自动流控
callObserver.disableAutoFlowControl();
// 设置 ready 回调
callObserver.setOnReadyHandler(() -> {
// 当流变为可写时,恢复发送数据
while (callObserver.isReady() && hasMoreData()) {
callObserver.onNext(getNextData());
}
});
// 初始发送(如果可写)
while (callObserver.isReady() && hasMoreData()) {
callObserver.onNext(getNextData());
}服务端使用背压@Override
public StreamObserver<Request> biStreamCall(StreamObserver<Response> responseObserver) {
// 转换为 ServerStreamObserver
ServerStreamObserver<Response> serverObserver =
(ServerStreamObserver<Response>) responseObserver;
// 设置 ready 回调
serverObserver.setOnReadyHandler(() -> {
while (serverObserver.isReady() && hasMoreResponses()) {
serverObserver.onNext(getNextResponse());
}
});
return new StreamObserver<Request>() {
@Override
public void onNext(Request request) {
// 处理请求
}
// ...
};
} |
oxsean
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
wangchengming666
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
zrlw
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
What is the purpose of the change?
Checklist