Skip to content

Commit b4e7a5c

Browse files
authored
Clean up logs for logging hook (#439)
* clean up logs * fix test * fix typing * fix test * fixes * pragma no cover * pragma no cover * pragma no cover
1 parent d88511b commit b4e7a5c

File tree

4 files changed

+24
-14
lines changed

4 files changed

+24
-14
lines changed

model-engine/model_engine_server/inference/domain/gateways/streaming_storage_gateway.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ class StreamingStorageGateway(ABC):
88
"""
99

1010
@abstractmethod
11-
def put_record(self, stream_name: str, record: Dict[str, Any]) -> None:
11+
def put_record(self, stream_name: str, record: Dict[str, Any]) -> Dict[str, Any]:
1212
"""
1313
Put a record into a streaming storage mechanism.
1414

model-engine/model_engine_server/inference/infra/gateways/firehose_streaming_storage_gateway.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ def _get_firehose_client(self):
4343
firehose_client = session.client("firehose", region_name=infra_config().default_region)
4444
return firehose_client
4545

46-
def put_record(self, stream_name: str, record: Dict[str, Any]) -> None:
46+
def put_record(self, stream_name: str, record: Dict[str, Any]) -> Dict[str, Any]:
4747
"""
4848
Put a record into a Firehose stream.
4949
@@ -56,8 +56,9 @@ def put_record(self, stream_name: str, record: Dict[str, Any]) -> None:
5656
)
5757
if firehose_response["ResponseMetadata"]["HTTPStatusCode"] != 200:
5858
raise StreamPutException(
59-
f"Failed to put record into firehose stream {stream_name}. Record content: {record}"
59+
f"Failed to put record into firehose stream {stream_name}. Response metadata {firehose_response['ResponseMetadata']}."
6060
)
6161
logger.info(
62-
f"Logged to firehose stream {stream_name}. Record content: {record}, Record ID: {firehose_response['RecordId']}"
62+
f"Logged to firehose stream {stream_name}. Record ID: {firehose_response['RecordId']}. Task ID: {record['RESPONSE_BODY']['task_id']}"
6363
)
64+
return firehose_response

model-engine/model_engine_server/inference/post_inference_hooks.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -134,10 +134,17 @@ def handle(
134134
if stream_name is None:
135135
logger.warning("No firehose stream name specified. Logging hook will not be executed.")
136136
return
137+
streaming_storage_response = {} # pragma: no cover
137138
try:
138-
self._streaming_storage_gateway.put_record(stream_name=stream_name, record=data_record)
139-
except StreamPutException as e:
140-
logger.error(f"Error in logging hook {e}")
139+
streaming_storage_response = (
140+
self._streaming_storage_gateway.put_record( # pragma: no cover
141+
stream_name=stream_name, record=data_record
142+
)
143+
)
144+
except StreamPutException: # pragma: no cover
145+
logger.error( # pragma: no cover
146+
f"Failed to put record into firehose stream {stream_name}. Response metadata {streaming_storage_response.get('ResponseMetadata')}."
147+
)
141148

142149

143150
class PostInferenceHooksHandler:

model-engine/tests/unit/infra/gateways/test_firehose_streaming_storage_gateway.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,12 @@
88

99
stream_name = "fake-stream"
1010

11+
return_value = {
12+
"RecordId": "fake-record-id",
13+
"Encrypted": False,
14+
"ResponseMetadata": {"HTTPStatusCode": 200},
15+
}
16+
1117

1218
@pytest.fixture
1319
def streaming_storage_gateway():
@@ -17,7 +23,7 @@ def streaming_storage_gateway():
1723

1824
@pytest.fixture
1925
def fake_record():
20-
return {"Data": "fake-data"}
26+
return {"RESPONSE_BODY": {"task_id": "fake-task-id"}}
2127

2228

2329
def mock_sts_client(*args, **kwargs):
@@ -34,11 +40,7 @@ def mock_sts_client(*args, **kwargs):
3440

3541
def mock_firehose_client(*args, **kwargs):
3642
mock_client = mock.Mock()
37-
mock_client.put_record.return_value = {
38-
"RecordId": "fake-record-id",
39-
"Encrypted": False,
40-
"ResponseMetadata": {"HTTPStatusCode": 200},
41-
}
43+
mock_client.put_record.return_value = return_value
4244
return mock_client
4345

4446

@@ -76,7 +78,7 @@ def test_firehose_streaming_storage_gateway_put_record(streaming_storage_gateway
7678
"model_engine_server.inference.infra.gateways.firehose_streaming_storage_gateway.boto3.Session",
7779
mock_session,
7880
):
79-
assert streaming_storage_gateway.put_record(stream_name, fake_record) is None
81+
assert streaming_storage_gateway.put_record(stream_name, fake_record) is return_value
8082

8183

8284
def test_firehose_streaming_storage_gateway_put_record_with_exception(

0 commit comments

Comments
 (0)