Skip to content

Commit a8f5c3c

Browse files
authored
Provide optimized writers for OpenTelemetry's "metrics.proto" wire protocol (#10983)
Add metrics.proto for testing Minor refactoring Support recording OTLP messages where we know there will be remaining bytes to marshal Provide optimized writers for OpenTelemetry's "metrics.proto" wire protocol Add parameterized metrics.proto test written with assistance of claude Avoid creating an extra array that alway copies in the canned resource; instead just create a small prefix and re-use the resource chunk as-is Minor refactoring Performance: use intrinsic to reverse byte order Performance: re-use temporary chunk collections across cycles Performance: use simple loop instead of stream Performance: avoid repeated unboxing Performance: combine size-computation loops Performance: avoid temporary array allocations just for elementSizes Co-authored-by: stuart.mcculloch <stuart.mcculloch@datadoghq.com>
1 parent a1239d3 commit a8f5c3c

9 files changed

Lines changed: 2191 additions & 48 deletions

File tree

dd-java-agent/agent-otel/otel-bootstrap/src/main/java/datadog/trace/bootstrap/otel/common/export/OtlpCommonProto.java

Lines changed: 38 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -70,27 +70,15 @@ public static void writeVarInt(StreamingBuffer buf, long value) {
7070
}
7171

7272
public static void writeI32(StreamingBuffer buf, int value) {
73-
buf.putInt( // convert to little-endian
74-
(value & 0xff000000) >>> 24
75-
| (value & 0x00ff0000) >>> 8
76-
| (value & 0x0000ff00) << 8
77-
| (value & 0x000000ff) << 24);
73+
buf.putInt(Integer.reverseBytes(value)); // convert to little-endian
7874
}
7975

8076
public static void writeI32(StreamingBuffer buf, float value) {
8177
writeI32(buf, Float.floatToRawIntBits(value));
8278
}
8379

8480
public static void writeI64(StreamingBuffer buf, long value) {
85-
buf.putLong( // convert to little-endian
86-
(value & 0xff00000000000000L) >>> 56
87-
| (value & 0x00ff000000000000L) >>> 40
88-
| (value & 0x0000ff0000000000L) >>> 24
89-
| (value & 0x000000ff00000000L) >>> 8
90-
| (value & 0x00000000ff000000L) << 8
91-
| (value & 0x0000000000ff0000L) << 24
92-
| (value & 0x000000000000ff00L) << 40
93-
| (value & 0x00000000000000ffL) << 56);
81+
buf.putLong(Long.reverseBytes(value)); // convert to little-endian
9482
}
9583

9684
public static void writeI64(StreamingBuffer buf, double value) {
@@ -119,12 +107,18 @@ public static void writeTag(StreamingBuffer buf, int fieldNum, int wireType) {
119107
}
120108

121109
public static byte[] recordMessage(GrowableBuffer buf, int fieldNum) {
110+
return recordMessage(buf, fieldNum, 0);
111+
}
112+
113+
public static byte[] recordMessage(GrowableBuffer buf, int fieldNum, int remainingBytes) {
122114
try {
123115
ByteBuffer data = buf.flip();
124116
int dataSize = data.remaining();
125-
ByteBuffer message = ByteBuffer.allocate(sizeTag(fieldNum) + sizeVarInt(dataSize) + dataSize);
117+
int expectedSize = dataSize + remainingBytes;
118+
ByteBuffer message =
119+
ByteBuffer.allocate(sizeTag(fieldNum) + sizeVarInt(expectedSize) + dataSize);
126120
writeTag(message, fieldNum, LEN_WIRE_TYPE);
127-
writeVarInt(message, dataSize);
121+
writeVarInt(message, expectedSize);
128122
message.put(data);
129123
return message.array();
130124
} finally {
@@ -134,19 +128,19 @@ public static byte[] recordMessage(GrowableBuffer buf, int fieldNum) {
134128

135129
public static void writeInstrumentationScope(
136130
StreamingBuffer buf, OtelInstrumentationScope scope) {
137-
byte[] scopeNameUtf8 = scope.getName().getUtf8Bytes();
138-
int scopeSize = 1 + sizeVarInt(scopeNameUtf8.length) + scopeNameUtf8.length;
139-
byte[] scopeVersionUtf8 = null;
131+
byte[] nameUtf8 = scope.getName().getUtf8Bytes();
132+
int scopeSize = 1 + sizeVarInt(nameUtf8.length) + nameUtf8.length;
133+
byte[] versionUtf8 = null;
140134
if (scope.getVersion() != null) {
141-
scopeVersionUtf8 = scope.getVersion().getUtf8Bytes();
142-
scopeSize += 1 + sizeVarInt(scopeVersionUtf8.length) + scopeVersionUtf8.length;
135+
versionUtf8 = scope.getVersion().getUtf8Bytes();
136+
scopeSize += 1 + sizeVarInt(versionUtf8.length) + versionUtf8.length;
143137
}
144138
writeVarInt(buf, scopeSize);
145139
writeTag(buf, 1, LEN_WIRE_TYPE);
146-
writeString(buf, scopeNameUtf8);
147-
if (scopeVersionUtf8 != null) {
140+
writeString(buf, nameUtf8);
141+
if (versionUtf8 != null) {
148142
writeTag(buf, 2, LEN_WIRE_TYPE);
149-
writeString(buf, scopeVersionUtf8);
143+
writeString(buf, versionUtf8);
150144
}
151145
}
152146

@@ -167,9 +161,7 @@ public static void writeAttribute(StreamingBuffer buf, int type, String key, Obj
167161
writeDoubleAttribute(buf, keyUtf8, (double) value);
168162
break;
169163
case OtlpAttributeVisitor.STRING_ARRAY:
170-
byte[][] valueUtf8s =
171-
((List<String>) value).stream().map(OtlpCommonProto::valueUtf8).toArray(byte[][]::new);
172-
writeStringArrayAttribute(buf, keyUtf8, valueUtf8s);
164+
writeStringArrayAttribute(buf, keyUtf8, (List<String>) value);
173165
break;
174166
case OtlpAttributeVisitor.BOOLEAN_ARRAY:
175167
writeBooleanArrayAttribute(buf, keyUtf8, (List<Boolean>) value);
@@ -247,13 +239,14 @@ private static void writeDoubleAttribute(StreamingBuffer buf, byte[] keyUtf8, do
247239
}
248240

249241
private static void writeStringArrayAttribute(
250-
StreamingBuffer buf, byte[] keyUtf8, byte[][] valueUtf8s) {
251-
int[] elementSizes = new int[valueUtf8s.length];
242+
StreamingBuffer buf, byte[] keyUtf8, List<String> strings) {
243+
byte[][] valueUtf8s = new byte[strings.size()][];
252244
for (int i = 0; i < valueUtf8s.length; i++) {
253-
elementSizes[i] = 1 + sizeVarInt(valueUtf8s[i].length) + valueUtf8s[i].length;
245+
valueUtf8s[i] = valueUtf8(strings.get(i));
254246
}
255247
int arraySize = 0;
256-
for (int elementSize : elementSizes) {
248+
for (byte[] valueUtf8 : valueUtf8s) {
249+
int elementSize = 1 + sizeVarInt(valueUtf8.length) + valueUtf8.length;
257250
arraySize += 1 + sizeVarInt(elementSize) + elementSize;
258251
}
259252
int valueSize = 1 + sizeVarInt(arraySize) + arraySize;
@@ -267,12 +260,13 @@ private static void writeStringArrayAttribute(
267260
writeVarInt(buf, valueSize);
268261
writeTag(buf, 5, LEN_WIRE_TYPE);
269262
writeVarInt(buf, arraySize);
270-
for (int i = 0; i < elementSizes.length; i++) {
263+
for (byte[] valueUtf8 : valueUtf8s) {
264+
int elementSize = 1 + sizeVarInt(valueUtf8.length) + valueUtf8.length;
271265
writeTag(buf, 1, LEN_WIRE_TYPE);
272-
writeVarInt(buf, elementSizes[i]);
266+
writeVarInt(buf, elementSize);
273267
writeTag(buf, 1, LEN_WIRE_TYPE);
274-
writeVarInt(buf, valueUtf8s[i].length);
275-
buf.put(valueUtf8s[i]);
268+
writeVarInt(buf, valueUtf8.length);
269+
buf.put(valueUtf8);
276270
}
277271
}
278272

@@ -300,12 +294,13 @@ private static void writeBooleanArrayAttribute(
300294

301295
private static void writeLongArrayAttribute(
302296
StreamingBuffer buf, byte[] keyUtf8, List<Long> values) {
303-
int[] elementSizes = new int[values.size()];
304-
for (int i = 0; i < values.size(); i++) {
305-
elementSizes[i] = 1 + sizeVarInt(values.get(i));
297+
long[] longValues = new long[values.size()];
298+
for (int i = 0; i < longValues.length; i++) {
299+
longValues[i] = values.get(i); // avoid repeated unboxing later
306300
}
307301
int arraySize = 0;
308-
for (int elementSize : elementSizes) {
302+
for (long longValue : longValues) {
303+
int elementSize = 1 + sizeVarInt(longValue);
309304
arraySize += 1 + sizeVarInt(elementSize) + elementSize;
310305
}
311306
int valueSize = 1 + sizeVarInt(arraySize) + arraySize;
@@ -319,11 +314,12 @@ private static void writeLongArrayAttribute(
319314
writeVarInt(buf, valueSize);
320315
writeTag(buf, 5, LEN_WIRE_TYPE);
321316
writeVarInt(buf, arraySize);
322-
for (int i = 0; i < elementSizes.length; i++) {
317+
for (long longValue : longValues) {
318+
int elementSize = 1 + sizeVarInt(longValue);
323319
writeTag(buf, 1, LEN_WIRE_TYPE);
324-
writeVarInt(buf, elementSizes[i]);
320+
writeVarInt(buf, elementSize);
325321
writeTag(buf, 3, VARINT_WIRE_TYPE);
326-
writeVarInt(buf, values.get(i));
322+
writeVarInt(buf, longValue);
327323
}
328324
}
329325

dd-java-agent/agent-otel/otel-bootstrap/src/main/java/datadog/trace/bootstrap/otel/common/export/OtlpResourceProto.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
public final class OtlpResourceProto {
1919
private OtlpResourceProto() {}
2020

21-
private static final byte[] RESOURCE_MESSAGE = buildResourceMessage(Config.get());
21+
public static final byte[] RESOURCE_MESSAGE = buildResourceMessage(Config.get());
2222

2323
private static final Set<String> IGNORED_GLOBAL_TAGS =
2424
new HashSet<>(
@@ -30,11 +30,6 @@ private OtlpResourceProto() {}
3030
"deployment.environment.name",
3131
"service.version"));
3232

33-
/** Writes the resource message in protobuf format to the given buffer. */
34-
public static void writeResourceMessage(StreamingBuffer buf) {
35-
buf.put(RESOURCE_MESSAGE);
36-
}
37-
3833
static byte[] buildResourceMessage(Config config) {
3934
GrowableBuffer buf = new GrowableBuffer(512);
4035

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package datadog.trace.bootstrap.otel.metrics.export;
2+
3+
import java.util.ArrayDeque;
4+
import java.util.Deque;
5+
import java.util.function.Consumer;
6+
7+
/** OTLP metrics payload consisting of a sequence of chunked byte-arrays. */
8+
public final class OtlpMetricsPayload {
9+
static final OtlpMetricsPayload EMPTY = new OtlpMetricsPayload(new ArrayDeque<>(), 0);
10+
11+
private final Deque<byte[]> chunks;
12+
private final int length;
13+
14+
OtlpMetricsPayload(Deque<byte[]> chunks, int length) {
15+
this.chunks = chunks;
16+
this.length = length;
17+
}
18+
19+
/** Drains the chunked payload to the given consumer. */
20+
public void drain(Consumer<byte[]> consumer) {
21+
byte[] chunk;
22+
while ((chunk = chunks.pollFirst()) != null) {
23+
consumer.accept(chunk);
24+
}
25+
}
26+
27+
/** Returns the total length of the chunked payload. */
28+
public int getLength() {
29+
return length;
30+
}
31+
}
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
package datadog.trace.bootstrap.otel.metrics.export;
2+
3+
import static datadog.trace.api.config.OtlpConfig.Temporality.CUMULATIVE;
4+
import static datadog.trace.bootstrap.otel.common.export.OtlpCommonProto.I64_WIRE_TYPE;
5+
import static datadog.trace.bootstrap.otel.common.export.OtlpCommonProto.LEN_WIRE_TYPE;
6+
import static datadog.trace.bootstrap.otel.common.export.OtlpCommonProto.VARINT_WIRE_TYPE;
7+
import static datadog.trace.bootstrap.otel.common.export.OtlpCommonProto.recordMessage;
8+
import static datadog.trace.bootstrap.otel.common.export.OtlpCommonProto.writeI64;
9+
import static datadog.trace.bootstrap.otel.common.export.OtlpCommonProto.writeInstrumentationScope;
10+
import static datadog.trace.bootstrap.otel.common.export.OtlpCommonProto.writeString;
11+
import static datadog.trace.bootstrap.otel.common.export.OtlpCommonProto.writeTag;
12+
import static datadog.trace.bootstrap.otel.common.export.OtlpCommonProto.writeVarInt;
13+
14+
import datadog.communication.serialization.GrowableBuffer;
15+
import datadog.trace.api.Config;
16+
import datadog.trace.bootstrap.otel.common.OtelInstrumentationScope;
17+
import datadog.trace.bootstrap.otel.metrics.OtelInstrumentDescriptor;
18+
import datadog.trace.bootstrap.otel.metrics.data.OtlpDataPoint;
19+
import datadog.trace.bootstrap.otel.metrics.data.OtlpDoublePoint;
20+
import datadog.trace.bootstrap.otel.metrics.data.OtlpHistogramPoint;
21+
import datadog.trace.bootstrap.otel.metrics.data.OtlpLongPoint;
22+
23+
/** Provides optimized writers for OpenTelemetry's "metrics.proto" wire protocol. */
24+
public final class OtlpMetricsProto {
25+
private OtlpMetricsProto() {}
26+
27+
private static final int AGGREGATION_TEMPORALITY_DELTA = 1;
28+
private static final int AGGREGATION_TEMPORALITY_CUMULATIVE = 2;
29+
30+
private static final int AGGREGATION_TEMPORALITY =
31+
CUMULATIVE.equals(Config.get().getOtlpMetricsTemporalityPreference())
32+
? AGGREGATION_TEMPORALITY_CUMULATIVE
33+
: AGGREGATION_TEMPORALITY_DELTA;
34+
35+
/**
36+
* Records the first part of a scoped metrics message where we know its nested metric messages
37+
* will follow in one or more byte-arrays that add up to the given number of remaining bytes.
38+
*/
39+
public static byte[] recordScopedMetricsMessage(
40+
GrowableBuffer buf, OtelInstrumentationScope scope, int remainingBytes) {
41+
42+
writeTag(buf, 1, LEN_WIRE_TYPE);
43+
writeInstrumentationScope(buf, scope);
44+
if (scope.getSchemaUrl() != null) {
45+
writeTag(buf, 3, LEN_WIRE_TYPE);
46+
writeString(buf, scope.getSchemaUrl().getUtf8Bytes());
47+
}
48+
49+
return recordMessage(buf, 2, remainingBytes);
50+
}
51+
52+
/**
53+
* Records the first part of a metric message where we know that its nested data point messages
54+
* will follow in one or more byte-arrays that add up to the given number of remaining bytes.
55+
*/
56+
public static byte[] recordMetricMessage(
57+
GrowableBuffer buf, OtelInstrumentDescriptor descriptor, int remainingBytes) {
58+
59+
writeTag(buf, 1, LEN_WIRE_TYPE);
60+
writeString(buf, descriptor.getName().getUtf8Bytes());
61+
if (descriptor.getDescription() != null) {
62+
writeTag(buf, 2, LEN_WIRE_TYPE);
63+
writeString(buf, descriptor.getDescription().getUtf8Bytes());
64+
}
65+
if (descriptor.getUnit() != null) {
66+
writeTag(buf, 3, LEN_WIRE_TYPE);
67+
writeString(buf, descriptor.getUnit().getUtf8Bytes());
68+
}
69+
70+
switch (descriptor.getType()) {
71+
case GAUGE:
72+
case OBSERVABLE_GAUGE:
73+
writeTag(buf, 5, LEN_WIRE_TYPE);
74+
writeVarInt(buf, remainingBytes);
75+
// gauges have no aggregation temporality
76+
break;
77+
case COUNTER:
78+
case OBSERVABLE_COUNTER:
79+
writeTag(buf, 7, LEN_WIRE_TYPE);
80+
writeVarInt(buf, remainingBytes + 4);
81+
writeTag(buf, 2, VARINT_WIRE_TYPE);
82+
writeVarInt(buf, AGGREGATION_TEMPORALITY);
83+
writeTag(buf, 3, VARINT_WIRE_TYPE);
84+
writeVarInt(buf, 1); // monotonic
85+
break;
86+
case UP_DOWN_COUNTER:
87+
case OBSERVABLE_UP_DOWN_COUNTER:
88+
writeTag(buf, 7, LEN_WIRE_TYPE);
89+
writeVarInt(buf, remainingBytes + 2);
90+
writeTag(buf, 2, VARINT_WIRE_TYPE);
91+
writeVarInt(buf, AGGREGATION_TEMPORALITY);
92+
break;
93+
case HISTOGRAM:
94+
writeTag(buf, 9, LEN_WIRE_TYPE);
95+
writeVarInt(buf, remainingBytes + 2);
96+
writeTag(buf, 2, VARINT_WIRE_TYPE);
97+
writeVarInt(buf, AGGREGATION_TEMPORALITY);
98+
break;
99+
default:
100+
throw new IllegalArgumentException("Unknown instrument type: " + descriptor.getType());
101+
}
102+
103+
return recordMessage(buf, 2, remainingBytes);
104+
}
105+
106+
/** Completes recording of a data point message and packs it into its own byte-array. */
107+
public static byte[] recordDataPointMessage(GrowableBuffer buf, OtlpDataPoint point) {
108+
if (point instanceof OtlpDoublePoint) {
109+
writeTag(buf, 4, I64_WIRE_TYPE);
110+
writeI64(buf, ((OtlpDoublePoint) point).value);
111+
} else if (point instanceof OtlpLongPoint) {
112+
writeTag(buf, 6, I64_WIRE_TYPE);
113+
writeI64(buf, ((OtlpLongPoint) point).value);
114+
} else { // must be a histogram point
115+
OtlpHistogramPoint histogram = (OtlpHistogramPoint) point;
116+
writeTag(buf, 4, I64_WIRE_TYPE);
117+
writeI64(buf, (long) histogram.count);
118+
writeTag(buf, 5, I64_WIRE_TYPE);
119+
writeI64(buf, histogram.sum);
120+
for (double bucketCount : histogram.bucketCounts) {
121+
writeTag(buf, 6, I64_WIRE_TYPE);
122+
writeI64(buf, (long) bucketCount);
123+
}
124+
for (double bucketBoundary : histogram.bucketBoundaries) {
125+
writeTag(buf, 7, I64_WIRE_TYPE);
126+
writeI64(buf, bucketBoundary);
127+
}
128+
}
129+
130+
return recordMessage(buf, 1);
131+
}
132+
}

0 commit comments

Comments
 (0)