25
25
from nats .aio .msg import Msg
26
26
from nats .aio .subscription import Subscription
27
27
from nats .js import api
28
- from nats .js .errors import BadBucketError , BucketNotFoundError , InvalidBucketNameError , NotFoundError
28
+ from nats .js .errors import BadBucketError , BucketNotFoundError , InvalidBucketNameError , NotFoundError , FetchTimeoutError
29
29
from nats .js .kv import KeyValue
30
30
from nats .js .manager import JetStreamManager
31
31
from nats .js .object_store import (
@@ -547,6 +547,13 @@ def _is_temporary_error(cls, status: Optional[str]) -> bool:
547
547
else :
548
548
return False
549
549
550
+ @classmethod
551
+ def _is_heartbeat (cls , status : Optional [str ]) -> bool :
552
+ if status == api .StatusCode .CONTROL_MESSAGE :
553
+ return True
554
+ else :
555
+ return False
556
+
550
557
@classmethod
551
558
def _time_until (cls , timeout : Optional [float ],
552
559
start_time : float ) -> Optional [float ]:
@@ -620,9 +627,7 @@ async def activity_check(self):
620
627
self ._active = False
621
628
if not active :
622
629
if self ._ordered :
623
- await self .reset_ordered_consumer (
624
- self ._sseq + 1
625
- )
630
+ await self .reset_ordered_consumer (self ._sseq + 1 )
626
631
except asyncio .CancelledError :
627
632
break
628
633
@@ -882,14 +887,18 @@ async def consumer_info(self) -> api.ConsumerInfo:
882
887
)
883
888
return info
884
889
885
- async def fetch (self ,
886
- batch : int = 1 ,
887
- timeout : Optional [float ] = 5 ) -> List [Msg ]:
890
+ async def fetch (
891
+ self ,
892
+ batch : int = 1 ,
893
+ timeout : Optional [float ] = 5 ,
894
+ heartbeat : Optional [float ] = None
895
+ ) -> List [Msg ]:
888
896
"""
889
897
fetch makes a request to JetStream to be delivered a set of messages.
890
898
891
899
:param batch: Number of messages to fetch from server.
892
900
:param timeout: Max duration of the fetch request before it expires.
901
+ :param heartbeat: Idle Heartbeat interval in seconds for the fetch request.
893
902
894
903
::
895
904
@@ -925,15 +934,16 @@ async def main():
925
934
timeout * 1_000_000_000
926
935
) - 100_000 if timeout else None
927
936
if batch == 1 :
928
- msg = await self ._fetch_one (expires , timeout )
937
+ msg = await self ._fetch_one (expires , timeout , heartbeat )
929
938
return [msg ]
930
- msgs = await self ._fetch_n (batch , expires , timeout )
939
+ msgs = await self ._fetch_n (batch , expires , timeout , heartbeat )
931
940
return msgs
932
941
933
942
async def _fetch_one (
934
943
self ,
935
944
expires : Optional [int ],
936
945
timeout : Optional [float ],
946
+ heartbeat : Optional [float ] = None
937
947
) -> Msg :
938
948
queue = self ._sub ._pending_queue
939
949
@@ -957,37 +967,66 @@ async def _fetch_one(
957
967
next_req ['batch' ] = 1
958
968
if expires :
959
969
next_req ['expires' ] = int (expires )
970
+ if heartbeat :
971
+ next_req ['idle_heartbeat' ] = int (
972
+ heartbeat * 1_000_000_000
973
+ ) # to nanoseconds
960
974
961
975
await self ._nc .publish (
962
976
self ._nms ,
963
977
json .dumps (next_req ).encode (),
964
978
self ._deliver ,
965
979
)
966
980
967
- # Wait for the response or raise timeout.
968
- msg = await self ._sub .next_msg (timeout )
969
-
970
- # Should have received at least a processable message at this point,
971
- status = JetStreamContext .is_status_msg (msg )
981
+ start_time = time .monotonic ()
982
+ got_any_response = False
983
+ while True :
984
+ try :
985
+ deadline = JetStreamContext ._time_until (
986
+ timeout , start_time
987
+ )
988
+ # Wait for the response or raise timeout.
989
+ msg = await self ._sub .next_msg (timeout = deadline )
972
990
973
- if status :
974
- # In case of a temporary error, treat it as a timeout to retry.
975
- if JetStreamContext ._is_temporary_error (status ):
976
- raise nats .errors .TimeoutError
977
- else :
978
- # Any other type of status message is an error.
979
- raise nats .js .errors .APIError .from_msg (msg )
980
- return msg
991
+ # Should have received at least a processable message at this point,
992
+ status = JetStreamContext .is_status_msg (msg )
993
+ if status :
994
+ if JetStreamContext ._is_heartbeat (status ):
995
+ got_any_response = True
996
+ continue
997
+
998
+ # In case of a temporary error, treat it as a timeout to retry.
999
+ if JetStreamContext ._is_temporary_error (status ):
1000
+ raise nats .errors .TimeoutError
1001
+ else :
1002
+ # Any other type of status message is an error.
1003
+ raise nats .js .errors .APIError .from_msg (msg )
1004
+ else :
1005
+ return msg
1006
+ except asyncio .TimeoutError :
1007
+ deadline = JetStreamContext ._time_until (
1008
+ timeout , start_time
1009
+ )
1010
+ if deadline is not None and deadline < 0 :
1011
+ # No response from the consumer could have been
1012
+ # due to a reconnect while the fetch request,
1013
+ # the JS API not responding on time, or maybe
1014
+ # there were no messages yet.
1015
+ if got_any_response :
1016
+ raise FetchTimeoutError
1017
+ raise
981
1018
982
1019
async def _fetch_n (
983
1020
self ,
984
1021
batch : int ,
985
1022
expires : Optional [int ],
986
1023
timeout : Optional [float ],
1024
+ heartbeat : Optional [float ] = None
987
1025
) -> List [Msg ]:
988
1026
msgs = []
989
1027
queue = self ._sub ._pending_queue
990
1028
start_time = time .monotonic ()
1029
+ got_any_response = False
991
1030
needed = batch
992
1031
993
1032
# Fetch as many as needed from the internal pending queue.
@@ -1013,6 +1052,10 @@ async def _fetch_n(
1013
1052
next_req ['batch' ] = needed
1014
1053
if expires :
1015
1054
next_req ['expires' ] = expires
1055
+ if heartbeat :
1056
+ next_req ['idle_heartbeat' ] = int (
1057
+ heartbeat * 1_000_000_000
1058
+ ) # to nanoseconds
1016
1059
next_req ['no_wait' ] = True
1017
1060
await self ._nc .publish (
1018
1061
self ._nms ,
@@ -1024,12 +1067,20 @@ async def _fetch_n(
1024
1067
try :
1025
1068
msg = await self ._sub .next_msg (timeout )
1026
1069
except asyncio .TimeoutError :
1070
+ # Return any message that was already available in the internal queue.
1027
1071
if msgs :
1028
1072
return msgs
1029
1073
raise
1030
1074
1075
+ got_any_response = False
1076
+
1031
1077
status = JetStreamContext .is_status_msg (msg )
1032
- if JetStreamContext ._is_processable_msg (status , msg ):
1078
+ if JetStreamContext ._is_heartbeat (status ):
1079
+ # Mark that we got any response from the server so this is not
1080
+ # a possible i/o timeout error or due to a disconnection.
1081
+ got_any_response = True
1082
+ pass
1083
+ elif JetStreamContext ._is_processable_msg (status , msg ):
1033
1084
# First processable message received, do not raise error from now.
1034
1085
msgs .append (msg )
1035
1086
needed -= 1
@@ -1045,6 +1096,10 @@ async def _fetch_n(
1045
1096
# No more messages after this so fallthrough
1046
1097
# after receiving the rest.
1047
1098
break
1099
+ elif JetStreamContext ._is_heartbeat (status ):
1100
+ # Skip heartbeats.
1101
+ got_any_response = True
1102
+ continue
1048
1103
elif JetStreamContext ._is_processable_msg (status , msg ):
1049
1104
needed -= 1
1050
1105
msgs .append (msg )
@@ -1063,6 +1118,11 @@ async def _fetch_n(
1063
1118
next_req ['batch' ] = needed
1064
1119
if expires :
1065
1120
next_req ['expires' ] = expires
1121
+ if heartbeat :
1122
+ next_req ['idle_heartbeat' ] = int (
1123
+ heartbeat * 1_000_000_000
1124
+ ) # to nanoseconds
1125
+
1066
1126
await self ._nc .publish (
1067
1127
self ._nms ,
1068
1128
json .dumps (next_req ).encode (),
@@ -1083,7 +1143,12 @@ async def _fetch_n(
1083
1143
if len (msgs ) == 0 :
1084
1144
# Not a single processable message has been received so far,
1085
1145
# if this timed out then let the error be raised.
1086
- msg = await self ._sub .next_msg (timeout = deadline )
1146
+ try :
1147
+ msg = await self ._sub .next_msg (timeout = deadline )
1148
+ except asyncio .TimeoutError :
1149
+ if got_any_response :
1150
+ raise FetchTimeoutError
1151
+ raise
1087
1152
else :
1088
1153
try :
1089
1154
msg = await self ._sub .next_msg (timeout = deadline )
@@ -1093,6 +1158,10 @@ async def _fetch_n(
1093
1158
1094
1159
if msg :
1095
1160
status = JetStreamContext .is_status_msg (msg )
1161
+ if JetStreamContext ._is_heartbeat (status ):
1162
+ got_any_response = True
1163
+ continue
1164
+
1096
1165
if not status :
1097
1166
needed -= 1
1098
1167
msgs .append (msg )
@@ -1116,6 +1185,9 @@ async def _fetch_n(
1116
1185
1117
1186
msg = await self ._sub .next_msg (timeout = deadline )
1118
1187
status = JetStreamContext .is_status_msg (msg )
1188
+ if JetStreamContext ._is_heartbeat (status ):
1189
+ got_any_response = True
1190
+ continue
1119
1191
if JetStreamContext ._is_processable_msg (status , msg ):
1120
1192
needed -= 1
1121
1193
msgs .append (msg )
@@ -1124,6 +1196,9 @@ async def _fetch_n(
1124
1196
# at least one message has already arrived.
1125
1197
pass
1126
1198
1199
+ if len (msgs ) == 0 and got_any_response :
1200
+ raise FetchTimeoutError
1201
+
1127
1202
return msgs
1128
1203
1129
1204
######################
0 commit comments