Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,15 @@ public class ConfigOptions {
"The number of threads that the client uses for sending requests to the "
+ "network and receiving responses from network. The default value is 4");

public static final ConfigOption<Boolean> NETTY_CLIENT_ALLOCATOR_HEAP_BUFFER_FIRST =
key("netty.client.allocator.heap-buffer-first")
.booleanType()
.defaultValue(true)
.withDescription(
"Whether to allocate heap buffer first for the netty client. "
+ "If set to false, direct buffer will be used first, "
+ "which requires sufficient off-heap memory to be available.");

// ------------------------------------------------------------------------
// Client Settings
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ protected void initChannel(SocketChannel ch) throws Exception {
super.initChannel(ch);
addIdleStateHandler(ch);
ch.pipeline().addLast(prepender);
addFrameDecoder(ch, MAX_FRAME_LENGTH, 4);
addFrameDecoder(ch, MAX_FRAME_LENGTH, 4, false);
ch.pipeline().addLast("flowController", new FlowControlHandler());
ch.pipeline().addLast(new KafkaCommandDecoder(requestChannels));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,22 @@

package org.apache.fluss.rpc.netty;

import org.apache.fluss.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.fluss.shaded.netty4.io.netty.buffer.ByteBufAllocator;
import org.apache.fluss.shaded.netty4.io.netty.channel.ChannelInitializer;
import org.apache.fluss.shaded.netty4.io.netty.channel.socket.SocketChannel;
import org.apache.fluss.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder;
import org.apache.fluss.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.apache.fluss.shaded.netty4.io.netty.handler.timeout.IdleStateHandler;

import static java.lang.Integer.MAX_VALUE;
import static org.apache.fluss.utils.Preconditions.checkArgument;

/**
* A basic {@link ChannelInitializer} for initializing {@link SocketChannel} instances to support
* netty logging and add common handlers.
*/
public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> {

private final int maxIdleTimeSeconds;

private static final NettyLogger nettyLogger = new NettyLogger();
Expand All @@ -46,15 +49,113 @@ protected void initChannel(SocketChannel ch) throws Exception {
}
}

public void addFrameDecoder(SocketChannel ch, int maxFrameLength, int initialBytesToStrip) {
ch.pipeline()
.addLast(
"frameDecoder",
new LengthFieldBasedFrameDecoder(
maxFrameLength, 0, 4, 0, initialBytesToStrip));
public void addFrameDecoder(
SocketChannel ch, int maxFrameLength, int initialBytesToStrip, boolean preferHeap) {
LengthFieldBasedFrameDecoder lengthFieldBasedFrameDecoder =
new LengthFieldBasedFrameDecoder(maxFrameLength, 0, 4, 0, initialBytesToStrip);
lengthFieldBasedFrameDecoder.setCumulator(new FlussMergeCumulator(preferHeap));
ch.pipeline().addLast("frameDecoder", lengthFieldBasedFrameDecoder);
}

public void addIdleStateHandler(SocketChannel ch) {
ch.pipeline().addLast("idle", new IdleStateHandler(0, 0, maxIdleTimeSeconds));
}

/**
* A custom {@link ByteToMessageDecoder.Cumulator} that ensures the cumulation buffer stays on
* the JVM heap when heap memory is preferred.
*
* <p>Why this is needed — the two pitfalls in Netty's default {@code MERGE_CUMULATOR}:
*
* <ol>
* <li><b>Fast-path "use in directly" problem:</b> When the cumulation is empty (e.g. the very
* first read, where cumulation starts as {@code EMPTY_BUFFER}), {@code MERGE_CUMULATOR}
* short-circuits and returns the incoming {@code in} buffer as the new cumulation without
* copying. On <b>native transports (epoll / kqueue)</b>, {@code
* EpollRecvByteAllocatorHandle.allocate()} always forces the read buffer to be direct
* regardless of the channel's configured allocator (it wraps it with {@code
* PreferredDirectByteBufAllocator} internally, because JNI requires direct buffers). So
* even with {@code PreferHeapByteBufAllocator} set on the channel, the first {@code in}
* is direct, and the default cumulator would make a direct buffer the cumulation for the
* rest of the connection lifetime. This cumulator blocks that by skipping the fast-path
* when {@code isPreferHeap=true}.
* <li><b>Subsequent reads — write-into-direct cumulation problem:</b> If for any reason a
* direct cumulation was established (e.g. before this fix, or on NIO when the first read
* buffer happened to be direct), subsequent {@code in} buffers with small sizes will pass
* the {@code required <= maxWritableBytes()} check and be written into the existing
* direct cumulation via {@code writeBytes}, bypassing {@code expandCumulation} entirely.
* Setting {@code ch.config().setAllocator(preferHeapAllocator)} alone does NOT fix this
* because it only affects newly allocated buffers, not writes into an existing one.
* </ol>
*
* <p>By blocking the fast-path, the first {@code in} (direct) triggers {@code
* expandCumulation}, which calls {@code alloc.buffer(...)}. Since {@code alloc} is the channel
* allocator ({@code PreferHeapByteBufAllocator}), the new cumulation buffer will be a heap
* buffer. All subsequent writes then accumulate into that heap buffer.
*/
static final class FlussMergeCumulator implements ByteToMessageDecoder.Cumulator {
private final boolean isPreferHeap;

public FlussMergeCumulator(boolean isPreferHeap) {
this.isPreferHeap = isPreferHeap;
}

@Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
if (cumulation == in) {
// when the in buffer is the same as the cumulation it is doubly retained, release
// it once
in.release();
return cumulation;
}
if (!cumulation.isReadable() && in.isContiguous()) {
// If cumulation is empty and input buffer is contiguous, use it directly
cumulation.release();
return in;
}
try {
final int required = in.readableBytes();
if (required > cumulation.maxWritableBytes()
|| required > cumulation.maxFastWritableBytes() && cumulation.refCnt() > 1
|| cumulation.isReadOnly()
|| (cumulation.isDirect() && isPreferHeap)) {
// Expand cumulation (by replacing it) under the following conditions:
// - cumulation cannot be resized to accommodate the additional data
// - cumulation can be expanded with a reallocation operation to accommodate but
// the buffer is
// assumed to be shared (e.g. refCnt() > 1) and the reallocation may not be
// safe.
return expandCumulation(alloc, cumulation, in);
}
cumulation.writeBytes(in, in.readerIndex(), required);
in.readerIndex(in.writerIndex());
return cumulation;
} finally {
// We must release in all cases as otherwise it may produce a leak if
// writeBytes(...) throw
// for whatever release (for example because of OutOfMemoryError)
in.release();
}
}
}

static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf oldCumulation, ByteBuf in) {
int oldBytes = oldCumulation.readableBytes();
int newBytes = in.readableBytes();
int totalBytes = oldBytes + newBytes;
ByteBuf newCumulation = alloc.buffer(alloc.calculateNewCapacity(totalBytes, MAX_VALUE));
ByteBuf toRelease = newCumulation;
try {
// This avoids redundant checks and stack depth compared to calling writeBytes(...)
newCumulation
.setBytes(0, oldCumulation, oldCumulation.readerIndex(), oldBytes)
.setBytes(oldBytes, in, in.readerIndex(), newBytes)
.writerIndex(totalBytes);
in.readerIndex(in.writerIndex());
toRelease = oldCumulation;
return newCumulation;
} finally {
toRelease.release();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,29 @@
package org.apache.fluss.rpc.netty.client;

import org.apache.fluss.rpc.netty.NettyChannelInitializer;
import org.apache.fluss.shaded.netty4.io.netty.buffer.ByteBufAllocator;
import org.apache.fluss.shaded.netty4.io.netty.channel.ChannelInitializer;
import org.apache.fluss.shaded.netty4.io.netty.channel.PreferHeapByteBufAllocator;
import org.apache.fluss.shaded.netty4.io.netty.channel.socket.SocketChannel;

/**
* A specialized {@link ChannelInitializer} for initializing {@link SocketChannel} instances that
* will be used by the client to handle the init request for the server.
*/
final class ClientChannelInitializer extends NettyChannelInitializer {
private final ByteBufAllocator allocator;

public ClientChannelInitializer(long maxIdleTimeSeconds) {
public ClientChannelInitializer(long maxIdleTimeSeconds, ByteBufAllocator allocator) {
super(maxIdleTimeSeconds);
this.allocator = allocator;
}

@Override
protected void initChannel(SocketChannel ch) throws Exception {
// NettyClientHandler will be added dynamically when connection is built
super.initChannel(ch);
addFrameDecoder(ch, Integer.MAX_VALUE, 0);
ch.config().setAllocator(allocator);
addFrameDecoder(ch, Integer.MAX_VALUE, 0, allocator instanceof PreferHeapByteBufAllocator);
addIdleStateHandler(ch);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@
import org.apache.fluss.security.auth.AuthenticationFactory;
import org.apache.fluss.security.auth.ClientAuthenticator;
import org.apache.fluss.shaded.netty4.io.netty.bootstrap.Bootstrap;
import org.apache.fluss.shaded.netty4.io.netty.buffer.ByteBufAllocator;
import org.apache.fluss.shaded.netty4.io.netty.buffer.PooledByteBufAllocator;
import org.apache.fluss.shaded.netty4.io.netty.channel.ChannelOption;
import org.apache.fluss.shaded.netty4.io.netty.channel.EventLoopGroup;
import org.apache.fluss.shaded.netty4.io.netty.channel.PreferHeapByteBufAllocator;
import org.apache.fluss.utils.MapUtils;
import org.apache.fluss.utils.concurrent.FutureUtils;

Expand All @@ -48,6 +50,7 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import static org.apache.fluss.config.ConfigOptions.NETTY_CLIENT_ALLOCATOR_HEAP_BUFFER_FIRST;
import static org.apache.fluss.utils.Preconditions.checkArgument;

/**
Expand Down Expand Up @@ -97,15 +100,23 @@ public NettyClient(
int connectionMaxIdle =
(int) conf.get(ConfigOptions.NETTY_CONNECTION_MAX_IDLE_TIME).getSeconds();
PooledByteBufAllocator pooledAllocator = PooledByteBufAllocator.DEFAULT;
// The inner client runs inside a Fluss server process where direct memory budget is
// explicitly managed, so it always uses pooled direct buffers. The external client
// (e.g. embedded in Flink) defaults to heap buffers to avoid unexpected off-heap
// pressure in user JVMs that may not have tuned -XX:MaxDirectMemorySize.
ByteBufAllocator allocator =
isInnerClient || !conf.getBoolean(NETTY_CLIENT_ALLOCATOR_HEAP_BUFFER_FIRST)
? PooledByteBufAllocator.DEFAULT
: new PreferHeapByteBufAllocator(pooledAllocator);
this.bootstrap =
new Bootstrap()
.group(eventGroup)
.channel(NettyUtils.getClientSocketChannelClass(eventGroup))
.option(ChannelOption.ALLOCATOR, pooledAllocator)
.option(ChannelOption.ALLOCATOR, allocator)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMs)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ClientChannelInitializer(connectionMaxIdle));
.handler(new ClientChannelInitializer(connectionMaxIdle, allocator));
this.isInnerClient = isInnerClient;
this.clientMetricGroup = clientMetricGroup;
this.authenticatorSupplier = AuthenticationFactory.loadClientAuthenticatorSupplier(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ final class ServerConnection {
private final ConnectionMetrics connectionMetrics;
private final ClientAuthenticator authenticator;
private final ExponentialBackoff backoff;

private final Object lock = new Object();

@GuardedBy("lock")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public ServerChannelInitializer(
protected void initChannel(SocketChannel ch) throws Exception {
super.initChannel(ch);
// initialBytesToStrip=0 to include the frame size field after decoding
addFrameDecoder(ch, Integer.MAX_VALUE, 0);
addFrameDecoder(ch, Integer.MAX_VALUE, 0, false);
addIdleStateHandler(ch);
ServerAuthenticator serverAuthenticator = authenticatorSupplier.get();
LOG.debug(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.fluss.security.auth.AuthenticationFactory;
import org.apache.fluss.security.auth.ClientAuthenticator;
import org.apache.fluss.shaded.netty4.io.netty.bootstrap.Bootstrap;
import org.apache.fluss.shaded.netty4.io.netty.buffer.PooledByteBufAllocator;
import org.apache.fluss.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.fluss.shaded.netty4.io.netty.channel.EventLoopGroup;
import org.apache.fluss.utils.NetUtils;
Expand Down Expand Up @@ -102,7 +103,8 @@ void setUp() throws Exception {
new Bootstrap()
.group(eventLoopGroup)
.channel(getClientSocketChannelClass(eventLoopGroup))
.handler(new ClientChannelInitializer(5000));
.handler(
new ClientChannelInitializer(5000, PooledByteBufAllocator.DEFAULT));
clientAuthenticator =
AuthenticationFactory.loadClientAuthenticatorSupplier(new Configuration()).get();
}
Expand Down
Loading