Skip to content

Commit e9d9ea2

Browse files
authored
Merge pull request #520 from ydb-platform/fix_attach_timeout
Fix attach timeouts
2 parents 2872be4 + 55184e2 commit e9d9ea2

File tree

5 files changed

+106
-11
lines changed

5 files changed

+106
-11
lines changed

tests/query/test_query_session.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
11
import pytest
2+
import threading
3+
import time
4+
from concurrent.futures import _base as b
5+
from unittest import mock
6+
27

38
from ydb.query.session import QuerySession
49

@@ -100,3 +105,38 @@ def test_two_results(self, session: QuerySession):
100105
res.append(list(result_set.rows[0].values()))
101106

102107
assert res == [[1], [2]]
108+
109+
def test_thread_leaks(self, session: QuerySession):
110+
session.create()
111+
thread_names = [t.name for t in threading.enumerate()]
112+
assert "first response attach stream thread" not in thread_names
113+
assert "attach stream thread" in thread_names
114+
115+
def test_first_resp_timeout(self, session: QuerySession):
116+
class FakeStream:
117+
def __iter__(self):
118+
return self
119+
120+
def __next__(self):
121+
time.sleep(10)
122+
return 1
123+
124+
def cancel(self):
125+
pass
126+
127+
fake_stream = mock.Mock(spec=FakeStream)
128+
129+
session._attach_call = mock.MagicMock(return_value=fake_stream)
130+
assert session._attach_call() == fake_stream
131+
132+
session._create_call()
133+
with pytest.raises(b.TimeoutError):
134+
session._attach(0.1)
135+
136+
fake_stream.cancel.assert_called()
137+
138+
thread_names = [t.name for t in threading.enumerate()]
139+
assert "first response attach stream thread" not in thread_names
140+
assert "attach stream thread" not in thread_names
141+
142+
_check_session_state_empty(session)

ydb/_utilities.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,3 +182,21 @@ def inc_and_get(self) -> int:
182182
with self._lock:
183183
self._value += 1
184184
return self._value
185+
186+
187+
def get_first_message_with_timeout(status_stream: SyncResponseIterator, timeout: int):
188+
waiter = future()
189+
190+
def get_first_response(waiter):
191+
first_response = next(status_stream)
192+
waiter.set_result(first_response)
193+
194+
thread = threading.Thread(
195+
target=get_first_response,
196+
args=(waiter,),
197+
name="first response attach stream thread",
198+
daemon=True,
199+
)
200+
thread.start()
201+
202+
return waiter.result(timeout=timeout)

ydb/aio/_utilities.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
import asyncio
2+
3+
14
class AsyncResponseIterator(object):
25
def __init__(self, it, wrapper):
36
self.it = it.__aiter__()
@@ -21,3 +24,10 @@ async def next(self):
2124

2225
async def __anext__(self):
2326
return await self._next()
27+
28+
29+
async def get_first_message_with_timeout(stream: AsyncResponseIterator, timeout: int):
30+
async def get_first_response():
31+
return await stream.next()
32+
33+
return await asyncio.wait_for(get_first_response(), timeout)

ydb/aio/query/session.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from ...query import base
1616
from ...query.session import (
1717
BaseQuerySession,
18+
DEFAULT_ATTACH_FIRST_RESP_TIMEOUT,
1819
QuerySessionStateEnum,
1920
)
2021

@@ -43,9 +44,17 @@ async def _attach(self) -> None:
4344
lambda response: common_utils.ServerStatus.from_proto(response),
4445
)
4546

46-
first_response = await self._status_stream.next()
47-
if first_response.status != issues.StatusCode.SUCCESS:
48-
pass
47+
try:
48+
first_response = await _utilities.get_first_message_with_timeout(
49+
self._status_stream,
50+
DEFAULT_ATTACH_FIRST_RESP_TIMEOUT,
51+
)
52+
if first_response.status != issues.StatusCode.SUCCESS:
53+
raise RuntimeError("Failed to attach session")
54+
except Exception as e:
55+
self._state.reset()
56+
self._status_stream.cancel()
57+
raise e
4958

5059
self._state.set_attached(True)
5160
self._state._change_state(QuerySessionStateEnum.CREATED)

ydb/query/session.py

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@
2222
logger = logging.getLogger(__name__)
2323

2424

25+
DEFAULT_ATTACH_FIRST_RESP_TIMEOUT = 600
26+
DEFAULT_ATTACH_LONG_TIMEOUT = 31536000 # year
27+
28+
2529
class QuerySessionStateEnum(enum.Enum):
2630
NOT_INITIALIZED = "NOT_INITIALIZED"
2731
CREATED = "CREATED"
@@ -136,6 +140,12 @@ def __init__(self, driver: common_utils.SupportedDriverType, settings: Optional[
136140
self._driver = driver
137141
self._settings = self._get_client_settings(driver, settings)
138142
self._state = QuerySessionState(settings)
143+
self._attach_settings: BaseRequestSettings = (
144+
BaseRequestSettings()
145+
.with_operation_timeout(DEFAULT_ATTACH_LONG_TIMEOUT)
146+
.with_cancel_after(DEFAULT_ATTACH_LONG_TIMEOUT)
147+
.with_timeout(DEFAULT_ATTACH_LONG_TIMEOUT)
148+
)
139149

140150
def _get_client_settings(
141151
self,
@@ -168,12 +178,12 @@ def _delete_call(self, settings: Optional[BaseRequestSettings] = None) -> "BaseQ
168178
settings=settings,
169179
)
170180

171-
def _attach_call(self, settings: Optional[BaseRequestSettings] = None) -> Iterable[_apis.ydb_query.SessionState]:
181+
def _attach_call(self) -> Iterable[_apis.ydb_query.SessionState]:
172182
return self._driver(
173183
_apis.ydb_query.AttachSessionRequest(session_id=self._state.session_id),
174184
_apis.QueryService.Stub,
175185
_apis.QueryService.AttachSession,
176-
settings=settings,
186+
settings=self._attach_settings,
177187
)
178188

179189
def _execute_call(
@@ -213,24 +223,32 @@ class QuerySession(BaseQuerySession):
213223

214224
_stream = None
215225

216-
def _attach(self, settings: Optional[BaseRequestSettings] = None) -> None:
217-
self._stream = self._attach_call(settings=settings)
226+
def _attach(self, first_resp_timeout: int = DEFAULT_ATTACH_FIRST_RESP_TIMEOUT) -> None:
227+
self._stream = self._attach_call()
218228
status_stream = _utilities.SyncResponseIterator(
219229
self._stream,
220230
lambda response: common_utils.ServerStatus.from_proto(response),
221231
)
222232

223-
first_response = next(status_stream)
224-
if first_response.status != issues.StatusCode.SUCCESS:
225-
pass
233+
try:
234+
first_response = _utilities.get_first_message_with_timeout(
235+
status_stream,
236+
first_resp_timeout,
237+
)
238+
if first_response.status != issues.StatusCode.SUCCESS:
239+
raise RuntimeError("Failed to attach session")
240+
except Exception as e:
241+
self._state.reset()
242+
status_stream.cancel()
243+
raise e
226244

227245
self._state.set_attached(True)
228246
self._state._change_state(QuerySessionStateEnum.CREATED)
229247

230248
threading.Thread(
231249
target=self._check_session_status_loop,
232250
args=(status_stream,),
233-
name="check session status thread",
251+
name="attach stream thread",
234252
daemon=True,
235253
).start()
236254

0 commit comments

Comments
 (0)