Skip to content

Migrate context extraction calls to context-first APIs #8368

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ class CompositePropagator implements Propagator {

@Override
public <C> void inject(Context context, C carrier, CarrierSetter<C> setter) {
for (Propagator propagator : this.propagators) {
propagator.inject(context, carrier, setter);
for (int i = this.propagators.length - 1; i >= 0; i--) {
this.propagators[i].inject(context, carrier, setter);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ public static Propagator noop() {
* Creates a composite propagator.
*
* @param propagators the elements that composes the returned propagator.
* @return the composite propagator that will apply the propagators in their given order.
* @return the composite propagator that will apply the propagators in their given order for
* context extraction, and reverse given order for context injection.
*/
public static Propagator composite(Propagator... propagators) {
if (propagators.length == 0) {
Expand Down
4 changes: 4 additions & 0 deletions dd-trace-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ excludedClassesCoverage += [
'datadog.trace.core.TracingConfigPoller.Updater',
// covered with dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/CheckpointerTest.groovy
'datadog.trace.core.datastreams.DefaultDataStreamsMonitoring',
// TODO CorePropagation will be removed during context refactoring
'datadog.trace.core.propagation.CorePropagation',
// TODO DSM propagator will be tested once fully migrated
'datadog.trace.core.datastreams.DataStreamPropagator'
]

addTestSuite('traceAgentTest')
Expand Down
27 changes: 12 additions & 15 deletions dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static datadog.trace.api.DDTags.DJM_ENABLED;
import static datadog.trace.api.DDTags.DSM_ENABLED;
import static datadog.trace.api.DDTags.PROFILING_CONTEXT_ENGINE;
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.DSM_CONCERN;
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.STANDALONE_ASM_CONCERN;
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.TRACING_CONCERN;
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.XRAY_TRACING_CONCERN;
Expand Down Expand Up @@ -32,7 +33,6 @@
import datadog.trace.api.InstrumenterConfig;
import datadog.trace.api.StatsDClient;
import datadog.trace.api.TraceConfig;
import datadog.trace.api.TracePropagationStyle;
import datadog.trace.api.config.GeneralConfig;
import datadog.trace.api.experimental.DataStreamsCheckpointer;
import datadog.trace.api.flare.TracerFlare;
Expand Down Expand Up @@ -79,7 +79,6 @@
import datadog.trace.common.writer.WriterFactory;
import datadog.trace.common.writer.ddintake.DDIntakeTraceInterceptor;
import datadog.trace.context.TraceScope;
import datadog.trace.core.datastreams.DataStreamContextInjector;
import datadog.trace.core.datastreams.DataStreamsMonitoring;
import datadog.trace.core.datastreams.DefaultDataStreamsMonitoring;
import datadog.trace.core.flare.TracerFlarePoller;
Expand Down Expand Up @@ -715,27 +714,25 @@ private CoreTracer(

sharedCommunicationObjects.whenReady(this.dataStreamsMonitoring::start);

// Create default extractor from config if not provided and decorate it with DSM extractor
HttpCodec.Extractor builtExtractor =
extractor == null ? HttpCodec.createExtractor(config, this::captureTraceConfig) : extractor;
builtExtractor = this.dataStreamsMonitoring.extractor(builtExtractor);
// Create all HTTP injectors plus the DSM one
Map<TracePropagationStyle, HttpCodec.Injector> injectors =
HttpCodec.allInjectorsFor(config, invertMap(baggageMapping));
DataStreamContextInjector dataStreamContextInjector = this.dataStreamsMonitoring.injector();
// Store all propagators to propagation
this.propagation =
new CorePropagation(builtExtractor, injector, injectors, dataStreamContextInjector);
// Store all propagators to propagation -- only DSM injection left
this.propagation = new CorePropagation(this.dataStreamsMonitoring.injector());

// Register context propagators
HttpCodec.Extractor tracingExtractor =
extractor == null ? HttpCodec.createExtractor(config, this::captureTraceConfig) : extractor;
TracingPropagator tracingPropagator = new TracingPropagator(injector, tracingExtractor);
// Check if standalone AppSec is enabled:
// If enabled, use the standalone AppSec propagator by default that will limit tracing concern
// injection and delegate to the tracing propagator if needed,
// If disabled, the most common case, use the usual tracing propagator by default.
boolean standaloneAppSec = config.isAppSecStandaloneEnabled();
boolean dsm = config.isDataStreamsEnabled();
Propagators.register(STANDALONE_ASM_CONCERN, new StandaloneAsmPropagator(), standaloneAppSec);
Propagators.register(
TRACING_CONCERN, new TracingPropagator(injector, extractor), !standaloneAppSec);
Propagators.register(TRACING_CONCERN, tracingPropagator, !standaloneAppSec);
Propagators.register(XRAY_TRACING_CONCERN, new XRayPropagator(config), false);
if (dsm) {
Propagators.register(DSM_CONCERN, this.dataStreamsMonitoring.propagator());
}

this.tagInterceptor =
null == tagInterceptor ? new TagInterceptor(new RuleFlags(config)) : tagInterceptor;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package datadog.trace.core.datastreams;

import datadog.context.Context;
import datadog.context.propagation.CarrierSetter;
import datadog.context.propagation.CarrierVisitor;
import datadog.context.propagation.Propagator;
import datadog.trace.api.TraceConfig;
import datadog.trace.api.time.TimeSource;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
import datadog.trace.bootstrap.instrumentation.api.PathwayContext;
import datadog.trace.bootstrap.instrumentation.api.TagContext;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import javax.annotation.ParametersAreNonnullByDefault;

// TODO Javadoc
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be documented as part of the next PR, which will be dedicated to DSM migration.

@ParametersAreNonnullByDefault
public class DataStreamPropagator implements Propagator {
private final Supplier<TraceConfig> traceConfigSupplier;
private final TimeSource timeSource;
private final long hashOfKnownTags;
private final String serviceNameOverride;

public DataStreamPropagator(
Supplier<TraceConfig> traceConfigSupplier,
TimeSource timeSource,
long hashOfKnownTags,
String serviceNameOverride) {
this.traceConfigSupplier = traceConfigSupplier;
this.timeSource = timeSource;
this.hashOfKnownTags = hashOfKnownTags;
this.serviceNameOverride = serviceNameOverride;
}

@Override
public <C> void inject(Context context, C carrier, CarrierSetter<C> setter) {
// TODO Still in CorePropagation, not migrated yet
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, this will be completed in the next PR

}

@Override
public <C> Context extract(Context context, C carrier, CarrierVisitor<C> visitor) {
// TODO Pathway context needs to be stored into its own context element
// Get span context to store pathway context into
TagContext spanContext = getSpanContextOrNull(context);
PathwayContext pathwayContext;
// Ensure if DSM is enabled and look for pathway context
if (isDsmEnabled(spanContext)
&& (pathwayContext = extractDsmPathwayContext(carrier, visitor)) != null) {
// Store pathway context into span context
if (spanContext == null) {
spanContext = new TagContext();
AgentSpan span = AgentSpan.fromSpanContext(spanContext);
context = Context.root().with(span);
}
spanContext.withPathwayContext(pathwayContext);
}
return context;
}

private TagContext getSpanContextOrNull(Context context) {
AgentSpan extractedSpan = AgentSpan.fromContext(context);
AgentSpanContext extractedSpanContext;
if (extractedSpan != null
&& (extractedSpanContext = extractedSpan.context()) instanceof TagContext) {
return (TagContext) extractedSpanContext;
}
return null;
}

private boolean isDsmEnabled(@Nullable TagContext tagContext) {
TraceConfig traceConfig = tagContext == null ? null : tagContext.getTraceConfig();
if (traceConfig == null) {
traceConfig = this.traceConfigSupplier.get();
}
return traceConfig.isDataStreamsEnabled();
}

private <C> PathwayContext extractDsmPathwayContext(C carrier, CarrierVisitor<C> visitor) {
return DefaultPathwayContext.extract(
carrier, visitor, this.timeSource, this.hashOfKnownTags, this.serviceNameOverride);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package datadog.trace.core.datastreams;

import datadog.context.propagation.Propagator;
import datadog.trace.api.experimental.DataStreamsContextCarrier;
import datadog.trace.bootstrap.instrumentation.api.AgentDataStreamsMonitoring;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
Expand All @@ -10,6 +11,13 @@
public interface DataStreamsMonitoring extends AgentDataStreamsMonitoring, AutoCloseable {
void start();

/**
* Gets the propagator for DSM concern.
*
* @return The propagator for DSM concern.
*/
Propagator propagator();

/**
* Get a context extractor that support {@link PathwayContext} extraction.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import datadog.communication.ddagent.DDAgentFeaturesDiscovery;
import datadog.communication.ddagent.SharedCommunicationObjects;
import datadog.context.propagation.Propagator;
import datadog.trace.api.Config;
import datadog.trace.api.TraceConfig;
import datadog.trace.api.WellKnownTags;
Expand Down Expand Up @@ -200,6 +201,12 @@ public PathwayContext newPathwayContext() {
}
}

@Override
public Propagator propagator() {
return new DataStreamPropagator(
this.traceConfigSupplier, this.timeSource, this.hashOfKnownTags, getThreadServiceName());
}

@Override
public HttpCodec.Extractor extractor(HttpCodec.Extractor delegate) {
return new DataStreamContextExtractor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
import com.datadoghq.sketch.ddsketch.encoding.ByteArrayInput;
import com.datadoghq.sketch.ddsketch.encoding.GrowingByteArrayOutput;
import com.datadoghq.sketch.ddsketch.encoding.VarEncodingHelper;
import datadog.context.propagation.CarrierVisitor;
import datadog.trace.api.Config;
import datadog.trace.api.WellKnownTags;
import datadog.trace.api.time.TimeSource;
import datadog.trace.bootstrap.instrumentation.api.AgentPropagation;
import datadog.trace.bootstrap.instrumentation.api.PathwayContext;
import datadog.trace.bootstrap.instrumentation.api.StatsPoint;
import datadog.trace.util.FNV64Hash;
Expand All @@ -26,6 +26,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -267,7 +268,7 @@ public String toString() {
}
}

private static class PathwayContextExtractor implements AgentPropagation.KeyClassifier {
private static class PathwayContextExtractor implements BiConsumer<String, String> {
private final TimeSource timeSource;
private final long hashOfKnownTags;
private final String serviceNameOverride;
Expand All @@ -281,27 +282,25 @@ private static class PathwayContextExtractor implements AgentPropagation.KeyClas
}

@Override
public boolean accept(String key, String value) {
public void accept(String key, String value) {
if (PROPAGATION_KEY_BASE64.equalsIgnoreCase(key)) {
try {
extractedContext = decode(timeSource, hashOfKnownTags, serviceNameOverride, value);
} catch (IOException e) {
return false;
} catch (IOException ignored) {
}
}
return true;
}
}

static <C> DefaultPathwayContext extract(
C carrier,
AgentPropagation.ContextVisitor<C> getter,
CarrierVisitor<C> getter,
TimeSource timeSource,
long hashOfKnownTags,
String serviceNameOverride) {
PathwayContextExtractor pathwayContextExtractor =
new PathwayContextExtractor(timeSource, hashOfKnownTags, serviceNameOverride);
getter.forEachKey(carrier, pathwayContextExtractor);
getter.forEachKeyValue(carrier, pathwayContextExtractor);
if (pathwayContextExtractor.extractedContext == null) {
log.debug("No context extracted");
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,81 +1,23 @@
package datadog.trace.core.propagation;

import datadog.trace.api.Config;
import datadog.trace.api.TracePropagationStyle;
import datadog.trace.bootstrap.instrumentation.api.AgentPropagation;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
import datadog.trace.core.DDSpanContext;
import datadog.trace.core.datastreams.DataStreamContextInjector;
import java.util.LinkedHashMap;
import java.util.Map;

public class CorePropagation implements AgentPropagation {
private final HttpCodec.Injector injector;
private final Map<TracePropagationStyle, HttpCodec.Injector> injectors;
private final DataStreamContextInjector dataStreamContextInjector;
private final HttpCodec.Extractor extractor;

/**
* Constructor
*
* @param extractor The context extractor.
* @param defaultInjector The default injector when no {@link TracePropagationStyle} given.
* @param injectors All the other injectors available for context injection.
* @param dataStreamContextInjector The DSM context injector, as a specific object until generic
* context injection is available.
*/
public CorePropagation(
HttpCodec.Extractor extractor,
HttpCodec.Injector defaultInjector,
Map<TracePropagationStyle, HttpCodec.Injector> injectors,
DataStreamContextInjector dataStreamContextInjector) {
this.extractor = extractor;
this.injector = defaultInjector;
this.injectors = injectors;
public CorePropagation(DataStreamContextInjector dataStreamContextInjector) {
this.dataStreamContextInjector = dataStreamContextInjector;
}

@Override
public <C> void inject(final AgentSpan span, final C carrier, final Setter<C> setter) {
inject(span.context(), carrier, setter, null);
}

@Override
public <C> void inject(AgentSpanContext context, C carrier, Setter<C> setter) {
inject(context, carrier, setter, null);
}

@Override
public <C> void inject(AgentSpan span, C carrier, Setter<C> setter, TracePropagationStyle style) {
inject(span.context(), carrier, setter, style);
}

private <C> void inject(
AgentSpanContext context, C carrier, Setter<C> setter, TracePropagationStyle style) {
if (!(context instanceof DDSpanContext)) {
return;
}

final DDSpanContext ddSpanContext = (DDSpanContext) context;
ddSpanContext.getTraceCollector().setSamplingPriorityIfNecessary();

/**
* If the experimental appsec standalone feature is enabled and appsec propagation is disabled
* (no ASM events), stop propagation
*/
if (Config.get().isAppSecStandaloneEnabled()
&& !ddSpanContext.getPropagationTags().isAppsecPropagationEnabled()) {
return;
}

if (null == style) {
injector.inject(ddSpanContext, carrier, setter);
} else {
injectors.get(style).inject(ddSpanContext, carrier, setter);
}
}

@Override
public <C> void injectPathwayContext(
AgentSpan span, C carrier, Setter<C> setter, LinkedHashMap<String, String> sortedTags) {
Expand All @@ -100,9 +42,4 @@ public <C> void injectPathwayContextWithoutSendingStats(
this.dataStreamContextInjector.injectPathwayContextWithoutSendingStats(
span, carrier, setter, sortedTags);
}

@Override
public <C> AgentSpanContext.Extracted extract(final C carrier, final ContextVisitor<C> getter) {
return extractor.extract(carrier, getter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import datadog.trace.common.sampling.Sampler
import datadog.trace.common.writer.DDAgentWriter
import datadog.trace.common.writer.ListWriter
import datadog.trace.common.writer.LoggingWriter
import datadog.trace.core.datastreams.DataStreamContextExtractor
import datadog.trace.core.propagation.HttpCodec
import datadog.trace.core.tagprocessor.TagsPostProcessorFactory
import datadog.trace.core.test.DDCoreSpecification
import okhttp3.HttpUrl
Expand Down Expand Up @@ -55,9 +53,6 @@ class CoreTracerTest extends DDCoreSpecification {
tracer.writer instanceof DDAgentWriter
tracer.statsDClient != null && tracer.statsDClient != StatsDClient.NO_OP

tracer.propagate().injector instanceof HttpCodec.CompoundInjector
tracer.propagate().extractor instanceof DataStreamContextExtractor

cleanup:
tracer.close()
}
Expand Down
Loading
Loading