Skip to content

Commit 632ec88

Browse files
Introduced trace post-processing (#6800)
* Introduced trace post-processing * setRequiresPostProcessing fully moved to DDSpanContext * Removed Thread creation and added timeout boolean supplier * Removed option to disable post-processing * TracePostProcessor replaced with SpanPostProcessor * Update dd-trace-core/src/main/java/datadog/trace/core/postprocessor/SpanPostProcessor.java Co-authored-by: Stuart McCulloch <[email protected]> * Minor improvements for pr-review * Spotless * Default trace post-processing timeout value * Added test * Catch any exception, if happens during span post-processing --------- Co-authored-by: Stuart McCulloch <[email protected]>
1 parent 90a6e41 commit 632ec88

File tree

9 files changed

+198
-16
lines changed

9 files changed

+198
-16
lines changed

dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,8 @@ public final class ConfigDefaults {
215215

216216
static final float DEFAULT_TRACE_FLUSH_INTERVAL = 1;
217217

218+
static final long DEFAULT_TRACE_POST_PROCESSING_TIMEOUT = 1000; // 1 second
219+
218220
static final boolean DEFAULT_COUCHBASE_INTERNAL_SPANS_ENABLED = true;
219221
static final boolean DEFAULT_ELASTICSEARCH_BODY_ENABLED = false;
220222
static final boolean DEFAULT_ELASTICSEARCH_PARAMS_ENABLED = true;

dd-trace-api/src/main/java/datadog/trace/api/config/TracerConfig.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,5 +133,7 @@ public final class TracerConfig {
133133

134134
public static final String TRACE_FLUSH_INTERVAL = "trace.flush.interval";
135135

136+
public static final String TRACE_POST_PROCESSING_TIMEOUT = "trace.post-processing.timeout";
137+
136138
private TracerConfig() {}
137139
}

dd-trace-core/src/main/java/datadog/trace/common/writer/DDAgentWriter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,8 @@ public DDAgentWriter build() {
154154
null == prioritization ? FAST_LANE : prioritization,
155155
flushIntervalMilliseconds,
156156
TimeUnit.MILLISECONDS,
157-
singleSpanSampler);
157+
singleSpanSampler,
158+
null);
158159

159160
return new DDAgentWriter(traceProcessingWorker, dispatcher, healthMetrics, alwaysFlush);
160161
}

dd-trace-core/src/main/java/datadog/trace/common/writer/DDIntakeWriter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,8 @@ public DDIntakeWriter build() {
122122
prioritization,
123123
flushIntervalMilliseconds,
124124
TimeUnit.MILLISECONDS,
125-
singleSpanSampler);
125+
singleSpanSampler,
126+
null);
126127

127128
return new DDIntakeWriter(
128129
traceProcessingWorker,

dd-trace-core/src/main/java/datadog/trace/common/writer/TraceProcessingWorker.java

Lines changed: 82 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,14 @@
1313
import datadog.trace.common.writer.ddagent.PrioritizationStrategy;
1414
import datadog.trace.core.CoreSpan;
1515
import datadog.trace.core.DDSpan;
16+
import datadog.trace.core.DDSpanContext;
1617
import datadog.trace.core.monitor.HealthMetrics;
18+
import datadog.trace.core.postprocessor.SpanPostProcessor;
19+
import java.util.ArrayList;
1720
import java.util.List;
1821
import java.util.concurrent.CountDownLatch;
1922
import java.util.concurrent.TimeUnit;
23+
import java.util.function.BooleanSupplier;
2024
import org.jctools.queues.MessagePassingQueue;
2125
import org.jctools.queues.MpscBlockingConsumerArrayQueue;
2226
import org.slf4j.Logger;
@@ -50,7 +54,8 @@ public TraceProcessingWorker(
5054
final Prioritization prioritization,
5155
final long flushInterval,
5256
final TimeUnit timeUnit,
53-
final SingleSpanSampler singleSpanSampler) {
57+
final SingleSpanSampler singleSpanSampler,
58+
final SpanPostProcessor spanPostProcessor) {
5459
this.capacity = capacity;
5560
this.primaryQueue = createQueue(capacity);
5661
this.secondaryQueue = createQueue(capacity);
@@ -73,9 +78,21 @@ public TraceProcessingWorker(
7378
this.serializingHandler =
7479
runAsDaemon
7580
? new DaemonTraceSerializingHandler(
76-
primaryQueue, secondaryQueue, healthMetrics, dispatcher, flushInterval, timeUnit)
81+
primaryQueue,
82+
secondaryQueue,
83+
healthMetrics,
84+
dispatcher,
85+
flushInterval,
86+
timeUnit,
87+
spanPostProcessor)
7788
: new NonDaemonTraceSerializingHandler(
78-
primaryQueue, secondaryQueue, healthMetrics, dispatcher, flushInterval, timeUnit);
89+
primaryQueue,
90+
secondaryQueue,
91+
healthMetrics,
92+
dispatcher,
93+
flushInterval,
94+
timeUnit,
95+
spanPostProcessor);
7996
this.serializerThread = newAgentThread(TRACE_PROCESSOR, serializingHandler, runAsDaemon);
8097
}
8198

@@ -134,9 +151,16 @@ public DaemonTraceSerializingHandler(
134151
HealthMetrics healthMetrics,
135152
PayloadDispatcher payloadDispatcher,
136153
long flushInterval,
137-
TimeUnit timeUnit) {
154+
TimeUnit timeUnit,
155+
SpanPostProcessor spanPostProcessor) {
138156
super(
139-
primaryQueue, secondaryQueue, healthMetrics, payloadDispatcher, flushInterval, timeUnit);
157+
primaryQueue,
158+
secondaryQueue,
159+
healthMetrics,
160+
payloadDispatcher,
161+
flushInterval,
162+
timeUnit,
163+
spanPostProcessor);
140164
}
141165

142166
@Override
@@ -169,9 +193,16 @@ public NonDaemonTraceSerializingHandler(
169193
HealthMetrics healthMetrics,
170194
PayloadDispatcher payloadDispatcher,
171195
long flushInterval,
172-
TimeUnit timeUnit) {
196+
TimeUnit timeUnit,
197+
SpanPostProcessor spanPostProcessor) {
173198
super(
174-
primaryQueue, secondaryQueue, healthMetrics, payloadDispatcher, flushInterval, timeUnit);
199+
primaryQueue,
200+
secondaryQueue,
201+
healthMetrics,
202+
payloadDispatcher,
203+
flushInterval,
204+
timeUnit,
205+
spanPostProcessor);
175206
}
176207

177208
@Override
@@ -206,14 +237,16 @@ public abstract static class TraceSerializingHandler implements Runnable {
206237
private final boolean doTimeFlush;
207238
private final PayloadDispatcher payloadDispatcher;
208239
private long lastTicks;
240+
private final SpanPostProcessor spanPostProcessor;
209241

210242
public TraceSerializingHandler(
211243
final MpscBlockingConsumerArrayQueue<Object> primaryQueue,
212244
final MpscBlockingConsumerArrayQueue<Object> secondaryQueue,
213245
final HealthMetrics healthMetrics,
214246
final PayloadDispatcher payloadDispatcher,
215247
final long flushInterval,
216-
final TimeUnit timeUnit) {
248+
final TimeUnit timeUnit,
249+
final SpanPostProcessor spanPostProcessor) {
217250
this.primaryQueue = primaryQueue;
218251
this.secondaryQueue = secondaryQueue;
219252
this.healthMetrics = healthMetrics;
@@ -225,6 +258,7 @@ public TraceSerializingHandler(
225258
} else {
226259
this.ticksRequiredToFlush = Long.MAX_VALUE;
227260
}
261+
this.spanPostProcessor = spanPostProcessor;
228262
}
229263

230264
@SuppressWarnings("unchecked")
@@ -235,6 +269,7 @@ public void onEvent(Object event) {
235269
try {
236270
if (event instanceof List) {
237271
List<DDSpan> trace = (List<DDSpan>) event;
272+
maybeTracePostProcessing(trace);
238273
// TODO populate `_sample_rate` metric in a way that accounts for lost/dropped traces
239274
payloadDispatcher.addTrace(trace);
240275
} else if (event instanceof FlushEvent) {
@@ -295,5 +330,44 @@ private void consumeBatch(MessagePassingQueue<Object> queue) {
295330
protected boolean queuesAreEmpty() {
296331
return primaryQueue.isEmpty() && secondaryQueue.isEmpty();
297332
}
333+
334+
private void maybeTracePostProcessing(List<DDSpan> trace) {
335+
if (trace == null || trace.isEmpty()) {
336+
return;
337+
}
338+
339+
// Filter spans that need post-processing
340+
List<DDSpan> spansToPostProcess = null;
341+
for (DDSpan span : trace) {
342+
DDSpanContext context = span.context();
343+
if (context != null && context.isRequiresPostProcessing()) {
344+
if (spansToPostProcess == null) {
345+
spansToPostProcess = new ArrayList<>();
346+
}
347+
spansToPostProcess.add(span);
348+
}
349+
}
350+
351+
if (spansToPostProcess == null) {
352+
return;
353+
}
354+
355+
try {
356+
long timeout = Config.get().getTracePostProcessingTimeout();
357+
long deadline = System.currentTimeMillis() + timeout;
358+
BooleanSupplier timeoutCheck = () -> System.currentTimeMillis() > deadline;
359+
360+
for (DDSpan span : spansToPostProcess) {
361+
if (!spanPostProcessor.process(span, timeoutCheck)) {
362+
log.debug("Span post-processing interrupted due to timeout.");
363+
break;
364+
}
365+
}
366+
} catch (Throwable e) {
367+
if (log.isDebugEnabled()) {
368+
log.debug("Error while trace post-processing", e);
369+
}
370+
}
371+
}
298372
}
299373
}

dd-trace-core/src/main/java/datadog/trace/core/DDSpanContext.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ public class DDSpanContext
140140
private final boolean injectBaggageAsTags;
141141
private volatile int encodedOperationName;
142142
private volatile int encodedResourceName;
143+
private volatile boolean requiresPostProcessing;
143144

144145
public DDSpanContext(
145146
final DDTraceId traceId,
@@ -944,4 +945,12 @@ private String getTagName(String key) {
944945
// TODO is this decided?
945946
return "_dd." + key + ".json";
946947
}
948+
949+
public void setRequiresPostProcessing(boolean postProcessing) {
950+
this.requiresPostProcessing = postProcessing;
951+
}
952+
953+
public boolean isRequiresPostProcessing() {
954+
return requiresPostProcessing;
955+
}
947956
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package datadog.trace.core.postprocessor;
2+
3+
import datadog.trace.core.DDSpan;
4+
import java.util.function.BooleanSupplier;
5+
6+
/**
7+
* Span Post-processing with a timeout check capability.
8+
*
9+
* <p>Implementations of this interface should carry out post-processing of spans while supporting
10+
* interruption when a specified time limit is exceeded. The method {@code process} must check the
11+
* state of {@code timeoutCheck} while processing span. If {@code timeoutCheck.getAsBoolean()}
12+
* returns {@code true}, processing should be immediately halted, and the method should return
13+
* {@code false}. If post-processing completes successfully before the timeout, the method should
14+
* return {@code true}.
15+
*/
16+
public interface SpanPostProcessor {
17+
/**
18+
* Post-processes a span.
19+
*
20+
* @param span The span to be post-processed.
21+
* @param timeoutCheck A timeout check returning {@code true} if the allotted time has elapsed.
22+
* @return {@code true} if the span was successfully processed; {@code false} in case of a
23+
* timeout.
24+
*/
25+
boolean process(DDSpan span, BooleanSupplier timeoutCheck);
26+
}

dd-trace-core/src/test/groovy/datadog/trace/common/writer/TraceProcessingWorkerTest.groovy

Lines changed: 63 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@ import datadog.trace.common.sampling.SingleSpanSampler
44
import datadog.trace.common.writer.ddagent.PrioritizationStrategy.PublishResult
55
import datadog.trace.core.CoreSpan
66
import datadog.trace.core.DDSpan
7+
import datadog.trace.core.DDSpanContext
8+
import datadog.trace.core.PendingTrace
79
import datadog.trace.core.monitor.HealthMetrics
10+
import datadog.trace.core.postprocessor.SpanPostProcessor
811
import datadog.trace.test.util.DDSpecification
912
import spock.util.concurrent.PollingConditions
1013

@@ -41,6 +44,7 @@ class TraceProcessingWorkerTest extends DDSpecification {
4144
FAST_LANE,
4245
1,
4346
TimeUnit.NANOSECONDS,
47+
null,
4448
null
4549
) // stop heartbeats from being throttled
4650

@@ -67,6 +71,7 @@ class TraceProcessingWorkerTest extends DDSpecification {
6771
FAST_LANE,
6872
1,
6973
TimeUnit.NANOSECONDS,
74+
null,
7075
null
7176
) // stop heartbeats from being throttled
7277
def timeConditions = new PollingConditions(timeout: 1, initialDelay: 1, factor: 1.25)
@@ -92,7 +97,7 @@ class TraceProcessingWorkerTest extends DDSpecification {
9297
false
9398
},
9499
FAST_LANE,
95-
100, TimeUnit.SECONDS, null) // prevent heartbeats from helping the flush happen
100+
100, TimeUnit.SECONDS, null, null) // prevent heartbeats from helping the flush happen
96101

97102
when: "there is pending work it is completed before a flush"
98103
// processing this span will throw an exception, but it should be caught
@@ -131,7 +136,7 @@ class TraceProcessingWorkerTest extends DDSpecification {
131136
throwingDispatcher, {
132137
false
133138
}, FAST_LANE,
134-
100, TimeUnit.SECONDS, null) // prevent heartbeats from helping the flush happen
139+
100, TimeUnit.SECONDS, null, null) // prevent heartbeats from helping the flush happen
135140
worker.start()
136141

137142
when: "a trace is processed but can't be passed on"
@@ -149,6 +154,58 @@ class TraceProcessingWorkerTest extends DDSpecification {
149154
priority << [SAMPLER_DROP, USER_DROP, SAMPLER_KEEP, USER_KEEP, UNSET]
150155
}
151156

157+
def "trace should be post-processed"() {
158+
setup:
159+
AtomicInteger acceptedCount = new AtomicInteger()
160+
PayloadDispatcherImpl countingDispatcher = Mock(PayloadDispatcherImpl)
161+
countingDispatcher.addTrace(_) >> {
162+
acceptedCount.getAndIncrement()
163+
}
164+
HealthMetrics healthMetrics = Mock(HealthMetrics)
165+
166+
// Span 1 - should be post-processed
167+
def span1 = DDSpan.create("test", 0, Mock(DDSpanContext) {
168+
isRequiresPostProcessing() >> true
169+
getTrace() >> Mock(PendingTrace) {
170+
getCurrentTimeNano() >> 0
171+
}
172+
}, [])
173+
def processedSpan1 = false
174+
175+
// Span 2 - should NOT be post-processed
176+
def span2 = DDSpan.create("test", 0, Mock(DDSpanContext) {
177+
isRequiresPostProcessing() >> false
178+
getTrace() >> Mock(PendingTrace) {
179+
getCurrentTimeNano() >> 0
180+
}
181+
}, [])
182+
def processedSpan2 = false
183+
184+
SpanPostProcessor spanPostProcessor = Mock(SpanPostProcessor) {
185+
process(span1, _) >> { processedSpan1 = true }
186+
process(span2, _) >> { processedSpan2 = true }
187+
}
188+
189+
TraceProcessingWorker worker = new TraceProcessingWorker(10, healthMetrics,
190+
countingDispatcher, {
191+
false
192+
}, FAST_LANE, 100, TimeUnit.SECONDS, null, spanPostProcessor)
193+
worker.start()
194+
195+
when: "traces are submitted"
196+
worker.publish(span1, SAMPLER_KEEP, [span1, span2])
197+
worker.publish(span2, SAMPLER_KEEP, [span1, span2])
198+
199+
then: "traces are passed through unless rejected on submission"
200+
conditions.eventually {
201+
assert processedSpan1 == true
202+
assert processedSpan2 == false
203+
}
204+
205+
cleanup:
206+
worker.close()
207+
}
208+
152209
def "traces should be processed"() {
153210
setup:
154211
AtomicInteger acceptedCount = new AtomicInteger()
@@ -160,7 +217,7 @@ class TraceProcessingWorkerTest extends DDSpecification {
160217
TraceProcessingWorker worker = new TraceProcessingWorker(10, healthMetrics,
161218
countingDispatcher, {
162219
false
163-
}, FAST_LANE, 100, TimeUnit.SECONDS, null)
220+
}, FAST_LANE, 100, TimeUnit.SECONDS, null, null)
164221
// prevent heartbeats from helping the flush happen
165222
worker.start()
166223

@@ -211,7 +268,7 @@ class TraceProcessingWorkerTest extends DDSpecification {
211268
TraceProcessingWorker worker = new TraceProcessingWorker(10, healthMetrics,
212269
countingDispatcher, {
213270
false
214-
}, FAST_LANE, 100, TimeUnit.SECONDS, null)
271+
}, FAST_LANE, 100, TimeUnit.SECONDS, null, null)
215272
worker.start()
216273
worker.close()
217274
int queueSize = 0
@@ -248,7 +305,7 @@ class TraceProcessingWorkerTest extends DDSpecification {
248305
return false
249306
}
250307
}
251-
TraceProcessingWorker worker = new TraceProcessingWorker(10, healthMetrics, countingDispatcher, { true }, FAST_LANE, 100, TimeUnit.SECONDS, singleSpanSampler)
308+
TraceProcessingWorker worker = new TraceProcessingWorker(10, healthMetrics, countingDispatcher, { true }, FAST_LANE, 100, TimeUnit.SECONDS, singleSpanSampler, null)
252309
worker.start()
253310

254311
when: "traces are submitted"
@@ -324,7 +381,7 @@ class TraceProcessingWorkerTest extends DDSpecification {
324381
return false
325382
}
326383
}
327-
TraceProcessingWorker worker = new TraceProcessingWorker(10, healthMetrics, countingDispatcher, { false }, FAST_LANE, 100, TimeUnit.SECONDS, singleSpanSampler)
384+
TraceProcessingWorker worker = new TraceProcessingWorker(10, healthMetrics, countingDispatcher, { false }, FAST_LANE, 100, TimeUnit.SECONDS, singleSpanSampler, null)
328385
worker.start()
329386

330387
when: "traces are submitted"

0 commit comments

Comments
 (0)