Skip to content
Closed
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 52 additions & 21 deletions datadog_lambda/dsm.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,69 @@
from datadog_lambda import logger
import logging
import json
from datadog_lambda.trigger import EventTypes

logger = logging.getLogger(__name__)

def set_dsm_context(event, event_source):

def set_dsm_context(event, event_source):
if event_source.equals(EventTypes.SQS):
_dsm_set_sqs_context(event)


def _dsm_set_sqs_context(event):
from datadog_lambda.wrapper import format_err_with_traceback
from ddtrace.internal.datastreams import data_streams_processor
from ddtrace.internal.datastreams.processor import DsmPathwayCodec
from ddtrace.internal.datastreams.botocore import (
get_datastreams_context,
calculate_sqs_payload_size,
)

records = event.get("Records")
if records is None:
return
processor = data_streams_processor()

for record in records:
arn = record.get("eventSourceARN", "")
try:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we remove the try / except here. Is there a reason for that (maybe there is and I don't see it).
But we want to make sure our instrumentation never prevents the lambda from being executed, even if there is an issue with the instrumentation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, we should move this inside the try/except.

queue_arn = record.get("eventSourceARN", "")

contextjson = get_datastreams_context(record)
payload_size = calculate_sqs_payload_size(record)
context_json = _get_dsm_context_from_sqs_lambda(record)
if not context_json:
return
_set_dsm_context_for_record(context_json, "sqs", arn)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should you continue instead of return?


ctx = DsmPathwayCodec.decode(contextjson, processor)
ctx.set_checkpoint(
["direction:in", f"topic:{queue_arn}", "type:sqs"],
payload_size=payload_size,
)
except Exception as e:
logger.error(format_err_with_traceback(e))
logger.error(f"Unable to set dsm context: {e}")


def _set_dsm_context_for_record(context_json, type, arn):
from ddtrace.data_streams import set_consume_checkpoint

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to be sure to set a minimum version for the ddtrace dependency. To do that, you'll want to find the first version of ddtrace that includes this set_consume_checkpoint. Then update pyproject.toml with this version.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it! Thanks for letting me know how this is done

carrier_get = _create_carrier_get(context_json)
set_consume_checkpoint(type, arn, carrier_get, manual_checkpoint=False)


def _get_dsm_context_from_sqs_lambda(message):
"""
Lambda-specific message shape for SQS -> Lambda:
- message.messageAttributes._datadog.stringValue
"""
context_json = None
message_attributes = message.get("messageAttributes")
if not message_attributes:
logger.debug("DataStreams skipped lambda message: %r", message)
return None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we're logging debug messages multiple times for the same record.


if "_datadog" not in message_attributes:
logger.debug("DataStreams skipped lambda message: %r", message)
return None

datadog_attr = message_attributes["_datadog"]

if "stringValue" in datadog_attr:
context_json = json.loads(datadog_attr["stringValue"])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should do a type check here to ensure this is a dict.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just want to clarify but we are checking that context_json is a dict right? I think context_json is the only one we need to make sure is a dict, the test you asked me to write also signaled that to me

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should also make sure datadog_attr is a dict.

if not isinstance(context_json, dict):
logger.debug("DataStreams did not handle lambda message: %r", message)
return None
else:
logger.debug("DataStreams did not handle lambda message: %r", message)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend making each of these log lines slightly different. That way when one is encountered, it is easy to find the exact line of code where it was produced. Otherwise, we don't know what the actual issue was.

return context_json


def _create_carrier_get(context_json):
def carrier_get(key):
return context_json.get(key)

return carrier_get
Loading
Loading