From 815f52d93f0bafe05793304d4c1d70d17595ec13 Mon Sep 17 00:00:00 2001 From: Anush008 Date: Wed, 18 Mar 2026 14:10:52 +0530 Subject: [PATCH] feat: Support for tracing ID headers Signed-off-by: Anush008 --- .../io/qdrant/client/QdrantGrpcClient.java | 13 ++- .../java/io/qdrant/client/RequestHeaders.java | 85 +++++++++++++++++++ 2 files changed, 94 insertions(+), 4 deletions(-) create mode 100644 src/main/java/io/qdrant/client/RequestHeaders.java diff --git a/src/main/java/io/qdrant/client/QdrantGrpcClient.java b/src/main/java/io/qdrant/client/QdrantGrpcClient.java index 0df6d78..67a23d7 100644 --- a/src/main/java/io/qdrant/client/QdrantGrpcClient.java +++ b/src/main/java/io/qdrant/client/QdrantGrpcClient.java @@ -1,6 +1,8 @@ package io.qdrant.client; import io.grpc.CallCredentials; +import io.grpc.Channel; +import io.grpc.ClientInterceptors; import io.grpc.Deadline; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; @@ -21,6 +23,7 @@ public class QdrantGrpcClient implements AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(QdrantGrpcClient.class); @Nullable private final CallCredentials callCredentials; private final ManagedChannel channel; + private final Channel interceptedChannel; private final boolean shutdownChannelOnClose; @Nullable private final Duration timeout; @@ -31,6 +34,8 @@ public class QdrantGrpcClient implements AutoCloseable { @Nullable Duration timeout) { this.callCredentials = callCredentials; this.channel = channel; + this.interceptedChannel = + ClientInterceptors.intercept(channel, RequestHeaders.newInterceptor()); this.shutdownChannelOnClose = shutdownChannelOnClose; this.timeout = timeout; } @@ -136,7 +141,7 @@ public ManagedChannel channel() { * @return a new instance of {@link QdrantFutureStub} */ public QdrantGrpc.QdrantFutureStub qdrant() { - return QdrantGrpc.newFutureStub(channel) + return QdrantGrpc.newFutureStub(interceptedChannel) .withCallCredentials(callCredentials) .withDeadline( timeout != null ? Deadline.after(timeout.toMillis(), TimeUnit.MILLISECONDS) : null); @@ -148,7 +153,7 @@ public QdrantGrpc.QdrantFutureStub qdrant() { * @return a new instance of {@link PointsFutureStub} */ public PointsFutureStub points() { - return PointsGrpc.newFutureStub(channel) + return PointsGrpc.newFutureStub(interceptedChannel) .withCallCredentials(callCredentials) .withDeadline( timeout != null ? Deadline.after(timeout.toMillis(), TimeUnit.MILLISECONDS) : null); @@ -160,7 +165,7 @@ public PointsFutureStub points() { * @return a new instance of {@link CollectionsFutureStub} */ public CollectionsFutureStub collections() { - return CollectionsGrpc.newFutureStub(channel) + return CollectionsGrpc.newFutureStub(interceptedChannel) .withCallCredentials(callCredentials) .withDeadline( timeout != null ? Deadline.after(timeout.toMillis(), TimeUnit.MILLISECONDS) : null); @@ -172,7 +177,7 @@ public CollectionsFutureStub collections() { * @return a new instance of {@link SnapshotsFutureStub} */ public SnapshotsFutureStub snapshots() { - return SnapshotsGrpc.newFutureStub(channel) + return SnapshotsGrpc.newFutureStub(interceptedChannel) .withCallCredentials(callCredentials) .withDeadline( timeout != null ? Deadline.after(timeout.toMillis(), TimeUnit.MILLISECONDS) : null); diff --git a/src/main/java/io/qdrant/client/RequestHeaders.java b/src/main/java/io/qdrant/client/RequestHeaders.java new file mode 100644 index 0000000..0aa32ff --- /dev/null +++ b/src/main/java/io/qdrant/client/RequestHeaders.java @@ -0,0 +1,85 @@ +package io.qdrant.client; + +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.Context; +import io.grpc.ForwardingClientCall; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * Utilities for attaching per-request headers to gRPC calls. + * + *
{@code
+ * Context ctx = RequestHeaders.withHeader(Context.current(), "x-request-id", "abc-123");
+ * ctx.run(() -> client.listCollectionsAsync());
+ * }
+ */ +public final class RequestHeaders { + + static final Context.Key> HEADERS_KEY = Context.key("qdrant-request-headers"); + + private RequestHeaders() {} + + /** + * Returns a new {@link Context} that carries key/value as a gRPC metadata header on every request + * started within that context. + * + * @param ctx the parent context + * @param key the header name + * @param value the header value + * @return a child context with the header attached + */ + public static Context withHeader(Context ctx, String key, String value) { + return withHeaders(ctx, Collections.singletonMap(key, value)); + } + + /** + * Returns a new {@link Context} that carries all entries of headers as gRPC metadata on every + * request started within that context. + * + * @param ctx the parent context + * @param headers the headers to attach + * @return a child context with the headers attached + */ + public static Context withHeaders(Context ctx, Map headers) { + if (headers == null || headers.isEmpty()) { + return ctx; + } + Map merged = new HashMap<>(); + Map current = HEADERS_KEY.get(ctx); + if (current != null) merged.putAll(current); + merged.putAll(headers); + return ctx.withValue(HEADERS_KEY, merged); + } + + /** Returns a {@link ClientInterceptor} that injects per-request headers from the context. */ + static ClientInterceptor newInterceptor() { + return new ClientInterceptor() { + @Override + public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, Channel next) { + return new ForwardingClientCall.SimpleForwardingClientCall( + next.newCall(method, callOptions)) { + @Override + public void start(Listener responseListener, Metadata headers) { + Map extra = HEADERS_KEY.get(); + if (extra != null) { + for (Map.Entry entry : extra.entrySet()) { + headers.put( + Metadata.Key.of(entry.getKey(), Metadata.ASCII_STRING_MARSHALLER), + entry.getValue()); + } + } + super.start(responseListener, headers); + } + }; + } + }; + } +}