13
13
# See the License for the specific language governing permissions and
14
14
# limitations under the License.
15
15
#
16
+ import json
17
+ import logging as std_logging
18
+ import pickle
16
19
import warnings
17
20
from typing import Callable , Dict , Optional , Sequence , Tuple , Union
18
21
21
24
import google .auth # type: ignore
22
25
from google .auth import credentials as ga_credentials # type: ignore
23
26
from google .auth .transport .grpc import SslCredentials # type: ignore
27
+ from google .protobuf .json_format import MessageToJson
28
+ import google .protobuf .message
24
29
25
30
import grpc # type: ignore
31
+ import proto # type: ignore
26
32
27
33
from google .iam .v1 import iam_policy_pb2 # type: ignore
28
34
from google .iam .v1 import policy_pb2 # type: ignore
29
35
from google .protobuf import empty_pb2 # type: ignore
30
36
from google .pubsub_v1 .types import pubsub
31
37
from .base import PublisherTransport , DEFAULT_CLIENT_INFO
32
38
39
+ try :
40
+ from google .api_core import client_logging # type: ignore
41
+
42
+ CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER
43
+ except ImportError : # pragma: NO COVER
44
+ CLIENT_LOGGING_SUPPORTED = False
45
+
46
+ _LOGGER = std_logging .getLogger (__name__ )
47
+
48
+
49
+ class _LoggingClientInterceptor (grpc .UnaryUnaryClientInterceptor ): # pragma: NO COVER
50
+ def intercept_unary_unary (self , continuation , client_call_details , request ):
51
+ logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER .isEnabledFor (
52
+ std_logging .DEBUG
53
+ )
54
+ if logging_enabled : # pragma: NO COVER
55
+ request_metadata = client_call_details .metadata
56
+ if isinstance (request , proto .Message ):
57
+ request_payload = type (request ).to_json (request )
58
+ elif isinstance (request , google .protobuf .message .Message ):
59
+ request_payload = MessageToJson (request )
60
+ else :
61
+ request_payload = f"{ type (request ).__name__ } : { pickle .dumps (request )} "
62
+
63
+ request_metadata = {
64
+ key : value .decode ("utf-8" ) if isinstance (value , bytes ) else value
65
+ for key , value in request_metadata
66
+ }
67
+ grpc_request = {
68
+ "payload" : request_payload ,
69
+ "requestMethod" : "grpc" ,
70
+ "metadata" : dict (request_metadata ),
71
+ }
72
+ _LOGGER .debug (
73
+ f"Sending request for { client_call_details .method } " ,
74
+ extra = {
75
+ "serviceName" : "google.pubsub.v1.Publisher" ,
76
+ "rpcName" : client_call_details .method ,
77
+ "request" : grpc_request ,
78
+ "metadata" : grpc_request ["metadata" ],
79
+ },
80
+ )
81
+
82
+ response = continuation (client_call_details , request )
83
+ if logging_enabled : # pragma: NO COVER
84
+ response_metadata = response .trailing_metadata ()
85
+ # Convert gRPC metadata `<class 'grpc.aio._metadata.Metadata'>` to list of tuples
86
+ metadata = (
87
+ dict ([(k , str (v )) for k , v in response_metadata ])
88
+ if response_metadata
89
+ else None
90
+ )
91
+ result = response .result ()
92
+ if isinstance (result , proto .Message ):
93
+ response_payload = type (result ).to_json (result )
94
+ elif isinstance (result , google .protobuf .message .Message ):
95
+ response_payload = MessageToJson (result )
96
+ else :
97
+ response_payload = f"{ type (result ).__name__ } : { pickle .dumps (result )} "
98
+ grpc_response = {
99
+ "payload" : response_payload ,
100
+ "metadata" : metadata ,
101
+ "status" : "OK" ,
102
+ }
103
+ _LOGGER .debug (
104
+ f"Received response for { client_call_details .method } ." ,
105
+ extra = {
106
+ "serviceName" : "google.pubsub.v1.Publisher" ,
107
+ "rpcName" : client_call_details .method ,
108
+ "response" : grpc_response ,
109
+ "metadata" : grpc_response ["metadata" ],
110
+ },
111
+ )
112
+ return response
113
+
33
114
34
115
class PublisherGrpcTransport (PublisherTransport ):
35
116
"""gRPC backend transport for Publisher.
@@ -186,7 +267,12 @@ def __init__(
186
267
],
187
268
)
188
269
189
- # Wrap messages. This must be done after self._grpc_channel exists
270
+ self ._interceptor = _LoggingClientInterceptor ()
271
+ self ._logged_channel = grpc .intercept_channel (
272
+ self ._grpc_channel , self ._interceptor
273
+ )
274
+
275
+ # Wrap messages. This must be done after self._logged_channel exists
190
276
self ._prep_wrapped_messages (client_info )
191
277
192
278
@classmethod
@@ -260,7 +346,7 @@ def create_topic(self) -> Callable[[pubsub.Topic], pubsub.Topic]:
260
346
# gRPC handles serialization and deserialization, so we just need
261
347
# to pass in the functions for each.
262
348
if "create_topic" not in self ._stubs :
263
- self ._stubs ["create_topic" ] = self .grpc_channel .unary_unary (
349
+ self ._stubs ["create_topic" ] = self ._logged_channel .unary_unary (
264
350
"/google.pubsub.v1.Publisher/CreateTopic" ,
265
351
request_serializer = pubsub .Topic .serialize ,
266
352
response_deserializer = pubsub .Topic .deserialize ,
@@ -286,7 +372,7 @@ def update_topic(self) -> Callable[[pubsub.UpdateTopicRequest], pubsub.Topic]:
286
372
# gRPC handles serialization and deserialization, so we just need
287
373
# to pass in the functions for each.
288
374
if "update_topic" not in self ._stubs :
289
- self ._stubs ["update_topic" ] = self .grpc_channel .unary_unary (
375
+ self ._stubs ["update_topic" ] = self ._logged_channel .unary_unary (
290
376
"/google.pubsub.v1.Publisher/UpdateTopic" ,
291
377
request_serializer = pubsub .UpdateTopicRequest .serialize ,
292
378
response_deserializer = pubsub .Topic .deserialize ,
@@ -311,7 +397,7 @@ def publish(self) -> Callable[[pubsub.PublishRequest], pubsub.PublishResponse]:
311
397
# gRPC handles serialization and deserialization, so we just need
312
398
# to pass in the functions for each.
313
399
if "publish" not in self ._stubs :
314
- self ._stubs ["publish" ] = self .grpc_channel .unary_unary (
400
+ self ._stubs ["publish" ] = self ._logged_channel .unary_unary (
315
401
"/google.pubsub.v1.Publisher/Publish" ,
316
402
request_serializer = pubsub .PublishRequest .serialize ,
317
403
response_deserializer = pubsub .PublishResponse .deserialize ,
@@ -335,7 +421,7 @@ def get_topic(self) -> Callable[[pubsub.GetTopicRequest], pubsub.Topic]:
335
421
# gRPC handles serialization and deserialization, so we just need
336
422
# to pass in the functions for each.
337
423
if "get_topic" not in self ._stubs :
338
- self ._stubs ["get_topic" ] = self .grpc_channel .unary_unary (
424
+ self ._stubs ["get_topic" ] = self ._logged_channel .unary_unary (
339
425
"/google.pubsub.v1.Publisher/GetTopic" ,
340
426
request_serializer = pubsub .GetTopicRequest .serialize ,
341
427
response_deserializer = pubsub .Topic .deserialize ,
@@ -361,7 +447,7 @@ def list_topics(
361
447
# gRPC handles serialization and deserialization, so we just need
362
448
# to pass in the functions for each.
363
449
if "list_topics" not in self ._stubs :
364
- self ._stubs ["list_topics" ] = self .grpc_channel .unary_unary (
450
+ self ._stubs ["list_topics" ] = self ._logged_channel .unary_unary (
365
451
"/google.pubsub.v1.Publisher/ListTopics" ,
366
452
request_serializer = pubsub .ListTopicsRequest .serialize ,
367
453
response_deserializer = pubsub .ListTopicsResponse .deserialize ,
@@ -390,7 +476,7 @@ def list_topic_subscriptions(
390
476
# gRPC handles serialization and deserialization, so we just need
391
477
# to pass in the functions for each.
392
478
if "list_topic_subscriptions" not in self ._stubs :
393
- self ._stubs ["list_topic_subscriptions" ] = self .grpc_channel .unary_unary (
479
+ self ._stubs ["list_topic_subscriptions" ] = self ._logged_channel .unary_unary (
394
480
"/google.pubsub.v1.Publisher/ListTopicSubscriptions" ,
395
481
request_serializer = pubsub .ListTopicSubscriptionsRequest .serialize ,
396
482
response_deserializer = pubsub .ListTopicSubscriptionsResponse .deserialize ,
@@ -423,7 +509,7 @@ def list_topic_snapshots(
423
509
# gRPC handles serialization and deserialization, so we just need
424
510
# to pass in the functions for each.
425
511
if "list_topic_snapshots" not in self ._stubs :
426
- self ._stubs ["list_topic_snapshots" ] = self .grpc_channel .unary_unary (
512
+ self ._stubs ["list_topic_snapshots" ] = self ._logged_channel .unary_unary (
427
513
"/google.pubsub.v1.Publisher/ListTopicSnapshots" ,
428
514
request_serializer = pubsub .ListTopicSnapshotsRequest .serialize ,
429
515
response_deserializer = pubsub .ListTopicSnapshotsResponse .deserialize ,
@@ -452,7 +538,7 @@ def delete_topic(self) -> Callable[[pubsub.DeleteTopicRequest], empty_pb2.Empty]
452
538
# gRPC handles serialization and deserialization, so we just need
453
539
# to pass in the functions for each.
454
540
if "delete_topic" not in self ._stubs :
455
- self ._stubs ["delete_topic" ] = self .grpc_channel .unary_unary (
541
+ self ._stubs ["delete_topic" ] = self ._logged_channel .unary_unary (
456
542
"/google.pubsub.v1.Publisher/DeleteTopic" ,
457
543
request_serializer = pubsub .DeleteTopicRequest .serialize ,
458
544
response_deserializer = empty_pb2 .Empty .FromString ,
@@ -484,13 +570,16 @@ def detach_subscription(
484
570
# gRPC handles serialization and deserialization, so we just need
485
571
# to pass in the functions for each.
486
572
if "detach_subscription" not in self ._stubs :
487
- self ._stubs ["detach_subscription" ] = self .grpc_channel .unary_unary (
573
+ self ._stubs ["detach_subscription" ] = self ._logged_channel .unary_unary (
488
574
"/google.pubsub.v1.Publisher/DetachSubscription" ,
489
575
request_serializer = pubsub .DetachSubscriptionRequest .serialize ,
490
576
response_deserializer = pubsub .DetachSubscriptionResponse .deserialize ,
491
577
)
492
578
return self ._stubs ["detach_subscription" ]
493
579
580
+ def close (self ):
581
+ self ._logged_channel .close ()
582
+
494
583
@property
495
584
def set_iam_policy (
496
585
self ,
@@ -509,7 +598,7 @@ def set_iam_policy(
509
598
# gRPC handles serialization and deserialization, so we just need
510
599
# to pass in the functions for each.
511
600
if "set_iam_policy" not in self ._stubs :
512
- self ._stubs ["set_iam_policy" ] = self .grpc_channel .unary_unary (
601
+ self ._stubs ["set_iam_policy" ] = self ._logged_channel .unary_unary (
513
602
"/google.iam.v1.IAMPolicy/SetIamPolicy" ,
514
603
request_serializer = iam_policy_pb2 .SetIamPolicyRequest .SerializeToString ,
515
604
response_deserializer = policy_pb2 .Policy .FromString ,
@@ -535,7 +624,7 @@ def get_iam_policy(
535
624
# gRPC handles serialization and deserialization, so we just need
536
625
# to pass in the functions for each.
537
626
if "get_iam_policy" not in self ._stubs :
538
- self ._stubs ["get_iam_policy" ] = self .grpc_channel .unary_unary (
627
+ self ._stubs ["get_iam_policy" ] = self ._logged_channel .unary_unary (
539
628
"/google.iam.v1.IAMPolicy/GetIamPolicy" ,
540
629
request_serializer = iam_policy_pb2 .GetIamPolicyRequest .SerializeToString ,
541
630
response_deserializer = policy_pb2 .Policy .FromString ,
@@ -564,16 +653,13 @@ def test_iam_permissions(
564
653
# gRPC handles serialization and deserialization, so we just need
565
654
# to pass in the functions for each.
566
655
if "test_iam_permissions" not in self ._stubs :
567
- self ._stubs ["test_iam_permissions" ] = self .grpc_channel .unary_unary (
656
+ self ._stubs ["test_iam_permissions" ] = self ._logged_channel .unary_unary (
568
657
"/google.iam.v1.IAMPolicy/TestIamPermissions" ,
569
658
request_serializer = iam_policy_pb2 .TestIamPermissionsRequest .SerializeToString ,
570
659
response_deserializer = iam_policy_pb2 .TestIamPermissionsResponse .FromString ,
571
660
)
572
661
return self ._stubs ["test_iam_permissions" ]
573
662
574
- def close (self ):
575
- self .grpc_channel .close ()
576
-
577
663
@property
578
664
def kind (self ) -> str :
579
665
return "grpc"
0 commit comments