Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,27 +70,15 @@ public static void writeVarInt(StreamingBuffer buf, long value) {
}

public static void writeI32(StreamingBuffer buf, int value) {
buf.putInt( // convert to little-endian
(value & 0xff000000) >>> 24
| (value & 0x00ff0000) >>> 8
| (value & 0x0000ff00) << 8
| (value & 0x000000ff) << 24);
buf.putInt(Integer.reverseBytes(value)); // convert to little-endian
}

public static void writeI32(StreamingBuffer buf, float value) {
writeI32(buf, Float.floatToRawIntBits(value));
}

public static void writeI64(StreamingBuffer buf, long value) {
buf.putLong( // convert to little-endian
(value & 0xff00000000000000L) >>> 56
| (value & 0x00ff000000000000L) >>> 40
| (value & 0x0000ff0000000000L) >>> 24
| (value & 0x000000ff00000000L) >>> 8
| (value & 0x00000000ff000000L) << 8
| (value & 0x0000000000ff0000L) << 24
| (value & 0x000000000000ff00L) << 40
| (value & 0x00000000000000ffL) << 56);
buf.putLong(Long.reverseBytes(value)); // convert to little-endian
}

public static void writeI64(StreamingBuffer buf, double value) {
Expand Down Expand Up @@ -119,12 +107,18 @@ public static void writeTag(StreamingBuffer buf, int fieldNum, int wireType) {
}

public static byte[] recordMessage(GrowableBuffer buf, int fieldNum) {
return recordMessage(buf, fieldNum, 0);
}

public static byte[] recordMessage(GrowableBuffer buf, int fieldNum, int remainingBytes) {
try {
ByteBuffer data = buf.flip();
int dataSize = data.remaining();
ByteBuffer message = ByteBuffer.allocate(sizeTag(fieldNum) + sizeVarInt(dataSize) + dataSize);
int expectedSize = dataSize + remainingBytes;
ByteBuffer message =
ByteBuffer.allocate(sizeTag(fieldNum) + sizeVarInt(expectedSize) + dataSize);
writeTag(message, fieldNum, LEN_WIRE_TYPE);
writeVarInt(message, dataSize);
writeVarInt(message, expectedSize);
message.put(data);
return message.array();
} finally {
Expand All @@ -134,19 +128,19 @@ public static byte[] recordMessage(GrowableBuffer buf, int fieldNum) {

public static void writeInstrumentationScope(
StreamingBuffer buf, OtelInstrumentationScope scope) {
byte[] scopeNameUtf8 = scope.getName().getUtf8Bytes();
int scopeSize = 1 + sizeVarInt(scopeNameUtf8.length) + scopeNameUtf8.length;
byte[] scopeVersionUtf8 = null;
byte[] nameUtf8 = scope.getName().getUtf8Bytes();
int scopeSize = 1 + sizeVarInt(nameUtf8.length) + nameUtf8.length;
byte[] versionUtf8 = null;
if (scope.getVersion() != null) {
scopeVersionUtf8 = scope.getVersion().getUtf8Bytes();
scopeSize += 1 + sizeVarInt(scopeVersionUtf8.length) + scopeVersionUtf8.length;
versionUtf8 = scope.getVersion().getUtf8Bytes();
scopeSize += 1 + sizeVarInt(versionUtf8.length) + versionUtf8.length;
}
writeVarInt(buf, scopeSize);
writeTag(buf, 1, LEN_WIRE_TYPE);
writeString(buf, scopeNameUtf8);
if (scopeVersionUtf8 != null) {
writeString(buf, nameUtf8);
if (versionUtf8 != null) {
writeTag(buf, 2, LEN_WIRE_TYPE);
writeString(buf, scopeVersionUtf8);
writeString(buf, versionUtf8);
}
}

Expand All @@ -167,9 +161,7 @@ public static void writeAttribute(StreamingBuffer buf, int type, String key, Obj
writeDoubleAttribute(buf, keyUtf8, (double) value);
break;
case OtlpAttributeVisitor.STRING_ARRAY:
byte[][] valueUtf8s =
((List<String>) value).stream().map(OtlpCommonProto::valueUtf8).toArray(byte[][]::new);
writeStringArrayAttribute(buf, keyUtf8, valueUtf8s);
writeStringArrayAttribute(buf, keyUtf8, (List<String>) value);
break;
case OtlpAttributeVisitor.BOOLEAN_ARRAY:
writeBooleanArrayAttribute(buf, keyUtf8, (List<Boolean>) value);
Expand Down Expand Up @@ -247,13 +239,14 @@ private static void writeDoubleAttribute(StreamingBuffer buf, byte[] keyUtf8, do
}

private static void writeStringArrayAttribute(
StreamingBuffer buf, byte[] keyUtf8, byte[][] valueUtf8s) {
int[] elementSizes = new int[valueUtf8s.length];
StreamingBuffer buf, byte[] keyUtf8, List<String> strings) {
byte[][] valueUtf8s = new byte[strings.size()][];
for (int i = 0; i < valueUtf8s.length; i++) {
elementSizes[i] = 1 + sizeVarInt(valueUtf8s[i].length) + valueUtf8s[i].length;
valueUtf8s[i] = valueUtf8(strings.get(i));
}
int arraySize = 0;
for (int elementSize : elementSizes) {
for (byte[] valueUtf8 : valueUtf8s) {
int elementSize = 1 + sizeVarInt(valueUtf8.length) + valueUtf8.length;
arraySize += 1 + sizeVarInt(elementSize) + elementSize;
}
int valueSize = 1 + sizeVarInt(arraySize) + arraySize;
Expand All @@ -267,12 +260,13 @@ private static void writeStringArrayAttribute(
writeVarInt(buf, valueSize);
writeTag(buf, 5, LEN_WIRE_TYPE);
writeVarInt(buf, arraySize);
for (int i = 0; i < elementSizes.length; i++) {
for (byte[] valueUtf8 : valueUtf8s) {
int elementSize = 1 + sizeVarInt(valueUtf8.length) + valueUtf8.length;
writeTag(buf, 1, LEN_WIRE_TYPE);
writeVarInt(buf, elementSizes[i]);
writeVarInt(buf, elementSize);
writeTag(buf, 1, LEN_WIRE_TYPE);
writeVarInt(buf, valueUtf8s[i].length);
buf.put(valueUtf8s[i]);
writeVarInt(buf, valueUtf8.length);
buf.put(valueUtf8);
}
}

Expand Down Expand Up @@ -300,12 +294,13 @@ private static void writeBooleanArrayAttribute(

private static void writeLongArrayAttribute(
StreamingBuffer buf, byte[] keyUtf8, List<Long> values) {
int[] elementSizes = new int[values.size()];
for (int i = 0; i < values.size(); i++) {
elementSizes[i] = 1 + sizeVarInt(values.get(i));
long[] longValues = new long[values.size()];
for (int i = 0; i < longValues.length; i++) {
longValues[i] = values.get(i); // avoid repeated unboxing later
}
int arraySize = 0;
for (int elementSize : elementSizes) {
for (long longValue : longValues) {
int elementSize = 1 + sizeVarInt(longValue);
arraySize += 1 + sizeVarInt(elementSize) + elementSize;
}
int valueSize = 1 + sizeVarInt(arraySize) + arraySize;
Expand All @@ -319,11 +314,12 @@ private static void writeLongArrayAttribute(
writeVarInt(buf, valueSize);
writeTag(buf, 5, LEN_WIRE_TYPE);
writeVarInt(buf, arraySize);
for (int i = 0; i < elementSizes.length; i++) {
for (long longValue : longValues) {
int elementSize = 1 + sizeVarInt(longValue);
writeTag(buf, 1, LEN_WIRE_TYPE);
writeVarInt(buf, elementSizes[i]);
writeVarInt(buf, elementSize);
writeTag(buf, 3, VARINT_WIRE_TYPE);
writeVarInt(buf, values.get(i));
writeVarInt(buf, longValue);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
public final class OtlpResourceProto {
private OtlpResourceProto() {}

private static final byte[] RESOURCE_MESSAGE = buildResourceMessage(Config.get());
public static final byte[] RESOURCE_MESSAGE = buildResourceMessage(Config.get());

private static final Set<String> IGNORED_GLOBAL_TAGS =
new HashSet<>(
Expand All @@ -30,11 +30,6 @@ private OtlpResourceProto() {}
"deployment.environment.name",
"service.version"));

/** Writes the resource message in protobuf format to the given buffer. */
public static void writeResourceMessage(StreamingBuffer buf) {
buf.put(RESOURCE_MESSAGE);
}

static byte[] buildResourceMessage(Config config) {
GrowableBuffer buf = new GrowableBuffer(512);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package datadog.trace.bootstrap.otel.metrics.export;

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.function.Consumer;

/** OTLP metrics payload consisting of a sequence of chunked byte-arrays. */
public final class OtlpMetricsPayload {
static final OtlpMetricsPayload EMPTY = new OtlpMetricsPayload(new ArrayDeque<>(), 0);

private final Deque<byte[]> chunks;
private final int length;

OtlpMetricsPayload(Deque<byte[]> chunks, int length) {
this.chunks = chunks;
this.length = length;
}

/** Drains the chunked payload to the given consumer. */
public void drain(Consumer<byte[]> consumer) {
byte[] chunk;
while ((chunk = chunks.pollFirst()) != null) {
consumer.accept(chunk);
}
}

/** Returns the total length of the chunked payload. */
public int getLength() {
return length;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package datadog.trace.bootstrap.otel.metrics.export;

import static datadog.trace.api.config.OtlpConfig.Temporality.CUMULATIVE;
import static datadog.trace.bootstrap.otel.common.export.OtlpCommonProto.I64_WIRE_TYPE;
import static datadog.trace.bootstrap.otel.common.export.OtlpCommonProto.LEN_WIRE_TYPE;
import static datadog.trace.bootstrap.otel.common.export.OtlpCommonProto.VARINT_WIRE_TYPE;
import static datadog.trace.bootstrap.otel.common.export.OtlpCommonProto.recordMessage;
import static datadog.trace.bootstrap.otel.common.export.OtlpCommonProto.writeI64;
import static datadog.trace.bootstrap.otel.common.export.OtlpCommonProto.writeInstrumentationScope;
import static datadog.trace.bootstrap.otel.common.export.OtlpCommonProto.writeString;
import static datadog.trace.bootstrap.otel.common.export.OtlpCommonProto.writeTag;
import static datadog.trace.bootstrap.otel.common.export.OtlpCommonProto.writeVarInt;

import datadog.communication.serialization.GrowableBuffer;
import datadog.trace.api.Config;
import datadog.trace.bootstrap.otel.common.OtelInstrumentationScope;
import datadog.trace.bootstrap.otel.metrics.OtelInstrumentDescriptor;
import datadog.trace.bootstrap.otel.metrics.data.OtlpDataPoint;
import datadog.trace.bootstrap.otel.metrics.data.OtlpDoublePoint;
import datadog.trace.bootstrap.otel.metrics.data.OtlpHistogramPoint;
import datadog.trace.bootstrap.otel.metrics.data.OtlpLongPoint;

/** Provides optimized writers for OpenTelemetry's "metrics.proto" wire protocol. */
public final class OtlpMetricsProto {
private OtlpMetricsProto() {}

private static final int AGGREGATION_TEMPORALITY_DELTA = 1;
private static final int AGGREGATION_TEMPORALITY_CUMULATIVE = 2;

private static final int AGGREGATION_TEMPORALITY =
CUMULATIVE.equals(Config.get().getOtlpMetricsTemporalityPreference())
? AGGREGATION_TEMPORALITY_CUMULATIVE
: AGGREGATION_TEMPORALITY_DELTA;

/**
* Records the first part of a scoped metrics message where we know its nested metric messages
* will follow in one or more byte-arrays that add up to the given number of remaining bytes.
*/
public static byte[] recordScopedMetricsMessage(
GrowableBuffer buf, OtelInstrumentationScope scope, int remainingBytes) {

writeTag(buf, 1, LEN_WIRE_TYPE);
writeInstrumentationScope(buf, scope);
if (scope.getSchemaUrl() != null) {
writeTag(buf, 3, LEN_WIRE_TYPE);
writeString(buf, scope.getSchemaUrl().getUtf8Bytes());
}

return recordMessage(buf, 2, remainingBytes);
}

/**
* Records the first part of a metric message where we know that its nested data point messages
* will follow in one or more byte-arrays that add up to the given number of remaining bytes.
*/
public static byte[] recordMetricMessage(
GrowableBuffer buf, OtelInstrumentDescriptor descriptor, int remainingBytes) {

writeTag(buf, 1, LEN_WIRE_TYPE);
writeString(buf, descriptor.getName().getUtf8Bytes());
if (descriptor.getDescription() != null) {
writeTag(buf, 2, LEN_WIRE_TYPE);
writeString(buf, descriptor.getDescription().getUtf8Bytes());
}
if (descriptor.getUnit() != null) {
writeTag(buf, 3, LEN_WIRE_TYPE);
writeString(buf, descriptor.getUnit().getUtf8Bytes());
}

switch (descriptor.getType()) {
case GAUGE:
case OBSERVABLE_GAUGE:
writeTag(buf, 5, LEN_WIRE_TYPE);
writeVarInt(buf, remainingBytes);
// gauges have no aggregation temporality
break;
case COUNTER:
case OBSERVABLE_COUNTER:
writeTag(buf, 7, LEN_WIRE_TYPE);
writeVarInt(buf, remainingBytes + 4);
writeTag(buf, 2, VARINT_WIRE_TYPE);
writeVarInt(buf, AGGREGATION_TEMPORALITY);
writeTag(buf, 3, VARINT_WIRE_TYPE);
writeVarInt(buf, 1); // monotonic
break;
case UP_DOWN_COUNTER:
case OBSERVABLE_UP_DOWN_COUNTER:
writeTag(buf, 7, LEN_WIRE_TYPE);
writeVarInt(buf, remainingBytes + 2);
writeTag(buf, 2, VARINT_WIRE_TYPE);
writeVarInt(buf, AGGREGATION_TEMPORALITY);
break;
case HISTOGRAM:
writeTag(buf, 9, LEN_WIRE_TYPE);
writeVarInt(buf, remainingBytes + 2);
writeTag(buf, 2, VARINT_WIRE_TYPE);
writeVarInt(buf, AGGREGATION_TEMPORALITY);
break;
default:
throw new IllegalArgumentException("Unknown instrument type: " + descriptor.getType());
}

return recordMessage(buf, 2, remainingBytes);
}

/** Completes recording of a data point message and packs it into its own byte-array. */
public static byte[] recordDataPointMessage(GrowableBuffer buf, OtlpDataPoint point) {
if (point instanceof OtlpDoublePoint) {
writeTag(buf, 4, I64_WIRE_TYPE);
writeI64(buf, ((OtlpDoublePoint) point).value);
} else if (point instanceof OtlpLongPoint) {
writeTag(buf, 6, I64_WIRE_TYPE);
writeI64(buf, ((OtlpLongPoint) point).value);
} else { // must be a histogram point
OtlpHistogramPoint histogram = (OtlpHistogramPoint) point;
writeTag(buf, 4, I64_WIRE_TYPE);
writeI64(buf, (long) histogram.count);
writeTag(buf, 5, I64_WIRE_TYPE);
writeI64(buf, histogram.sum);
for (double bucketCount : histogram.bucketCounts) {
writeTag(buf, 6, I64_WIRE_TYPE);
writeI64(buf, (long) bucketCount);
}
for (double bucketBoundary : histogram.bucketBoundaries) {
writeTag(buf, 7, I64_WIRE_TYPE);
writeI64(buf, bucketBoundary);
}
}

return recordMessage(buf, 1);
}
}
Loading
Loading