Skip to content

feat: Support Exemplar #3997

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jul 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.google.cloud.spanner;

import static com.google.cloud.spanner.XGoogSpannerRequestId.REQUEST_ID;

import com.google.api.core.InternalApi;
import com.google.api.gax.tracing.OpenTelemetryMetricsRecorder;
import com.google.common.collect.ImmutableList;
Expand All @@ -26,7 +28,9 @@
import io.opentelemetry.sdk.metrics.InstrumentSelector;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.View;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -94,6 +98,8 @@ public class BuiltInMetricsConstant {
AttributeKey.stringKey("directpath_enabled");
public static final AttributeKey<String> DIRECT_PATH_USED_KEY =
AttributeKey.stringKey("directpath_used");
public static final AttributeKey<String> REQUEST_ID_KEY = AttributeKey.stringKey(REQUEST_ID);
public static Set<String> ALLOWED_EXEMPLARS_ATTRIBUTES = new HashSet<>(Arrays.asList(REQUEST_ID));

// IP address prefixes allocated for DirectPath backends.
public static final String DP_IPV6_PREFIX = "2001:4860:8040";
Expand Down Expand Up @@ -168,6 +174,7 @@ static Map<InstrumentSelector, View> getAllViews() {
Aggregation.sum(),
InstrumentType.COUNTER,
"1");
defineSpannerView(views);
defineGRPCView(views);
return views.build();
}
Expand Down Expand Up @@ -200,6 +207,19 @@ private static void defineView(
viewMap.put(selector, view);
}

private static void defineSpannerView(ImmutableMap.Builder<InstrumentSelector, View> viewMap) {
InstrumentSelector selector =
InstrumentSelector.builder()
.setMeterName(BuiltInMetricsConstant.SPANNER_METER_NAME)
.build();
Set<String> attributesFilter =
BuiltInMetricsConstant.COMMON_ATTRIBUTES.stream()
.map(AttributeKey::getKey)
.collect(Collectors.toSet());
View view = View.builder().setAttributeFilter(attributesFilter).build();
viewMap.put(selector, view);
}

private static void defineGRPCView(ImmutableMap.Builder<InstrumentSelector, View> viewMap) {
for (String metric : BuiltInMetricsConstant.GRPC_METRICS_TO_ENABLE) {
InstrumentSelector selector =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,21 @@ class BuiltInMetricsTracer extends MetricsTracer implements ApiTracer {
private final Map<String, String> attributes = new HashMap<>();
private Float gfeLatency = null;
private Float afeLatency = null;
private TraceWrapper traceWrapper;
private long gfeHeaderMissingCount = 0;
private long afeHeaderMissingCount = 0;
private final ISpan currentSpan;

BuiltInMetricsTracer(
MethodName methodName, BuiltInMetricsRecorder builtInOpenTelemetryMetricsRecorder) {
MethodName methodName,
BuiltInMetricsRecorder builtInOpenTelemetryMetricsRecorder,
TraceWrapper traceWrapper,
ISpan currentSpan) {
super(methodName, builtInOpenTelemetryMetricsRecorder);
this.builtInOpenTelemetryMetricsRecorder = builtInOpenTelemetryMetricsRecorder;
this.attributes.put(METHOD_ATTRIBUTE, methodName.toString());
this.traceWrapper = traceWrapper;
this.currentSpan = currentSpan;
}

/**
Expand All @@ -55,10 +62,12 @@ class BuiltInMetricsTracer extends MetricsTracer implements ApiTracer {
*/
@Override
public void attemptSucceeded() {
super.attemptSucceeded();
attributes.put(STATUS_ATTRIBUTE, StatusCode.Code.OK.toString());
builtInOpenTelemetryMetricsRecorder.recordServerTimingHeaderMetrics(
gfeLatency, afeLatency, gfeHeaderMissingCount, afeHeaderMissingCount, attributes);
try (IScope s = this.traceWrapper.withSpan(this.currentSpan)) {
super.attemptSucceeded();
attributes.put(STATUS_ATTRIBUTE, StatusCode.Code.OK.toString());
builtInOpenTelemetryMetricsRecorder.recordServerTimingHeaderMetrics(
gfeLatency, afeLatency, gfeHeaderMissingCount, afeHeaderMissingCount, attributes);
}
}

/**
Expand All @@ -67,10 +76,12 @@ public void attemptSucceeded() {
*/
@Override
public void attemptCancelled() {
super.attemptCancelled();
attributes.put(STATUS_ATTRIBUTE, StatusCode.Code.CANCELLED.toString());
builtInOpenTelemetryMetricsRecorder.recordServerTimingHeaderMetrics(
gfeLatency, afeLatency, gfeHeaderMissingCount, afeHeaderMissingCount, attributes);
try (IScope s = this.traceWrapper.withSpan(this.currentSpan)) {
super.attemptCancelled();
attributes.put(STATUS_ATTRIBUTE, StatusCode.Code.CANCELLED.toString());
builtInOpenTelemetryMetricsRecorder.recordServerTimingHeaderMetrics(
gfeLatency, afeLatency, gfeHeaderMissingCount, afeHeaderMissingCount, attributes);
}
}

/**
Expand All @@ -83,10 +94,12 @@ public void attemptCancelled() {
*/
@Override
public void attemptFailedDuration(Throwable error, java.time.Duration delay) {
super.attemptFailedDuration(error, delay);
attributes.put(STATUS_ATTRIBUTE, extractStatus(error));
builtInOpenTelemetryMetricsRecorder.recordServerTimingHeaderMetrics(
gfeLatency, afeLatency, gfeHeaderMissingCount, afeHeaderMissingCount, attributes);
try (IScope s = this.traceWrapper.withSpan(this.currentSpan)) {
super.attemptFailedDuration(error, delay);
attributes.put(STATUS_ATTRIBUTE, extractStatus(error));
builtInOpenTelemetryMetricsRecorder.recordServerTimingHeaderMetrics(
gfeLatency, afeLatency, gfeHeaderMissingCount, afeHeaderMissingCount, attributes);
}
}

/**
Expand All @@ -98,10 +111,12 @@ public void attemptFailedDuration(Throwable error, java.time.Duration delay) {
*/
@Override
public void attemptFailedRetriesExhausted(Throwable error) {
super.attemptFailedRetriesExhausted(error);
attributes.put(STATUS_ATTRIBUTE, extractStatus(error));
builtInOpenTelemetryMetricsRecorder.recordServerTimingHeaderMetrics(
gfeLatency, afeLatency, gfeHeaderMissingCount, afeHeaderMissingCount, attributes);
try (IScope s = this.traceWrapper.withSpan(this.currentSpan)) {
super.attemptFailedRetriesExhausted(error);
attributes.put(STATUS_ATTRIBUTE, extractStatus(error));
builtInOpenTelemetryMetricsRecorder.recordServerTimingHeaderMetrics(
gfeLatency, afeLatency, gfeHeaderMissingCount, afeHeaderMissingCount, attributes);
}
}

/**
Expand All @@ -113,10 +128,12 @@ public void attemptFailedRetriesExhausted(Throwable error) {
*/
@Override
public void attemptPermanentFailure(Throwable error) {
super.attemptPermanentFailure(error);
attributes.put(STATUS_ATTRIBUTE, extractStatus(error));
builtInOpenTelemetryMetricsRecorder.recordServerTimingHeaderMetrics(
gfeLatency, afeLatency, gfeHeaderMissingCount, afeHeaderMissingCount, attributes);
try (IScope s = this.traceWrapper.withSpan(this.currentSpan)) {
super.attemptPermanentFailure(error);
attributes.put(STATUS_ATTRIBUTE, extractStatus(error));
builtInOpenTelemetryMetricsRecorder.recordServerTimingHeaderMetrics(
gfeLatency, afeLatency, gfeHeaderMissingCount, afeHeaderMissingCount, attributes);
}
}

void recordGFELatency(Float gfeLatency) {
Expand All @@ -140,7 +157,6 @@ public void addAttributes(Map<String, String> attributes) {
super.addAttributes(attributes);
this.attributes.putAll(attributes);
}
;

@Override
public void addAttributes(String key, String value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,24 +38,31 @@ class BuiltInMetricsTracerFactory extends MetricsTracerFactory {

protected BuiltInMetricsRecorder builtInMetricsRecorder;
private final Map<String, String> attributes;
private final TraceWrapper traceWrapper;

/**
* Pass in a Map of client level attributes which will be added to every single MetricsTracer
* created from the ApiTracerFactory.
*/
public BuiltInMetricsTracerFactory(
BuiltInMetricsRecorder builtInMetricsRecorder, Map<String, String> attributes) {
BuiltInMetricsRecorder builtInMetricsRecorder,
Map<String, String> attributes,
TraceWrapper traceWrapper) {
super(builtInMetricsRecorder, attributes);
this.builtInMetricsRecorder = builtInMetricsRecorder;
this.attributes = ImmutableMap.copyOf(attributes);
this.traceWrapper = traceWrapper;
}

@Override
public ApiTracer newTracer(ApiTracer parent, SpanName spanName, OperationType operationType) {
ISpan currentSpan = this.traceWrapper.getCurrentSpan();
BuiltInMetricsTracer metricsTracer =
new BuiltInMetricsTracer(
MethodName.of(spanName.getClientName(), spanName.getMethodName()),
builtInMetricsRecorder);
builtInMetricsRecorder,
this.traceWrapper,
currentSpan);
metricsTracer.addAttributes(attributes);
return metricsTracer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static com.google.api.MetricDescriptor.ValueType.DISTRIBUTION;
import static com.google.api.MetricDescriptor.ValueType.DOUBLE;
import static com.google.api.MetricDescriptor.ValueType.INT64;
import static com.google.cloud.spanner.BuiltInMetricsConstant.ALLOWED_EXEMPLARS_ATTRIBUTES;
import static com.google.cloud.spanner.BuiltInMetricsConstant.GAX_METER_NAME;
import static com.google.cloud.spanner.BuiltInMetricsConstant.GRPC_METER_NAME;
import static com.google.cloud.spanner.BuiltInMetricsConstant.PROJECT_ID_KEY;
Expand Down Expand Up @@ -293,7 +294,13 @@ private static String makeSpanName(String projectId, String traceId, String span

private static DroppedLabels mapFilteredAttributes(Attributes attributes) {
DroppedLabels.Builder labels = DroppedLabels.newBuilder();
attributes.forEach((k, v) -> labels.putLabel(cleanAttributeKey(k.getKey()), v.toString()));
attributes.forEach(
(k, v) -> {
String key = cleanAttributeKey(k.getKey());
if (ALLOWED_EXEMPLARS_ATTRIBUTES.contains(key)) {
labels.putLabel(key, v.toString());
}
});
return labels.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
import io.opencensus.trace.Tracing;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.Attributes;
Expand Down Expand Up @@ -2073,7 +2074,15 @@ private ApiTracerFactory createMetricsApiTracerFactory() {
return openTelemetry != null
? new BuiltInMetricsTracerFactory(
new BuiltInMetricsRecorder(openTelemetry, BuiltInMetricsConstant.METER_NAME),
new HashMap<>())
new HashMap<>(),
new TraceWrapper(
Tracing.getTracer(),
// Using the OpenTelemetry object set in Spanner Options, will be NoOp if not set
this.getOpenTelemetry()
.getTracer(
MetricRegistryConstants.INSTRUMENTATION_SCOPE,
GaxProperties.getLibraryVersion(getClass())),
true))
: null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ public class XGoogSpannerRequestId {
@VisibleForTesting
static final String RAND_PROCESS_ID = XGoogSpannerRequestId.generateRandProcessId();

static String REQUEST_ID = "x-goog-spanner-request-id";
public static final Metadata.Key<String> REQUEST_HEADER_KEY =
Metadata.Key.of("x-goog-spanner-request-id", Metadata.ASCII_STRING_MARSHALLER);
Metadata.Key.of(REQUEST_ID, Metadata.ASCII_STRING_MARSHALLER);

@VisibleForTesting
static final long VERSION = 1; // The version of the specification being implemented.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@
import static com.google.cloud.spanner.spi.v1.SpannerRpcViews.SPANNER_GFE_LATENCY;

import com.google.api.gax.tracing.ApiTracer;
import com.google.cloud.spanner.BuiltInMetricsConstant;
import com.google.cloud.spanner.CompositeTracer;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.SpannerRpcMetrics;
import com.google.cloud.spanner.*;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.spanner.admin.database.v1.DatabaseName;
Expand Down Expand Up @@ -120,6 +117,8 @@ public void start(Listener<RespT> responseListener, Metadata headers) {
getMetricAttributes(key, method.getFullMethodName(), databaseName);
Map<String, String> builtInMetricsAttributes =
getBuiltInMetricAttributes(key, databaseName);
builtInMetricsAttributes.put(
BuiltInMetricsConstant.REQUEST_ID_KEY.getKey(), extractRequestId(headers));
addBuiltInMetricAttributes(compositeTracer, builtInMetricsAttributes);
super.start(
new SimpleForwardingClientCallListener<RespT>(responseListener) {
Expand All @@ -128,6 +127,7 @@ public void onHeaders(Metadata metadata) {
Boolean isDirectPathUsed =
isDirectPathUsed(getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
addDirectPathUsedAttribute(compositeTracer, isDirectPathUsed);

processHeader(
metadata, tagContext, attributes, span, compositeTracer, isDirectPathUsed);
super.onHeaders(metadata);
Expand Down Expand Up @@ -248,6 +248,10 @@ private DatabaseName extractDatabaseName(Metadata headers) throws ExecutionExcep
return UNDEFINED_DATABASE_NAME;
}

private String extractRequestId(Metadata headers) throws ExecutionException {
return headers.get(XGoogSpannerRequestId.REQUEST_HEADER_KEY);
}

private TagContext getTagContext(String key, String method, DatabaseName databaseName)
throws ExecutionException {
return tagsCache.get(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeFalse;

import com.google.api.gax.core.GaxProperties;
import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.tracing.ApiTracerFactory;
Expand All @@ -40,6 +41,7 @@
import io.grpc.Server;
import io.grpc.Status;
import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
import io.opencensus.trace.Tracing;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.OpenTelemetrySdk;
Expand Down Expand Up @@ -96,7 +98,14 @@ public ApiTracerFactory createMetricsTracerFactory() {
OpenTelemetrySdk.builder().setMeterProvider(meterProvider.build()).build();

return new BuiltInMetricsTracerFactory(
new BuiltInMetricsRecorder(openTelemetry, BuiltInMetricsConstant.METER_NAME), attributes);
new BuiltInMetricsRecorder(openTelemetry, BuiltInMetricsConstant.METER_NAME),
attributes,
new TraceWrapper(
Tracing.getTracer(),
openTelemetry.getTracer(
MetricRegistryConstants.INSTRUMENTATION_SCOPE,
GaxProperties.getLibraryVersion(getClass())),
true));
}

@BeforeClass
Expand All @@ -115,6 +124,12 @@ public void clearRequests() throws IOException {
@Override
public void createSpannerInstance() {
SpannerOptions.Builder builder = SpannerOptions.newBuilder();

ApiTracerFactory metricsTracerFactory =
new BuiltInMetricsTracerFactory(
new BuiltInMetricsRecorder(OpenTelemetry.noop(), BuiltInMetricsConstant.METER_NAME),
attributes,
new TraceWrapper(Tracing.getTracer(), OpenTelemetry.noop().getTracer(""), true));
// Set a quick polling algorithm to prevent this from slowing down the test unnecessarily.
builder
.getDatabaseAdminStubSettingsBuilder()
Expand Down
Loading
Loading