diff --git a/dd-java-agent/agent-otel/otel-bootstrap/src/main/java/datadog/trace/bootstrap/otel/common/export/OtlpCommonProto.java b/dd-java-agent/agent-otel/otel-bootstrap/src/main/java/datadog/trace/bootstrap/otel/common/export/OtlpCommonProto.java index f8d03a5c8f3..90adb865413 100644 --- a/dd-java-agent/agent-otel/otel-bootstrap/src/main/java/datadog/trace/bootstrap/otel/common/export/OtlpCommonProto.java +++ b/dd-java-agent/agent-otel/otel-bootstrap/src/main/java/datadog/trace/bootstrap/otel/common/export/OtlpCommonProto.java @@ -70,11 +70,7 @@ public static void writeVarInt(StreamingBuffer buf, long value) { } public static void writeI32(StreamingBuffer buf, int value) { - buf.putInt( // convert to little-endian - (value & 0xff000000) >>> 24 - | (value & 0x00ff0000) >>> 8 - | (value & 0x0000ff00) << 8 - | (value & 0x000000ff) << 24); + buf.putInt(Integer.reverseBytes(value)); // convert to little-endian } public static void writeI32(StreamingBuffer buf, float value) { @@ -82,15 +78,7 @@ public static void writeI32(StreamingBuffer buf, float value) { } public static void writeI64(StreamingBuffer buf, long value) { - buf.putLong( // convert to little-endian - (value & 0xff00000000000000L) >>> 56 - | (value & 0x00ff000000000000L) >>> 40 - | (value & 0x0000ff0000000000L) >>> 24 - | (value & 0x000000ff00000000L) >>> 8 - | (value & 0x00000000ff000000L) << 8 - | (value & 0x0000000000ff0000L) << 24 - | (value & 0x000000000000ff00L) << 40 - | (value & 0x00000000000000ffL) << 56); + buf.putLong(Long.reverseBytes(value)); // convert to little-endian } public static void writeI64(StreamingBuffer buf, double value) { @@ -119,12 +107,18 @@ public static void writeTag(StreamingBuffer buf, int fieldNum, int wireType) { } public static byte[] recordMessage(GrowableBuffer buf, int fieldNum) { + return recordMessage(buf, fieldNum, 0); + } + + public static byte[] recordMessage(GrowableBuffer buf, int fieldNum, int remainingBytes) { try { ByteBuffer data = buf.flip(); int dataSize = data.remaining(); - ByteBuffer message = ByteBuffer.allocate(sizeTag(fieldNum) + sizeVarInt(dataSize) + dataSize); + int expectedSize = dataSize + remainingBytes; + ByteBuffer message = + ByteBuffer.allocate(sizeTag(fieldNum) + sizeVarInt(expectedSize) + dataSize); writeTag(message, fieldNum, LEN_WIRE_TYPE); - writeVarInt(message, dataSize); + writeVarInt(message, expectedSize); message.put(data); return message.array(); } finally { @@ -134,19 +128,19 @@ public static byte[] recordMessage(GrowableBuffer buf, int fieldNum) { public static void writeInstrumentationScope( StreamingBuffer buf, OtelInstrumentationScope scope) { - byte[] scopeNameUtf8 = scope.getName().getUtf8Bytes(); - int scopeSize = 1 + sizeVarInt(scopeNameUtf8.length) + scopeNameUtf8.length; - byte[] scopeVersionUtf8 = null; + byte[] nameUtf8 = scope.getName().getUtf8Bytes(); + int scopeSize = 1 + sizeVarInt(nameUtf8.length) + nameUtf8.length; + byte[] versionUtf8 = null; if (scope.getVersion() != null) { - scopeVersionUtf8 = scope.getVersion().getUtf8Bytes(); - scopeSize += 1 + sizeVarInt(scopeVersionUtf8.length) + scopeVersionUtf8.length; + versionUtf8 = scope.getVersion().getUtf8Bytes(); + scopeSize += 1 + sizeVarInt(versionUtf8.length) + versionUtf8.length; } writeVarInt(buf, scopeSize); writeTag(buf, 1, LEN_WIRE_TYPE); - writeString(buf, scopeNameUtf8); - if (scopeVersionUtf8 != null) { + writeString(buf, nameUtf8); + if (versionUtf8 != null) { writeTag(buf, 2, LEN_WIRE_TYPE); - writeString(buf, scopeVersionUtf8); + writeString(buf, versionUtf8); } } @@ -167,9 +161,7 @@ public static void writeAttribute(StreamingBuffer buf, int type, String key, Obj writeDoubleAttribute(buf, keyUtf8, (double) value); break; case OtlpAttributeVisitor.STRING_ARRAY: - byte[][] valueUtf8s = - ((List) value).stream().map(OtlpCommonProto::valueUtf8).toArray(byte[][]::new); - writeStringArrayAttribute(buf, keyUtf8, valueUtf8s); + writeStringArrayAttribute(buf, keyUtf8, (List) value); break; case OtlpAttributeVisitor.BOOLEAN_ARRAY: writeBooleanArrayAttribute(buf, keyUtf8, (List) value); @@ -247,13 +239,14 @@ private static void writeDoubleAttribute(StreamingBuffer buf, byte[] keyUtf8, do } private static void writeStringArrayAttribute( - StreamingBuffer buf, byte[] keyUtf8, byte[][] valueUtf8s) { - int[] elementSizes = new int[valueUtf8s.length]; + StreamingBuffer buf, byte[] keyUtf8, List strings) { + byte[][] valueUtf8s = new byte[strings.size()][]; for (int i = 0; i < valueUtf8s.length; i++) { - elementSizes[i] = 1 + sizeVarInt(valueUtf8s[i].length) + valueUtf8s[i].length; + valueUtf8s[i] = valueUtf8(strings.get(i)); } int arraySize = 0; - for (int elementSize : elementSizes) { + for (byte[] valueUtf8 : valueUtf8s) { + int elementSize = 1 + sizeVarInt(valueUtf8.length) + valueUtf8.length; arraySize += 1 + sizeVarInt(elementSize) + elementSize; } int valueSize = 1 + sizeVarInt(arraySize) + arraySize; @@ -267,12 +260,13 @@ private static void writeStringArrayAttribute( writeVarInt(buf, valueSize); writeTag(buf, 5, LEN_WIRE_TYPE); writeVarInt(buf, arraySize); - for (int i = 0; i < elementSizes.length; i++) { + for (byte[] valueUtf8 : valueUtf8s) { + int elementSize = 1 + sizeVarInt(valueUtf8.length) + valueUtf8.length; writeTag(buf, 1, LEN_WIRE_TYPE); - writeVarInt(buf, elementSizes[i]); + writeVarInt(buf, elementSize); writeTag(buf, 1, LEN_WIRE_TYPE); - writeVarInt(buf, valueUtf8s[i].length); - buf.put(valueUtf8s[i]); + writeVarInt(buf, valueUtf8.length); + buf.put(valueUtf8); } } @@ -300,12 +294,13 @@ private static void writeBooleanArrayAttribute( private static void writeLongArrayAttribute( StreamingBuffer buf, byte[] keyUtf8, List values) { - int[] elementSizes = new int[values.size()]; - for (int i = 0; i < values.size(); i++) { - elementSizes[i] = 1 + sizeVarInt(values.get(i)); + long[] longValues = new long[values.size()]; + for (int i = 0; i < longValues.length; i++) { + longValues[i] = values.get(i); // avoid repeated unboxing later } int arraySize = 0; - for (int elementSize : elementSizes) { + for (long longValue : longValues) { + int elementSize = 1 + sizeVarInt(longValue); arraySize += 1 + sizeVarInt(elementSize) + elementSize; } int valueSize = 1 + sizeVarInt(arraySize) + arraySize; @@ -319,11 +314,12 @@ private static void writeLongArrayAttribute( writeVarInt(buf, valueSize); writeTag(buf, 5, LEN_WIRE_TYPE); writeVarInt(buf, arraySize); - for (int i = 0; i < elementSizes.length; i++) { + for (long longValue : longValues) { + int elementSize = 1 + sizeVarInt(longValue); writeTag(buf, 1, LEN_WIRE_TYPE); - writeVarInt(buf, elementSizes[i]); + writeVarInt(buf, elementSize); writeTag(buf, 3, VARINT_WIRE_TYPE); - writeVarInt(buf, values.get(i)); + writeVarInt(buf, longValue); } } diff --git a/dd-java-agent/agent-otel/otel-bootstrap/src/main/java/datadog/trace/bootstrap/otel/common/export/OtlpResourceProto.java b/dd-java-agent/agent-otel/otel-bootstrap/src/main/java/datadog/trace/bootstrap/otel/common/export/OtlpResourceProto.java index b1d68c8b1f1..18e5b3e9feb 100644 --- a/dd-java-agent/agent-otel/otel-bootstrap/src/main/java/datadog/trace/bootstrap/otel/common/export/OtlpResourceProto.java +++ b/dd-java-agent/agent-otel/otel-bootstrap/src/main/java/datadog/trace/bootstrap/otel/common/export/OtlpResourceProto.java @@ -18,7 +18,7 @@ public final class OtlpResourceProto { private OtlpResourceProto() {} - private static final byte[] RESOURCE_MESSAGE = buildResourceMessage(Config.get()); + public static final byte[] RESOURCE_MESSAGE = buildResourceMessage(Config.get()); private static final Set IGNORED_GLOBAL_TAGS = new HashSet<>( @@ -30,11 +30,6 @@ private OtlpResourceProto() {} "deployment.environment.name", "service.version")); - /** Writes the resource message in protobuf format to the given buffer. */ - public static void writeResourceMessage(StreamingBuffer buf) { - buf.put(RESOURCE_MESSAGE); - } - static byte[] buildResourceMessage(Config config) { GrowableBuffer buf = new GrowableBuffer(512); diff --git a/dd-java-agent/agent-otel/otel-bootstrap/src/main/java/datadog/trace/bootstrap/otel/metrics/export/OtlpMetricsPayload.java b/dd-java-agent/agent-otel/otel-bootstrap/src/main/java/datadog/trace/bootstrap/otel/metrics/export/OtlpMetricsPayload.java new file mode 100644 index 00000000000..8d9a4395fa0 --- /dev/null +++ b/dd-java-agent/agent-otel/otel-bootstrap/src/main/java/datadog/trace/bootstrap/otel/metrics/export/OtlpMetricsPayload.java @@ -0,0 +1,31 @@ +package datadog.trace.bootstrap.otel.metrics.export; + +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.function.Consumer; + +/** OTLP metrics payload consisting of a sequence of chunked byte-arrays. */ +public final class OtlpMetricsPayload { + static final OtlpMetricsPayload EMPTY = new OtlpMetricsPayload(new ArrayDeque<>(), 0); + + private final Deque chunks; + private final int length; + + OtlpMetricsPayload(Deque chunks, int length) { + this.chunks = chunks; + this.length = length; + } + + /** Drains the chunked payload to the given consumer. */ + public void drain(Consumer consumer) { + byte[] chunk; + while ((chunk = chunks.pollFirst()) != null) { + consumer.accept(chunk); + } + } + + /** Returns the total length of the chunked payload. */ + public int getLength() { + return length; + } +} diff --git a/dd-java-agent/agent-otel/otel-bootstrap/src/main/java/datadog/trace/bootstrap/otel/metrics/export/OtlpMetricsProto.java b/dd-java-agent/agent-otel/otel-bootstrap/src/main/java/datadog/trace/bootstrap/otel/metrics/export/OtlpMetricsProto.java new file mode 100644 index 00000000000..154da481a28 --- /dev/null +++ b/dd-java-agent/agent-otel/otel-bootstrap/src/main/java/datadog/trace/bootstrap/otel/metrics/export/OtlpMetricsProto.java @@ -0,0 +1,132 @@ +package datadog.trace.bootstrap.otel.metrics.export; + +import static datadog.trace.api.config.OtlpConfig.Temporality.CUMULATIVE; +import static datadog.trace.bootstrap.otel.common.export.OtlpCommonProto.I64_WIRE_TYPE; +import static datadog.trace.bootstrap.otel.common.export.OtlpCommonProto.LEN_WIRE_TYPE; +import static datadog.trace.bootstrap.otel.common.export.OtlpCommonProto.VARINT_WIRE_TYPE; +import static datadog.trace.bootstrap.otel.common.export.OtlpCommonProto.recordMessage; +import static datadog.trace.bootstrap.otel.common.export.OtlpCommonProto.writeI64; +import static datadog.trace.bootstrap.otel.common.export.OtlpCommonProto.writeInstrumentationScope; +import static datadog.trace.bootstrap.otel.common.export.OtlpCommonProto.writeString; +import static datadog.trace.bootstrap.otel.common.export.OtlpCommonProto.writeTag; +import static datadog.trace.bootstrap.otel.common.export.OtlpCommonProto.writeVarInt; + +import datadog.communication.serialization.GrowableBuffer; +import datadog.trace.api.Config; +import datadog.trace.bootstrap.otel.common.OtelInstrumentationScope; +import datadog.trace.bootstrap.otel.metrics.OtelInstrumentDescriptor; +import datadog.trace.bootstrap.otel.metrics.data.OtlpDataPoint; +import datadog.trace.bootstrap.otel.metrics.data.OtlpDoublePoint; +import datadog.trace.bootstrap.otel.metrics.data.OtlpHistogramPoint; +import datadog.trace.bootstrap.otel.metrics.data.OtlpLongPoint; + +/** Provides optimized writers for OpenTelemetry's "metrics.proto" wire protocol. */ +public final class OtlpMetricsProto { + private OtlpMetricsProto() {} + + private static final int AGGREGATION_TEMPORALITY_DELTA = 1; + private static final int AGGREGATION_TEMPORALITY_CUMULATIVE = 2; + + private static final int AGGREGATION_TEMPORALITY = + CUMULATIVE.equals(Config.get().getOtlpMetricsTemporalityPreference()) + ? AGGREGATION_TEMPORALITY_CUMULATIVE + : AGGREGATION_TEMPORALITY_DELTA; + + /** + * Records the first part of a scoped metrics message where we know its nested metric messages + * will follow in one or more byte-arrays that add up to the given number of remaining bytes. + */ + public static byte[] recordScopedMetricsMessage( + GrowableBuffer buf, OtelInstrumentationScope scope, int remainingBytes) { + + writeTag(buf, 1, LEN_WIRE_TYPE); + writeInstrumentationScope(buf, scope); + if (scope.getSchemaUrl() != null) { + writeTag(buf, 3, LEN_WIRE_TYPE); + writeString(buf, scope.getSchemaUrl().getUtf8Bytes()); + } + + return recordMessage(buf, 2, remainingBytes); + } + + /** + * Records the first part of a metric message where we know that its nested data point messages + * will follow in one or more byte-arrays that add up to the given number of remaining bytes. + */ + public static byte[] recordMetricMessage( + GrowableBuffer buf, OtelInstrumentDescriptor descriptor, int remainingBytes) { + + writeTag(buf, 1, LEN_WIRE_TYPE); + writeString(buf, descriptor.getName().getUtf8Bytes()); + if (descriptor.getDescription() != null) { + writeTag(buf, 2, LEN_WIRE_TYPE); + writeString(buf, descriptor.getDescription().getUtf8Bytes()); + } + if (descriptor.getUnit() != null) { + writeTag(buf, 3, LEN_WIRE_TYPE); + writeString(buf, descriptor.getUnit().getUtf8Bytes()); + } + + switch (descriptor.getType()) { + case GAUGE: + case OBSERVABLE_GAUGE: + writeTag(buf, 5, LEN_WIRE_TYPE); + writeVarInt(buf, remainingBytes); + // gauges have no aggregation temporality + break; + case COUNTER: + case OBSERVABLE_COUNTER: + writeTag(buf, 7, LEN_WIRE_TYPE); + writeVarInt(buf, remainingBytes + 4); + writeTag(buf, 2, VARINT_WIRE_TYPE); + writeVarInt(buf, AGGREGATION_TEMPORALITY); + writeTag(buf, 3, VARINT_WIRE_TYPE); + writeVarInt(buf, 1); // monotonic + break; + case UP_DOWN_COUNTER: + case OBSERVABLE_UP_DOWN_COUNTER: + writeTag(buf, 7, LEN_WIRE_TYPE); + writeVarInt(buf, remainingBytes + 2); + writeTag(buf, 2, VARINT_WIRE_TYPE); + writeVarInt(buf, AGGREGATION_TEMPORALITY); + break; + case HISTOGRAM: + writeTag(buf, 9, LEN_WIRE_TYPE); + writeVarInt(buf, remainingBytes + 2); + writeTag(buf, 2, VARINT_WIRE_TYPE); + writeVarInt(buf, AGGREGATION_TEMPORALITY); + break; + default: + throw new IllegalArgumentException("Unknown instrument type: " + descriptor.getType()); + } + + return recordMessage(buf, 2, remainingBytes); + } + + /** Completes recording of a data point message and packs it into its own byte-array. */ + public static byte[] recordDataPointMessage(GrowableBuffer buf, OtlpDataPoint point) { + if (point instanceof OtlpDoublePoint) { + writeTag(buf, 4, I64_WIRE_TYPE); + writeI64(buf, ((OtlpDoublePoint) point).value); + } else if (point instanceof OtlpLongPoint) { + writeTag(buf, 6, I64_WIRE_TYPE); + writeI64(buf, ((OtlpLongPoint) point).value); + } else { // must be a histogram point + OtlpHistogramPoint histogram = (OtlpHistogramPoint) point; + writeTag(buf, 4, I64_WIRE_TYPE); + writeI64(buf, (long) histogram.count); + writeTag(buf, 5, I64_WIRE_TYPE); + writeI64(buf, histogram.sum); + for (double bucketCount : histogram.bucketCounts) { + writeTag(buf, 6, I64_WIRE_TYPE); + writeI64(buf, (long) bucketCount); + } + for (double bucketBoundary : histogram.bucketBoundaries) { + writeTag(buf, 7, I64_WIRE_TYPE); + writeI64(buf, bucketBoundary); + } + } + + return recordMessage(buf, 1); + } +} diff --git a/dd-java-agent/agent-otel/otel-bootstrap/src/main/java/datadog/trace/bootstrap/otel/metrics/export/OtlpMetricsProtoCollector.java b/dd-java-agent/agent-otel/otel-bootstrap/src/main/java/datadog/trace/bootstrap/otel/metrics/export/OtlpMetricsProtoCollector.java new file mode 100644 index 00000000000..0ec6eedd548 --- /dev/null +++ b/dd-java-agent/agent-otel/otel-bootstrap/src/main/java/datadog/trace/bootstrap/otel/metrics/export/OtlpMetricsProtoCollector.java @@ -0,0 +1,222 @@ +package datadog.trace.bootstrap.otel.metrics.export; + +import static datadog.trace.bootstrap.otel.common.export.OtlpCommonProto.I64_WIRE_TYPE; +import static datadog.trace.bootstrap.otel.common.export.OtlpCommonProto.LEN_WIRE_TYPE; +import static datadog.trace.bootstrap.otel.common.export.OtlpCommonProto.recordMessage; +import static datadog.trace.bootstrap.otel.common.export.OtlpCommonProto.writeAttribute; +import static datadog.trace.bootstrap.otel.common.export.OtlpCommonProto.writeI64; +import static datadog.trace.bootstrap.otel.common.export.OtlpCommonProto.writeTag; +import static datadog.trace.bootstrap.otel.common.export.OtlpResourceProto.RESOURCE_MESSAGE; +import static datadog.trace.bootstrap.otel.metrics.OtelInstrumentType.GAUGE; +import static datadog.trace.bootstrap.otel.metrics.OtelInstrumentType.HISTOGRAM; +import static datadog.trace.bootstrap.otel.metrics.OtelInstrumentType.OBSERVABLE_GAUGE; +import static datadog.trace.bootstrap.otel.metrics.export.OtlpMetricsProto.recordDataPointMessage; +import static datadog.trace.bootstrap.otel.metrics.export.OtlpMetricsProto.recordMetricMessage; +import static datadog.trace.bootstrap.otel.metrics.export.OtlpMetricsProto.recordScopedMetricsMessage; + +import datadog.communication.serialization.GrowableBuffer; +import datadog.trace.api.time.SystemTimeSource; +import datadog.trace.api.time.TimeSource; +import datadog.trace.bootstrap.otel.common.OtelInstrumentationScope; +import datadog.trace.bootstrap.otel.metrics.OtelInstrumentDescriptor; +import datadog.trace.bootstrap.otel.metrics.OtelInstrumentType; +import datadog.trace.bootstrap.otel.metrics.data.OtelMetricRegistry; +import datadog.trace.bootstrap.otel.metrics.data.OtlpDataPoint; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Deque; +import java.util.List; +import java.util.function.Consumer; + +/** + * Collects OpenTelemetry metrics and marshalls them into a chunked 'metrics.proto' payload. + * + *

This collector is designed to be called by a single thread. To minimize allocations each + * collection returns a payload only to be used by the calling thread until the next collection. + * (The payload should be copied before passing it onto another thread.) + * + *

We use a single temporary buffer to prepare message chunks at different nesting levels. First + * we chunk all data points for a given metric. Once the metric is complete we add the first part of + * the metric message and its chunked data points to the scoped chunks. Once the scope is complete + * we add the first part of the scoped metrics message and all its chunks (metric messages and data + * points) to the payload. Once all the metrics data has been chunked we add the enclosing resource + * metrics message to the start of the payload. + */ +public final class OtlpMetricsProtoCollector + implements OtlpMetricsVisitor, OtlpScopedMetricsVisitor, OtlpMetricVisitor { + + public static final OtlpMetricsProtoCollector INSTANCE = + new OtlpMetricsProtoCollector(SystemTimeSource.INSTANCE); + + private final GrowableBuffer buf = new GrowableBuffer(512); + + private final TimeSource timeSource; + + private long startNanos; + private long endNanos; + + // temporary collections of chunks at different nesting levels + private final Deque payloadChunks = new ArrayDeque<>(); + private final List scopedChunks = new ArrayList<>(); + private final List metricChunks = new ArrayList<>(); + + // total number of chunked bytes at different nesting levels + private int payloadBytes; + private int scopedBytes; + private int metricBytes; + + private OtelInstrumentationScope currentScope; + private OtelInstrumentDescriptor currentMetric; + + public OtlpMetricsProtoCollector(TimeSource timeSource) { + this.timeSource = timeSource; + this.endNanos = timeSource.getCurrentTimeNanos(); + } + + /** + * Collects OpenTelemetry metrics and marshalls them into a chunked payload. + * + *

This payload is only valid for the calling thread until the next collection. + */ + public OtlpMetricsPayload collectMetrics() { + return collectMetrics(OtelMetricRegistry.INSTANCE::collectMetrics); + } + + OtlpMetricsPayload collectMetrics(Consumer registry) { + start(); + try { + registry.accept(this); + return completePayload(); + } finally { + stop(); + } + } + + /** Prepare temporary elements to collect metrics data. */ + private void start() { + // shift interval to cover last collection to now + startNanos = endNanos; + endNanos = timeSource.getCurrentTimeNanos(); + + // clear payloadChunks in case it wasn't fully consumed via OtlpMetricsPayload + payloadChunks.clear(); + } + + /** Cleanup elements used to collect metrics data. */ + private void stop() { + buf.reset(); + + // leave payloadChunks in place so it can be consumed via OtlpMetricsPayload + scopedChunks.clear(); + metricChunks.clear(); + + payloadBytes = 0; + scopedBytes = 0; + metricBytes = 0; + + currentScope = null; + currentMetric = null; + } + + @Override + public OtlpScopedMetricsVisitor visitScopedMetrics(OtelInstrumentationScope scope) { + if (currentScope != null) { + completeScope(); + } + currentScope = scope; + return this; + } + + @Override + public OtlpMetricVisitor visitMetric(OtelInstrumentDescriptor metric) { + if (currentMetric != null) { + completeMetric(); + } + currentMetric = metric; + return this; + } + + // called once we've processed all scopes and metric messages + private OtlpMetricsPayload completePayload() { + if (currentScope != null) { + completeScope(); + } + + if (payloadBytes == 0) { + return OtlpMetricsPayload.EMPTY; + } + + // prepend the canned resource chunk + payloadChunks.addFirst(RESOURCE_MESSAGE); + payloadBytes += RESOURCE_MESSAGE.length; + + // finally prepend the total length of all collected chunks + byte[] prefix = recordMessage(buf, 1, payloadBytes); + payloadChunks.addFirst(prefix); + payloadBytes += prefix.length; + + return new OtlpMetricsPayload(payloadChunks, payloadBytes); + } + + // called once we've processed all metrics in a specific scope + private void completeScope() { + if (currentMetric != null) { + completeMetric(); + } + + // add scoped metrics message prefix to its nested chunks and promote to payload + if (scopedBytes > 0) { + byte[] scopedPrefix = recordScopedMetricsMessage(buf, currentScope, scopedBytes); + payloadChunks.add(scopedPrefix); + payloadChunks.addAll(scopedChunks); + payloadBytes += scopedPrefix.length + scopedBytes; + } + + // reset temporary elements for next scope + currentScope = null; + scopedChunks.clear(); + scopedBytes = 0; + } + + // called once we've processed all data points in a specific metric + private void completeMetric() { + + // add metric message prefix to its nested chunks and promote to scoped + if (metricBytes > 0) { + byte[] metricPrefix = recordMetricMessage(buf, currentMetric, metricBytes); + scopedChunks.add(metricPrefix); + scopedChunks.addAll(metricChunks); + scopedBytes += metricPrefix.length + metricBytes; + } + + // reset temporary elements for next metric + currentMetric = null; + metricChunks.clear(); + metricBytes = 0; + } + + @Override + public void visitAttribute(int type, String key, Object value) { + // add attribute to the data point currently being collected + writeTag(buf, currentMetric.getType() == HISTOGRAM ? 9 : 7, LEN_WIRE_TYPE); + writeAttribute(buf, type, key, value); + } + + @Override + public void visitDataPoint(OtlpDataPoint point) { + OtelInstrumentType metricType = currentMetric.getType(); + + // gauges don't have a start time (no aggregation temporality) + if (metricType != GAUGE && metricType != OBSERVABLE_GAUGE) { + writeTag(buf, 2, I64_WIRE_TYPE); + writeI64(buf, startNanos); + } + writeTag(buf, 3, I64_WIRE_TYPE); + writeI64(buf, endNanos); + + // add complete data point message to the metric chunks + byte[] pointMessage = recordDataPointMessage(buf, point); + metricChunks.add(pointMessage); + metricBytes += pointMessage.length; + } +} diff --git a/dd-java-agent/agent-otel/otel-bootstrap/src/test/java/datadog/trace/bootstrap/otel/metrics/OtlpTestDescriptors.java b/dd-java-agent/agent-otel/otel-bootstrap/src/test/java/datadog/trace/bootstrap/otel/metrics/OtlpTestDescriptors.java new file mode 100644 index 00000000000..f9eee3b4260 --- /dev/null +++ b/dd-java-agent/agent-otel/otel-bootstrap/src/test/java/datadog/trace/bootstrap/otel/metrics/OtlpTestDescriptors.java @@ -0,0 +1,11 @@ +package datadog.trace.bootstrap.otel.metrics; + +/** Test-only factory giving tests access to the package-private descriptor constructor. */ +public final class OtlpTestDescriptors { + private OtlpTestDescriptors() {} + + public static OtelInstrumentDescriptor descriptor( + String name, OtelInstrumentType type, boolean longValues, String description, String unit) { + return new OtelInstrumentDescriptor(name, type, longValues, description, unit); + } +} diff --git a/dd-java-agent/agent-otel/otel-bootstrap/src/test/java/datadog/trace/bootstrap/otel/metrics/data/OtlpTestPoints.java b/dd-java-agent/agent-otel/otel-bootstrap/src/test/java/datadog/trace/bootstrap/otel/metrics/data/OtlpTestPoints.java new file mode 100644 index 00000000000..26c0d947a27 --- /dev/null +++ b/dd-java-agent/agent-otel/otel-bootstrap/src/test/java/datadog/trace/bootstrap/otel/metrics/data/OtlpTestPoints.java @@ -0,0 +1,21 @@ +package datadog.trace.bootstrap.otel.metrics.data; + +import java.util.List; + +/** Test-only factories giving tests access to the package-private data-point constructors. */ +public final class OtlpTestPoints { + private OtlpTestPoints() {} + + public static OtlpLongPoint longPoint(long value) { + return new OtlpLongPoint(value); + } + + public static OtlpDoublePoint doublePoint(double value) { + return new OtlpDoublePoint(value); + } + + public static OtlpHistogramPoint histogramPoint( + double count, List bucketBoundaries, List bucketCounts, double sum) { + return new OtlpHistogramPoint(count, bucketBoundaries, bucketCounts, sum); + } +} diff --git a/dd-java-agent/agent-otel/otel-bootstrap/src/test/java/datadog/trace/bootstrap/otel/metrics/export/OtlpMetricsProtoTest.java b/dd-java-agent/agent-otel/otel-bootstrap/src/test/java/datadog/trace/bootstrap/otel/metrics/export/OtlpMetricsProtoTest.java new file mode 100644 index 00000000000..98cafc54871 --- /dev/null +++ b/dd-java-agent/agent-otel/otel-bootstrap/src/test/java/datadog/trace/bootstrap/otel/metrics/export/OtlpMetricsProtoTest.java @@ -0,0 +1,1000 @@ +package datadog.trace.bootstrap.otel.metrics.export; + +import static datadog.trace.bootstrap.otel.common.export.OtlpAttributeVisitor.BOOLEAN; +import static datadog.trace.bootstrap.otel.common.export.OtlpAttributeVisitor.DOUBLE; +import static datadog.trace.bootstrap.otel.common.export.OtlpAttributeVisitor.LONG; +import static datadog.trace.bootstrap.otel.common.export.OtlpAttributeVisitor.STRING; +import static datadog.trace.bootstrap.otel.metrics.OtlpTestDescriptors.descriptor; +import static datadog.trace.bootstrap.otel.metrics.data.OtlpTestPoints.doublePoint; +import static datadog.trace.bootstrap.otel.metrics.data.OtlpTestPoints.histogramPoint; +import static datadog.trace.bootstrap.otel.metrics.data.OtlpTestPoints.longPoint; +import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; +import static java.util.stream.Collectors.toList; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.WireFormat; +import datadog.trace.api.time.ControllableTimeSource; +import datadog.trace.bootstrap.otel.common.OtelInstrumentationScope; +import datadog.trace.bootstrap.otel.metrics.OtelInstrumentType; +import datadog.trace.bootstrap.otel.metrics.data.OtlpDataPoint; +import datadog.trace.bootstrap.otel.metrics.data.OtlpDoublePoint; +import datadog.trace.bootstrap.otel.metrics.data.OtlpHistogramPoint; +import datadog.trace.bootstrap.otel.metrics.data.OtlpLongPoint; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +/** + * Tests for {@link OtlpMetricsProto} via {@link OtlpMetricsProtoCollector#collectMetrics}. + * + *

Each test case drives the collector through its visitor API, drains the resulting chunked + * payload into a contiguous byte array, and then parses it back using protobuf's {@link + * CodedInputStream} to verify the wire encoding against the OpenTelemetry metrics proto schema. + * + *

Relevant proto field numbers (from {@code opentelemetry/proto/metrics/v1/metrics.proto}): + * + *

+ *   MetricsData        { ResourceMetrics resource_metrics = 1; }
+ *   ResourceMetrics    { Resource resource = 1; ScopeMetrics scope_metrics = 2; }
+ *   ScopeMetrics       { InstrumentationScope scope = 1; Metric metrics = 2; string schema_url = 3; }
+ *   InstrumentationScope { string name = 1; string version = 2; }
+ *   Metric             { string name = 1; string description = 2; string unit = 3;
+ *                        Gauge gauge = 5; Sum sum = 7; Histogram histogram = 9; }
+ *   Gauge              { NumberDataPoint data_points = 1; }
+ *   Sum                { NumberDataPoint data_points = 1; AggregationTemporality ... = 2; bool is_monotonic = 3; }
+ *   Histogram          { HistogramDataPoint data_points = 1; AggregationTemporality ... = 2; }
+ *   NumberDataPoint    { fixed64 start_time_unix_nano = 2; fixed64 time_unix_nano = 3;
+ *                        double as_double = 4; sfixed64 as_int = 6; KeyValue attributes = 7; }
+ *   HistogramDataPoint { fixed64 start_time_unix_nano = 2; fixed64 time_unix_nano = 3;
+ *                        fixed64 count = 4; double sum = 5; fixed64 bucket_counts = 6;
+ *                        double explicit_bounds = 7; KeyValue attributes = 9; }
+ * 
+ */ +class OtlpMetricsProtoTest { + + // ── spec classes (test-data descriptors) ────────────────────────────────── + + static final class ScopeSpec { + final String name; + final String version; // null → absent from wire + final String schemaUrl; // null → absent from wire + final List metrics; + + ScopeSpec(String name, String version, String schemaUrl, List metrics) { + this.name = name; + this.version = version; + this.schemaUrl = schemaUrl; + this.metrics = metrics; + } + } + + static final class MetricSpec { + final String name; + final String description; // null → absent from wire + final String unit; // null → absent from wire + final OtelInstrumentType type; + final boolean longValues; + final OtlpDataPoint point; + final List attrs; + + MetricSpec( + String name, + String description, + String unit, + OtelInstrumentType type, + boolean longValues, + OtlpDataPoint point, + List attrs) { + this.name = name; + this.description = description; + this.unit = unit; + this.type = type; + this.longValues = longValues; + this.point = point; + this.attrs = attrs; + } + } + + static final class AttrSpec { + final int type; + final String key; + final Object value; + + AttrSpec(int type, String key, Object value) { + this.type = type; + this.key = key; + this.value = value; + } + } + + // ── shorthand builders ──────────────────────────────────────────────────── + + private static ScopeSpec scope(String name, MetricSpec... metrics) { + return new ScopeSpec(name, null, null, asList(metrics)); + } + + private static ScopeSpec scopeFull( + String name, String version, String schemaUrl, MetricSpec... metrics) { + return new ScopeSpec(name, version, schemaUrl, asList(metrics)); + } + + private static MetricSpec counterLong(String name, long value, AttrSpec... attrs) { + return new MetricSpec( + name, null, null, OtelInstrumentType.COUNTER, true, longPoint(value), asList(attrs)); + } + + private static MetricSpec counterLongFull( + String name, String desc, String unit, long value, AttrSpec... attrs) { + return new MetricSpec( + name, desc, unit, OtelInstrumentType.COUNTER, true, longPoint(value), asList(attrs)); + } + + private static MetricSpec counterDouble(String name, double value, AttrSpec... attrs) { + return new MetricSpec( + name, null, null, OtelInstrumentType.COUNTER, false, doublePoint(value), asList(attrs)); + } + + private static MetricSpec gaugeLong(String name, long value) { + return new MetricSpec( + name, null, null, OtelInstrumentType.GAUGE, true, longPoint(value), emptyList()); + } + + private static MetricSpec gaugeDouble(String name, double value) { + return new MetricSpec( + name, null, null, OtelInstrumentType.GAUGE, false, doublePoint(value), emptyList()); + } + + private static MetricSpec upDownLong(String name, long value, AttrSpec... attrs) { + return new MetricSpec( + name, + null, + null, + OtelInstrumentType.UP_DOWN_COUNTER, + true, + longPoint(value), + asList(attrs)); + } + + private static MetricSpec upDownDouble(String name, double value, AttrSpec... attrs) { + return new MetricSpec( + name, + null, + null, + OtelInstrumentType.UP_DOWN_COUNTER, + false, + doublePoint(value), + asList(attrs)); + } + + private static MetricSpec observableGaugeLong(String name, long value) { + return new MetricSpec( + name, null, null, OtelInstrumentType.OBSERVABLE_GAUGE, true, longPoint(value), emptyList()); + } + + private static MetricSpec observableGaugeDouble(String name, double value) { + return new MetricSpec( + name, + null, + null, + OtelInstrumentType.OBSERVABLE_GAUGE, + false, + doublePoint(value), + emptyList()); + } + + private static MetricSpec observableCounterLong(String name, long value) { + return new MetricSpec( + name, + null, + null, + OtelInstrumentType.OBSERVABLE_COUNTER, + true, + longPoint(value), + emptyList()); + } + + private static MetricSpec observableCounterDouble(String name, double value) { + return new MetricSpec( + name, + null, + null, + OtelInstrumentType.OBSERVABLE_COUNTER, + false, + doublePoint(value), + emptyList()); + } + + private static MetricSpec observableUpDownCounterLong(String name, long value) { + return new MetricSpec( + name, + null, + null, + OtelInstrumentType.OBSERVABLE_UP_DOWN_COUNTER, + true, + longPoint(value), + emptyList()); + } + + private static MetricSpec observableUpDownCounterDouble(String name, double value) { + return new MetricSpec( + name, + null, + null, + OtelInstrumentType.OBSERVABLE_UP_DOWN_COUNTER, + false, + doublePoint(value), + emptyList()); + } + + private static MetricSpec histogram( + String name, + double count, + List bounds, + List counts, + double sum, + AttrSpec... attrs) { + return new MetricSpec( + name, + null, + null, + OtelInstrumentType.HISTOGRAM, + false, + histogramPoint(count, bounds, counts, sum), + asList(attrs)); + } + + private static AttrSpec strAttr(String key, String value) { + return new AttrSpec(STRING, key, value); + } + + private static AttrSpec longAttr(String key, long value) { + return new AttrSpec(LONG, key, value); + } + + private static AttrSpec boolAttr(String key, boolean value) { + return new AttrSpec(BOOLEAN, key, value); + } + + private static AttrSpec dblAttr(String key, double value) { + return new AttrSpec(DOUBLE, key, value); + } + + // ── test cases ───────────────────────────────────────────────────────────── + + static Stream cases() { + return Stream.of( + // ── empty ───────────────────────────────────────────────────────────── + Arguments.of("empty — no scopes produces empty payload", emptyList()), + + // ── scope with no metrics ───────────────────────────────────────────── + Arguments.of("scope with no metrics", asList(scope("io.empty"))), + + // ── counter long boundary values ────────────────────────────────────── + Arguments.of( + "counter long zero — boundary", asList(scope("io.test", counterLong("requests", 0L)))), + Arguments.of( + "counter long Long.MAX_VALUE — boundary", + asList(scope("io.test", counterLong("requests", Long.MAX_VALUE)))), + Arguments.of( + "counter long Long.MIN_VALUE — negative boundary", + asList(scope("io.test", counterLong("requests", Long.MIN_VALUE)))), + + // ── gauge long boundary values (no start time) ─────────────────────── + Arguments.of("gauge long zero", asList(scope("io.gauge", gaugeLong("connections", 0L)))), + Arguments.of( + "gauge long Long.MAX_VALUE", + asList(scope("io.gauge", gaugeLong("connections", Long.MAX_VALUE)))), + Arguments.of( + "gauge long Long.MIN_VALUE — negative", + asList(scope("io.gauge", gaugeLong("balance", Long.MIN_VALUE)))), + + // ── gauge double special values (no start time) ─────────────────────── + Arguments.of("gauge double zero", asList(scope("io.gauge", gaugeDouble("rate", 0.0)))), + Arguments.of( + "gauge double -0.0 — negative zero", + asList(scope("io.gauge", gaugeDouble("temperature", -0.0)))), + Arguments.of( + "gauge double Double.MAX_VALUE", + asList(scope("io.gauge", gaugeDouble("temperature", Double.MAX_VALUE)))), + Arguments.of( + "gauge double +Infinity", + asList(scope("io.gauge", gaugeDouble("temperature", Double.POSITIVE_INFINITY)))), + Arguments.of( + "gauge double -Infinity", + asList(scope("io.gauge", gaugeDouble("temperature", Double.NEGATIVE_INFINITY)))), + Arguments.of( + "gauge double NaN", asList(scope("io.gauge", gaugeDouble("invalid", Double.NaN)))), + + // ── up-down counter long (non-monotonic sum) ────────────────────────── + Arguments.of( + "up-down-counter long zero", asList(scope("io.test", upDownLong("queue.size", 0L)))), + Arguments.of( + "up-down-counter long Long.MAX_VALUE", + asList(scope("io.test", upDownLong("queue.size", Long.MAX_VALUE)))), + Arguments.of( + "up-down-counter long negative with string attr", + asList(scope("io.test", upDownLong("queue.size", -42L, strAttr("host", "my-host"))))), + + // ── up-down counter double ──────────────────────────────────────────── + Arguments.of( + "up-down-counter double positive", + asList(scope("io.test", upDownDouble("balance", 3.14)))), + Arguments.of( + "up-down-counter double negative", + asList(scope("io.test", upDownDouble("delta", -2.71)))), + Arguments.of( + "up-down-counter double zero", asList(scope("io.test", upDownDouble("offset", 0.0)))), + + // ── counter double ──────────────────────────────────────────────────── + Arguments.of( + "counter double zero — no attrs", + asList(scope("io.test", counterDouble("errors", 0.0)))), + + // ── counter double with multiple attribute types ─────────────────────── + Arguments.of( + "counter double with string, long, bool, double attrs", + asList( + scope( + "io.test", + counterDouble( + "latency", + 3.14, + strAttr("service", "web"), + longAttr("status", 200L), + boolAttr("success", true), + dblAttr("rate", 0.5))))), + + // ── histogram — no buckets ──────────────────────────────────────────── + Arguments.of( + "histogram no buckets", + asList( + scope("io.hist", histogram("response.time", 1.0, emptyList(), asList(1.0), 0.5)))), + + // ── histogram — zero count and sum ──────────────────────────────────── + Arguments.of( + "histogram zero count and sum", + asList(scope("io.hist", histogram("idle.time", 0.0, emptyList(), asList(0.0), 0.0)))), + + // ── histogram — single explicit bound ───────────────────────────────── + Arguments.of( + "histogram single bound", + asList( + scope( + "io.hist", + histogram("request.size", 5.0, asList(100.0), asList(4.0, 1.0), 280.0)))), + + // ── histogram — with explicit bounds and attrs ──────────────────────── + Arguments.of( + "histogram with bounds and string attr", + asList( + scope( + "io.hist", + histogram( + "response.time", + 10.0, + asList(1.0, 5.0, 10.0), + asList(2.0, 3.0, 4.0, 1.0), + 45.5, + strAttr("region", "us-east"))))), + + // ── histogram — many buckets with multiple attrs ─────────────────────── + Arguments.of( + "histogram many buckets with long and bool attrs", + asList( + scope( + "io.hist", + histogram( + "latency.ms", + 100.0, + asList(1.0, 2.0, 5.0, 10.0, 25.0, 50.0, 100.0), + asList(5.0, 10.0, 20.0, 30.0, 15.0, 12.0, 6.0, 2.0), + 4321.0, + longAttr("shard", 3L), + boolAttr("cached", false))))), + + // ── scope metadata — optional version and schema URL ────────────────── + Arguments.of( + "scope with version and schemaUrl", + asList( + scopeFull( + "io.opentelemetry", + "1.2.3", + "https://opentelemetry.io/schemas/1.21", + counterLong("events", 1L)))), + Arguments.of( + "scope with version only — no schemaUrl", + asList(scopeFull("io.versioned", "2.0.0", null, counterLong("events", 1L)))), + Arguments.of( + "scope with schemaUrl only — no version", + asList( + scopeFull( + "io.schemed", + null, + "https://opentelemetry.io/schemas/1.21", + counterLong("events", 1L)))), + + // ── metric metadata — optional description and unit ─────────────────── + Arguments.of( + "metric with description and unit", + asList( + scope( + "io.test", + counterLongFull("cpu.usage", "CPU utilisation of the process", "%", 75L)))), + + // ── observable gauge ────────────────────────────────────────────────── + Arguments.of( + "observable gauge long — no start time written", + asList(scope("io.obs", observableGaugeLong("heap.used", 1024L * 1024L)))), + Arguments.of( + "observable gauge long zero", asList(scope("io.obs", observableGaugeLong("idle", 0L)))), + Arguments.of( + "observable gauge long Long.MIN_VALUE — negative", + asList(scope("io.obs", observableGaugeLong("balance", Long.MIN_VALUE)))), + Arguments.of( + "observable gauge double", + asList(scope("io.obs", observableGaugeDouble("cpu.percent", 75.5)))), + Arguments.of( + "observable gauge double NaN", + asList(scope("io.obs", observableGaugeDouble("invalid", Double.NaN)))), + + // ── observable counter (monotonic sum) ──────────────────────────────── + Arguments.of( + "observable counter long Long.MAX_VALUE — has temporality and is_monotonic", + asList(scope("io.obs", observableCounterLong("file.reads", Long.MAX_VALUE)))), + Arguments.of( + "observable counter long zero", + asList(scope("io.obs", observableCounterLong("events", 0L)))), + Arguments.of( + "observable counter double", + asList(scope("io.obs", observableCounterDouble("bytes.sent", 1024.5)))), + Arguments.of( + "observable counter double zero", + asList(scope("io.obs", observableCounterDouble("noop", 0.0)))), + + // ── observable up-down-counter (non-monotonic sum) ──────────────────── + Arguments.of( + "observable up-down-counter long positive", + asList(scope("io.obs", observableUpDownCounterLong("queue.depth", 42L)))), + Arguments.of( + "observable up-down-counter long negative", + asList(scope("io.obs", observableUpDownCounterLong("balance", -10L)))), + Arguments.of( + "observable up-down-counter long zero", + asList(scope("io.obs", observableUpDownCounterLong("empty", 0L)))), + Arguments.of( + "observable up-down-counter double", + asList(scope("io.obs", observableUpDownCounterDouble("ratio", 0.75)))), + Arguments.of( + "observable up-down-counter double negative", + asList(scope("io.obs", observableUpDownCounterDouble("delta", -1.5)))), + + // ── empty scope between two scopes with metrics ─────────────────────── + Arguments.of( + "middle scope with no metrics — flanked by scopes with metrics", + asList( + scope("io.first", counterLong("a", 1L)), + scope("io.empty"), + scope("io.last", counterLong("b", 2L)))), + + // ── multiple scopes and multiple metrics ────────────────────────────── + Arguments.of( + "two scopes each with two metrics", + asList( + scope( + "io.http", + counterLong("requests", 100L, strAttr("method", "GET")), + gaugeDouble("active.connections", 5.0)), + scope( + "io.db", + counterLong("queries", 50L), + upDownLong("pool.size", 10L, longAttr("pool.id", 1L)))))); + } + + // ── parameterized test ──────────────────────────────────────────────────── + + private static final long START_EPOCH_NS = TimeUnit.SECONDS.toNanos(1330837567); + private static final long END_EPOCH_NS = START_EPOCH_NS + TimeUnit.MINUTES.toNanos(30); + + @ParameterizedTest(name = "{0}") + @MethodSource("cases") + void testCollectMetrics(String caseName, List expectedScopes) throws IOException { + ControllableTimeSource timeSource = new ControllableTimeSource(); + timeSource.set(START_EPOCH_NS); // captured in constructor + OtlpMetricsProtoCollector collector = new OtlpMetricsProtoCollector(timeSource); + timeSource.set(END_EPOCH_NS); // captured during collection + OtlpMetricsPayload payload = + collector.collectMetrics( + visitor -> { + for (ScopeSpec s : expectedScopes) { + OtlpScopedMetricsVisitor sv = + visitor.visitScopedMetrics( + new OtelInstrumentationScope(s.name, s.version, s.schemaUrl)); + for (MetricSpec m : s.metrics) { + OtlpMetricVisitor mv = + sv.visitMetric( + descriptor(m.name, m.type, m.longValues, m.description, m.unit)); + for (AttrSpec a : m.attrs) { + mv.visitAttribute(a.type, a.key, a.value); + } + mv.visitDataPoint(m.point); + } + } + }); + + // Scopes with no metrics produce no wire output — filter them for verification + List nonEmptyScopes = + expectedScopes.stream().filter(s -> !s.metrics.isEmpty()).collect(toList()); + + if (nonEmptyScopes.isEmpty()) { + assertEquals(0, payload.getLength(), "empty registry must produce empty payload"); + return; + } + + // drain all chunks into a single contiguous byte array + ByteArrayOutputStream baos = new ByteArrayOutputStream(payload.getLength()); + payload.drain(chunk -> baos.write(chunk, 0, chunk.length)); + byte[] bytes = baos.toByteArray(); + assertTrue(bytes.length > 0, "non-empty registry must produce bytes"); + + // ── parse MetricsData ────────────────────────────────────────────────── + // The full payload encodes a single MetricsData.resource_metrics (field 1, LEN). + CodedInputStream md = CodedInputStream.newInstance(bytes); + int mdTag = md.readTag(); + assertEquals(1, WireFormat.getTagFieldNumber(mdTag), "MetricsData.resource_metrics is field 1"); + assertEquals(WireFormat.WIRETYPE_LENGTH_DELIMITED, WireFormat.getTagWireType(mdTag)); + CodedInputStream rm = md.readBytes().newCodedInput(); + assertTrue(md.isAtEnd(), "expected exactly one ResourceMetrics"); + + // ── parse ResourceMetrics ────────────────────────────────────────────── + // Fields: resource=1, scope_metrics=2 (repeated) + boolean resourceFound = false; + int scopeIdx = 0; + while (!rm.isAtEnd()) { + int rmTag = rm.readTag(); + int rmField = WireFormat.getTagFieldNumber(rmTag); + if (rmField == 1) { + verifyResource(rm.readBytes().newCodedInput()); + resourceFound = true; + continue; + } + assertEquals(2, rmField, "ResourceMetrics.scope_metrics is field 2"); + assertTrue(scopeIdx < nonEmptyScopes.size(), "more ScopeMetrics than expected"); + verifyScopeMetrics(rm.readBytes().newCodedInput(), nonEmptyScopes.get(scopeIdx++)); + } + assertTrue(resourceFound, "Resource message must be present in ResourceMetrics"); + assertEquals(nonEmptyScopes.size(), scopeIdx, "scope count mismatch in case: " + caseName); + } + + // ── verification helpers ────────────────────────────────────────────────── + + /** + * Parses a {@code Resource} message body and asserts it contains a {@code service.name} + * attribute. The value is not verified as it depends on the runtime environment. + * + *
+   *   Resource { repeated KeyValue attributes = 1; }
+   * 
+ */ + private static void verifyResource(CodedInputStream res) throws IOException { + boolean foundServiceName = false; + while (!res.isAtEnd()) { + int tag = res.readTag(); + if (WireFormat.getTagFieldNumber(tag) == 1) { // attributes (repeated KeyValue) + String key = readKeyValueKey(res.readBytes().newCodedInput()); + if ("service.name".equals(key)) { + foundServiceName = true; + } + } else { + res.skipField(tag); + } + } + assertTrue(foundServiceName, "Resource must contain a 'service.name' attribute"); + } + + /** + * Parses a {@code ScopeMetrics} message body and asserts its content matches {@code expected}. + * + *
+   *   ScopeMetrics { scope=1, metrics=2, schema_url=3 }
+   * 
+ */ + private static void verifyScopeMetrics(CodedInputStream sm, ScopeSpec expected) + throws IOException { + String parsedName = null; + String parsedVersion = null; + String parsedSchemaUrl = null; + int metricIdx = 0; + + while (!sm.isAtEnd()) { + int tag = sm.readTag(); + switch (WireFormat.getTagFieldNumber(tag)) { + case 1: // InstrumentationScope + CodedInputStream scope = sm.readBytes().newCodedInput(); + while (!scope.isAtEnd()) { + int st = scope.readTag(); + switch (WireFormat.getTagFieldNumber(st)) { + case 1: + parsedName = scope.readString(); + break; + case 2: + parsedVersion = scope.readString(); + break; + default: + scope.skipField(st); + } + } + break; + case 2: // Metric (repeated) + assertTrue( + metricIdx < expected.metrics.size(), + "more metrics than expected in scope " + expected.name); + verifyMetric(sm.readBytes().newCodedInput(), expected.metrics.get(metricIdx++)); + break; + case 3: // schema_url + parsedSchemaUrl = sm.readString(); + break; + default: + sm.skipField(tag); + } + } + + assertEquals(expected.name, parsedName, "scope name"); + assertEquals(expected.version, parsedVersion, "scope version"); + assertEquals(expected.schemaUrl, parsedSchemaUrl, "scope schemaUrl"); + assertEquals(expected.metrics.size(), metricIdx, "metric count in scope " + expected.name); + } + + /** + * Parses a {@code Metric} message body and asserts its content matches {@code expected}. + * + *
+   *   Metric { name=1, description=2, unit=3, gauge=5, sum=7, histogram=9 }
+   * 
+ */ + private static void verifyMetric(CodedInputStream m, MetricSpec expected) throws IOException { + String parsedName = null; + String parsedDesc = null; + String parsedUnit = null; + boolean dataFound = false; + + while (!m.isAtEnd()) { + int tag = m.readTag(); + switch (WireFormat.getTagFieldNumber(tag)) { + case 1: + parsedName = m.readString(); + break; + case 2: + parsedDesc = m.readString(); + break; + case 3: + parsedUnit = m.readString(); + break; + case 5: // Gauge + assertTrue(isGaugeType(expected.type), "unexpected gauge for " + expected.name); + verifyGauge(m.readBytes().newCodedInput(), expected); + dataFound = true; + break; + case 7: // Sum + assertTrue(isSumType(expected.type), "unexpected sum for " + expected.name); + verifySum(m.readBytes().newCodedInput(), expected); + dataFound = true; + break; + case 9: // Histogram + assertEquals( + OtelInstrumentType.HISTOGRAM, + expected.type, + "unexpected histogram for " + expected.name); + verifyHistogram(m.readBytes().newCodedInput(), expected); + dataFound = true; + break; + default: + m.skipField(tag); + } + } + + assertEquals(expected.name, parsedName, "metric name"); + assertEquals(expected.description, parsedDesc, "metric description"); + assertEquals(expected.unit, parsedUnit, "metric unit"); + assertTrue(dataFound, "no data payload found in metric " + expected.name); + } + + private static boolean isGaugeType(OtelInstrumentType type) { + return type == OtelInstrumentType.GAUGE || type == OtelInstrumentType.OBSERVABLE_GAUGE; + } + + private static boolean isSumType(OtelInstrumentType type) { + return type == OtelInstrumentType.COUNTER + || type == OtelInstrumentType.OBSERVABLE_COUNTER + || type == OtelInstrumentType.UP_DOWN_COUNTER + || type == OtelInstrumentType.OBSERVABLE_UP_DOWN_COUNTER; + } + + /** + * Parses a {@code Gauge} message body. Gauge has no aggregation temporality or start time. + * + *
+   *   Gauge { data_points=1 }
+   * 
+ */ + private static void verifyGauge(CodedInputStream g, MetricSpec expected) throws IOException { + boolean foundDataPoint = false; + while (!g.isAtEnd()) { + int tag = g.readTag(); + if (WireFormat.getTagFieldNumber(tag) == 1) { + assertFalse(foundDataPoint, "expected exactly one data point in gauge " + expected.name); + verifyNumberDataPoint(g.readBytes().newCodedInput(), expected, /* hasStartTime= */ false); + foundDataPoint = true; + } else { + g.skipField(tag); + } + } + assertTrue(foundDataPoint, "no data point found in gauge " + expected.name); + } + + /** + * Parses a {@code Sum} message body and verifies temporality and monotonicity. + * + *
+   *   Sum { data_points=1, aggregation_temporality=2, is_monotonic=3 }
+   * 
+ */ + private static void verifySum(CodedInputStream s, MetricSpec expected) throws IOException { + boolean foundDataPoint = false; + boolean foundTemporality = false; + + while (!s.isAtEnd()) { + int tag = s.readTag(); + switch (WireFormat.getTagFieldNumber(tag)) { + case 1: // NumberDataPoint + assertFalse(foundDataPoint, "expected exactly one data point in sum " + expected.name); + verifyNumberDataPoint(s.readBytes().newCodedInput(), expected, /* hasStartTime= */ true); + foundDataPoint = true; + break; + case 2: // AggregationTemporality (1=DELTA, 2=CUMULATIVE) + int temporality = s.readEnum(); + assertTrue( + temporality == 1 || temporality == 2, + "aggregation_temporality must be DELTA(1) or CUMULATIVE(2)"); + foundTemporality = true; + break; + case 3: // is_monotonic + boolean isMonotonic = s.readBool(); + boolean expectedMonotonic = + expected.type == OtelInstrumentType.COUNTER + || expected.type == OtelInstrumentType.OBSERVABLE_COUNTER; + assertEquals(expectedMonotonic, isMonotonic, "is_monotonic for " + expected.name); + break; + default: + s.skipField(tag); + } + } + + assertTrue(foundDataPoint, "no data point found in sum " + expected.name); + assertTrue(foundTemporality, "aggregation_temporality missing from sum " + expected.name); + } + + /** + * Parses a {@code Histogram} message body and verifies temporality. + * + *
+   *   Histogram { data_points=1, aggregation_temporality=2 }
+   * 
+ */ + private static void verifyHistogram(CodedInputStream h, MetricSpec expected) throws IOException { + boolean foundDataPoint = false; + boolean foundTemporality = false; + + while (!h.isAtEnd()) { + int tag = h.readTag(); + switch (WireFormat.getTagFieldNumber(tag)) { + case 1: // HistogramDataPoint + assertFalse( + foundDataPoint, "expected exactly one data point in histogram " + expected.name); + verifyHistogramDataPoint(h.readBytes().newCodedInput(), expected); + foundDataPoint = true; + break; + case 2: // AggregationTemporality + int temporality = h.readEnum(); + assertTrue( + temporality == 1 || temporality == 2, + "aggregation_temporality must be DELTA(1) or CUMULATIVE(2)"); + foundTemporality = true; + break; + default: + h.skipField(tag); + } + } + + assertTrue(foundDataPoint, "no data point found in histogram " + expected.name); + assertTrue(foundTemporality, "aggregation_temporality missing from histogram " + expected.name); + } + + /** + * Parses a {@code NumberDataPoint} message body and asserts timestamps, value, and attributes. + * + *
+   *   NumberDataPoint { start_time_unix_nano=2, time_unix_nano=3,
+   *                     as_double=4, as_int=6, attributes=7 }
+   * 
+ * + * @param hasStartTime true for non-gauge types; gauges omit {@code start_time_unix_nano} + */ + private static void verifyNumberDataPoint( + CodedInputStream dp, MetricSpec expected, boolean hasStartTime) throws IOException { + boolean foundStartTime = false; + boolean foundEndTime = false; + boolean foundValue = false; + List parsedAttrKeys = new ArrayList<>(); + + while (!dp.isAtEnd()) { + int tag = dp.readTag(); + switch (WireFormat.getTagFieldNumber(tag)) { + case 2: // start_time_unix_nano (fixed64) + assertEquals( + START_EPOCH_NS, dp.readFixed64(), "start_time_unix_nano for " + expected.name); + foundStartTime = true; + break; + case 3: // time_unix_nano (fixed64) + assertEquals(END_EPOCH_NS, dp.readFixed64(), "time_unix_nano for " + expected.name); + foundEndTime = true; + break; + case 4: // as_double (double via fixed64 wire type) + double parsedDouble = dp.readDouble(); + OtlpDoublePoint expectedDouble = (OtlpDoublePoint) expected.point; + assertEquals( + Double.doubleToRawLongBits(expectedDouble.value), + Double.doubleToRawLongBits(parsedDouble), + "as_double for " + expected.name); + foundValue = true; + break; + case 6: // as_int (sfixed64) + long parsedLong = dp.readSFixed64(); + OtlpLongPoint expectedLong = (OtlpLongPoint) expected.point; + assertEquals(expectedLong.value, parsedLong, "as_int for " + expected.name); + foundValue = true; + break; + case 7: // attributes (repeated KeyValue) + parsedAttrKeys.add(readKeyValueKey(dp.readBytes().newCodedInput())); + break; + default: + dp.skipField(tag); + } + } + + assertEquals( + hasStartTime, foundStartTime, "start_time_unix_nano presence for " + expected.name); + assertTrue(foundEndTime, "time_unix_nano required for " + expected.name); + assertTrue(foundValue, "value field required for " + expected.name); + assertEquals( + expected.attrs.size(), parsedAttrKeys.size(), "attribute count for " + expected.name); + for (int i = 0; i < expected.attrs.size(); i++) { + assertEquals( + expected.attrs.get(i).key, + parsedAttrKeys.get(i), + "attribute key[" + i + "] for " + expected.name); + } + } + + /** + * Parses a {@code HistogramDataPoint} message body and asserts timestamps, count, sum, bucket + * counts, explicit bounds, and attributes. + * + *
+   *   HistogramDataPoint { start_time_unix_nano=2, time_unix_nano=3, count=4,
+   *                        sum=5, bucket_counts=6, explicit_bounds=7, attributes=9 }
+   * 
+ */ + private static void verifyHistogramDataPoint(CodedInputStream dp, MetricSpec expected) + throws IOException { + OtlpHistogramPoint hp = (OtlpHistogramPoint) expected.point; + boolean foundStartTime = false; + boolean foundEndTime = false; + boolean foundCount = false; + boolean foundSum = false; + List parsedBucketCounts = new ArrayList<>(); + List parsedBounds = new ArrayList<>(); + List parsedAttrKeys = new ArrayList<>(); + + while (!dp.isAtEnd()) { + int tag = dp.readTag(); + switch (WireFormat.getTagFieldNumber(tag)) { + case 2: // start_time_unix_nano + assertEquals( + START_EPOCH_NS, dp.readFixed64(), "start_time_unix_nano for " + expected.name); + foundStartTime = true; + break; + case 3: // time_unix_nano + assertEquals(END_EPOCH_NS, dp.readFixed64(), "time_unix_nano for " + expected.name); + foundEndTime = true; + break; + case 4: // count (fixed64) + assertEquals((long) hp.count, dp.readFixed64(), "histogram count"); + foundCount = true; + break; + case 5: // sum (double via fixed64) + assertEquals( + Double.doubleToRawLongBits(hp.sum), + Double.doubleToRawLongBits(dp.readDouble()), + "histogram sum"); + foundSum = true; + break; + case 6: // bucket_counts (repeated fixed64) + parsedBucketCounts.add(dp.readFixed64()); + break; + case 7: // explicit_bounds (repeated double) + parsedBounds.add(dp.readDouble()); + break; + case 9: // attributes (repeated KeyValue) + parsedAttrKeys.add(readKeyValueKey(dp.readBytes().newCodedInput())); + break; + default: + dp.skipField(tag); + } + } + + assertTrue(foundStartTime, "start_time_unix_nano required for histogram " + expected.name); + assertTrue(foundEndTime, "time_unix_nano required for histogram " + expected.name); + assertTrue(foundCount, "count required for histogram " + expected.name); + assertTrue(foundSum, "sum required for histogram " + expected.name); + + assertEquals( + hp.bucketCounts.size(), + parsedBucketCounts.size(), + "bucket_counts size for " + expected.name); + for (int i = 0; i < hp.bucketCounts.size(); i++) { + assertEquals( + (long) hp.bucketCounts.get(i).doubleValue(), + (long) parsedBucketCounts.get(i), + "bucket_counts[" + i + "] for " + expected.name); + } + + assertEquals( + hp.bucketBoundaries.size(), + parsedBounds.size(), + "explicit_bounds size for " + expected.name); + for (int i = 0; i < hp.bucketBoundaries.size(); i++) { + assertEquals( + Double.doubleToRawLongBits(hp.bucketBoundaries.get(i)), + Double.doubleToRawLongBits(parsedBounds.get(i)), + "explicit_bounds[" + i + "] for " + expected.name); + } + + assertEquals( + expected.attrs.size(), + parsedAttrKeys.size(), + "attribute count for histogram " + expected.name); + for (int i = 0; i < expected.attrs.size(); i++) { + assertEquals( + expected.attrs.get(i).key, + parsedAttrKeys.get(i), + "attribute key[" + i + "] for histogram " + expected.name); + } + } + + /** + * Reads a {@code KeyValue} body and returns the key (field 1). The value field is skipped; its + * encoding is covered by {@code OtlpCommonProtoTest}. + */ + private static String readKeyValueKey(CodedInputStream kv) throws IOException { + String key = null; + while (!kv.isAtEnd()) { + int tag = kv.readTag(); + if (WireFormat.getTagFieldNumber(tag) == 1) { + key = kv.readString(); + } else { + kv.skipField(tag); + } + } + return key; + } +} diff --git a/dd-java-agent/agent-otel/otel-bootstrap/src/test/resources/opentelemetry/proto/metrics/v1/metrics.proto b/dd-java-agent/agent-otel/otel-bootstrap/src/test/resources/opentelemetry/proto/metrics/v1/metrics.proto new file mode 100644 index 00000000000..a6fab4ee750 --- /dev/null +++ b/dd-java-agent/agent-otel/otel-bootstrap/src/test/resources/opentelemetry/proto/metrics/v1/metrics.proto @@ -0,0 +1,735 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package opentelemetry.proto.metrics.v1; + +import "opentelemetry/proto/common/v1/common.proto"; +import "opentelemetry/proto/resource/v1/resource.proto"; + +option csharp_namespace = "OpenTelemetry.Proto.Metrics.V1"; +option java_multiple_files = true; +option java_package = "io.opentelemetry.proto.metrics.v1"; +option java_outer_classname = "MetricsProto"; +option go_package = "go.opentelemetry.io/proto/otlp/metrics/v1"; + +// MetricsData represents the metrics data that can be stored in a persistent +// storage, OR can be embedded by other protocols that transfer OTLP metrics +// data but do not implement the OTLP protocol. +// +// MetricsData +// └─── ResourceMetrics +// ├── Resource +// ├── SchemaURL +// └── ScopeMetrics +// ├── Scope +// ├── SchemaURL +// └── Metric +// ├── Name +// ├── Description +// ├── Unit +// └── data +// ├── Gauge +// ├── Sum +// ├── Histogram +// ├── ExponentialHistogram +// └── Summary +// +// The main difference between this message and collector protocol is that +// in this message there will not be any "control" or "metadata" specific to +// OTLP protocol. +// +// When new fields are added into this message, the OTLP request MUST be updated +// as well. +message MetricsData { + // An array of ResourceMetrics. + // For data coming from a single resource this array will typically contain + // one element. Intermediary nodes that receive data from multiple origins + // typically batch the data before forwarding further and in that case this + // array will contain multiple elements. + repeated ResourceMetrics resource_metrics = 1; +} + +// A collection of ScopeMetrics from a Resource. +message ResourceMetrics { + reserved 1000; + + // The resource for the metrics in this message. + // If this field is not set then no resource info is known. + opentelemetry.proto.resource.v1.Resource resource = 1; + + // A list of metrics that originate from a resource. + repeated ScopeMetrics scope_metrics = 2; + + // The Schema URL, if known. This is the identifier of the Schema that the resource data + // is recorded in. Notably, the last part of the URL path is the version number of the + // schema: http[s]://server[:port]/path/. To learn more about Schema URL see + // https://opentelemetry.io/docs/specs/otel/schemas/#schema-url + // This schema_url applies to the data in the "resource" field. It does not apply + // to the data in the "scope_metrics" field which have their own schema_url field. + string schema_url = 3; +} + +// A collection of Metrics produced by an Scope. +message ScopeMetrics { + // The instrumentation scope information for the metrics in this message. + // Semantically when InstrumentationScope isn't set, it is equivalent with + // an empty instrumentation scope name (unknown). + opentelemetry.proto.common.v1.InstrumentationScope scope = 1; + + // A list of metrics that originate from an instrumentation library. + repeated Metric metrics = 2; + + // The Schema URL, if known. This is the identifier of the Schema that the metric data + // is recorded in. Notably, the last part of the URL path is the version number of the + // schema: http[s]://server[:port]/path/. To learn more about Schema URL see + // https://opentelemetry.io/docs/specs/otel/schemas/#schema-url + // This schema_url applies to the data in the "scope" field and all metrics in the + // "metrics" field. + string schema_url = 3; +} + +// Defines a Metric which has one or more timeseries. The following is a +// brief summary of the Metric data model. For more details, see: +// +// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/data-model.md +// +// The data model and relation between entities is shown in the +// diagram below. Here, "DataPoint" is the term used to refer to any +// one of the specific data point value types, and "points" is the term used +// to refer to any one of the lists of points contained in the Metric. +// +// - Metric is composed of a metadata and data. +// - Metadata part contains a name, description, unit. +// - Data is one of the possible types (Sum, Gauge, Histogram, Summary). +// - DataPoint contains timestamps, attributes, and one of the possible value type +// fields. +// +// Metric +// +------------+ +// |name | +// |description | +// |unit | +------------------------------------+ +// |data |---> |Gauge, Sum, Histogram, Summary, ... | +// +------------+ +------------------------------------+ +// +// Data [One of Gauge, Sum, Histogram, Summary, ...] +// +-----------+ +// |... | // Metadata about the Data. +// |points |--+ +// +-----------+ | +// | +---------------------------+ +// | |DataPoint 1 | +// v |+------+------+ +------+ | +// +-----+ ||label |label |...|label | | +// | 1 |-->||value1|value2|...|valueN| | +// +-----+ |+------+------+ +------+ | +// | . | |+-----+ | +// | . | ||value| | +// | . | |+-----+ | +// | . | +---------------------------+ +// | . | . +// | . | . +// | . | . +// | . | +---------------------------+ +// | . | |DataPoint M | +// +-----+ |+------+------+ +------+ | +// | M |-->||label |label |...|label | | +// +-----+ ||value1|value2|...|valueN| | +// |+------+------+ +------+ | +// |+-----+ | +// ||value| | +// |+-----+ | +// +---------------------------+ +// +// Each distinct type of DataPoint represents the output of a specific +// aggregation function, the result of applying the DataPoint's +// associated function of to one or more measurements. +// +// All DataPoint types have three common fields: +// - Attributes includes key-value pairs associated with the data point +// - TimeUnixNano is required, set to the end time of the aggregation +// - StartTimeUnixNano is optional, but strongly encouraged for DataPoints +// having an AggregationTemporality field, as discussed below. +// +// Both TimeUnixNano and StartTimeUnixNano values are expressed as +// UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970. +// +// # TimeUnixNano +// +// This field is required, having consistent interpretation across +// DataPoint types. TimeUnixNano is the moment corresponding to when +// the data point's aggregate value was captured. +// +// Data points with the 0 value for TimeUnixNano SHOULD be rejected +// by consumers. +// +// # StartTimeUnixNano +// +// StartTimeUnixNano in general allows detecting when a sequence of +// observations is unbroken. This field indicates to consumers the +// start time for points with cumulative and delta +// AggregationTemporality, and it should be included whenever possible +// to support correct rate calculation. Although it may be omitted +// when the start time is truly unknown, setting StartTimeUnixNano is +// strongly encouraged. +message Metric { + reserved 4, 6, 8; + + // The name of the metric. + string name = 1; + + // A description of the metric, which can be used in documentation. + string description = 2; + + // The unit in which the metric value is reported. Follows the format + // described by https://unitsofmeasure.org/ucum.html. + string unit = 3; + + // Data determines the aggregation type (if any) of the metric, what is the + // reported value type for the data points, as well as the relatationship to + // the time interval over which they are reported. + oneof data { + Gauge gauge = 5; + Sum sum = 7; + Histogram histogram = 9; + ExponentialHistogram exponential_histogram = 10; + Summary summary = 11; + } + + // Additional metadata attributes that describe the metric. [Optional]. + // Attributes are non-identifying. + // Consumers SHOULD NOT need to be aware of these attributes. + // These attributes MAY be used to encode information allowing + // for lossless roundtrip translation to / from another data model. + // Attribute keys MUST be unique (it is not allowed to have more than one + // attribute with the same key). + // The behavior of software that receives duplicated keys can be unpredictable. + repeated opentelemetry.proto.common.v1.KeyValue metadata = 12; +} + +// Gauge represents the type of a scalar metric that always exports the +// "current value" for every data point. It should be used for an "unknown" +// aggregation. +// +// A Gauge does not support different aggregation temporalities. Given the +// aggregation is unknown, points cannot be combined using the same +// aggregation, regardless of aggregation temporalities. Therefore, +// AggregationTemporality is not included. Consequently, this also means +// "StartTimeUnixNano" is ignored for all data points. +message Gauge { + // The time series data points. + // Note: Multiple time series may be included (same timestamp, different attributes). + repeated NumberDataPoint data_points = 1; +} + +// Sum represents the type of a scalar metric that is calculated as a sum of all +// reported measurements over a time interval. +message Sum { + // The time series data points. + // Note: Multiple time series may be included (same timestamp, different attributes). + repeated NumberDataPoint data_points = 1; + + // aggregation_temporality describes if the aggregator reports delta changes + // since last report time, or cumulative changes since a fixed start time. + AggregationTemporality aggregation_temporality = 2; + + // Represents whether the sum is monotonic. + bool is_monotonic = 3; +} + +// Histogram represents the type of a metric that is calculated by aggregating +// as a Histogram of all reported measurements over a time interval. +message Histogram { + // The time series data points. + // Note: Multiple time series may be included (same timestamp, different attributes). + repeated HistogramDataPoint data_points = 1; + + // aggregation_temporality describes if the aggregator reports delta changes + // since last report time, or cumulative changes since a fixed start time. + AggregationTemporality aggregation_temporality = 2; +} + +// ExponentialHistogram represents the type of a metric that is calculated by aggregating +// as a ExponentialHistogram of all reported double measurements over a time interval. +message ExponentialHistogram { + // The time series data points. + // Note: Multiple time series may be included (same timestamp, different attributes). + repeated ExponentialHistogramDataPoint data_points = 1; + + // aggregation_temporality describes if the aggregator reports delta changes + // since last report time, or cumulative changes since a fixed start time. + AggregationTemporality aggregation_temporality = 2; +} + +// Summary metric data are used to convey quantile summaries, +// a Prometheus (see: https://prometheus.io/docs/concepts/metric_types/#summary) +// and OpenMetrics (see: https://github.com/prometheus/OpenMetrics/blob/4dbf6075567ab43296eed941037c12951faafb92/protos/prometheus.proto#L45) +// data type. These data points cannot always be merged in a meaningful way. +// While they can be useful in some applications, histogram data points are +// recommended for new applications. +// Summary metrics do not have an aggregation temporality field. This is +// because the count and sum fields of a SummaryDataPoint are assumed to be +// cumulative values. +message Summary { + // The time series data points. + // Note: Multiple time series may be included (same timestamp, different attributes). + repeated SummaryDataPoint data_points = 1; +} + +// AggregationTemporality defines how a metric aggregator reports aggregated +// values. It describes how those values relate to the time interval over +// which they are aggregated. +enum AggregationTemporality { + // UNSPECIFIED is the default AggregationTemporality, it MUST not be used. + AGGREGATION_TEMPORALITY_UNSPECIFIED = 0; + + // DELTA is an AggregationTemporality for a metric aggregator which reports + // changes since last report time. Successive metrics contain aggregation of + // values from continuous and non-overlapping intervals. + // + // The values for a DELTA metric are based only on the time interval + // associated with one measurement cycle. There is no dependency on + // previous measurements like is the case for CUMULATIVE metrics. + // + // For example, consider a system measuring the number of requests that + // it receives and reports the sum of these requests every second as a + // DELTA metric: + // + // 1. The system starts receiving at time=t_0. + // 2. A request is received, the system measures 1 request. + // 3. A request is received, the system measures 1 request. + // 4. A request is received, the system measures 1 request. + // 5. The 1 second collection cycle ends. A metric is exported for the + // number of requests received over the interval of time t_0 to + // t_0+1 with a value of 3. + // 6. A request is received, the system measures 1 request. + // 7. A request is received, the system measures 1 request. + // 8. The 1 second collection cycle ends. A metric is exported for the + // number of requests received over the interval of time t_0+1 to + // t_0+2 with a value of 2. + AGGREGATION_TEMPORALITY_DELTA = 1; + + // CUMULATIVE is an AggregationTemporality for a metric aggregator which + // reports changes since a fixed start time. This means that current values + // of a CUMULATIVE metric depend on all previous measurements since the + // start time. Because of this, the sender is required to retain this state + // in some form. If this state is lost or invalidated, the CUMULATIVE metric + // values MUST be reset and a new fixed start time following the last + // reported measurement time sent MUST be used. + // + // For example, consider a system measuring the number of requests that + // it receives and reports the sum of these requests every second as a + // CUMULATIVE metric: + // + // 1. The system starts receiving at time=t_0. + // 2. A request is received, the system measures 1 request. + // 3. A request is received, the system measures 1 request. + // 4. A request is received, the system measures 1 request. + // 5. The 1 second collection cycle ends. A metric is exported for the + // number of requests received over the interval of time t_0 to + // t_0+1 with a value of 3. + // 6. A request is received, the system measures 1 request. + // 7. A request is received, the system measures 1 request. + // 8. The 1 second collection cycle ends. A metric is exported for the + // number of requests received over the interval of time t_0 to + // t_0+2 with a value of 5. + // 9. The system experiences a fault and loses state. + // 10. The system recovers and resumes receiving at time=t_1. + // 11. A request is received, the system measures 1 request. + // 12. The 1 second collection cycle ends. A metric is exported for the + // number of requests received over the interval of time t_1 to + // t_0+1 with a value of 1. + // + // Note: Even though, when reporting changes since last report time, using + // CUMULATIVE is valid, it is not recommended. This may cause problems for + // systems that do not use start_time to determine when the aggregation + // value was reset (e.g. Prometheus). + AGGREGATION_TEMPORALITY_CUMULATIVE = 2; +} + +// DataPointFlags is defined as a protobuf 'uint32' type and is to be used as a +// bit-field representing 32 distinct boolean flags. Each flag defined in this +// enum is a bit-mask. To test the presence of a single flag in the flags of +// a data point, for example, use an expression like: +// +// (point.flags & DATA_POINT_FLAGS_NO_RECORDED_VALUE_MASK) == DATA_POINT_FLAGS_NO_RECORDED_VALUE_MASK +// +enum DataPointFlags { + // The zero value for the enum. Should not be used for comparisons. + // Instead use bitwise "and" with the appropriate mask as shown above. + DATA_POINT_FLAGS_DO_NOT_USE = 0; + + // This DataPoint is valid but has no recorded value. This value + // SHOULD be used to reflect explicitly missing data in a series, as + // for an equivalent to the Prometheus "staleness marker". + DATA_POINT_FLAGS_NO_RECORDED_VALUE_MASK = 1; + + // Bits 2-31 are reserved for future use. +} + +// NumberDataPoint is a single data point in a timeseries that describes the +// time-varying scalar value of a metric. +message NumberDataPoint { + reserved 1; + + // The set of key/value pairs that uniquely identify the timeseries from + // where this point belongs. The list may be empty (may contain 0 elements). + // Attribute keys MUST be unique (it is not allowed to have more than one + // attribute with the same key). + // The behavior of software that receives duplicated keys can be unpredictable. + repeated opentelemetry.proto.common.v1.KeyValue attributes = 7; + + // StartTimeUnixNano is optional but strongly encouraged, see the + // the detailed comments above Metric. + // + // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January + // 1970. + fixed64 start_time_unix_nano = 2; + + // TimeUnixNano is required, see the detailed comments above Metric. + // + // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January + // 1970. + fixed64 time_unix_nano = 3; + + // The value itself. A point is considered invalid when one of the recognized + // value fields is not present inside this oneof. + oneof value { + double as_double = 4; + sfixed64 as_int = 6; + } + + // (Optional) List of exemplars collected from + // measurements that were used to form the data point + repeated Exemplar exemplars = 5; + + // Flags that apply to this specific data point. See DataPointFlags + // for the available flags and their meaning. + uint32 flags = 8; +} + +// HistogramDataPoint is a single data point in a timeseries that describes the +// time-varying values of a Histogram. A Histogram contains summary statistics +// for a population of values, it may optionally contain the distribution of +// those values across a set of buckets. +// +// If the histogram contains the distribution of values, then both +// "explicit_bounds" and "bucket counts" fields must be defined. +// If the histogram does not contain the distribution of values, then both +// "explicit_bounds" and "bucket_counts" must be omitted and only "count" and +// "sum" are known. +message HistogramDataPoint { + reserved 1; + + // The set of key/value pairs that uniquely identify the timeseries from + // where this point belongs. The list may be empty (may contain 0 elements). + // Attribute keys MUST be unique (it is not allowed to have more than one + // attribute with the same key). + // The behavior of software that receives duplicated keys can be unpredictable. + repeated opentelemetry.proto.common.v1.KeyValue attributes = 9; + + // StartTimeUnixNano is optional but strongly encouraged, see the + // the detailed comments above Metric. + // + // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January + // 1970. + fixed64 start_time_unix_nano = 2; + + // TimeUnixNano is required, see the detailed comments above Metric. + // + // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January + // 1970. + fixed64 time_unix_nano = 3; + + // count is the number of values in the population. Must be non-negative. This + // value must be equal to the sum of the "count" fields in buckets if a + // histogram is provided. + fixed64 count = 4; + + // sum of the values in the population. If count is zero then this field + // must be zero. + // + // Note: Sum should only be filled out when measuring non-negative discrete + // events, and is assumed to be monotonic over the values of these events. + // Negative events *can* be recorded, but sum should not be filled out when + // doing so. This is specifically to enforce compatibility w/ OpenMetrics, + // see: https://github.com/prometheus/OpenMetrics/blob/v1.0.0/specification/OpenMetrics.md#histogram + optional double sum = 5; + + // bucket_counts is an optional field contains the count values of histogram + // for each bucket. + // + // The sum of the bucket_counts must equal the value in the count field. + // + // The number of elements in bucket_counts array must be by one greater than + // the number of elements in explicit_bounds array. The exception to this rule + // is when the length of bucket_counts is 0, then the length of explicit_bounds + // must also be 0. + repeated fixed64 bucket_counts = 6; + + // explicit_bounds specifies buckets with explicitly defined bounds for values. + // + // The boundaries for bucket at index i are: + // + // (-infinity, explicit_bounds[i]] for i == 0 + // (explicit_bounds[i-1], explicit_bounds[i]] for 0 < i < size(explicit_bounds) + // (explicit_bounds[i-1], +infinity) for i == size(explicit_bounds) + // + // The values in the explicit_bounds array must be strictly increasing. + // + // Histogram buckets are inclusive of their upper boundary, except the last + // bucket where the boundary is at infinity. This format is intentionally + // compatible with the OpenMetrics histogram definition. + // + // If bucket_counts length is 0 then explicit_bounds length must also be 0, + // otherwise the data point is invalid. + repeated double explicit_bounds = 7; + + // (Optional) List of exemplars collected from + // measurements that were used to form the data point + repeated Exemplar exemplars = 8; + + // Flags that apply to this specific data point. See DataPointFlags + // for the available flags and their meaning. + uint32 flags = 10; + + // min is the minimum value over (start_time, end_time]. + optional double min = 11; + + // max is the maximum value over (start_time, end_time]. + optional double max = 12; +} + +// ExponentialHistogramDataPoint is a single data point in a timeseries that describes the +// time-varying values of a ExponentialHistogram of double values. A ExponentialHistogram contains +// summary statistics for a population of values, it may optionally contain the +// distribution of those values across a set of buckets. +// +message ExponentialHistogramDataPoint { + // The set of key/value pairs that uniquely identify the timeseries from + // where this point belongs. The list may be empty (may contain 0 elements). + // Attribute keys MUST be unique (it is not allowed to have more than one + // attribute with the same key). + // The behavior of software that receives duplicated keys can be unpredictable. + repeated opentelemetry.proto.common.v1.KeyValue attributes = 1; + + // StartTimeUnixNano is optional but strongly encouraged, see the + // the detailed comments above Metric. + // + // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January + // 1970. + fixed64 start_time_unix_nano = 2; + + // TimeUnixNano is required, see the detailed comments above Metric. + // + // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January + // 1970. + fixed64 time_unix_nano = 3; + + // The number of values in the population. Must be + // non-negative. This value must be equal to the sum of the "bucket_counts" + // values in the positive and negative Buckets plus the "zero_count" field. + fixed64 count = 4; + + // The sum of the values in the population. If count is zero then this field + // must be zero. + // + // Note: Sum should only be filled out when measuring non-negative discrete + // events, and is assumed to be monotonic over the values of these events. + // Negative events *can* be recorded, but sum should not be filled out when + // doing so. This is specifically to enforce compatibility w/ OpenMetrics, + // see: https://github.com/prometheus/OpenMetrics/blob/v1.0.0/specification/OpenMetrics.md#histogram + optional double sum = 5; + + // scale describes the resolution of the histogram. Boundaries are + // located at powers of the base, where: + // + // base = (2^(2^-scale)) + // + // The histogram bucket identified by `index`, a signed integer, + // contains values that are greater than (base^index) and + // less than or equal to (base^(index+1)). + // + // The positive and negative ranges of the histogram are expressed + // separately. Negative values are mapped by their absolute value + // into the negative range using the same scale as the positive range. + // + // scale is not restricted by the protocol, as the permissible + // values depend on the range of the data. + sint32 scale = 6; + + // The count of values that are either exactly zero or + // within the region considered zero by the instrumentation at the + // tolerated degree of precision. This bucket stores values that + // cannot be expressed using the standard exponential formula as + // well as values that have been rounded to zero. + // + // Implementations MAY consider the zero bucket to have probability + // mass equal to (zero_count / count). + fixed64 zero_count = 7; + + // positive carries the positive range of exponential bucket counts. + Buckets positive = 8; + + // negative carries the negative range of exponential bucket counts. + Buckets negative = 9; + + // Buckets are a set of bucket counts, encoded in a contiguous array + // of counts. + message Buckets { + // The bucket index of the first entry in the bucket_counts array. + // + // Note: This uses a varint encoding as a simple form of compression. + sint32 offset = 1; + + // An array of count values, where bucket_counts[i] carries + // the count of the bucket at index (offset+i). bucket_counts[i] is the count + // of values greater than base^(offset+i) and less than or equal to + // base^(offset+i+1). + // + // Note: By contrast, the explicit HistogramDataPoint uses + // fixed64. This field is expected to have many buckets, + // especially zeros, so uint64 has been selected to ensure + // varint encoding. + repeated uint64 bucket_counts = 2; + } + + // Flags that apply to this specific data point. See DataPointFlags + // for the available flags and their meaning. + uint32 flags = 10; + + // (Optional) List of exemplars collected from + // measurements that were used to form the data point + repeated Exemplar exemplars = 11; + + // The minimum value over (start_time, end_time]. + optional double min = 12; + + // The maximum value over (start_time, end_time]. + optional double max = 13; + + // ZeroThreshold may be optionally set to convey the width of the zero + // region. Where the zero region is defined as the closed interval + // [-ZeroThreshold, ZeroThreshold]. + // When ZeroThreshold is 0, zero count bucket stores values that cannot be + // expressed using the standard exponential formula as well as values that + // have been rounded to zero. + double zero_threshold = 14; +} + +// SummaryDataPoint is a single data point in a timeseries that describes the +// time-varying values of a Summary metric. The count and sum fields represent +// cumulative values. +message SummaryDataPoint { + reserved 1; + + // The set of key/value pairs that uniquely identify the timeseries from + // where this point belongs. The list may be empty (may contain 0 elements). + // Attribute keys MUST be unique (it is not allowed to have more than one + // attribute with the same key). + // The behavior of software that receives duplicated keys can be unpredictable. + repeated opentelemetry.proto.common.v1.KeyValue attributes = 7; + + // StartTimeUnixNano is optional but strongly encouraged, see the + // the detailed comments above Metric. + // + // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January + // 1970. + fixed64 start_time_unix_nano = 2; + + // TimeUnixNano is required, see the detailed comments above Metric. + // + // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January + // 1970. + fixed64 time_unix_nano = 3; + + // count is the number of values in the population. Must be non-negative. + fixed64 count = 4; + + // sum of the values in the population. If count is zero then this field + // must be zero. + // + // Note: Sum should only be filled out when measuring non-negative discrete + // events, and is assumed to be monotonic over the values of these events. + // Negative events *can* be recorded, but sum should not be filled out when + // doing so. This is specifically to enforce compatibility w/ OpenMetrics, + // see: https://github.com/prometheus/OpenMetrics/blob/v1.0.0/specification/OpenMetrics.md#summary + double sum = 5; + + // Represents the value at a given quantile of a distribution. + // + // To record Min and Max values following conventions are used: + // - The 1.0 quantile is equivalent to the maximum value observed. + // - The 0.0 quantile is equivalent to the minimum value observed. + // + // See the following issue for more context: + // https://github.com/open-telemetry/opentelemetry-proto/issues/125 + message ValueAtQuantile { + // The quantile of a distribution. Must be in the interval + // [0.0, 1.0]. + double quantile = 1; + + // The value at the given quantile of a distribution. + // + // Quantile values must NOT be negative. + double value = 2; + } + + // (Optional) list of values at different quantiles of the distribution calculated + // from the current snapshot. The quantiles must be strictly increasing. + repeated ValueAtQuantile quantile_values = 6; + + // Flags that apply to this specific data point. See DataPointFlags + // for the available flags and their meaning. + uint32 flags = 8; +} + +// A representation of an exemplar, which is a sample input measurement. +// Exemplars also hold information about the environment when the measurement +// was recorded, for example the span and trace ID of the active span when the +// exemplar was recorded. +message Exemplar { + reserved 1; + + // The set of key/value pairs that were filtered out by the aggregator, but + // recorded alongside the original measurement. Only key/value pairs that were + // filtered out by the aggregator should be included + repeated opentelemetry.proto.common.v1.KeyValue filtered_attributes = 7; + + // time_unix_nano is the exact time when this exemplar was recorded + // + // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January + // 1970. + fixed64 time_unix_nano = 2; + + // The value of the measurement that was recorded. An exemplar is + // considered invalid when one of the recognized value fields is not present + // inside this oneof. + oneof value { + double as_double = 3; + sfixed64 as_int = 6; + } + + // (Optional) Span ID of the exemplar trace. + // span_id may be missing if the measurement is not recorded inside a trace + // or if the trace is not sampled. + bytes span_id = 4; + + // (Optional) Trace ID of the exemplar trace. + // trace_id may be missing if the measurement is not recorded inside a trace + // or if the trace is not sampled. + bytes trace_id = 5; +}