Skip to content

Commit 27a28a4

Browse files
committed
feat: Support for tracing ID headers
Signed-off-by: Anush008 <mail@anush.sh>
1 parent af10911 commit 27a28a4

2 files changed

Lines changed: 94 additions & 4 deletions

File tree

src/main/java/io/qdrant/client/QdrantGrpcClient.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package io.qdrant.client;
22

33
import io.grpc.CallCredentials;
4+
import io.grpc.Channel;
5+
import io.grpc.ClientInterceptors;
46
import io.grpc.Deadline;
57
import io.grpc.ManagedChannel;
68
import io.grpc.ManagedChannelBuilder;
@@ -21,6 +23,7 @@ public class QdrantGrpcClient implements AutoCloseable {
2123
private static final Logger logger = LoggerFactory.getLogger(QdrantGrpcClient.class);
2224
@Nullable private final CallCredentials callCredentials;
2325
private final ManagedChannel channel;
26+
private final Channel interceptedChannel;
2427
private final boolean shutdownChannelOnClose;
2528
@Nullable private final Duration timeout;
2629

@@ -31,6 +34,7 @@ public class QdrantGrpcClient implements AutoCloseable {
3134
@Nullable Duration timeout) {
3235
this.callCredentials = callCredentials;
3336
this.channel = channel;
37+
this.interceptedChannel = ClientInterceptors.intercept(channel, RequestHeaders.newInterceptor());
3438
this.shutdownChannelOnClose = shutdownChannelOnClose;
3539
this.timeout = timeout;
3640
}
@@ -136,7 +140,7 @@ public ManagedChannel channel() {
136140
* @return a new instance of {@link QdrantFutureStub}
137141
*/
138142
public QdrantGrpc.QdrantFutureStub qdrant() {
139-
return QdrantGrpc.newFutureStub(channel)
143+
return QdrantGrpc.newFutureStub(interceptedChannel)
140144
.withCallCredentials(callCredentials)
141145
.withDeadline(
142146
timeout != null ? Deadline.after(timeout.toMillis(), TimeUnit.MILLISECONDS) : null);
@@ -148,7 +152,7 @@ public QdrantGrpc.QdrantFutureStub qdrant() {
148152
* @return a new instance of {@link PointsFutureStub}
149153
*/
150154
public PointsFutureStub points() {
151-
return PointsGrpc.newFutureStub(channel)
155+
return PointsGrpc.newFutureStub(interceptedChannel)
152156
.withCallCredentials(callCredentials)
153157
.withDeadline(
154158
timeout != null ? Deadline.after(timeout.toMillis(), TimeUnit.MILLISECONDS) : null);
@@ -160,7 +164,7 @@ public PointsFutureStub points() {
160164
* @return a new instance of {@link CollectionsFutureStub}
161165
*/
162166
public CollectionsFutureStub collections() {
163-
return CollectionsGrpc.newFutureStub(channel)
167+
return CollectionsGrpc.newFutureStub(interceptedChannel)
164168
.withCallCredentials(callCredentials)
165169
.withDeadline(
166170
timeout != null ? Deadline.after(timeout.toMillis(), TimeUnit.MILLISECONDS) : null);
@@ -172,7 +176,7 @@ public CollectionsFutureStub collections() {
172176
* @return a new instance of {@link SnapshotsFutureStub}
173177
*/
174178
public SnapshotsFutureStub snapshots() {
175-
return SnapshotsGrpc.newFutureStub(channel)
179+
return SnapshotsGrpc.newFutureStub(interceptedChannel)
176180
.withCallCredentials(callCredentials)
177181
.withDeadline(
178182
timeout != null ? Deadline.after(timeout.toMillis(), TimeUnit.MILLISECONDS) : null);
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package io.qdrant.client;
2+
3+
import io.grpc.CallOptions;
4+
import io.grpc.Channel;
5+
import io.grpc.ClientCall;
6+
import io.grpc.ClientInterceptor;
7+
import io.grpc.Context;
8+
import io.grpc.ForwardingClientCall;
9+
import io.grpc.Metadata;
10+
import io.grpc.MethodDescriptor;
11+
import java.util.Collections;
12+
import java.util.HashMap;
13+
import java.util.Map;
14+
15+
/**
16+
* Utilities for attaching per-request headers to gRPC calls.
17+
*
18+
* <pre>{@code
19+
* Context ctx = RequestHeaders.withHeader(Context.current(), "x-request-id", "abc-123");
20+
* ctx.run(() -> client.listCollectionsAsync());
21+
* }</pre>
22+
*/
23+
public final class RequestHeaders {
24+
25+
static final Context.Key<Map<String, String>> HEADERS_KEY =
26+
Context.key("qdrant-request-headers");
27+
28+
private RequestHeaders() {}
29+
30+
/**
31+
* Returns a new {@link Context} derived from {@code ctx} that carries {@code key}/{@code value}
32+
* as a gRPC metadata header on every request started within that context.
33+
*
34+
* @param ctx the parent context
35+
* @param key the header name
36+
* @param value the header value
37+
* @return a child context with the header attached
38+
*/
39+
public static Context withHeader(Context ctx, String key, String value) {
40+
return withHeaders(ctx, Collections.singletonMap(key, value));
41+
}
42+
43+
/**
44+
* Returns a new {@link Context} derived from {@code ctx} that carries all entries of {@code
45+
* headers} as gRPC metadata on every request started within that context.
46+
*
47+
* @param ctx the parent context
48+
* @param headers the headers to attach
49+
* @return a child context with the headers attached
50+
*/
51+
public static Context withHeaders(Context ctx, Map<String, String> headers) {
52+
if (headers == null || headers.isEmpty()) {
53+
return ctx;
54+
}
55+
Map<String, String> merged = new HashMap<>();
56+
Map<String, String> current = HEADERS_KEY.get(ctx);
57+
if (current != null) merged.putAll(current);
58+
merged.putAll(headers);
59+
return ctx.withValue(HEADERS_KEY, merged);
60+
}
61+
62+
/** Returns a {@link ClientInterceptor} that injects per-request headers from the context. */
63+
static ClientInterceptor newInterceptor() {
64+
return new ClientInterceptor() {
65+
@Override
66+
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
67+
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
68+
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
69+
next.newCall(method, callOptions)) {
70+
@Override
71+
public void start(Listener<RespT> responseListener, Metadata headers) {
72+
Map<String, String> extra = HEADERS_KEY.get();
73+
if (extra != null) {
74+
for (Map.Entry<String, String> entry : extra.entrySet()) {
75+
headers.put(
76+
Metadata.Key.of(entry.getKey(), Metadata.ASCII_STRING_MARSHALLER),
77+
entry.getValue());
78+
}
79+
}
80+
super.start(responseListener, headers);
81+
}
82+
};
83+
}
84+
};
85+
}
86+
}

0 commit comments

Comments
 (0)