Skip to content

Commit 193db44

Browse files
committed
Remove 10min timeout from topic stream
1 parent 3706262 commit 193db44

File tree

4 files changed

+34
-7
lines changed

4 files changed

+34
-7
lines changed

ydb/_grpc/grpcwrapper/common_utils.py

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@
3434
from ..common.protos import ydb_topic_pb2, ydb_issue_message_pb2
3535

3636
from ... import issues, connection
37+
from ...settings import BaseRequestSettings
38+
39+
40+
DEFAULT_LONG_TIMEOUT = 31536000 # year
3741

3842

3943
class IFromProto(abc.ABC):
@@ -131,7 +135,7 @@ async def __anext__(self):
131135

132136
class IGrpcWrapperAsyncIO(abc.ABC):
133137
@abc.abstractmethod
134-
async def receive(self) -> Any:
138+
async def receive(self, timeout: Optional[int] = None) -> Any:
135139
...
136140

137141
@abc.abstractmethod
@@ -161,6 +165,13 @@ def __init__(self, convert_server_grpc_to_wrapper):
161165
self._stream_call = None
162166
self._wait_executor = None
163167

168+
self._stream_settings: BaseRequestSettings = (
169+
BaseRequestSettings()
170+
.with_operation_timeout(DEFAULT_LONG_TIMEOUT)
171+
.with_cancel_after(DEFAULT_LONG_TIMEOUT)
172+
.with_timeout(DEFAULT_LONG_TIMEOUT)
173+
)
174+
164175
def __del__(self):
165176
self._clean_executor(wait=False)
166177

@@ -188,6 +199,7 @@ async def _start_asyncio_driver(self, driver: DriverIO, stub, method):
188199
requests_iterator,
189200
stub,
190201
method,
202+
settings=self._stream_settings,
191203
)
192204
self._stream_call = stream_call
193205
self.from_server_grpc = stream_call.__aiter__()
@@ -196,14 +208,29 @@ async def _start_sync_driver(self, driver: Driver, stub, method):
196208
requests_iterator = AsyncQueueToSyncIteratorAsyncIO(self.from_client_grpc)
197209
self._wait_executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
198210

199-
stream_call = await to_thread(driver, requests_iterator, stub, method, executor=self._wait_executor)
211+
stream_call = await to_thread(
212+
driver,
213+
requests_iterator,
214+
stub,
215+
method,
216+
executor=self._wait_executor,
217+
settings=self._stream_settings,
218+
)
200219
self._stream_call = stream_call
201220
self.from_server_grpc = SyncToAsyncIterator(stream_call.__iter__(), self._wait_executor)
202221

203-
async def receive(self) -> Any:
222+
async def receive(self, timeout: Optional[int] = None) -> Any:
204223
# todo handle grpc exceptions and convert it to internal exceptions
205224
try:
206-
grpc_message = await self.from_server_grpc.__anext__()
225+
if timeout is None:
226+
grpc_message = await self.from_server_grpc.__anext__()
227+
else:
228+
229+
async def get_response():
230+
return await self.from_server_grpc.__anext__()
231+
232+
grpc_message = await asyncio.wait_for(get_response(), timeout)
233+
207234
except (grpc.RpcError, grpc.aio.AioRpcError) as e:
208235
raise connection._rpc_error_handler(self._connection_state, e)
209236

ydb/_topic_common/test_helpers.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ def __init__(self):
1515
self.from_client = asyncio.Queue()
1616
self._closed = False
1717

18-
async def receive(self) -> typing.Any:
18+
async def receive(self, timeout: typing.Optional[int] = None) -> typing.Any:
1919
if self._closed:
2020
raise Exception("read from closed StreamMock")
2121

ydb/_topic_reader/topic_reader_asyncio.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -490,7 +490,7 @@ async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamReadMess
490490
logger.debug("reader stream %s send init request", self._id)
491491

492492
stream.write(StreamReadMessage.FromClient(client_message=init_message))
493-
init_response = await stream.receive() # type: StreamReadMessage.FromServer
493+
init_response = await stream.receive(timeout=10) # type: StreamReadMessage.FromServer
494494
if isinstance(init_response.server_message, StreamReadMessage.InitResponse):
495495
self._session_id = init_response.server_message.session_id
496496
logger.debug("reader stream %s initialized session=%s", self._id, self._session_id)

ydb/_topic_writer/topic_writer_asyncio.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -799,7 +799,7 @@ async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamWriteMes
799799
logger.debug("writer stream %s send init request", self._id)
800800
stream.write(StreamWriteMessage.FromClient(init_message))
801801

802-
resp = await stream.receive()
802+
resp = await stream.receive(timeout=10)
803803
self._ensure_ok(resp)
804804
if not isinstance(resp, StreamWriteMessage.InitResponse):
805805
raise TopicWriterError("Unexpected answer for init request: %s" % resp)

0 commit comments

Comments
 (0)