From 334edbac9bf0804efcd3815c2bf8080edcff0ee5 Mon Sep 17 00:00:00 2001 From: Hongshun Wang Date: Mon, 2 Mar 2026 15:06:11 +0800 Subject: [PATCH] [client] Netty prefer heap memory --- .../apache/fluss/config/ConfigOptions.java | 9 ++ .../fluss/kafka/KafkaChannelInitializer.java | 2 +- .../rpc/netty/NettyChannelInitializer.java | 115 ++++++++++++++++-- .../client/ClientChannelInitializer.java | 9 +- .../fluss/rpc/netty/client/NettyClient.java | 15 ++- .../rpc/netty/client/ServerConnection.java | 1 - .../server/ServerChannelInitializer.java | 2 +- .../netty/client/ServerConnectionTest.java | 4 +- website/docs/_configs/_partial_config.mdx | 5 +- 9 files changed, 145 insertions(+), 17 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index 08742ebc2e..9dfb0b298d 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -870,6 +870,15 @@ public class ConfigOptions { "The number of threads that the client uses for sending requests to the " + "network and receiving responses from network. The default value is 4"); + public static final ConfigOption NETTY_CLIENT_ALLOCATOR_HEAP_BUFFER_FIRST = + key("netty.client.allocator.heap-buffer-first") + .booleanType() + .defaultValue(true) + .withDescription( + "Whether to allocate heap buffer first for the netty client. " + + "If set to false, direct buffer will be used first, " + + "which requires sufficient off-heap memory to be available."); + // ------------------------------------------------------------------------ // Client Settings // ------------------------------------------------------------------------ diff --git a/fluss-kafka/src/main/java/org/apache/fluss/kafka/KafkaChannelInitializer.java b/fluss-kafka/src/main/java/org/apache/fluss/kafka/KafkaChannelInitializer.java index bf57ea0d52..d346d6652b 100644 --- a/fluss-kafka/src/main/java/org/apache/fluss/kafka/KafkaChannelInitializer.java +++ b/fluss-kafka/src/main/java/org/apache/fluss/kafka/KafkaChannelInitializer.java @@ -44,7 +44,7 @@ protected void initChannel(SocketChannel ch) throws Exception { super.initChannel(ch); addIdleStateHandler(ch); ch.pipeline().addLast(prepender); - addFrameDecoder(ch, MAX_FRAME_LENGTH, 4); + addFrameDecoder(ch, MAX_FRAME_LENGTH, 4, false); ch.pipeline().addLast("flowController", new FlowControlHandler()); ch.pipeline().addLast(new KafkaCommandDecoder(requestChannels)); } diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/NettyChannelInitializer.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/NettyChannelInitializer.java index 2b98686003..fcc19c9517 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/NettyChannelInitializer.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/NettyChannelInitializer.java @@ -17,11 +17,15 @@ package org.apache.fluss.rpc.netty; +import org.apache.fluss.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.fluss.shaded.netty4.io.netty.buffer.ByteBufAllocator; import org.apache.fluss.shaded.netty4.io.netty.channel.ChannelInitializer; import org.apache.fluss.shaded.netty4.io.netty.channel.socket.SocketChannel; +import org.apache.fluss.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder; import org.apache.fluss.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder; import org.apache.fluss.shaded.netty4.io.netty.handler.timeout.IdleStateHandler; +import static java.lang.Integer.MAX_VALUE; import static org.apache.fluss.utils.Preconditions.checkArgument; /** @@ -29,7 +33,6 @@ * netty logging and add common handlers. */ public class NettyChannelInitializer extends ChannelInitializer { - private final int maxIdleTimeSeconds; private static final NettyLogger nettyLogger = new NettyLogger(); @@ -46,15 +49,113 @@ protected void initChannel(SocketChannel ch) throws Exception { } } - public void addFrameDecoder(SocketChannel ch, int maxFrameLength, int initialBytesToStrip) { - ch.pipeline() - .addLast( - "frameDecoder", - new LengthFieldBasedFrameDecoder( - maxFrameLength, 0, 4, 0, initialBytesToStrip)); + public void addFrameDecoder( + SocketChannel ch, int maxFrameLength, int initialBytesToStrip, boolean preferHeap) { + LengthFieldBasedFrameDecoder lengthFieldBasedFrameDecoder = + new LengthFieldBasedFrameDecoder(maxFrameLength, 0, 4, 0, initialBytesToStrip); + lengthFieldBasedFrameDecoder.setCumulator(new FlussMergeCumulator(preferHeap)); + ch.pipeline().addLast("frameDecoder", lengthFieldBasedFrameDecoder); } public void addIdleStateHandler(SocketChannel ch) { ch.pipeline().addLast("idle", new IdleStateHandler(0, 0, maxIdleTimeSeconds)); } + + /** + * A custom {@link ByteToMessageDecoder.Cumulator} that ensures the cumulation buffer stays on + * the JVM heap when heap memory is preferred. + * + *

Why this is needed — the two pitfalls in Netty's default {@code MERGE_CUMULATOR}: + * + *

    + *
  1. Fast-path "use in directly" problem: When the cumulation is empty (e.g. the very + * first read, where cumulation starts as {@code EMPTY_BUFFER}), {@code MERGE_CUMULATOR} + * short-circuits and returns the incoming {@code in} buffer as the new cumulation without + * copying. On native transports (epoll / kqueue), {@code + * EpollRecvByteAllocatorHandle.allocate()} always forces the read buffer to be direct + * regardless of the channel's configured allocator (it wraps it with {@code + * PreferredDirectByteBufAllocator} internally, because JNI requires direct buffers). So + * even with {@code PreferHeapByteBufAllocator} set on the channel, the first {@code in} + * is direct, and the default cumulator would make a direct buffer the cumulation for the + * rest of the connection lifetime. This cumulator blocks that by skipping the fast-path + * when {@code isPreferHeap=true}. + *
  2. Subsequent reads — write-into-direct cumulation problem: If for any reason a + * direct cumulation was established (e.g. before this fix, or on NIO when the first read + * buffer happened to be direct), subsequent {@code in} buffers with small sizes will pass + * the {@code required <= maxWritableBytes()} check and be written into the existing + * direct cumulation via {@code writeBytes}, bypassing {@code expandCumulation} entirely. + * Setting {@code ch.config().setAllocator(preferHeapAllocator)} alone does NOT fix this + * because it only affects newly allocated buffers, not writes into an existing one. + *
+ * + *

By blocking the fast-path, the first {@code in} (direct) triggers {@code + * expandCumulation}, which calls {@code alloc.buffer(...)}. Since {@code alloc} is the channel + * allocator ({@code PreferHeapByteBufAllocator}), the new cumulation buffer will be a heap + * buffer. All subsequent writes then accumulate into that heap buffer. + */ + static final class FlussMergeCumulator implements ByteToMessageDecoder.Cumulator { + private final boolean isPreferHeap; + + public FlussMergeCumulator(boolean isPreferHeap) { + this.isPreferHeap = isPreferHeap; + } + + @Override + public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { + if (cumulation == in) { + // when the in buffer is the same as the cumulation it is doubly retained, release + // it once + in.release(); + return cumulation; + } + if (!cumulation.isReadable() && in.isContiguous()) { + // If cumulation is empty and input buffer is contiguous, use it directly + cumulation.release(); + return in; + } + try { + final int required = in.readableBytes(); + if (required > cumulation.maxWritableBytes() + || required > cumulation.maxFastWritableBytes() && cumulation.refCnt() > 1 + || cumulation.isReadOnly() + || (cumulation.isDirect() && isPreferHeap)) { + // Expand cumulation (by replacing it) under the following conditions: + // - cumulation cannot be resized to accommodate the additional data + // - cumulation can be expanded with a reallocation operation to accommodate but + // the buffer is + // assumed to be shared (e.g. refCnt() > 1) and the reallocation may not be + // safe. + return expandCumulation(alloc, cumulation, in); + } + cumulation.writeBytes(in, in.readerIndex(), required); + in.readerIndex(in.writerIndex()); + return cumulation; + } finally { + // We must release in all cases as otherwise it may produce a leak if + // writeBytes(...) throw + // for whatever release (for example because of OutOfMemoryError) + in.release(); + } + } + } + + static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf oldCumulation, ByteBuf in) { + int oldBytes = oldCumulation.readableBytes(); + int newBytes = in.readableBytes(); + int totalBytes = oldBytes + newBytes; + ByteBuf newCumulation = alloc.buffer(alloc.calculateNewCapacity(totalBytes, MAX_VALUE)); + ByteBuf toRelease = newCumulation; + try { + // This avoids redundant checks and stack depth compared to calling writeBytes(...) + newCumulation + .setBytes(0, oldCumulation, oldCumulation.readerIndex(), oldBytes) + .setBytes(oldBytes, in, in.readerIndex(), newBytes) + .writerIndex(totalBytes); + in.readerIndex(in.writerIndex()); + toRelease = oldCumulation; + return newCumulation; + } finally { + toRelease.release(); + } + } } diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ClientChannelInitializer.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ClientChannelInitializer.java index 2d217ed0f2..087a31b7f3 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ClientChannelInitializer.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ClientChannelInitializer.java @@ -18,7 +18,9 @@ package org.apache.fluss.rpc.netty.client; import org.apache.fluss.rpc.netty.NettyChannelInitializer; +import org.apache.fluss.shaded.netty4.io.netty.buffer.ByteBufAllocator; import org.apache.fluss.shaded.netty4.io.netty.channel.ChannelInitializer; +import org.apache.fluss.shaded.netty4.io.netty.channel.PreferHeapByteBufAllocator; import org.apache.fluss.shaded.netty4.io.netty.channel.socket.SocketChannel; /** @@ -26,16 +28,19 @@ * will be used by the client to handle the init request for the server. */ final class ClientChannelInitializer extends NettyChannelInitializer { + private final ByteBufAllocator allocator; - public ClientChannelInitializer(long maxIdleTimeSeconds) { + public ClientChannelInitializer(long maxIdleTimeSeconds, ByteBufAllocator allocator) { super(maxIdleTimeSeconds); + this.allocator = allocator; } @Override protected void initChannel(SocketChannel ch) throws Exception { // NettyClientHandler will be added dynamically when connection is built super.initChannel(ch); - addFrameDecoder(ch, Integer.MAX_VALUE, 0); + ch.config().setAllocator(allocator); + addFrameDecoder(ch, Integer.MAX_VALUE, 0, allocator instanceof PreferHeapByteBufAllocator); addIdleStateHandler(ch); } } diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/NettyClient.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/NettyClient.java index f567b8584d..2c4183a261 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/NettyClient.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/NettyClient.java @@ -30,9 +30,11 @@ import org.apache.fluss.security.auth.AuthenticationFactory; import org.apache.fluss.security.auth.ClientAuthenticator; import org.apache.fluss.shaded.netty4.io.netty.bootstrap.Bootstrap; +import org.apache.fluss.shaded.netty4.io.netty.buffer.ByteBufAllocator; import org.apache.fluss.shaded.netty4.io.netty.buffer.PooledByteBufAllocator; import org.apache.fluss.shaded.netty4.io.netty.channel.ChannelOption; import org.apache.fluss.shaded.netty4.io.netty.channel.EventLoopGroup; +import org.apache.fluss.shaded.netty4.io.netty.channel.PreferHeapByteBufAllocator; import org.apache.fluss.utils.MapUtils; import org.apache.fluss.utils.concurrent.FutureUtils; @@ -48,6 +50,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.Supplier; +import static org.apache.fluss.config.ConfigOptions.NETTY_CLIENT_ALLOCATOR_HEAP_BUFFER_FIRST; import static org.apache.fluss.utils.Preconditions.checkArgument; /** @@ -97,15 +100,23 @@ public NettyClient( int connectionMaxIdle = (int) conf.get(ConfigOptions.NETTY_CONNECTION_MAX_IDLE_TIME).getSeconds(); PooledByteBufAllocator pooledAllocator = PooledByteBufAllocator.DEFAULT; + // The inner client runs inside a Fluss server process where direct memory budget is + // explicitly managed, so it always uses pooled direct buffers. The external client + // (e.g. embedded in Flink) defaults to heap buffers to avoid unexpected off-heap + // pressure in user JVMs that may not have tuned -XX:MaxDirectMemorySize. + ByteBufAllocator allocator = + isInnerClient || !conf.getBoolean(NETTY_CLIENT_ALLOCATOR_HEAP_BUFFER_FIRST) + ? PooledByteBufAllocator.DEFAULT + : new PreferHeapByteBufAllocator(pooledAllocator); this.bootstrap = new Bootstrap() .group(eventGroup) .channel(NettyUtils.getClientSocketChannelClass(eventGroup)) - .option(ChannelOption.ALLOCATOR, pooledAllocator) + .option(ChannelOption.ALLOCATOR, allocator) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMs) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, true) - .handler(new ClientChannelInitializer(connectionMaxIdle)); + .handler(new ClientChannelInitializer(connectionMaxIdle, allocator)); this.isInnerClient = isInnerClient; this.clientMetricGroup = clientMetricGroup; this.authenticatorSupplier = AuthenticationFactory.loadClientAuthenticatorSupplier(conf); diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java index a3fd7afbd5..0017a06dd0 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java @@ -77,7 +77,6 @@ final class ServerConnection { private final ConnectionMetrics connectionMetrics; private final ClientAuthenticator authenticator; private final ExponentialBackoff backoff; - private final Object lock = new Object(); @GuardedBy("lock") diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/ServerChannelInitializer.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/ServerChannelInitializer.java index 83813be377..f62add16fb 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/ServerChannelInitializer.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/ServerChannelInitializer.java @@ -65,7 +65,7 @@ public ServerChannelInitializer( protected void initChannel(SocketChannel ch) throws Exception { super.initChannel(ch); // initialBytesToStrip=0 to include the frame size field after decoding - addFrameDecoder(ch, Integer.MAX_VALUE, 0); + addFrameDecoder(ch, Integer.MAX_VALUE, 0, false); addIdleStateHandler(ch); ServerAuthenticator serverAuthenticator = authenticatorSupplier.get(); LOG.debug( diff --git a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java index 41e4bef2c2..b8e9674730 100644 --- a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java +++ b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java @@ -47,6 +47,7 @@ import org.apache.fluss.security.auth.AuthenticationFactory; import org.apache.fluss.security.auth.ClientAuthenticator; import org.apache.fluss.shaded.netty4.io.netty.bootstrap.Bootstrap; +import org.apache.fluss.shaded.netty4.io.netty.buffer.PooledByteBufAllocator; import org.apache.fluss.shaded.netty4.io.netty.channel.ChannelFuture; import org.apache.fluss.shaded.netty4.io.netty.channel.EventLoopGroup; import org.apache.fluss.utils.NetUtils; @@ -102,7 +103,8 @@ void setUp() throws Exception { new Bootstrap() .group(eventLoopGroup) .channel(getClientSocketChannelClass(eventLoopGroup)) - .handler(new ClientChannelInitializer(5000)); + .handler( + new ClientChannelInitializer(5000, PooledByteBufAllocator.DEFAULT)); clientAuthenticator = AuthenticationFactory.loadClientAuthenticatorSupplier(new Configuration()).get(); } diff --git a/website/docs/_configs/_partial_config.mdx b/website/docs/_configs/_partial_config.mdx index 7bb8e0a663..13d07e8ec7 100644 --- a/website/docs/_configs/_partial_config.mdx +++ b/website/docs/_configs/_partial_config.mdx @@ -78,7 +78,7 @@ | `client.lookup.batch-timeout` | `0 s` | Duration | The maximum time to wait for the lookup batch to full, if this timeout is reached, the lookup batch will be closed to send. | | `client.lookup.max-retries` | `2147483647` | Integer | Setting a value greater than zero will cause the client to resend any lookup request that fails with a potentially transient error. | | `client.scanner.remote-log.prefetch-num` | `4` | Integer | The number of remote log segments to keep in local temp file for LogScanner, which download from remote storage. The default setting is 4. | -| `client.scanner.io.tmpdir` | `/var/folders/bp/v2l48kz51mx86d743qv0zhzh0000gn/T//fluss` | String | Local directory that is used by client for storing the data files (like kv snapshot, log segment files) to read temporarily | +| `client.scanner.io.tmpdir` | `/var/folders/qw/wkqk76752w15lm9xdq4q22s00000gp/T//fluss` | String | Local directory that is used by client for storing the data files (like kv snapshot, log segment files) to read temporarily | | `client.remote-file.download-thread-num` | `3` | Integer | The number of threads the client uses to download remote files. | | `client.filesystem.security.token.renewal.backoff` | `1 hours` | Duration | The time period how long to wait before retrying to obtain new security tokens for filesystem after a failure. | | `client.filesystem.security.token.renewal.time-ratio` | `0.75` | Double | Ratio of the token's expiration time when new credentials for access filesystem should be re-obtained. | @@ -226,6 +226,7 @@ | `netty.server.max-queued-requests` | `500` | Integer | The number of queued requests allowed for worker threads, before blocking the I/O threads. | | `netty.connection.max-idle-time` | `10 min` | Duration | Close idle connections after the given time specified by this config. | | `netty.client.num-network-threads` | `4` | Integer | The number of threads that the client uses for sending requests to the network and receiving responses from network. The default value is 4 | +| `netty.client.allocator.heap-buffer-first` | `true` | Boolean | Whether to allocate heap buffer first for the netty client. If set to false, direct buffer will be used first, which requires sufficient off-heap memory to be available. | ## Plugin Configurations @@ -284,7 +285,7 @@ | `table.auto-partition.enabled` | `false` | Boolean | Whether enable auto partition for the table. Disable by default. When auto partition is enabled, the partitions of the table will be created automatically. | | `table.auto-partition.key` | `none` | String | This configuration defines the time-based partition key to be used for auto-partitioning when a table is partitioned with multiple keys. Auto-partitioning utilizes a time-based partition key to handle partitions automatically, including creating new ones and removing outdated ones, by comparing the time value of the partition with the current system time. In the case of a table using multiple partition keys (such as a composite partitioning strategy), this feature determines which key should serve as the primary time dimension for making auto-partitioning decisions.And If the table has only one partition key, this config is not necessary. Otherwise, it must be specified. | | `table.auto-partition.time-unit` | `DAY` | AutoPartitionTimeUnit | The time granularity for auto created partitions. The default value is `DAY`. Valid values are `HOUR`, `DAY`, `MONTH`, `QUARTER`, `YEAR`. If the value is `HOUR`, the partition format for auto created is yyyyMMddHH. If the value is `DAY`, the partition format for auto created is yyyyMMdd. If the value is `MONTH`, the partition format for auto created is yyyyMM. If the value is `QUARTER`, the partition format for auto created is yyyyQ. If the value is `YEAR`, the partition format for auto created is yyyy. | -| `table.auto-partition.time-zone` | `Europe/Paris` | String | The time zone for auto partitions, which is by default the same as the system time zone. | +| `table.auto-partition.time-zone` | `Asia/Shanghai` | String | The time zone for auto partitions, which is by default the same as the system time zone. | | `table.auto-partition.num-precreate` | `2` | Integer | The number of partitions to pre-create for auto created partitions in each check for auto partition. For example, if the current check time is 2024-11-11 and the value is configured as 3, then partitions 20241111, 20241112, 20241113 will be pre-created. If any one partition exists, it'll skip creating the partition. The default value is 2, which means 2 partitions will be pre-created. If the `table.auto-partition.time-unit` is `DAY`(default), one precreated partition is for today and another one is for tomorrow.For a partition table with multiple partition keys, pre-create is unsupported and will be set to 0 automatically when creating table if it is not explicitly specified. | | `table.auto-partition.num-retention` | `7` | Integer | The number of history partitions to retain for auto created partitions in each check for auto partition. For example, if the current check time is 2024-11-11, time-unit is DAY, and the value is configured as 3, then the history partitions 20241108, 20241109, 20241110 will be retained. The partitions earlier than 20241108 will be deleted. The default value is 7. | | `table.log.ttl` | `168 hours` | Duration | The time to live for log segments. The configuration controls the maximum time we will retain a log before we will delete old segments to free up space. If set to -1, the log will not be deleted. |