Skip to content

Commit 08626e3

Browse files
Added test
1 parent e78c7cb commit 08626e3

File tree

5 files changed

+106
-33
lines changed

5 files changed

+106
-33
lines changed

dd-trace-core/src/main/java/datadog/trace/common/writer/DDAgentWriter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,8 @@ public DDAgentWriter build() {
154154
null == prioritization ? FAST_LANE : prioritization,
155155
flushIntervalMilliseconds,
156156
TimeUnit.MILLISECONDS,
157-
singleSpanSampler);
157+
singleSpanSampler,
158+
null);
158159

159160
return new DDAgentWriter(traceProcessingWorker, dispatcher, healthMetrics, alwaysFlush);
160161
}

dd-trace-core/src/main/java/datadog/trace/common/writer/DDIntakeWriter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,8 @@ public DDIntakeWriter build() {
122122
prioritization,
123123
flushIntervalMilliseconds,
124124
TimeUnit.MILLISECONDS,
125-
singleSpanSampler);
125+
singleSpanSampler,
126+
null);
126127

127128
return new DDIntakeWriter(
128129
traceProcessingWorker,

dd-trace-core/src/main/java/datadog/trace/common/writer/TraceProcessingWorker.java

Lines changed: 39 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import datadog.trace.core.DDSpan;
1616
import datadog.trace.core.DDSpanContext;
1717
import datadog.trace.core.monitor.HealthMetrics;
18-
import datadog.trace.core.postprocessor.AppSecPostProcessor;
1918
import datadog.trace.core.postprocessor.SpanPostProcessor;
2019
import java.util.ArrayList;
2120
import java.util.List;
@@ -55,7 +54,8 @@ public TraceProcessingWorker(
5554
final Prioritization prioritization,
5655
final long flushInterval,
5756
final TimeUnit timeUnit,
58-
final SingleSpanSampler singleSpanSampler) {
57+
final SingleSpanSampler singleSpanSampler,
58+
final SpanPostProcessor spanPostProcessor) {
5959
this.capacity = capacity;
6060
this.primaryQueue = createQueue(capacity);
6161
this.secondaryQueue = createQueue(capacity);
@@ -78,9 +78,21 @@ public TraceProcessingWorker(
7878
this.serializingHandler =
7979
runAsDaemon
8080
? new DaemonTraceSerializingHandler(
81-
primaryQueue, secondaryQueue, healthMetrics, dispatcher, flushInterval, timeUnit)
81+
primaryQueue,
82+
secondaryQueue,
83+
healthMetrics,
84+
dispatcher,
85+
flushInterval,
86+
timeUnit,
87+
spanPostProcessor)
8288
: new NonDaemonTraceSerializingHandler(
83-
primaryQueue, secondaryQueue, healthMetrics, dispatcher, flushInterval, timeUnit);
89+
primaryQueue,
90+
secondaryQueue,
91+
healthMetrics,
92+
dispatcher,
93+
flushInterval,
94+
timeUnit,
95+
spanPostProcessor);
8496
this.serializerThread = newAgentThread(TRACE_PROCESSOR, serializingHandler, runAsDaemon);
8597
}
8698

@@ -139,9 +151,16 @@ public DaemonTraceSerializingHandler(
139151
HealthMetrics healthMetrics,
140152
PayloadDispatcher payloadDispatcher,
141153
long flushInterval,
142-
TimeUnit timeUnit) {
154+
TimeUnit timeUnit,
155+
SpanPostProcessor spanPostProcessor) {
143156
super(
144-
primaryQueue, secondaryQueue, healthMetrics, payloadDispatcher, flushInterval, timeUnit);
157+
primaryQueue,
158+
secondaryQueue,
159+
healthMetrics,
160+
payloadDispatcher,
161+
flushInterval,
162+
timeUnit,
163+
spanPostProcessor);
145164
}
146165

147166
@Override
@@ -174,9 +193,16 @@ public NonDaemonTraceSerializingHandler(
174193
HealthMetrics healthMetrics,
175194
PayloadDispatcher payloadDispatcher,
176195
long flushInterval,
177-
TimeUnit timeUnit) {
196+
TimeUnit timeUnit,
197+
SpanPostProcessor spanPostProcessor) {
178198
super(
179-
primaryQueue, secondaryQueue, healthMetrics, payloadDispatcher, flushInterval, timeUnit);
199+
primaryQueue,
200+
secondaryQueue,
201+
healthMetrics,
202+
payloadDispatcher,
203+
flushInterval,
204+
timeUnit,
205+
spanPostProcessor);
180206
}
181207

182208
@Override
@@ -219,7 +245,8 @@ public TraceSerializingHandler(
219245
final HealthMetrics healthMetrics,
220246
final PayloadDispatcher payloadDispatcher,
221247
final long flushInterval,
222-
final TimeUnit timeUnit) {
248+
final TimeUnit timeUnit,
249+
final SpanPostProcessor spanPostProcessor) {
223250
this.primaryQueue = primaryQueue;
224251
this.secondaryQueue = secondaryQueue;
225252
this.healthMetrics = healthMetrics;
@@ -231,7 +258,7 @@ public TraceSerializingHandler(
231258
} else {
232259
this.ticksRequiredToFlush = Long.MAX_VALUE;
233260
}
234-
this.spanPostProcessor = new AppSecPostProcessor();
261+
this.spanPostProcessor = spanPostProcessor;
235262
}
236263

237264
@SuppressWarnings("unchecked")
@@ -330,8 +357,8 @@ private void maybeTracePostProcessing(List<DDSpan> trace) {
330357
BooleanSupplier timeoutCheck = () -> System.currentTimeMillis() > deadline;
331358

332359
for (DDSpan span : spansToPostProcess) {
333-
if (timeoutCheck.getAsBoolean() || !spanPostProcessor.process(span, timeoutCheck)) {
334-
log.debug("Span post-processing is interrupted due to timeout.");
360+
if (!spanPostProcessor.process(span, timeoutCheck)) {
361+
log.debug("Span post-processing interrupted due to timeout.");
335362
break;
336363
}
337364
}

dd-trace-core/src/main/java/datadog/trace/core/postprocessor/AppSecPostProcessor.java

Lines changed: 0 additions & 13 deletions
This file was deleted.

dd-trace-core/src/test/groovy/datadog/trace/common/writer/TraceProcessingWorkerTest.groovy

Lines changed: 63 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@ import datadog.trace.common.sampling.SingleSpanSampler
44
import datadog.trace.common.writer.ddagent.PrioritizationStrategy.PublishResult
55
import datadog.trace.core.CoreSpan
66
import datadog.trace.core.DDSpan
7+
import datadog.trace.core.DDSpanContext
8+
import datadog.trace.core.PendingTrace
79
import datadog.trace.core.monitor.HealthMetrics
10+
import datadog.trace.core.postprocessor.SpanPostProcessor
811
import datadog.trace.test.util.DDSpecification
912
import spock.util.concurrent.PollingConditions
1013

@@ -41,6 +44,7 @@ class TraceProcessingWorkerTest extends DDSpecification {
4144
FAST_LANE,
4245
1,
4346
TimeUnit.NANOSECONDS,
47+
null,
4448
null
4549
) // stop heartbeats from being throttled
4650

@@ -67,6 +71,7 @@ class TraceProcessingWorkerTest extends DDSpecification {
6771
FAST_LANE,
6872
1,
6973
TimeUnit.NANOSECONDS,
74+
null,
7075
null
7176
) // stop heartbeats from being throttled
7277
def timeConditions = new PollingConditions(timeout: 1, initialDelay: 1, factor: 1.25)
@@ -92,7 +97,7 @@ class TraceProcessingWorkerTest extends DDSpecification {
9297
false
9398
},
9499
FAST_LANE,
95-
100, TimeUnit.SECONDS, null) // prevent heartbeats from helping the flush happen
100+
100, TimeUnit.SECONDS, null, null) // prevent heartbeats from helping the flush happen
96101

97102
when: "there is pending work it is completed before a flush"
98103
// processing this span will throw an exception, but it should be caught
@@ -131,7 +136,7 @@ class TraceProcessingWorkerTest extends DDSpecification {
131136
throwingDispatcher, {
132137
false
133138
}, FAST_LANE,
134-
100, TimeUnit.SECONDS, null) // prevent heartbeats from helping the flush happen
139+
100, TimeUnit.SECONDS, null, null) // prevent heartbeats from helping the flush happen
135140
worker.start()
136141

137142
when: "a trace is processed but can't be passed on"
@@ -149,6 +154,58 @@ class TraceProcessingWorkerTest extends DDSpecification {
149154
priority << [SAMPLER_DROP, USER_DROP, SAMPLER_KEEP, USER_KEEP, UNSET]
150155
}
151156

157+
def "trace should be post-processed"() {
158+
setup:
159+
AtomicInteger acceptedCount = new AtomicInteger()
160+
PayloadDispatcherImpl countingDispatcher = Mock(PayloadDispatcherImpl)
161+
countingDispatcher.addTrace(_) >> {
162+
acceptedCount.getAndIncrement()
163+
}
164+
HealthMetrics healthMetrics = Mock(HealthMetrics)
165+
166+
// Span 1 - should be post-processed
167+
def span1 = DDSpan.create("test", 0, Mock(DDSpanContext) {
168+
isRequiresPostProcessing() >> true
169+
getTrace() >> Mock(PendingTrace) {
170+
getCurrentTimeNano() >> 0
171+
}
172+
}, [])
173+
def processedSpan1 = false
174+
175+
// Span 2 - should NOT be post-processed
176+
def span2 = DDSpan.create("test", 0, Mock(DDSpanContext) {
177+
isRequiresPostProcessing() >> false
178+
getTrace() >> Mock(PendingTrace) {
179+
getCurrentTimeNano() >> 0
180+
}
181+
}, [])
182+
def processedSpan2 = false
183+
184+
SpanPostProcessor spanPostProcessor = Mock(SpanPostProcessor) {
185+
process(span1, _) >> { processedSpan1 = true }
186+
process(span2, _) >> { processedSpan2 = true }
187+
}
188+
189+
TraceProcessingWorker worker = new TraceProcessingWorker(10, healthMetrics,
190+
countingDispatcher, {
191+
false
192+
}, FAST_LANE, 100, TimeUnit.SECONDS, null, spanPostProcessor)
193+
worker.start()
194+
195+
when: "traces are submitted"
196+
worker.publish(span1, SAMPLER_KEEP, [span1, span2])
197+
worker.publish(span2, SAMPLER_KEEP, [span1, span2])
198+
199+
then: "traces are passed through unless rejected on submission"
200+
conditions.eventually {
201+
assert processedSpan1 == true
202+
assert processedSpan2 == false
203+
}
204+
205+
cleanup:
206+
worker.close()
207+
}
208+
152209
def "traces should be processed"() {
153210
setup:
154211
AtomicInteger acceptedCount = new AtomicInteger()
@@ -160,7 +217,7 @@ class TraceProcessingWorkerTest extends DDSpecification {
160217
TraceProcessingWorker worker = new TraceProcessingWorker(10, healthMetrics,
161218
countingDispatcher, {
162219
false
163-
}, FAST_LANE, 100, TimeUnit.SECONDS, null)
220+
}, FAST_LANE, 100, TimeUnit.SECONDS, null, null)
164221
// prevent heartbeats from helping the flush happen
165222
worker.start()
166223

@@ -211,7 +268,7 @@ class TraceProcessingWorkerTest extends DDSpecification {
211268
TraceProcessingWorker worker = new TraceProcessingWorker(10, healthMetrics,
212269
countingDispatcher, {
213270
false
214-
}, FAST_LANE, 100, TimeUnit.SECONDS, null)
271+
}, FAST_LANE, 100, TimeUnit.SECONDS, null, null)
215272
worker.start()
216273
worker.close()
217274
int queueSize = 0
@@ -248,7 +305,7 @@ class TraceProcessingWorkerTest extends DDSpecification {
248305
return false
249306
}
250307
}
251-
TraceProcessingWorker worker = new TraceProcessingWorker(10, healthMetrics, countingDispatcher, { true }, FAST_LANE, 100, TimeUnit.SECONDS, singleSpanSampler)
308+
TraceProcessingWorker worker = new TraceProcessingWorker(10, healthMetrics, countingDispatcher, { true }, FAST_LANE, 100, TimeUnit.SECONDS, singleSpanSampler, null)
252309
worker.start()
253310

254311
when: "traces are submitted"
@@ -324,7 +381,7 @@ class TraceProcessingWorkerTest extends DDSpecification {
324381
return false
325382
}
326383
}
327-
TraceProcessingWorker worker = new TraceProcessingWorker(10, healthMetrics, countingDispatcher, { false }, FAST_LANE, 100, TimeUnit.SECONDS, singleSpanSampler)
384+
TraceProcessingWorker worker = new TraceProcessingWorker(10, healthMetrics, countingDispatcher, { false }, FAST_LANE, 100, TimeUnit.SECONDS, singleSpanSampler, null)
328385
worker.start()
329386

330387
when: "traces are submitted"

0 commit comments

Comments
 (0)