Skip to content

Commit 3fabdcd

Browse files
Extract Vert.x json body response schemas
1 parent aac9883 commit 3fabdcd

File tree

15 files changed

+305
-3
lines changed

15 files changed

+305
-3
lines changed

dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/AppSecRequestContext.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ public class AppSecRequestContext implements DataBundle, Closeable {
105105
private boolean reqDataPublished;
106106
private boolean rawReqBodyPublished;
107107
private boolean convertedReqBodyPublished;
108+
private boolean responseBodyPublished;
108109
private boolean respDataPublished;
109110
private boolean pathParamsPublished;
110111
private volatile Map<String, String> derivatives;
@@ -502,6 +503,14 @@ public void setConvertedReqBodyPublished(boolean convertedReqBodyPublished) {
502503
this.convertedReqBodyPublished = convertedReqBodyPublished;
503504
}
504505

506+
public boolean isResponseBodyPublished() {
507+
return responseBodyPublished;
508+
}
509+
510+
public void setResponseBodyPublished(final boolean responseBodyPublished) {
511+
this.responseBodyPublished = responseBodyPublished;
512+
}
513+
505514
public boolean isRespDataPublished() {
506515
return respDataPublished;
507516
}

dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/GatewayBridge.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ public class GatewayBridge {
9898
private volatile DataSubscriberInfo initialReqDataSubInfo;
9999
private volatile DataSubscriberInfo rawRequestBodySubInfo;
100100
private volatile DataSubscriberInfo requestBodySubInfo;
101+
private volatile DataSubscriberInfo responseBodySubInfo;
101102
private volatile DataSubscriberInfo pathParamsSubInfo;
102103
private volatile DataSubscriberInfo respDataSubInfo;
103104
private volatile DataSubscriberInfo grpcServerMethodSubInfo;
@@ -137,6 +138,7 @@ public void init() {
137138
subscriptionService.registerCallback(EVENTS.requestMethodUriRaw(), this::onRequestMethodUriRaw);
138139
subscriptionService.registerCallback(EVENTS.requestBodyStart(), this::onRequestBodyStart);
139140
subscriptionService.registerCallback(EVENTS.requestBodyDone(), this::onRequestBodyDone);
141+
subscriptionService.registerCallback(EVENTS.responseBody(), this::onResponseBody);
140142
subscriptionService.registerCallback(
141143
EVENTS.requestClientSocketAddress(), this::onRequestClientSocketAddress);
142144
subscriptionService.registerCallback(
@@ -177,6 +179,7 @@ public void reset() {
177179
initialReqDataSubInfo = null;
178180
rawRequestBodySubInfo = null;
179181
requestBodySubInfo = null;
182+
responseBodySubInfo = null;
180183
pathParamsSubInfo = null;
181184
respDataSubInfo = null;
182185
grpcServerMethodSubInfo = null;
@@ -638,6 +641,40 @@ private Flow<Void> onRequestBodyDone(RequestContext ctx_, StoredBodySupplier sup
638641
}
639642
}
640643

644+
private Flow<Void> onResponseBody(RequestContext ctx_, Object obj) {
645+
AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC);
646+
if (ctx == null) {
647+
return NoopFlow.INSTANCE;
648+
}
649+
650+
if (ctx.isResponseBodyPublished()) {
651+
log.debug(
652+
"Response body already published; will ignore new value of type {}", obj.getClass());
653+
return NoopFlow.INSTANCE;
654+
}
655+
ctx.setResponseBodyPublished(true);
656+
657+
while (true) {
658+
DataSubscriberInfo subInfo = responseBodySubInfo;
659+
if (subInfo == null) {
660+
subInfo = producerService.getDataSubscribers(KnownAddresses.RESPONSE_BODY_OBJECT);
661+
responseBodySubInfo = subInfo;
662+
}
663+
if (subInfo == null || subInfo.isEmpty()) {
664+
return NoopFlow.INSTANCE;
665+
}
666+
// TODO: review schema extraction limits
667+
Object converted = ObjectIntrospection.convert(obj, ctx);
668+
DataBundle bundle = new SingletonDataBundle<>(KnownAddresses.RESPONSE_BODY_OBJECT, converted);
669+
try {
670+
GatewayContext gwCtx = new GatewayContext(false);
671+
return producerService.publishDataEvent(subInfo, ctx, bundle, gwCtx);
672+
} catch (ExpiredSubscriberInfoException e) {
673+
responseBodySubInfo = null;
674+
}
675+
}
676+
}
677+
641678
private Flow<Void> onRequestPathParams(RequestContext ctx_, Map<String, ?> data) {
642679
AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC);
643680
if (ctx == null || ctx.isPathParamsPublished()) {

dd-java-agent/appsec/src/test/groovy/com/datadog/appsec/gateway/GatewayBridgeSpecification.groovy

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ class GatewayBridgeSpecification extends DDSpecification {
9999
BiFunction<RequestContext, StoredBodySupplier, Void> requestBodyStartCB
100100
BiFunction<RequestContext, StoredBodySupplier, Flow<Void>> requestBodyDoneCB
101101
BiFunction<RequestContext, Object, Flow<Void>> requestBodyProcessedCB
102+
BiFunction<RequestContext, Object, Flow<Void>> responseBodyCB
102103
BiFunction<RequestContext, Integer, Flow<Void>> responseStartedCB
103104
TriConsumer<RequestContext, String, String> respHeaderCB
104105
Function<RequestContext, Flow<Void>> respHeadersDoneCB
@@ -463,6 +464,7 @@ class GatewayBridgeSpecification extends DDSpecification {
463464
1 * ig.registerCallback(EVENTS.requestBodyStart(), _) >> { requestBodyStartCB = it[1]; null }
464465
1 * ig.registerCallback(EVENTS.requestBodyDone(), _) >> { requestBodyDoneCB = it[1]; null }
465466
1 * ig.registerCallback(EVENTS.requestBodyProcessed(), _) >> { requestBodyProcessedCB = it[1]; null }
467+
1 * ig.registerCallback(EVENTS.responseBody(), _) >> { responseBodyCB = it[1]; null }
466468
1 * ig.registerCallback(EVENTS.responseStarted(), _) >> { responseStartedCB = it[1]; null }
467469
1 * ig.registerCallback(EVENTS.responseHeader(), _) >> { respHeaderCB = it[1]; null }
468470
1 * ig.registerCallback(EVENTS.responseHeaderDone(), _) >> { respHeadersDoneCB = it[1]; null }
@@ -1340,4 +1342,17 @@ class GatewayBridgeSpecification extends DDSpecification {
13401342
arCtx.getRoute() == route
13411343
}
13421344
1345+
void 'test on response body callback'() {
1346+
when:
1347+
responseBodyCB.apply(ctx, [test: 'this is a test'])
1348+
1349+
then:
1350+
1 * eventDispatcher.getDataSubscribers(KnownAddresses.RESPONSE_BODY_OBJECT) >> nonEmptyDsInfo
1351+
1 * eventDispatcher.publishDataEvent(_, _, _, _) >> {
1352+
final bundle = it[2] as DataBundle
1353+
final body = bundle.get(KnownAddresses.RESPONSE_BODY_OBJECT)
1354+
assert body['test'] == 'this is a test'
1355+
}
1356+
}
1357+
13431358
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package datadog.trace.instrumentation.vertx_4_0.server;
2+
3+
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
4+
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
5+
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
6+
7+
import com.google.auto.service.AutoService;
8+
import datadog.trace.agent.tooling.Instrumenter;
9+
import datadog.trace.agent.tooling.InstrumenterModule;
10+
import datadog.trace.agent.tooling.muzzle.Reference;
11+
import io.vertx.ext.web.impl.RoutingContextImpl;
12+
13+
/**
14+
* @see RoutingContextImpl#getBodyAsJson(int)
15+
* @see RoutingContextImpl#getBodyAsJsonArray(int)
16+
*/
17+
@AutoService(InstrumenterModule.class)
18+
public class RoutingContextInstrumentation extends InstrumenterModule.AppSec
19+
implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {
20+
21+
public RoutingContextInstrumentation() {
22+
super("vertx", "vertx-4.0");
23+
}
24+
25+
@Override
26+
public Reference[] additionalMuzzleReferences() {
27+
return new Reference[] {VertxVersionMatcher.HTTP_1X_SERVER_RESPONSE};
28+
}
29+
30+
@Override
31+
public String instrumentedType() {
32+
return "io.vertx.ext.web.RoutingContext";
33+
}
34+
35+
@Override
36+
public void methodAdvice(MethodTransformer transformer) {
37+
transformer.applyAdvice(
38+
named("json").and(takesArguments(1)).and(takesArgument(0, Object.class)),
39+
packageName + ".RoutingContextJsonResponseAdvice");
40+
}
41+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package datadog.trace.instrumentation.vertx_4_0.server;
2+
3+
import static datadog.trace.api.gateway.Events.EVENTS;
4+
5+
import datadog.appsec.api.blocking.BlockingException;
6+
import datadog.trace.advice.ActiveRequestContext;
7+
import datadog.trace.advice.RequiresRequestContext;
8+
import datadog.trace.api.gateway.BlockResponseFunction;
9+
import datadog.trace.api.gateway.CallbackProvider;
10+
import datadog.trace.api.gateway.Flow;
11+
import datadog.trace.api.gateway.RequestContext;
12+
import datadog.trace.api.gateway.RequestContextSlot;
13+
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
14+
import java.util.function.BiFunction;
15+
import net.bytebuddy.asm.Advice;
16+
17+
@RequiresRequestContext(RequestContextSlot.APPSEC)
18+
class RoutingContextJsonResponseAdvice {
19+
20+
@Advice.OnMethodEnter(suppress = Throwable.class)
21+
static void before(
22+
@Advice.Argument(0) final Object object, @ActiveRequestContext final RequestContext reqCtx) {
23+
24+
if (object == null) {
25+
return;
26+
}
27+
28+
CallbackProvider cbp = AgentTracer.get().getCallbackProvider(RequestContextSlot.APPSEC);
29+
if (cbp == null) {
30+
return;
31+
}
32+
BiFunction<RequestContext, Object, Flow<Void>> callback =
33+
cbp.getCallback(EVENTS.responseBody());
34+
if (callback == null) {
35+
return;
36+
}
37+
38+
Flow<Void> flow = callback.apply(reqCtx, object);
39+
Flow.Action action = flow.getAction();
40+
if (action instanceof Flow.Action.RequestBlockingAction) {
41+
BlockResponseFunction blockResponseFunction = reqCtx.getBlockResponseFunction();
42+
if (blockResponseFunction == null) {
43+
return;
44+
}
45+
Flow.Action.RequestBlockingAction rba = (Flow.Action.RequestBlockingAction) action;
46+
blockResponseFunction.tryCommitBlockingResponse(
47+
reqCtx.getTraceSegment(),
48+
rba.getStatusCode(),
49+
rba.getBlockingContentType(),
50+
rba.getExtraHeaders());
51+
52+
throw new BlockingException("Blocked request (for RoutingContext/json)");
53+
}
54+
}
55+
}

dd-java-agent/instrumentation/vertx-web-4.0/src/test/groovy/server/VertxHttpServerForkedTest.groovy

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,11 @@ class VertxHttpServerForkedTest extends HttpServerTest<Vertx> {
8383
true
8484
}
8585

86+
@Override
87+
boolean testResponseBodyJson() {
88+
true
89+
}
90+
8691
@Override
8792
boolean testBlocking() {
8893
true

dd-java-agent/instrumentation/vertx-web-4.0/src/test/java/server/VertxTestServer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,8 @@ public void start(final Promise<Void> startPromise) {
127127
BODY_JSON,
128128
() -> {
129129
JsonObject json = ctx.getBodyAsJson();
130-
ctx.response().setStatusCode(BODY_JSON.getStatus()).end(json.toString());
130+
ctx.response().setStatusCode(BODY_JSON.getStatus());
131+
ctx.json(json);
131132
}));
132133
router
133134
.route(QUERY_ENCODED_BOTH.getRawPath())

dd-java-agent/instrumentation/vertx-web-5.0/src/test/groovy/server/VertxHttpServerForkedTest.groovy

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,11 @@ class VertxHttpServerForkedTest extends HttpServerTest<Vertx> {
6767
true
6868
}
6969

70+
@Override
71+
boolean testResponseBodyJson() {
72+
true
73+
}
74+
7075
@Override
7176
boolean testBodyUrlencoded() {
7277
true

dd-java-agent/instrumentation/vertx-web-5.0/src/test/java/server/VertxTestServer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,8 @@ public void start(final Promise<Void> startPromise) {
118118
BODY_JSON,
119119
() -> {
120120
JsonObject json = ctx.body().asJsonObject();
121-
ctx.response().setStatusCode(BODY_JSON.getStatus()).end(json.toString());
121+
ctx.response().setStatusCode(BODY_JSON.getStatus());
122+
ctx.json(json);
122123
}));
123124
router
124125
.route(QUERY_ENCODED_BOTH.getRawPath())

dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/base/HttpServerTest.groovy

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ import datadog.trace.bootstrap.instrumentation.api.URIUtils
3939
import datadog.trace.core.DDSpan
4040
import datadog.trace.core.datastreams.StatsGroup
4141
import datadog.trace.test.util.Flaky
42+
import groovy.json.JsonOutput
43+
import groovy.json.JsonSlurper
4244
import groovy.transform.Canonical
4345
import groovy.transform.CompileStatic
4446
import net.bytebuddy.utility.RandomString
@@ -135,6 +137,7 @@ abstract class HttpServerTest<SERVER> extends WithHttpServer<SERVER> {
135137
ss.registerCallback(events.requestBodyStart(), callbacks.requestBodyStartCb)
136138
ss.registerCallback(events.requestBodyDone(), callbacks.requestBodyEndCb)
137139
ss.registerCallback(events.requestBodyProcessed(), callbacks.requestBodyObjectCb)
140+
ss.registerCallback(events.responseBody(), callbacks.responseBodyObjectCb)
138141
ss.registerCallback(events.responseStarted(), callbacks.responseStartedCb)
139142
ss.registerCallback(events.responseHeader(), callbacks.responseHeaderCb)
140143
ss.registerCallback(events.responseHeaderDone(), callbacks.responseHeaderDoneCb)
@@ -335,6 +338,7 @@ abstract class HttpServerTest<SERVER> extends WithHttpServer<SERVER> {
335338
false
336339
}
337340

341+
338342
boolean isRequestBodyNoStreaming() {
339343
// if true, plain text request body tests expect the requestBodyProcessed
340344
// callback to tbe called, not requestBodyStart/requestBodyDone
@@ -353,6 +357,10 @@ abstract class HttpServerTest<SERVER> extends WithHttpServer<SERVER> {
353357
false
354358
}
355359

360+
boolean testResponseBodyJson() {
361+
false
362+
}
363+
356364
boolean testBlocking() {
357365
false
358366
}
@@ -1581,6 +1589,44 @@ abstract class HttpServerTest<SERVER> extends WithHttpServer<SERVER> {
15811589
true | 'text/html;q=0.8, application/json;q=0.9'
15821590
}
15831591

1592+
void 'test instrumentation gateway json response body'() {
1593+
setup:
1594+
assumeTrue(testResponseBodyJson())
1595+
final body = [a: 'x']
1596+
def request = request(
1597+
BODY_JSON, 'POST',
1598+
RequestBody.create(MediaType.get('application/json'), JsonOutput.toJson(body)))
1599+
.header(IG_RESPONSE_BODY_TAG, 'true')
1600+
.build()
1601+
def response = client.newCall(request).execute()
1602+
if (isDataStreamsEnabled()) {
1603+
TEST_DATA_STREAMS_WRITER.waitForGroups(1)
1604+
}
1605+
1606+
expect:
1607+
response.body().charStream().text == BODY_JSON.body
1608+
1609+
when:
1610+
TEST_WRITER.waitForTraces(1)
1611+
def trace = TEST_WRITER.get(0)
1612+
1613+
then:
1614+
!trace.isEmpty()
1615+
def rootSpan = trace.find { it.parentId == 0 }
1616+
assert rootSpan != null
1617+
final responseBody = rootSpan.getTag('response.body') as String
1618+
new JsonSlurper().parseText(responseBody) == body
1619+
1620+
and:
1621+
if (isDataStreamsEnabled()) {
1622+
StatsGroup first = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == 0 }
1623+
verifyAll(first) {
1624+
edgeTags.containsAll(DSM_EDGE_TAGS)
1625+
edgeTags.size() == DSM_EDGE_TAGS.size()
1626+
}
1627+
}
1628+
}
1629+
15841630
@Flaky(value = "https://github.com/DataDog/dd-trace-java/issues/4681", suites = ["GrizzlyAsyncTest", "GrizzlyTest"])
15851631
def 'test blocking of request with json response'() {
15861632
setup:
@@ -2280,6 +2326,7 @@ abstract class HttpServerTest<SERVER> extends WithHttpServer<SERVER> {
22802326
static final String IG_BODY_END_BLOCK_HEADER = "x-block-body-end"
22812327
static final String IG_BODY_CONVERTED_HEADER = "x-block-body-converted"
22822328
static final String IG_ASK_FOR_RESPONSE_HEADER_TAGS_HEADER = "x-include-response-headers-in-tags"
2329+
static final String IG_RESPONSE_BODY_TAG = "x-include-response-body-in-tags"
22832330
static final String IG_PEER_ADDRESS = "ig-peer-address"
22842331
static final String IG_PEER_PORT = "ig-peer-port"
22852332
static final String IG_RESPONSE_STATUS = "ig-response-status"
@@ -2303,6 +2350,7 @@ abstract class HttpServerTest<SERVER> extends WithHttpServer<SERVER> {
23032350
boolean bodyEndBlock
23042351
boolean bodyConvertedBlock
23052352
boolean responseHeadersInTags
2353+
boolean responseBodyTag
23062354
}
23072355

23082356
static final String stringOrEmpty(String string) {
@@ -2356,6 +2404,9 @@ abstract class HttpServerTest<SERVER> extends WithHttpServer<SERVER> {
23562404
if (IG_ASK_FOR_RESPONSE_HEADER_TAGS_HEADER.equalsIgnoreCase(key)) {
23572405
context.responseHeadersInTags = true
23582406
}
2407+
if (IG_RESPONSE_BODY_TAG.equalsIgnoreCase(key)) {
2408+
context.responseBodyTag = true
2409+
}
23592410
} as TriConsumer<RequestContext, String, String>
23602411

23612412
final Function<RequestContext, Flow<Void>> requestHeaderDoneCb =
@@ -2450,6 +2501,22 @@ abstract class HttpServerTest<SERVER> extends WithHttpServer<SERVER> {
24502501
}
24512502
} as BiFunction<RequestContext, Object, Flow<Void>>)
24522503

2504+
final BiFunction<RequestContext, Object, Flow<Void>> responseBodyObjectCb =
2505+
({ RequestContext rqCtxt, Object obj ->
2506+
String body = obj.toString()
2507+
Context context = rqCtxt.getData(RequestContextSlot.APPSEC)
2508+
if (context.responseBodyTag) {
2509+
rqCtxt.traceSegment.setTagTop('response.body', body)
2510+
}
2511+
if (context.responseBlock) {
2512+
new RbaFlow(
2513+
new Flow.Action.RequestBlockingAction(413, BlockingContentType.JSON)
2514+
)
2515+
} else {
2516+
Flow.ResultFlow.empty()
2517+
}
2518+
} as BiFunction<RequestContext, Object, Flow<Void>>)
2519+
24532520
final BiFunction<RequestContext, Integer, Flow<Void>> responseStartedCb =
24542521
({ RequestContext rqCtxt, Integer resultCode ->
24552522
Context context = rqCtxt.getData(RequestContextSlot.APPSEC)

0 commit comments

Comments
 (0)