diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/config/H2Config.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/config/H2Config.java index a801cc51f..e8ef0dd4b 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/config/H2Config.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/config/H2Config.java @@ -51,10 +51,11 @@ public class H2Config { private final int maxHeaderListSize; private final boolean compressionEnabled; private final int maxContinuations; + private final H2PingPolicy pingPolicy; H2Config(final int headerTableSize, final boolean pushEnabled, final int maxConcurrentStreams, final int initialWindowSize, final int maxFrameSize, final int maxHeaderListSize, - final boolean compressionEnabled, final int maxContinuations) { + final boolean compressionEnabled, final int maxContinuations, final H2PingPolicy pingPolicy) { super(); this.headerTableSize = headerTableSize; this.pushEnabled = pushEnabled; @@ -64,6 +65,7 @@ public class H2Config { this.maxHeaderListSize = maxHeaderListSize; this.compressionEnabled = compressionEnabled; this.maxContinuations = maxContinuations; + this.pingPolicy = pingPolicy; } public int getHeaderTableSize() { @@ -98,6 +100,15 @@ public int getMaxContinuations() { return maxContinuations; } + /** + * Optional keep-alive PING policy. + * + * @since 5.5 + */ + public H2PingPolicy getPingPolicy() { + return pingPolicy; + } + @Override public String toString() { final StringBuilder builder = new StringBuilder(); @@ -109,6 +120,7 @@ public String toString() { .append(", maxHeaderListSize=").append(this.maxHeaderListSize) .append(", compressionEnabled=").append(this.compressionEnabled) .append(", maxContinuations=").append(this.maxContinuations) + .append(", pingPolicy=").append(this.pingPolicy) .append("]"); return builder.toString(); } @@ -142,7 +154,9 @@ public static H2Config.Builder copy(final H2Config config) { .setInitialWindowSize(config.getInitialWindowSize()) .setMaxFrameSize(config.getMaxFrameSize()) .setMaxHeaderListSize(config.getMaxHeaderListSize()) - .setCompressionEnabled(config.isCompressionEnabled()); + .setCompressionEnabled(config.isCompressionEnabled()) + .setMaxContinuations(config.getMaxContinuations()) + .setPingPolicy(config.getPingPolicy()); } public static class Builder { @@ -155,6 +169,7 @@ public static class Builder { private int maxHeaderListSize; private boolean compressionEnabled; private int maxContinuations; + private H2PingPolicy pingPolicy; Builder() { this.headerTableSize = INIT_HEADER_TABLE_SIZE * 2; @@ -165,6 +180,7 @@ public static class Builder { this.maxHeaderListSize = FrameConsts.MAX_FRAME_SIZE; this.compressionEnabled = true; this.maxContinuations = 100; + this.pingPolicy = null; } public Builder setHeaderTableSize(final int headerTableSize) { @@ -211,7 +227,7 @@ public Builder setCompressionEnabled(final boolean compressionEnabled) { * Sets max limit on number of continuations. *

value zero represents no limit

* - * @since 5,4 + * @since 5.4 */ public Builder setMaxContinuations(final int maxContinuations) { Args.positive(maxContinuations, "Max continuations"); @@ -219,6 +235,16 @@ public Builder setMaxContinuations(final int maxContinuations) { return this; } + /** + * Sets optional keep-alive PING policy. + * + * @since 5.5 + */ + public Builder setPingPolicy(final H2PingPolicy pingPolicy) { + this.pingPolicy = pingPolicy; + return this; + } + public H2Config build() { return new H2Config( headerTableSize, @@ -228,7 +254,8 @@ public H2Config build() { maxFrameSize, maxHeaderListSize, compressionEnabled, - maxContinuations); + maxContinuations, + pingPolicy); } } diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/config/H2PingPolicy.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/config/H2PingPolicy.java new file mode 100644 index 000000000..fc81908fd --- /dev/null +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/config/H2PingPolicy.java @@ -0,0 +1,103 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.hc.core5.http2.config; + +import org.apache.hc.core5.util.Args; +import org.apache.hc.core5.util.TimeValue; +import org.apache.hc.core5.util.Timeout; + +/** + * HTTP/2 keep-alive ping policy. + * + * @since 5.5 + */ +public final class H2PingPolicy { + + private static final H2PingPolicy DISABLED = new H2PingPolicy(Timeout.DISABLED, Timeout.DISABLED); + + private final Timeout idleTime; + private final Timeout ackTimeout; + + private H2PingPolicy(final Timeout idleTime, final Timeout ackTimeout) { + this.idleTime = idleTime; + this.ackTimeout = ackTimeout; + } + + public static H2PingPolicy disabled() { + return DISABLED; + } + + public static Builder custom() { + return new Builder(); + } + + public Timeout getIdleTime() { + return idleTime; + } + + public Timeout getAckTimeout() { + return ackTimeout; + } + + public boolean isEnabled() { + return isActive(idleTime) && isActive(ackTimeout); + } + + private static boolean isActive(final Timeout timeout) { + return timeout != null && timeout.isEnabled() && TimeValue.isPositive(timeout); + } + + public static final class Builder { + + private Timeout idleTime; + private Timeout ackTimeout; + + private Builder() { + this.idleTime = Timeout.DISABLED; + this.ackTimeout = Timeout.DISABLED; + } + + public Builder setIdleTime(final Timeout idleTime) { + this.idleTime = Args.notNull(idleTime, "idleTime"); + return this; + } + + public Builder setAckTimeout(final Timeout ackTimeout) { + this.ackTimeout = Args.notNull(ackTimeout, "ackTimeout"); + return this; + } + + public H2PingPolicy build() { + if (isActive(idleTime)) { + Args.check(isActive(ackTimeout), "ackTimeout must be positive when idleTime is enabled"); + } + return new H2PingPolicy(idleTime, ackTimeout); + } + } + +} diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java index 5c4b5b397..5cf88ecf4 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java @@ -77,6 +77,7 @@ import org.apache.hc.core5.http2.H2StreamTimeoutException; import org.apache.hc.core5.http2.config.H2Config; import org.apache.hc.core5.http2.config.H2Param; +import org.apache.hc.core5.http2.config.H2PingPolicy; import org.apache.hc.core5.http2.config.H2Setting; import org.apache.hc.core5.http2.frame.FrameFactory; import org.apache.hc.core5.http2.frame.FrameFlag; @@ -125,6 +126,7 @@ enum SettingsHandshake { READY, TRANSMITTED, ACKED } private final AtomicInteger connOutputWindow; private final AtomicInteger outputRequests; private final H2StreamListener streamListener; + private final KeepAlivePingSupport keepAlivePingSupport; private ConnectionHandshake connState = ConnectionHandshake.READY; private SettingsHandshake localSettingState = SettingsHandshake.READY; @@ -183,6 +185,11 @@ enum SettingsHandshake { READY, TRANSMITTED, ACKED } this.lowMark = H2Config.INIT.getInitialWindowSize() / 2; this.streamListener = streamListener; + + final H2PingPolicy pingPolicy = this.localConfig.getPingPolicy(); + this.keepAlivePingSupport = pingPolicy != null && pingPolicy.isEnabled() + ? new KeepAlivePingSupport(pingPolicy) + : null; } @Override @@ -444,6 +451,9 @@ public final void onInput(final ByteBuffer src) throws HttpException, IOExceptio for (;;) { final RawFrame frame = inputBuffer.read(src, ioSession); if (frame != null) { + if (keepAlivePingSupport != null) { + keepAlivePingSupport.onFrameInput(frame); + } if (streamListener != null) { streamListener.onFrameInput(this, frame.getStreamId(), frame); } @@ -487,6 +497,10 @@ public final void onOutput() throws HttpException, IOException { ioSession.getLock().unlock(); } + if (keepAlivePingSupport != null) { + keepAlivePingSupport.activateIfReady(); + } + if (connState.compareTo(ConnectionHandshake.SHUTDOWN) < 0) { if (connOutputWindow.get() > 0 && remoteSettingState == SettingsHandshake.ACKED) { @@ -592,6 +606,15 @@ public final void onOutput() throws HttpException, IOException { } public final void onTimeout(final Timeout timeout) throws HttpException, IOException { + if (keepAlivePingSupport != null + && connState.compareTo(ConnectionHandshake.ACTIVE) <= 0 + && localSettingState == SettingsHandshake.ACKED + && remoteSettingState == SettingsHandshake.ACKED) { + if (keepAlivePingSupport.onTimeout(timeout)) { + return; + } + } + connState = ConnectionHandshake.SHUTDOWN; final RawFrame goAway; @@ -888,6 +911,9 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid PING frame payload"); } if (frame.isFlagSet(FrameFlag.ACK)) { + if (keepAlivePingSupport != null && keepAlivePingSupport.consumePingAck(ping)) { + break; + } final AsyncPingHandler pingHandler = pingHandlers.poll(); if (pingHandler != null) { pingHandler.consumeResponse(ping); @@ -910,6 +936,9 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio localSettingState = SettingsHandshake.ACKED; ioSession.setEvent(SelectionKey.OP_WRITE); applyLocalSettings(); + if (keepAlivePingSupport != null) { + keepAlivePingSupport.activateIfReady(); + } } } else { final ByteBuffer payload = frame.getPayload(); @@ -923,6 +952,9 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio final RawFrame response = frameFactory.createSettingsAck(); commitFrame(response); remoteSettingState = SettingsHandshake.ACKED; + if (keepAlivePingSupport != null) { + keepAlivePingSupport.activateIfReady(); + } } } break; @@ -1328,6 +1360,141 @@ void appendState(final StringBuilder buf) { .append(", streams.lastRemote=").append(streams.getLastRemoteId()); } + private final class KeepAlivePingSupport { + + private static final int PING_DATA_LEN = 8; + + private final Timeout idleTime; + private final Timeout ackTimeout; + + private boolean active; + private boolean awaitingAck; + + private long lastActivityNanos; + private long pingSeq; + private long expectedAckSeq; + + KeepAlivePingSupport(final H2PingPolicy policy) { + Args.notNull(policy, "PING policy"); + this.idleTime = policy.getIdleTime(); + this.ackTimeout = policy.getAckTimeout(); + this.active = false; + this.awaitingAck = false; + this.lastActivityNanos = System.nanoTime(); + this.pingSeq = 0L; + this.expectedAckSeq = 0L; + } + + void activateIfReady() { + if (active) { + return; + } + if (localSettingState == SettingsHandshake.ACKED && remoteSettingState == SettingsHandshake.ACKED) { + active = true; + awaitingAck = false; + lastActivityNanos = System.nanoTime(); + ioSession.setSocketTimeout(idleTime); + } + } + + void onFrameInput(final RawFrame frame) { + if (!active) { + return; + } + lastActivityNanos = System.nanoTime(); + if (awaitingAck) { + if (!(frame.getType() == FrameType.PING.getValue() && frame.isFlagSet(FrameFlag.ACK))) { + awaitingAck = false; + } + } + ioSession.setSocketTimeout(idleTime); + } + + void onActivity() { + if (!active) { + return; + } + lastActivityNanos = System.nanoTime(); + if (awaitingAck) { + awaitingAck = false; + } + ioSession.setSocketTimeout(idleTime); + } + + boolean consumePingAck(final ByteBuffer payload) { + if (!active || !awaitingAck) { + return false; + } + if (payload == null || payload.remaining() != PING_DATA_LEN) { + return false; + } + final long ack = payload.getLong(payload.position()); + if (ack != expectedAckSeq) { + return false; + } + onActivity(); + return true; + } + + boolean onTimeout(final Timeout timeout) throws IOException { + activateIfReady(); + if (!active) { + return false; + } + + if (awaitingAck) { + shutdownKeepAlive(timeout); + return true; + } + + final long idleNanos = idleTime.toMilliseconds() * 1_000_000L; + if (idleNanos > 0L) { + final long elapsed = System.nanoTime() - lastActivityNanos; + if (elapsed < idleNanos) { + final long remainingMs = Math.max(1L, (idleNanos - elapsed) / 1_000_000L); + ioSession.setSocketTimeout(Timeout.ofMilliseconds(remainingMs)); + return true; + } + } + + awaitingAck = true; + sendPing(); + ioSession.setSocketTimeout(ackTimeout); + return true; + } + + private void sendPing() throws IOException { + final long v = ++pingSeq; + expectedAckSeq = v; + + final ByteBuffer payload = ByteBuffer.allocate(PING_DATA_LEN); + payload.putLong(v); + payload.flip(); + + final RawFrame ping = frameFactory.createPing(payload); + commitFrame(ping); + } + + private void shutdownKeepAlive(final Timeout timeout) throws IOException { + connState = ConnectionHandshake.SHUTDOWN; + + final RawFrame goAway = frameFactory.createGoAway( + streams.getLastRemoteId(), + H2Error.NO_ERROR, + "Ping response timeout (" + timeout + ")"); + commitFrame(goAway); + + for (final Iterator it = streams.iterator(); it.hasNext(); ) { + final H2Stream stream = it.next(); + stream.fail(new H2StreamResetException( + H2Error.NO_ERROR, + "Ping response timeout (" + timeout + ")")); + } + streams.shutdownAndReleaseAll(); + } + + } + private static class Continuation { final int streamId; diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2KeepAlivePingClientExample.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2KeepAlivePingClientExample.java new file mode 100644 index 000000000..6fb7505d8 --- /dev/null +++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2KeepAlivePingClientExample.java @@ -0,0 +1,257 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.http2.examples; + +import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hc.core5.http.EntityDetails; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpConnection; +import org.apache.hc.core5.http.HttpException; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester; +import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler; +import org.apache.hc.core5.http.nio.AsyncRequestProducer; +import org.apache.hc.core5.http.nio.CapacityChannel; +import org.apache.hc.core5.http.nio.DataStreamChannel; +import org.apache.hc.core5.http.nio.RequestChannel; +import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer; +import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder; +import org.apache.hc.core5.http.nio.support.BasicResponseConsumer; +import org.apache.hc.core5.http.protocol.HttpContext; +import org.apache.hc.core5.http.protocol.HttpCoreContext; +import org.apache.hc.core5.http2.HttpVersionPolicy; +import org.apache.hc.core5.http2.config.H2Config; +import org.apache.hc.core5.http2.config.H2PingPolicy; +import org.apache.hc.core5.http2.frame.FrameFlag; +import org.apache.hc.core5.http2.frame.FrameType; +import org.apache.hc.core5.http2.frame.RawFrame; +import org.apache.hc.core5.http2.impl.nio.H2StreamListener; +import org.apache.hc.core5.http2.impl.nio.bootstrap.H2RequesterBootstrap; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.core5.util.Timeout; + +/** + * Minimal example demonstrating HTTP/2 connection keepalive using {@link H2PingPolicy}. + *

+ * The client configures an idle timeout and an ACK timeout. When the underlying HTTP/2 + * connection becomes idle, the I/O reactor triggers a keepalive {@code PING}. If the + * peer responds with {@code PING[ACK]} within the configured ACK timeout, the connection + * remains usable; otherwise the connection is considered dead and is terminated by the + * transport. + *

+ *

+ * This example performs a single request to establish the connection and then waits + * long enough for one keepalive round-trip. It prints: + *

+ *
    + *
  • the remote endpoint once,
  • + *
  • {@code >> PING} when a keepalive PING is emitted,
  • + *
  • {@code << PING[ACK]} when the ACK is received,
  • + *
  • a final counter line {@code keepalive: pingsOut=..., pingAcksIn=...}.
  • + *
+ *

+ * Notes: + *

+ *
    + *
  • This is intentionally not a unit test; it is a runnable sanity-check and usage example.
  • + *
  • Keepalive requires HTTP/2 settings negotiation to complete; PINGs may not be emitted + * immediately on startup.
  • + *
  • Timing is inherently environment-dependent; adjust {@code idleTime}/{@code ackTimeout} + * if running on a slow or heavily loaded machine.
  • + *
+ * @since 5.5 + */ + +public class H2KeepAlivePingClientExample { + + public static void main(final String[] args) throws Exception { + + final Timeout idleTime = Timeout.ofSeconds(1); + final Timeout ackTimeout = Timeout.ofSeconds(2); + + final IOReactorConfig ioReactorConfig = IOReactorConfig.custom() + .setSoTimeout(5, TimeUnit.SECONDS) + .build(); + + final H2PingPolicy pingPolicy = H2PingPolicy.custom() + .setIdleTime(idleTime) + .setAckTimeout(ackTimeout) + .build(); + + final H2Config h2Config = H2Config.custom() + .setPushEnabled(false) + .setMaxConcurrentStreams(100) + .setPingPolicy(pingPolicy) + .build(); + + final AtomicBoolean remotePrinted = new AtomicBoolean(false); + final AtomicInteger pingsOut = new AtomicInteger(0); + final AtomicInteger pingAcksIn = new AtomicInteger(0); + final CountDownLatch pingAckLatch = new CountDownLatch(1); + + final HttpAsyncRequester requester = H2RequesterBootstrap.bootstrap() + .setIOReactorConfig(ioReactorConfig) + .setH2Config(h2Config) + .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2) + .setStreamListener(new H2StreamListener() { + + private void printRemoteOnce(final HttpConnection connection) { + if (remotePrinted.compareAndSet(false, true)) { + System.out.println("remote=" + connection.getRemoteAddress()); + } + } + + @Override + public void onHeaderInput(final HttpConnection connection, final int streamId, final List headers) { + } + + @Override + public void onHeaderOutput(final HttpConnection connection, final int streamId, final List headers) { + } + + @Override + public void onFrameInput(final HttpConnection connection, final int streamId, final RawFrame frame) { + printRemoteOnce(connection); + if (FrameType.valueOf(frame.getType()) == FrameType.PING && frame.isFlagSet(FrameFlag.ACK)) { + System.out.println("<< PING[ACK]"); + pingAcksIn.incrementAndGet(); + pingAckLatch.countDown(); + } + } + + @Override + public void onFrameOutput(final HttpConnection connection, final int streamId, final RawFrame frame) { + printRemoteOnce(connection); + if (FrameType.valueOf(frame.getType()) == FrameType.PING && !frame.isFlagSet(FrameFlag.ACK)) { + System.out.println(">> PING"); + pingsOut.incrementAndGet(); + } + } + + @Override + public void onInputFlowControl(final HttpConnection connection, final int streamId, final int delta, final int actualSize) { + } + + @Override + public void onOutputFlowControl(final HttpConnection connection, final int streamId, final int delta, final int actualSize) { + } + + }) + .create(); + + Runtime.getRuntime().addShutdownHook(new Thread(() -> requester.close(CloseMode.GRACEFUL))); + + requester.start(); + + final URI requestUri = new URI("http://nghttp2.org/httpbin/post"); + final AsyncRequestProducer requestProducer = AsyncRequestBuilder.post(requestUri) + .setEntity("stuff") + .build(); + final BasicResponseConsumer responseConsumer = new BasicResponseConsumer<>(new StringAsyncEntityConsumer()); + + final CountDownLatch exchangeLatch = new CountDownLatch(1); + + requester.execute(new AsyncClientExchangeHandler() { + + @Override + public void releaseResources() { + requestProducer.releaseResources(); + responseConsumer.releaseResources(); + exchangeLatch.countDown(); + } + + @Override + public void cancel() { + exchangeLatch.countDown(); + } + + @Override + public void failed(final Exception cause) { + exchangeLatch.countDown(); + } + + @Override + public void produceRequest(final RequestChannel channel, final HttpContext httpContext) throws HttpException, IOException { + requestProducer.sendRequest(channel, httpContext); + } + + @Override + public int available() { + return requestProducer.available(); + } + + @Override + public void produce(final DataStreamChannel channel) throws IOException { + requestProducer.produce(channel); + } + + @Override + public void consumeInformation(final HttpResponse response, final HttpContext httpContext) { + } + + @Override + public void consumeResponse(final HttpResponse response, final EntityDetails entityDetails, final HttpContext httpContext) throws HttpException, IOException { + responseConsumer.consumeResponse(response, entityDetails, httpContext, null); + } + + @Override + public void updateCapacity(final CapacityChannel capacityChannel) throws IOException { + responseConsumer.updateCapacity(capacityChannel); + } + + @Override + public void consume(final ByteBuffer src) throws IOException { + responseConsumer.consume(src); + } + + @Override + public void streamEnd(final List trailers) throws HttpException, IOException { + responseConsumer.streamEnd(trailers); + } + + }, Timeout.ofSeconds(30), HttpCoreContext.create()); + + exchangeLatch.await(); + + final long waitMs = idleTime.toMilliseconds() + ackTimeout.toMilliseconds() + 500L; + pingAckLatch.await(waitMs, TimeUnit.MILLISECONDS); + + System.out.println("keepalive: pingsOut=" + pingsOut.get() + ", pingAcksIn=" + pingAcksIn.get()); + + requester.close(CloseMode.GRACEFUL); + } + +} diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java index c85d10aa9..150382440 100644 --- a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java +++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java @@ -57,10 +57,12 @@ import org.apache.hc.core5.http2.WritableByteChannelMock; import org.apache.hc.core5.http2.config.H2Config; import org.apache.hc.core5.http2.config.H2Param; +import org.apache.hc.core5.http2.config.H2PingPolicy; import org.apache.hc.core5.http2.config.H2Setting; import org.apache.hc.core5.http2.frame.DefaultFrameFactory; import org.apache.hc.core5.http2.frame.FrameConsts; import org.apache.hc.core5.http2.frame.FrameFactory; +import org.apache.hc.core5.http2.frame.FrameFlag; import org.apache.hc.core5.http2.frame.FrameType; import org.apache.hc.core5.http2.frame.RawFrame; import org.apache.hc.core5.http2.frame.StreamIdGenerator; @@ -846,28 +848,6 @@ static final class PriorityHeaderSender implements H2StreamHandler { @Override public void releaseResources() { } } - // Small struct + parser to decode the frames we capture from writes - private static final class FrameStub { - final int type; - final int streamId; - FrameStub(final int type, final int streamId) { this.type = type; this.streamId = streamId; } - } - private static List parseFrames(final byte[] all) { - final List out = new ArrayList<>(); - int p = 0; - while (p + 9 <= all.length) { - final int len = ((all[p] & 0xff) << 16) | ((all[p + 1] & 0xff) << 8) | (all[p + 2] & 0xff); - final int type = all[p + 3] & 0xff; - final int sid = ((all[p + 5] & 0x7f) << 24) | ((all[p + 6] & 0xff) << 16) - | ((all[p + 7] & 0xff) << 8) | (all[p + 8] & 0xff); - p += 9; - if (p + len > all.length) break; - out.add(new FrameStub(type, sid)); - p += len; - } - return out; - } - // 2) Client emits PRIORITY_UPDATE BEFORE HEADERS when Priority header present @Test void testSubmitWithPriorityHeaderEmitsPriorityUpdateBeforeHeaders() throws Exception { @@ -1083,5 +1063,326 @@ void testStreamIdleTimeoutTriggersH2StreamTimeoutException() throws Exception { Assertions.assertEquals(1, timeoutEx.getStreamId()); } + private static byte[] encodeFrame(final RawFrame frame) throws IOException { + final WritableByteChannelMock writableChannel = new WritableByteChannelMock(256); + final FrameOutputBuffer outBuffer = new FrameOutputBuffer(16 * 1024); + outBuffer.write(frame, writableChannel); + return writableChannel.toByteArray(); + } + + private static void feedFrame(final AbstractH2StreamMultiplexer mux, final RawFrame frame) throws Exception { + mux.onInput(ByteBuffer.wrap(encodeFrame(frame))); + } + + private static void completeSettingsHandshake(final AbstractH2StreamMultiplexer mux) throws Exception { + // Remote SETTINGS (non-ACK) -> mux replies with SETTINGS ACK and marks remoteSettingState ACKED + final RawFrame remoteSettings = FRAME_FACTORY.createSettings(new H2Setting[] { + new H2Setting(H2Param.MAX_FRAME_SIZE, FrameConsts.MIN_FRAME_SIZE) + }); + feedFrame(mux, remoteSettings); + + // Remote ACK of our SETTINGS -> localSettingState ACKED + feedFrame(mux, new RawFrame(FrameType.SETTINGS.getValue(), FrameFlag.ACK.getValue(), 0, null)); + } + + private static final class FrameStub { + final int type; + final int flags; + final int streamId; + final byte[] payload; + + FrameStub(final int type, final int flags, final int streamId, final byte[] payload) { + this.type = type; + this.flags = flags; + this.streamId = streamId; + this.payload = payload; + } + + boolean isPing() { + return type == FrameType.PING.getValue(); + } + + boolean isGoAway() { + return type == FrameType.GOAWAY.getValue(); + } + + boolean isAck() { + return (flags & FrameFlag.ACK.getValue()) != 0; + } + } + + private static List parseFrames(final byte[] all) { + final List out = new ArrayList<>(); + int p = 0; + while (p + 9 <= all.length) { + final int len = ((all[p] & 0xff) << 16) | ((all[p + 1] & 0xff) << 8) | (all[p + 2] & 0xff); + final int type = all[p + 3] & 0xff; + final int flags = all[p + 4] & 0xff; + final int sid = ((all[p + 5] & 0x7f) << 24) | ((all[p + 6] & 0xff) << 16) + | ((all[p + 7] & 0xff) << 8) | (all[p + 8] & 0xff); + p += 9; + if (p + len > all.length) { + break; + } + final byte[] payload = new byte[len]; + System.arraycopy(all, p, payload, 0, len); + out.add(new FrameStub(type, flags, sid, payload)); + p += len; + } + return out; + } + + private static byte[] concat(final List writes) { + final int total = writes.stream().mapToInt(a -> a.length).sum(); + final byte[] all = new byte[total]; + int p = 0; + for (final byte[] a : writes) { + System.arraycopy(a, 0, all, p, a.length); + p += a.length; + } + return all; + } + + + @Test + void testKeepAliveNotActiveBeforeSettingsHandshake() throws Exception { + final List writes = new ArrayList<>(); + Mockito.when(protocolIOSession.write(ArgumentMatchers.any(ByteBuffer.class))) + .thenAnswer(inv -> { + final ByteBuffer b = inv.getArgument(0, ByteBuffer.class); + final byte[] copy = new byte[b.remaining()]; + b.get(copy); + writes.add(copy); + return copy.length; + }); + Mockito.doNothing().when(protocolIOSession).setEvent(ArgumentMatchers.anyInt()); + Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt()); + + final Timeout idleTime = Timeout.ofMilliseconds(5); + final Timeout ackTimeout = Timeout.ofMilliseconds(5); + + final H2Config h2Config = H2Config.custom() + .setPingPolicy(H2PingPolicy.custom() + .setIdleTime(idleTime) + .setAckTimeout(ackTimeout) + .build()) + .build(); + + final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl( + protocolIOSession, FRAME_FACTORY, StreamIdGenerator.ODD, + httpProcessor, CharCodingConfig.DEFAULT, h2Config, h2StreamListener, () -> streamHandler); + + mux.onConnect(); + writes.clear(); + + // BEFORE SETTINGS handshake is fully ACKed, keepalive must NOT run + mux.onTimeout(idleTime); + + final List frames = parseFrames(concat(writes)); + Assertions.assertTrue(frames.stream().noneMatch(FrameStub::isPing), "Must not emit PING before handshake"); + Assertions.assertTrue(frames.stream().anyMatch(FrameStub::isGoAway), "Default timeout path must emit GOAWAY"); + } + + @Test + void testKeepAliveActivatesAfterSettingsAckedSetsIdleTimeout() throws Exception { + Mockito.doNothing().when(protocolIOSession).setEvent(ArgumentMatchers.anyInt()); + Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt()); + Mockito.when(protocolIOSession.write(ArgumentMatchers.any(ByteBuffer.class))).thenReturn(0); + + final Timeout idleTime = Timeout.ofMilliseconds(50); + final Timeout ackTimeout = Timeout.ofMilliseconds(20); + + final H2Config h2Config = H2Config.custom() + .setPingPolicy(H2PingPolicy.custom() + .setIdleTime(idleTime) + .setAckTimeout(ackTimeout) + .build()) + .build(); + + final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl( + protocolIOSession, FRAME_FACTORY, StreamIdGenerator.ODD, + httpProcessor, CharCodingConfig.DEFAULT, h2Config, h2StreamListener, () -> streamHandler); + + mux.onConnect(); + completeSettingsHandshake(mux); + + Mockito.verify(protocolIOSession, Mockito.atLeastOnce()).setSocketTimeout(ArgumentMatchers.eq(idleTime)); + Mockito.verify(protocolIOSession, Mockito.never()).setSocketTimeout(ArgumentMatchers.eq(ackTimeout)); + } + + @Test + void testKeepAliveIdleTimeoutSendsPingAndSetsAckTimeout() throws Exception { + final List writes = new ArrayList<>(); + Mockito.when(protocolIOSession.write(ArgumentMatchers.any(ByteBuffer.class))) + .thenAnswer(inv -> { + final ByteBuffer b = inv.getArgument(0, ByteBuffer.class); + final byte[] copy = new byte[b.remaining()]; + b.get(copy); + writes.add(copy); + return copy.length; + }); + Mockito.doNothing().when(protocolIOSession).setEvent(ArgumentMatchers.anyInt()); + Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt()); + + final Timeout idleTime = Timeout.ofMilliseconds(5); + final Timeout ackTimeout = Timeout.ofMilliseconds(50); + + final H2Config h2Config = H2Config.custom() + .setPingPolicy(H2PingPolicy.custom() + .setIdleTime(idleTime) + .setAckTimeout(ackTimeout) + .build()) + .build(); + + final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl( + protocolIOSession, FRAME_FACTORY, StreamIdGenerator.ODD, + httpProcessor, CharCodingConfig.DEFAULT, h2Config, h2StreamListener, () -> streamHandler); + + mux.onConnect(); + completeSettingsHandshake(mux); + + writes.clear(); + Thread.sleep(idleTime.toMilliseconds() + 10); + + mux.onTimeout(idleTime); + + Mockito.verify(protocolIOSession, Mockito.atLeastOnce()).setSocketTimeout(ArgumentMatchers.eq(ackTimeout)); + + final List frames = parseFrames(concat(writes)); + Assertions.assertTrue(frames.stream().anyMatch(f -> f.isPing() && !f.isAck()), "Must emit keepalive PING"); + Assertions.assertTrue(mux.isOpen(), "Connection should still be open after sending PING"); + } + + @Test + void testKeepAlivePingAckReturnsToIdleTimeout() throws Exception { + final List writes = new ArrayList<>(); + Mockito.when(protocolIOSession.write(ArgumentMatchers.any(ByteBuffer.class))) + .thenAnswer(inv -> { + final ByteBuffer b = inv.getArgument(0, ByteBuffer.class); + final byte[] copy = new byte[b.remaining()]; + b.get(copy); + writes.add(copy); + return copy.length; + }); + Mockito.doNothing().when(protocolIOSession).setEvent(ArgumentMatchers.anyInt()); + Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt()); + + final Timeout idleTime = Timeout.ofMilliseconds(5); + final Timeout ackTimeout = Timeout.ofMilliseconds(50); + + final H2Config h2Config = H2Config.custom() + .setPingPolicy(H2PingPolicy.custom() + .setIdleTime(idleTime) + .setAckTimeout(ackTimeout) + .build()) + .build(); + + final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl( + protocolIOSession, FRAME_FACTORY, StreamIdGenerator.ODD, + httpProcessor, CharCodingConfig.DEFAULT, h2Config, h2StreamListener, () -> streamHandler); + + mux.onConnect(); + completeSettingsHandshake(mux); + + writes.clear(); + Thread.sleep(idleTime.toMilliseconds() + 10); + mux.onTimeout(idleTime); + + final List frames = parseFrames(concat(writes)); + final FrameStub ping = frames.stream().filter(f -> f.isPing() && !f.isAck()).findFirst().orElse(null); + Assertions.assertNotNull(ping, "Expected a keepalive PING frame"); + Assertions.assertEquals(8, ping.payload.length, "PING payload must be 8 bytes"); + + // Feed an ACK with the same 8 bytes + final RawFrame pingAck = new RawFrame(FrameType.PING.getValue(), FrameFlag.ACK.getValue(), 0, ByteBuffer.wrap(ping.payload)); + feedFrame(mux, pingAck); + + Mockito.verify(protocolIOSession, Mockito.atLeastOnce()).setSocketTimeout(ArgumentMatchers.eq(idleTime)); + } + + @Test + void testKeepAliveAckTimeoutShutsDownAndFailsStreams() throws Exception { + final List writes = new ArrayList<>(); + Mockito.when(protocolIOSession.write(ArgumentMatchers.any(ByteBuffer.class))) + .thenAnswer(inv -> { + final ByteBuffer b = inv.getArgument(0, ByteBuffer.class); + final byte[] copy = new byte[b.remaining()]; + b.get(copy); + writes.add(copy); + return copy.length; + }); + Mockito.doNothing().when(protocolIOSession).setEvent(ArgumentMatchers.anyInt()); + Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt()); + + final Timeout idleTime = Timeout.ofMilliseconds(5); + final Timeout ackTimeout = Timeout.ofMilliseconds(20); + + final H2Config h2Config = H2Config.custom() + .setPingPolicy(H2PingPolicy.custom() + .setIdleTime(idleTime) + .setAckTimeout(ackTimeout) + .build()) + .build(); + + final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl( + protocolIOSession, FRAME_FACTORY, StreamIdGenerator.ODD, + httpProcessor, CharCodingConfig.DEFAULT, h2Config, h2StreamListener, () -> streamHandler); + + mux.onConnect(); + completeSettingsHandshake(mux); + + // Ensure at least one live stream to be failed + final H2StreamChannel channel = mux.createChannel(1); + mux.createStream(channel, streamHandler); + + writes.clear(); + Thread.sleep(idleTime.toMilliseconds() + 10); + mux.onTimeout(idleTime); // send PING, awaiting ACK + writes.clear(); + + // No ACK arrives -> next timeout closes via keepalive path (GOAWAY + fail streams) + mux.onTimeout(ackTimeout); + + final List frames = parseFrames(concat(writes)); + Assertions.assertTrue(frames.stream().anyMatch(FrameStub::isGoAway), "Must emit GOAWAY on ping ACK timeout"); + + Mockito.verify(streamHandler, Mockito.atLeastOnce()).failed(exceptionCaptor.capture()); + Assertions.assertInstanceOf(H2StreamResetException.class, exceptionCaptor.getValue()); + + Assertions.assertFalse(mux.isOpen(), "Connection must not be open after keepalive shutdown"); + } + + @Test + void testKeepAliveDisabledNeverEmitsPing() throws Exception { + final List writes = new ArrayList<>(); + Mockito.when(protocolIOSession.write(ArgumentMatchers.any(ByteBuffer.class))) + .thenAnswer(inv -> { + final ByteBuffer b = inv.getArgument(0, ByteBuffer.class); + final byte[] copy = new byte[b.remaining()]; + b.get(copy); + writes.add(copy); + return copy.length; + }); + Mockito.doNothing().when(protocolIOSession).setEvent(ArgumentMatchers.anyInt()); + Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt()); + + final H2Config h2Config = H2Config.custom() + .setPingPolicy(H2PingPolicy.disabled()) + .build(); + + final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl( + protocolIOSession, FRAME_FACTORY, StreamIdGenerator.ODD, + httpProcessor, CharCodingConfig.DEFAULT, h2Config, h2StreamListener, () -> streamHandler); + + mux.onConnect(); + writes.clear(); + + mux.onTimeout(Timeout.ofMilliseconds(1)); + + final List frames = parseFrames(concat(writes)); + Assertions.assertTrue(frames.stream().noneMatch(FrameStub::isPing), "Disabled policy must never emit PING"); + } + + } diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestH2KeepAlivePingPolicyIT.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestH2KeepAlivePingPolicyIT.java new file mode 100644 index 000000000..3968c42c4 --- /dev/null +++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestH2KeepAlivePingPolicyIT.java @@ -0,0 +1,357 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.hc.core5.testing.nio; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.LockSupport; + +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.HttpException; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.HttpRequest; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.HttpStatus; +import org.apache.hc.core5.http.Message; +import org.apache.hc.core5.http.URIScheme; +import org.apache.hc.core5.http.nio.AsyncServerRequestHandler; +import org.apache.hc.core5.http.nio.entity.DiscardingEntityConsumer; +import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer; +import org.apache.hc.core5.http.nio.support.AsyncResponseBuilder; +import org.apache.hc.core5.http.nio.support.BasicRequestProducer; +import org.apache.hc.core5.http.nio.support.BasicResponseConsumer; +import org.apache.hc.core5.http.protocol.HttpContext; +import org.apache.hc.core5.http.support.BasicRequestBuilder; +import org.apache.hc.core5.http2.config.H2Config; +import org.apache.hc.core5.http2.config.H2PingPolicy; +import org.apache.hc.core5.http2.nio.AsyncPingHandler; +import org.apache.hc.core5.http2.nio.command.PingCommand; +import org.apache.hc.core5.reactor.Command; +import org.apache.hc.core5.reactor.IOSession; +import org.apache.hc.core5.testing.extension.nio.H2TestResources; +import org.apache.hc.core5.util.Timeout; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +public class TestH2KeepAlivePingPolicyIT { + + private static final Timeout TIMEOUT = Timeout.ofSeconds(30); + + @RegisterExtension + private final H2TestResources resources = new H2TestResources(URIScheme.HTTP, TIMEOUT); + + @Test + void keepAlivePing_keepsConnectionOpenPastIdleTimeout() throws Exception { + final H2TestServer server = resources.server(); + final H2TestClient client = resources.client(); + + server.register("/hello", () -> new MessageExchangeHandler(new DiscardingEntityConsumer()) { + @Override + protected void handle( + final Message request, + final AsyncServerRequestHandler.ResponseTrigger responseTrigger, + final HttpContext context) throws IOException, HttpException { + responseTrigger.submitResponse( + AsyncResponseBuilder.create(HttpStatus.SC_OK) + .setEntity("OK", ContentType.TEXT_PLAIN) + .build(), + context); + } + }); + + final Timeout idleTime = Timeout.ofMilliseconds(200); + final Timeout ackTimeout = Timeout.ofSeconds(2); + + final H2PingPolicy pingPolicy = H2PingPolicy.custom() + .setIdleTime(idleTime) + .setAckTimeout(ackTimeout) + .build(); + + final H2Config h2Config = H2Config.custom() + .setPushEnabled(false) + .setPingPolicy(pingPolicy) + .build(); + + server.configure(h2Config); + final InetSocketAddress serverEndpoint = server.start(); + + client.configure(h2Config); + client.start(); + + final IOSession ioSession = client.requestSession( + new HttpHost("localhost", serverEndpoint.getPort()), + TIMEOUT, + null).get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()); + + // Make the inactivity timeout aggressive; keep-alive must prevent it from killing the session. + ioSession.setSocketTimeout(idleTime); + + try (final ClientSessionEndpoint streamEndpoint = new ClientSessionEndpoint(ioSession)) { + final HttpHost target = new HttpHost(URIScheme.HTTP.id, "localhost", serverEndpoint.getPort()); + + final Message r1 = executeHello(streamEndpoint, target); + Assertions.assertEquals(200, r1.getHead().getCode()); + Assertions.assertEquals("OK", r1.getBody()); + + parkAtLeast(idleTime.toMilliseconds() * 6L); + + Assertions.assertTrue(ioSession.isOpen(), "Expected session to stay open with keep-alive enabled"); + + final Message r2 = executeHello(streamEndpoint, target); + Assertions.assertEquals(200, r2.getHead().getCode()); + Assertions.assertEquals("OK", r2.getBody()); + } + } + + @Test + void keepAlivePing_disabled_connectionClosesOnIdleTimeout() throws Exception { + final H2TestServer server = resources.server(); + final H2TestClient client = resources.client(); + + server.register("/hello", () -> new MessageExchangeHandler(new DiscardingEntityConsumer()) { + @Override + protected void handle( + final Message request, + final AsyncServerRequestHandler.ResponseTrigger responseTrigger, + final HttpContext context) throws IOException, HttpException { + responseTrigger.submitResponse( + AsyncResponseBuilder.create(HttpStatus.SC_OK) + .setEntity("OK", ContentType.TEXT_PLAIN) + .build(), + context); + } + }); + + final H2Config h2Config = H2Config.custom() + .setPushEnabled(false) + // pingPolicy is intentionally not set (disabled) + .build(); + + server.configure(h2Config); + final InetSocketAddress serverEndpoint = server.start(); + + client.configure(h2Config); + client.start(); + + final IOSession ioSession = client.requestSession( + new HttpHost("localhost", serverEndpoint.getPort()), + TIMEOUT, + null).get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()); + + final Timeout idleTimeout = Timeout.ofMilliseconds(200); + ioSession.setSocketTimeout(idleTimeout); + + try (final ClientSessionEndpoint streamEndpoint = new ClientSessionEndpoint(ioSession)) { + final HttpHost target = new HttpHost(URIScheme.HTTP.id, "localhost", serverEndpoint.getPort()); + + final Message r1 = executeHello(streamEndpoint, target); + Assertions.assertEquals(200, r1.getHead().getCode()); + Assertions.assertEquals("OK", r1.getBody()); + + awaitTrue(() -> !ioSession.isOpen(), Timeout.ofSeconds(5), "Expected session to close without keep-alive"); + + Assertions.assertFalse(ioSession.isOpen(), "Expected session to close without keep-alive"); + + final Future> f = executeHelloAsync(streamEndpoint, target); + Assertions.assertThrows(ExecutionException.class, () -> f.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit())); + } + } + + @Test + void keepAlivePing_enabled_doesNotStealAckFromExplicitPingCommand() throws Exception { + final H2TestServer server = resources.server(); + final H2TestClient client = resources.client(); + + server.register("/hello", () -> new MessageExchangeHandler(new DiscardingEntityConsumer()) { + @Override + protected void handle( + final Message request, + final AsyncServerRequestHandler.ResponseTrigger responseTrigger, + final HttpContext context) throws IOException, HttpException { + responseTrigger.submitResponse( + AsyncResponseBuilder.create(HttpStatus.SC_OK) + .setEntity("OK", ContentType.TEXT_PLAIN) + .build(), + context); + } + }); + + final Timeout idleTime = Timeout.ofMilliseconds(100); + final Timeout ackTimeout = Timeout.ofSeconds(2); + + final H2PingPolicy pingPolicy = H2PingPolicy.custom() + .setIdleTime(idleTime) + .setAckTimeout(ackTimeout) + .build(); + + final H2Config h2Config = H2Config.custom() + .setPushEnabled(false) + .setPingPolicy(pingPolicy) + .build(); + + server.configure(h2Config); + final InetSocketAddress serverEndpoint = server.start(); + + client.configure(h2Config); + client.start(); + + final IOSession ioSession = client.requestSession( + new HttpHost("localhost", serverEndpoint.getPort()), + TIMEOUT, + null).get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()); + + ioSession.setSocketTimeout(idleTime); + + try (final ClientSessionEndpoint streamEndpoint = new ClientSessionEndpoint(ioSession)) { + final HttpHost target = new HttpHost(URIScheme.HTTP.id, "localhost", serverEndpoint.getPort()); + + // Warm-up to complete the HTTP/2 session & SETTINGS handshake. + final Message r1 = executeHello(streamEndpoint, target); + Assertions.assertEquals(200, r1.getHead().getCode()); + Assertions.assertEquals("OK", r1.getBody()); + + // Give the keep-alive logic a chance to become active (no hard assumptions about socket timeout changes). + parkAtLeast(idleTime.toMilliseconds() * 3L); + + final byte[] expected = new byte[]{0x55, 0x55, 0x55, 0x55, 0x55, 0x55, 0x55, 0x55}; + final CompletableFuture acked = new CompletableFuture<>(); + + final AsyncPingHandler handler = new AsyncPingHandler() { + + @Override + public ByteBuffer getData() { + return ByteBuffer.wrap(expected).asReadOnlyBuffer(); + } + + @Override + public void consumeResponse(final ByteBuffer feedback) throws IOException, HttpException { + if (feedback == null || feedback.remaining() != expected.length) { + acked.completeExceptionally(new AssertionError("Unexpected ping ACK payload")); + return; + } + final ByteBuffer dup = feedback.slice(); + final byte[] actual = new byte[expected.length]; + dup.get(actual); + for (int i = 0; i < expected.length; i++) { + if (actual[i] != expected[i]) { + acked.completeExceptionally(new AssertionError("Ping ACK payload mismatch")); + return; + } + } + acked.complete(null); + } + + @Override + public void failed(final Exception cause) { + acked.completeExceptionally(cause); + } + + @Override + public void cancel() { + acked.cancel(false); + } + }; + + ioSession.enqueue(new PingCommand(handler), Command.Priority.NORMAL); + ioSession.setEvent(SelectionKey.OP_WRITE); + + try { + acked.get(5, TimeUnit.SECONDS); + } catch (final TimeoutException ex) { + Assertions.fail("Timed out waiting for explicit PING ACK"); + } + + // Still usable. + Assertions.assertTrue(ioSession.isOpen(), "Expected session to stay open"); + final Message r2 = executeHello(streamEndpoint, target); + Assertions.assertEquals(200, r2.getHead().getCode()); + Assertions.assertEquals("OK", r2.getBody()); + } + } + + @Test + void keepAlivePingPolicy_rejectsDisabledAckTimeoutWhenIdleEnabled() { + Assertions.assertThrows(IllegalArgumentException.class, () -> H2PingPolicy.custom() + .setIdleTime(Timeout.ofSeconds(1)) + .setAckTimeout(Timeout.DISABLED) + .build()); + } + + private static Message executeHello( + final ClientSessionEndpoint endpoint, + final HttpHost target) throws Exception { + final Future> f = executeHelloAsync(endpoint, target); + return f.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()); + } + + private static Future> executeHelloAsync( + final ClientSessionEndpoint endpoint, + final HttpHost target) { + + final org.apache.hc.core5.http.message.BasicHttpRequest request = BasicRequestBuilder.get() + .setHttpHost(target) + .setPath("/hello") + .build(); + + return endpoint.execute( + new BasicRequestProducer(request, null), + new BasicResponseConsumer(new StringAsyncEntityConsumer()), + null); + } + + private static void parkAtLeast(final long millis) { + final long deadlineNanos = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(millis); + while (System.nanoTime() < deadlineNanos) { + LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10)); + } + } + + private interface Condition { + boolean get(); + } + + private static void awaitTrue(final Condition condition, final Timeout timeout, final String message) { + final long deadlineNanos = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout.toMilliseconds()); + while (System.nanoTime() < deadlineNanos) { + if (condition.get()) { + return; + } + LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(1)); + } + Assertions.fail(message); + } + +}