From 68c21045183bc2b687f08e477c1f8461e3a5e2d0 Mon Sep 17 00:00:00 2001 From: Daniil Dumchenko Date: Sun, 6 Apr 2025 18:05:52 +0700 Subject: [PATCH 1/5] Resolve #682 --- nats/js/api.py | 12 +++++++++- nats/js/client.py | 9 +++++++ tests/test_js.py | 60 ++++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 79 insertions(+), 2 deletions(-) diff --git a/nats/js/api.py b/nats/js/api.py index e9db83e1..dcfa1570 100644 --- a/nats/js/api.py +++ b/nats/js/api.py @@ -13,7 +13,6 @@ # from __future__ import annotations - from dataclasses import dataclass, fields, replace from enum import Enum from typing import Any, Dict, Iterable, Iterator, List, Optional, TypeVar @@ -34,6 +33,9 @@ class Header(str, Enum): ROLLUP = "Nats-Rollup" STATUS = "Status" + TTL = "Nats-TTL" + MARKER_REASON = "Nats-Marker-Reason" + DEFAULT_PREFIX = "$JS.API" INBOX_PREFIX = b"_INBOX." @@ -308,9 +310,14 @@ class StreamConfig(Base): # Metadata are user defined string key/value pairs. metadata: Optional[Dict[str, str]] = None + # Allow per message ttl + allow_msg_ttl: bool = False + subject_delete_marker_ttl: Optional[int] = None # in seconds + @classmethod def from_response(cls, resp: Dict[str, Any]): cls._convert_nanoseconds(resp, "max_age") + cls._convert_nanoseconds(resp, "subject_delete_marker_ttl") cls._convert_nanoseconds(resp, "duplicate_window") cls._convert(resp, "placement", Placement) cls._convert(resp, "mirror", StreamSource) @@ -325,6 +332,9 @@ def as_dict(self) -> Dict[str, object]: self.duplicate_window ) result["max_age"] = self._to_nanoseconds(self.max_age) + result["subject_delete_marker_ttl"] = self._to_nanoseconds( + self.subject_delete_marker_ttl + ) if self.sources: result["sources"] = [src.as_dict() for src in self.sources] if self.compression and (self.compression != StoreCompression.NONE diff --git a/nats/js/client.py b/nats/js/client.py index d26413c0..d1acef2e 100644 --- a/nats/js/client.py +++ b/nats/js/client.py @@ -186,6 +186,7 @@ async def publish( timeout: Optional[float] = None, stream: Optional[str] = None, headers: Optional[Dict[str, Any]] = None, + ttl: Optional[int] = None ) -> api.PubAck: """ publish emits a new message to JetStream and waits for acknowledgement. @@ -197,6 +198,9 @@ async def publish( hdr = hdr or {} hdr[api.Header.EXPECTED_STREAM] = stream + if ttl is not None: + hdr = hdr or {} + hdr[api.Header.TTL] = str(ttl) try: msg = await self._nc.request( subject, @@ -219,6 +223,7 @@ async def publish_async( wait_stall: Optional[float] = None, stream: Optional[str] = None, headers: Optional[Dict] = None, + ttl: Optional[int] = None ) -> asyncio.Future[api.PubAck]: """ emits a new message to JetStream and returns a future that can be awaited for acknowledgement. @@ -233,6 +238,10 @@ async def publish_async( hdr = hdr or {} hdr[api.Header.EXPECTED_STREAM] = stream + if ttl is not None: + hdr = hdr or {} + hdr[api.Header.TTL] = str(ttl) + try: await asyncio.wait_for( self._publish_async_pending_semaphore.acquire(), diff --git a/tests/test_js.py b/tests/test_js.py index 08a6377b..868342b5 100644 --- a/tests/test_js.py +++ b/tests/test_js.py @@ -13,7 +13,7 @@ from hashlib import sha256 import nats -import nats.js.api +from nats.js.api import Header import pytest from nats.aio.client import Client as NATS from nats.aio.errors import * @@ -132,6 +132,64 @@ async def test_publish_async(self): await nc.close() + @async_test + async def test_publish_per_message_ttl(self): + nc = NATS() + await nc.connect() + js = nc.jetstream() + + await js.add_stream( + name="QUUX", + subjects=["ququ"], + allow_msg_ttl=True, + subject_delete_marker_ttl=2, + ) + ack = await js.publish( + subject="ququ", payload=b"bar:1", stream="QUUX", ttl=2 + ) + assert ack.stream == "QUUX" + assert ack.seq == 1 + + info = await js.stream_info(name="QUUX") + assert info.state.messages == 1 + + message = await js.get_last_msg(stream_name="QUUX", subject="ququ") + ttl = message.headers.get(Header.TTL) + assert ttl == '2' + await nc.close() + + @async_test + async def test_async_publish_per_message_ttl(self): + nc = NATS() + await nc.connect() + js = nc.jetstream() + + await js.add_stream( + name="QUUX", + subjects=["ququ"], + allow_msg_ttl=True, + subject_delete_marker_ttl=2, + ) + + futures = [ + await js.publish_async(subject="ququ", payload=b"bar:1", ttl=2) + ] + + await js.publish_async_completed() + results = await asyncio.gather(*futures) + ack = results[0] + + assert ack.stream == "QUUX" + assert ack.seq == 1 + + info = await js.stream_info(name="QUUX") + assert info.state.messages == 1 + + message = await js.get_last_msg(stream_name="QUUX", subject="ququ") + ttl = message.headers.get(Header.TTL) + assert ttl == '2' + await nc.close() + class PullSubscribeTest(SingleJetStreamServerTestCase): From a3145eae7708beab3a4c7d15033eba10dcdeb301 Mon Sep 17 00:00:00 2001 From: Daniil Dumchenko Date: Fri, 11 Apr 2025 23:06:32 +0700 Subject: [PATCH 2/5] Feat: add pull consumer group with overflow --- nats/js/api.py | 11 ++++++ nats/js/client.py | 41 +++++++++++++++++++- tests/test_js.py | 95 +++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 145 insertions(+), 2 deletions(-) diff --git a/nats/js/api.py b/nats/js/api.py index dcfa1570..2bc1586c 100644 --- a/nats/js/api.py +++ b/nats/js/api.py @@ -463,6 +463,13 @@ class ReplayPolicy(str, Enum): ORIGINAL = "original" +class PriorityPolicy(str, Enum): + """Group priority policy""" + + OVERFLOW = "overflow" + PINNED_CLIENT = "pinned_client" + + @dataclass class ConsumerConfig(Base): """Consumer configuration. @@ -510,6 +517,10 @@ class ConsumerConfig(Base): # Metadata are user defined string key/value pairs. metadata: Optional[Dict[str, str]] = None + # add priprity groups + priority_groups: Optional[list[str]] = None + priority_policy: Optional[PriorityPolicy] = None + @classmethod def from_response(cls, resp: Dict[str, Any]): cls._convert_nanoseconds(resp, "ack_wait") diff --git a/nats/js/client.py b/nats/js/client.py index d1acef2e..f5d7bc42 100644 --- a/nats/js/client.py +++ b/nats/js/client.py @@ -1062,6 +1062,9 @@ async def fetch( batch: int = 1, timeout: Optional[float] = 5, heartbeat: Optional[float] = None, + group: Optional[str] = None, + min_pending: Optional[int] = None, + min_ack_pending: Optional[int] = None ) -> List[Msg]: """ fetch makes a request to JetStream to be delivered a set of messages. @@ -1069,6 +1072,7 @@ async def fetch( :param batch: Number of messages to fetch from server. :param timeout: Max duration of the fetch request before it expires. :param heartbeat: Idle Heartbeat interval in seconds for the fetch request. + :param heartbeat: If consumer has configured PriorityGroups, every Pull Request needs to provide it. :: @@ -1094,6 +1098,10 @@ async def main(): if self._sub is None: raise ValueError("nats: invalid subscription") + # # как добраться до priority_group? + # if self._priority_groups and not group: + # raise ValueError("A group is a required parameter, as priority groups are set.") + # FIXME: Check connection is not closed, etc... if batch < 1: raise ValueError("nats: invalid batch size") @@ -1104,9 +1112,15 @@ async def main(): timeout * 1_000_000_000 ) - 100_000 if timeout else None if batch == 1: - msg = await self._fetch_one(expires, timeout, heartbeat) + msg = await self._fetch_one( + expires, timeout, heartbeat, group, min_pending, + min_ack_pending + ) return [msg] - msgs = await self._fetch_n(batch, expires, timeout, heartbeat) + msgs = await self._fetch_n( + batch, expires, timeout, heartbeat, group, min_pending, + min_ack_pending + ) return msgs async def _fetch_one( @@ -1114,6 +1128,9 @@ async def _fetch_one( expires: Optional[int], timeout: Optional[float], heartbeat: Optional[float] = None, + group: Optional[str] = None, + min_pending: Optional[int] = None, + min_ack_pending: Optional[int] = None ) -> Msg: queue = self._sub._pending_queue @@ -1142,6 +1159,15 @@ async def _fetch_one( heartbeat * 1_000_000_000 ) # to nanoseconds + if group: + next_req["group"] = group + + if min_pending: + next_req["min_pending"] = min_pending + + if min_ack_pending: + next_req["min_ack_pending"] = min_ack_pending + await self._nc.publish( self._nms, json.dumps(next_req).encode(), @@ -1192,6 +1218,9 @@ async def _fetch_n( expires: Optional[int], timeout: Optional[float], heartbeat: Optional[float] = None, + group: Optional[str] = None, + min_pending: Optional[int] = None, + min_ack_pending: Optional[int] = None ) -> List[Msg]: msgs = [] queue = self._sub._pending_queue @@ -1226,6 +1255,14 @@ async def _fetch_n( next_req["idle_heartbeat"] = int( heartbeat * 1_000_000_000 ) # to nanoseconds + if group: + next_req["group"] = str(group) + + if min_pending: + next_req["min_pending"] = min_pending + + if min_ack_pending: + next_req["min_ack_pending"] = min_ack_pending next_req["no_wait"] = True await self._nc.publish( self._nms, diff --git a/tests/test_js.py b/tests/test_js.py index 868342b5..2d504254 100644 --- a/tests/test_js.py +++ b/tests/test_js.py @@ -1113,6 +1113,101 @@ async def test_fetch_heartbeats(self): await nc.close() + @async_test + async def test_pull_overflow(self): + nc = NATS() + await nc.connect() + js = nc.jetstream() + await js.add_stream(name="events", subjects=["events.a"]) + await js.add_consumer( + "events", + durable_name="a", + ack_policy="explicit", + max_waiting=512, + max_ack_pending=1024, + filter_subject="events.a", + priority_policy=api.PriorityPolicy.OVERFLOW.value, + priority_groups=["A"] + ) + sub = await js.pull_subscribe_bind( + "a", + stream="events", + ) + await js.publish("events.a", b"test") + + msgs = await sub.fetch(1, group="A") + for msg in msgs: + await msg.ack() + await nc.close() + + @async_test + async def test_pull_overflow_min_pending(self): + nc = NATS() + await nc.connect() + js = nc.jetstream() + await js.add_stream(name="events", subjects=["events.a"]) + await js.add_consumer( + "events", + durable_name="a", + ack_policy="explicit", + max_waiting=512, + max_ack_pending=1024, + filter_subject="events.a", + priority_policy=api.PriorityPolicy.OVERFLOW.value, + priority_groups=["A"] + ) + sub = await js.pull_subscribe_bind( + "a", + stream="events", + ) + for i in range(0, 5): + await js.publish("events.a", b"i:%d" % i) + + # because min pending > num_pending + with pytest.raises(asyncio.exceptions.CancelledError): + msgs = await sub.fetch(1, group="A", min_pending=10) + + for i in range(0, 20): + await js.publish("events.a", b"i:%d" % i) + + msgs = await sub.fetch(1, group="A", min_pending=10) + for msg in msgs: + await msg.ack() + await nc.close() + + @async_test + async def test_pull_overflow_min_ack_pending(self): + nc = NATS() + await nc.connect() + js = nc.jetstream() + await js.add_stream(name="events", subjects=["events.a"]) + await js.add_consumer( + "events", + durable_name="a", + ack_policy="explicit", + max_waiting=512, + max_ack_pending=1024, + filter_subject="events.a", + priority_policy=api.PriorityPolicy.OVERFLOW.value, + priority_groups=["A"] + ) + sub = await js.pull_subscribe_bind( + "a", + stream="events", + ) + for i in range(0, 5): + await js.publish("events.a", b"i:%d" % i) + + # because min_ack_pending > num_ack_pending + with pytest.raises(asyncio.exceptions.CancelledError): + await sub.fetch(1, group="A", min_ack_pending=10) + + for i in range(0, 20): + await js.publish("events.a", b"i:%d" % i) + await sub.fetch(10, group="A") + await sub.fetch(1, group="A", min_ack_pending=10) + await nc.close() + class JSMTest(SingleJetStreamServerTestCase): From b34d0d84a8132c0f5dd89b5a90037b2a8fdaed21 Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Fri, 11 Apr 2025 09:12:02 -0700 Subject: [PATCH 3/5] Update nats/js/api.py --- nats/js/api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nats/js/api.py b/nats/js/api.py index 2bc1586c..be0cd75c 100644 --- a/nats/js/api.py +++ b/nats/js/api.py @@ -517,7 +517,7 @@ class ConsumerConfig(Base): # Metadata are user defined string key/value pairs. metadata: Optional[Dict[str, str]] = None - # add priprity groups + # add priority groups priority_groups: Optional[list[str]] = None priority_policy: Optional[PriorityPolicy] = None From f2391cc4dd5f52fa690655c30462ae185115d734 Mon Sep 17 00:00:00 2001 From: Daniil Dumchenko Date: Fri, 11 Apr 2025 23:46:21 +0700 Subject: [PATCH 4/5] Fix: typo docs --- nats/js/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nats/js/client.py b/nats/js/client.py index f5d7bc42..e856a2f1 100644 --- a/nats/js/client.py +++ b/nats/js/client.py @@ -1072,7 +1072,7 @@ async def fetch( :param batch: Number of messages to fetch from server. :param timeout: Max duration of the fetch request before it expires. :param heartbeat: Idle Heartbeat interval in seconds for the fetch request. - :param heartbeat: If consumer has configured PriorityGroups, every Pull Request needs to provide it. + :param group: If consumer has configured PriorityGroups, every Pull Request needs to provide it. :: From 0fac2cbf1af073ed095e4306e2c8de8c3b81d470 Mon Sep 17 00:00:00 2001 From: Daniil Dumchenko Date: Fri, 11 Apr 2025 23:47:49 +0700 Subject: [PATCH 5/5] Fix: typo --- nats/js/client.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/nats/js/client.py b/nats/js/client.py index e856a2f1..53a4d4ee 100644 --- a/nats/js/client.py +++ b/nats/js/client.py @@ -1098,10 +1098,6 @@ async def main(): if self._sub is None: raise ValueError("nats: invalid subscription") - # # как добраться до priority_group? - # if self._priority_groups and not group: - # raise ValueError("A group is a required parameter, as priority groups are set.") - # FIXME: Check connection is not closed, etc... if batch < 1: raise ValueError("nats: invalid batch size")