From 3fa3dba1c21cfed1608d8e0fd461d21a1c74cb78 Mon Sep 17 00:00:00 2001 From: Tiffany Zhao Date: Tue, 20 Feb 2024 21:35:38 +0000 Subject: [PATCH 1/8] clean up logs --- .../domain/gateways/streaming_storage_gateway.py | 2 +- .../gateways/firehose_streaming_storage_gateway.py | 7 ++++--- .../inference/post_inference_hooks.py | 10 +++++++--- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/model-engine/model_engine_server/inference/domain/gateways/streaming_storage_gateway.py b/model-engine/model_engine_server/inference/domain/gateways/streaming_storage_gateway.py index ae4216dd..c757bdf0 100644 --- a/model-engine/model_engine_server/inference/domain/gateways/streaming_storage_gateway.py +++ b/model-engine/model_engine_server/inference/domain/gateways/streaming_storage_gateway.py @@ -8,7 +8,7 @@ class StreamingStorageGateway(ABC): """ @abstractmethod - def put_record(self, stream_name: str, record: Dict[str, Any]) -> None: + def put_record(self, stream_name: str, record: Dict[str, Any]) -> Any: """ Put a record into a streaming storage mechanism. diff --git a/model-engine/model_engine_server/inference/infra/gateways/firehose_streaming_storage_gateway.py b/model-engine/model_engine_server/inference/infra/gateways/firehose_streaming_storage_gateway.py index ab718737..f0b87e65 100644 --- a/model-engine/model_engine_server/inference/infra/gateways/firehose_streaming_storage_gateway.py +++ b/model-engine/model_engine_server/inference/infra/gateways/firehose_streaming_storage_gateway.py @@ -43,7 +43,7 @@ def _get_firehose_client(self): firehose_client = session.client("firehose", region_name=infra_config().default_region) return firehose_client - def put_record(self, stream_name: str, record: Dict[str, Any]) -> None: + def put_record(self, stream_name: str, record: Dict[str, Any]) -> Any: """ Put a record into a Firehose stream. @@ -56,8 +56,9 @@ def put_record(self, stream_name: str, record: Dict[str, Any]) -> None: ) if firehose_response["ResponseMetadata"]["HTTPStatusCode"] != 200: raise StreamPutException( - f"Failed to put record into firehose stream {stream_name}. Record content: {record}" + f"Failed to put record into firehose stream {stream_name}. Response metadata {firehose_response['ResponseMetadata']}." ) logger.info( - f"Logged to firehose stream {stream_name}. Record content: {record}, Record ID: {firehose_response['RecordId']}" + f"Logged to firehose stream {stream_name}. Record ID: {firehose_response['RecordId']} Task ID: {record['RESPONSE_BODY']['task_id']}" ) + return firehose_response diff --git a/model-engine/model_engine_server/inference/post_inference_hooks.py b/model-engine/model_engine_server/inference/post_inference_hooks.py index 6f388acb..05b96875 100644 --- a/model-engine/model_engine_server/inference/post_inference_hooks.py +++ b/model-engine/model_engine_server/inference/post_inference_hooks.py @@ -135,9 +135,13 @@ def handle( logger.warning("No firehose stream name specified. Logging hook will not be executed.") return try: - self._streaming_storage_gateway.put_record(stream_name=stream_name, record=data_record) - except StreamPutException as e: - logger.error(f"Error in logging hook {e}") + firehose_response = self._streaming_storage_gateway.put_record( + stream_name=stream_name, record=data_record + ) + except StreamPutException: + logger.error( + f"Failed to put record into firehose stream {stream_name}. Response metadata {firehose_response['ResponseMetadata']}." + ) class PostInferenceHooksHandler: From 27f742bcb062d769c44169d4759d6e4f4e7b3ce5 Mon Sep 17 00:00:00 2001 From: Tiffany Zhao Date: Tue, 20 Feb 2024 21:54:41 +0000 Subject: [PATCH 2/8] fix test --- .../infra/gateways/test_firehose_streaming_storage_gateway.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/model-engine/tests/unit/infra/gateways/test_firehose_streaming_storage_gateway.py b/model-engine/tests/unit/infra/gateways/test_firehose_streaming_storage_gateway.py index 1cedaef6..68962d83 100644 --- a/model-engine/tests/unit/infra/gateways/test_firehose_streaming_storage_gateway.py +++ b/model-engine/tests/unit/infra/gateways/test_firehose_streaming_storage_gateway.py @@ -17,7 +17,7 @@ def streaming_storage_gateway(): @pytest.fixture def fake_record(): - return {"Data": "fake-data"} + return {"RESPONSE_BODY": {"task_id": "fake-task-id"}} def mock_sts_client(*args, **kwargs): From ea1985b60c5f64b89cfde41f28f9764f6b1e967e Mon Sep 17 00:00:00 2001 From: Tiffany Zhao Date: Tue, 20 Feb 2024 21:59:38 +0000 Subject: [PATCH 3/8] fix typing --- .../inference/domain/gateways/streaming_storage_gateway.py | 2 +- .../infra/gateways/firehose_streaming_storage_gateway.py | 2 +- .../model_engine_server/inference/post_inference_hooks.py | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/model-engine/model_engine_server/inference/domain/gateways/streaming_storage_gateway.py b/model-engine/model_engine_server/inference/domain/gateways/streaming_storage_gateway.py index c757bdf0..add325bc 100644 --- a/model-engine/model_engine_server/inference/domain/gateways/streaming_storage_gateway.py +++ b/model-engine/model_engine_server/inference/domain/gateways/streaming_storage_gateway.py @@ -8,7 +8,7 @@ class StreamingStorageGateway(ABC): """ @abstractmethod - def put_record(self, stream_name: str, record: Dict[str, Any]) -> Any: + def put_record(self, stream_name: str, record: Dict[str, Any]) -> Dict[str, Any]: """ Put a record into a streaming storage mechanism. diff --git a/model-engine/model_engine_server/inference/infra/gateways/firehose_streaming_storage_gateway.py b/model-engine/model_engine_server/inference/infra/gateways/firehose_streaming_storage_gateway.py index f0b87e65..3ecd53cc 100644 --- a/model-engine/model_engine_server/inference/infra/gateways/firehose_streaming_storage_gateway.py +++ b/model-engine/model_engine_server/inference/infra/gateways/firehose_streaming_storage_gateway.py @@ -43,7 +43,7 @@ def _get_firehose_client(self): firehose_client = session.client("firehose", region_name=infra_config().default_region) return firehose_client - def put_record(self, stream_name: str, record: Dict[str, Any]) -> Any: + def put_record(self, stream_name: str, record: Dict[str, Any]) -> Dict[str, Any]: """ Put a record into a Firehose stream. diff --git a/model-engine/model_engine_server/inference/post_inference_hooks.py b/model-engine/model_engine_server/inference/post_inference_hooks.py index 05b96875..77b32448 100644 --- a/model-engine/model_engine_server/inference/post_inference_hooks.py +++ b/model-engine/model_engine_server/inference/post_inference_hooks.py @@ -135,12 +135,12 @@ def handle( logger.warning("No firehose stream name specified. Logging hook will not be executed.") return try: - firehose_response = self._streaming_storage_gateway.put_record( + streaming_storage_response = self._streaming_storage_gateway.put_record( stream_name=stream_name, record=data_record ) except StreamPutException: logger.error( - f"Failed to put record into firehose stream {stream_name}. Response metadata {firehose_response['ResponseMetadata']}." + f"Failed to put record into firehose stream {stream_name}. Response metadata {streaming_storage_response['ResponseMetadata']}." ) From d6b397c49a38c1d805501f8ab22897dcf50afd0a Mon Sep 17 00:00:00 2001 From: Tiffany Zhao Date: Wed, 21 Feb 2024 00:37:02 +0000 Subject: [PATCH 4/8] fix test --- .../test_firehose_streaming_storage_gateway.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/model-engine/tests/unit/infra/gateways/test_firehose_streaming_storage_gateway.py b/model-engine/tests/unit/infra/gateways/test_firehose_streaming_storage_gateway.py index 68962d83..3ae72a6e 100644 --- a/model-engine/tests/unit/infra/gateways/test_firehose_streaming_storage_gateway.py +++ b/model-engine/tests/unit/infra/gateways/test_firehose_streaming_storage_gateway.py @@ -8,6 +8,12 @@ stream_name = "fake-stream" +return_value = { + "RecordId": "fake-record-id", + "Encrypted": False, + "ResponseMetadata": {"HTTPStatusCode": 200}, +} + @pytest.fixture def streaming_storage_gateway(): @@ -34,11 +40,7 @@ def mock_sts_client(*args, **kwargs): def mock_firehose_client(*args, **kwargs): mock_client = mock.Mock() - mock_client.put_record.return_value = { - "RecordId": "fake-record-id", - "Encrypted": False, - "ResponseMetadata": {"HTTPStatusCode": 200}, - } + mock_client.put_record.return_value = return_value return mock_client @@ -76,7 +78,7 @@ def test_firehose_streaming_storage_gateway_put_record(streaming_storage_gateway "model_engine_server.inference.infra.gateways.firehose_streaming_storage_gateway.boto3.Session", mock_session, ): - assert streaming_storage_gateway.put_record(stream_name, fake_record) is None + assert streaming_storage_gateway.put_record(stream_name, fake_record) is return_value def test_firehose_streaming_storage_gateway_put_record_with_exception( From f62e1e80e52a4e4322928215599fe00950d939a3 Mon Sep 17 00:00:00 2001 From: Tiffany Zhao Date: Wed, 21 Feb 2024 00:54:30 +0000 Subject: [PATCH 5/8] fixes --- .../infra/gateways/firehose_streaming_storage_gateway.py | 2 +- .../inference/post_inference_hooks.py | 9 ++++++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/model-engine/model_engine_server/inference/infra/gateways/firehose_streaming_storage_gateway.py b/model-engine/model_engine_server/inference/infra/gateways/firehose_streaming_storage_gateway.py index 3ecd53cc..801178af 100644 --- a/model-engine/model_engine_server/inference/infra/gateways/firehose_streaming_storage_gateway.py +++ b/model-engine/model_engine_server/inference/infra/gateways/firehose_streaming_storage_gateway.py @@ -59,6 +59,6 @@ def put_record(self, stream_name: str, record: Dict[str, Any]) -> Dict[str, Any] f"Failed to put record into firehose stream {stream_name}. Response metadata {firehose_response['ResponseMetadata']}." ) logger.info( - f"Logged to firehose stream {stream_name}. Record ID: {firehose_response['RecordId']} Task ID: {record['RESPONSE_BODY']['task_id']}" + f"Logged to firehose stream {stream_name}. Record ID: {firehose_response['RecordId']}. Task ID: {record['RESPONSE_BODY']['task_id']}" ) return firehose_response diff --git a/model-engine/model_engine_server/inference/post_inference_hooks.py b/model-engine/model_engine_server/inference/post_inference_hooks.py index 77b32448..5c2db138 100644 --- a/model-engine/model_engine_server/inference/post_inference_hooks.py +++ b/model-engine/model_engine_server/inference/post_inference_hooks.py @@ -134,13 +134,16 @@ def handle( if stream_name is None: logger.warning("No firehose stream name specified. Logging hook will not be executed.") return + streaming_storage_response = {} try: - streaming_storage_response = self._streaming_storage_gateway.put_record( - stream_name=stream_name, record=data_record + streaming_storage_response = ( + self._streaming_storage_gateway.put_record( # pragma: no cover + stream_name=stream_name, record=data_record + ) ) except StreamPutException: logger.error( - f"Failed to put record into firehose stream {stream_name}. Response metadata {streaming_storage_response['ResponseMetadata']}." + f"Failed to put record into firehose stream {stream_name}. Response metadata {streaming_storage_response.get('ResponseMetadata')}." ) From 9076db1ff89c5e22f7a76fc6008a815f7092d237 Mon Sep 17 00:00:00 2001 From: Tiffany Zhao Date: Wed, 21 Feb 2024 01:07:34 +0000 Subject: [PATCH 6/8] pragma no cover --- .../model_engine_server/inference/post_inference_hooks.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/model-engine/model_engine_server/inference/post_inference_hooks.py b/model-engine/model_engine_server/inference/post_inference_hooks.py index 5c2db138..cf9aa0ac 100644 --- a/model-engine/model_engine_server/inference/post_inference_hooks.py +++ b/model-engine/model_engine_server/inference/post_inference_hooks.py @@ -136,12 +136,10 @@ def handle( return streaming_storage_response = {} try: - streaming_storage_response = ( - self._streaming_storage_gateway.put_record( # pragma: no cover - stream_name=stream_name, record=data_record - ) + streaming_storage_response = self._streaming_storage_gateway.put_record( + stream_name=stream_name, record=data_record ) - except StreamPutException: + except StreamPutException: # pragma: no cover logger.error( f"Failed to put record into firehose stream {stream_name}. Response metadata {streaming_storage_response.get('ResponseMetadata')}." ) From 66142b92caed1f7beb7c64374f70d0ebd191525f Mon Sep 17 00:00:00 2001 From: Tiffany Zhao Date: Wed, 21 Feb 2024 01:48:46 +0000 Subject: [PATCH 7/8] pragma no cover --- .../model_engine_server/inference/post_inference_hooks.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/model-engine/model_engine_server/inference/post_inference_hooks.py b/model-engine/model_engine_server/inference/post_inference_hooks.py index cf9aa0ac..2eb38744 100644 --- a/model-engine/model_engine_server/inference/post_inference_hooks.py +++ b/model-engine/model_engine_server/inference/post_inference_hooks.py @@ -135,11 +135,12 @@ def handle( logger.warning("No firehose stream name specified. Logging hook will not be executed.") return streaming_storage_response = {} + # pragma: no cover try: streaming_storage_response = self._streaming_storage_gateway.put_record( stream_name=stream_name, record=data_record ) - except StreamPutException: # pragma: no cover + except StreamPutException: logger.error( f"Failed to put record into firehose stream {stream_name}. Response metadata {streaming_storage_response.get('ResponseMetadata')}." ) From 0240de57aa3ed5a98636b63b636c226a05ac5131 Mon Sep 17 00:00:00 2001 From: Tiffany Zhao Date: Wed, 21 Feb 2024 02:02:44 +0000 Subject: [PATCH 8/8] pragma no cover --- .../inference/post_inference_hooks.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/model-engine/model_engine_server/inference/post_inference_hooks.py b/model-engine/model_engine_server/inference/post_inference_hooks.py index 2eb38744..39f5fcd7 100644 --- a/model-engine/model_engine_server/inference/post_inference_hooks.py +++ b/model-engine/model_engine_server/inference/post_inference_hooks.py @@ -134,14 +134,15 @@ def handle( if stream_name is None: logger.warning("No firehose stream name specified. Logging hook will not be executed.") return - streaming_storage_response = {} - # pragma: no cover + streaming_storage_response = {} # pragma: no cover try: - streaming_storage_response = self._streaming_storage_gateway.put_record( - stream_name=stream_name, record=data_record + streaming_storage_response = ( + self._streaming_storage_gateway.put_record( # pragma: no cover + stream_name=stream_name, record=data_record + ) ) - except StreamPutException: - logger.error( + except StreamPutException: # pragma: no cover + logger.error( # pragma: no cover f"Failed to put record into firehose stream {stream_name}. Response metadata {streaming_storage_response.get('ResponseMetadata')}." )