Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 61 additions & 20 deletions datadog_lambda/dsm.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,79 @@
import json

from ddtrace.internal.logger import get_logger
from datadog_lambda import logger
from datadog_lambda.trigger import EventTypes

log = get_logger(__name__)

def set_dsm_context(event, event_source):

def set_dsm_context(event, event_source):
if event_source.equals(EventTypes.SQS):
_dsm_set_sqs_context(event)


def _dsm_set_sqs_context(event):
def _dsm_set_context_helper(service_type, arn, payload_size, context_json):
"""
Common helper function for setting DSM context.

Args:
service_type: The service type string (example: sqs', 'sns')
arn: ARN from the record
payload_size: payload size of the record
context_json: Datadog context for the record
"""
from datadog_lambda.wrapper import format_err_with_traceback
from ddtrace.internal.datastreams import data_streams_processor
from ddtrace.internal.datastreams.processor import DsmPathwayCodec
from ddtrace.internal.datastreams.botocore import (
get_datastreams_context,
calculate_sqs_payload_size,
)

processor = data_streams_processor()

try:
ctx = DsmPathwayCodec.decode(context_json, processor)
ctx.set_checkpoint(
["direction:in", f"topic:{arn}", f"type:{service_type}"],
payload_size=payload_size,
)
except Exception as e:
logger.error(format_err_with_traceback(e))


def _dsm_set_sqs_context(event):
from ddtrace.internal.datastreams.botocore import calculate_sqs_payload_size

records = event.get("Records")
if records is None:
return
processor = data_streams_processor()

for record in records:
try:
queue_arn = record.get("eventSourceARN", "")

contextjson = get_datastreams_context(record)
payload_size = calculate_sqs_payload_size(record)

ctx = DsmPathwayCodec.decode(contextjson, processor)
ctx.set_checkpoint(
["direction:in", f"topic:{queue_arn}", "type:sqs"],
payload_size=payload_size,
)
except Exception as e:
logger.error(format_err_with_traceback(e))
arn = record.get("eventSourceARN", "")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we remove the try / except here. Is there a reason for that (maybe there is and I don't see it).
But we want to make sure our instrumentation never prevents the lambda from being executed, even if there is an issue with the instrumentation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, we should move this inside the try/except.

context_json = _get_dsm_context_from_lambda(record)
payload_size = calculate_sqs_payload_size(record, context_json)

_dsm_set_context_helper("sqs", arn, payload_size, context_json)


def _get_dsm_context_from_lambda(message):
"""
Lambda-specific message formats:
- message.messageAttributes._datadog.stringValue (SQS -> lambda)
"""
context_json = None
message_attributes = message.get("messageAttributes")
if not message_attributes:
log.debug("DataStreams skipped lambda message: %r", message)
return None

if "_datadog" not in message_attributes:
log.debug("DataStreams skipped lambda message: %r", message)
return None

datadog_attr = message_attributes["_datadog"]

if "stringValue" in datadog_attr:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should do a type check here to ensure this is a dict.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just want to clarify but we are checking that context_json is a dict right? I think context_json is the only one we need to make sure is a dict, the test you asked me to write also signaled that to me

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should also make sure datadog_attr is a dict.

# SQS -> lambda
Copy link
Contributor

@purple4reina purple4reina Jun 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use the event_type to avoid doing unnecessary work. We should already mostly know the shape of the event. Without doing so, this method is gonna get insanely large.

I would recommend creating a separate _get_dsm_context for each event type.

context_json = json.loads(datadog_attr["stringValue"])
else:
log.debug("DataStreams did not handle lambda message: %r", message)

return context_json
92 changes: 90 additions & 2 deletions tests/test_dsm.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
import unittest
import json
from unittest.mock import patch, MagicMock

from datadog_lambda.dsm import set_dsm_context, _dsm_set_sqs_context
from datadog_lambda.dsm import (
set_dsm_context,
_dsm_set_sqs_context,
_get_dsm_context_from_lambda,
)
from datadog_lambda.trigger import EventTypes, _EventSource


class TestDsmSQSContext(unittest.TestCase):
class TestSetDSMContext(unittest.TestCase):
def setUp(self):
patcher = patch("datadog_lambda.dsm._dsm_set_sqs_context")
self.mock_dsm_set_sqs_context = patcher.start()
Expand Down Expand Up @@ -110,3 +115,86 @@ def test_sqs_multiple_records_process_each_record(self):
self.assertIn(f"topic:{expected_arns[i]}", tags)
self.assertIn("type:sqs", tags)
self.assertEqual(kwargs["payload_size"], 100)


class TestGetDSMContext(unittest.TestCase):
def test_sqs_to_lambda_string_value_format(self):
"""Test format: message.messageAttributes._datadog.stringValue (SQS -> lambda)"""
trace_context = {
"x-datadog-trace-id": "789123456",
"x-datadog-parent-id": "321987654",
"dd-pathway-ctx": "test-pathway-ctx",
}

lambda_record = {
"messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
"receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
"body": "Test message.",
"attributes": {
"ApproximateReceiveCount": "1",
"SentTimestamp": "1545082649183",
"SenderId": "AIDAIENQZJOLO23YVJ4VO",
"ApproximateFirstReceiveTimestamp": "1545082649185",
},
"messageAttributes": {
"_datadog": {
"stringValue": json.dumps(trace_context),
"stringListValues": [],
"binaryListValues": [],
"dataType": "String",
},
"myAttribute": {
"stringValue": "myValue",
"stringListValues": [],
"binaryListValues": [],
"dataType": "String",
},
},
"md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
"awsRegion": "us-east-2",
}

result = _get_dsm_context_from_lambda(lambda_record)

assert result is not None
assert result == trace_context
assert result["x-datadog-trace-id"] == "789123456"
assert result["x-datadog-parent-id"] == "321987654"
assert result["dd-pathway-ctx"] == "test-pathway-ctx"

def test_no_message_attributes(self):
"""Test message without MessageAttributes returns None."""
message = {
"messageId": "test-message-id",
"body": "Test message without attributes",
}

result = _get_dsm_context_from_lambda(message)

assert result is None

def test_no_datadog_attribute(self):
"""Test message with MessageAttributes but no _datadog attribute returns None."""
message = {
"messageId": "test-message-id",
"body": "Test message",
"messageAttributes": {
"customAttribute": {"stringValue": "custom-value", "dataType": "String"}
},
}

result = _get_dsm_context_from_lambda(message)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] newline here

assert result is None

def test_empty_datadog_attribute(self):
"""Test message with empty _datadog attribute returns None."""
message = {
"messageId": "test-message-id",
"messageAttributes": {"_datadog": {}},
}

result = _get_dsm_context_from_lambda(message)

assert result is None
Loading