Skip to content

Commit 5c7c279

Browse files
committed
Make syncronous PythonParser restartable on error, same as HiredisParser
Fix sync PythonParser
1 parent 9289e0e commit 5c7c279

File tree

2 files changed

+123
-17
lines changed

2 files changed

+123
-17
lines changed

redis/connection.py

Lines changed: 42 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -232,12 +232,6 @@ def read(self, length):
232232
self._buffer.seek(self.bytes_read)
233233
data = self._buffer.read(length)
234234
self.bytes_read += len(data)
235-
236-
# purge the buffer when we've consumed it all so it doesn't
237-
# grow forever
238-
if self.bytes_read == self.bytes_written:
239-
self.purge()
240-
241235
return data[:-2]
242236

243237
def readline(self):
@@ -251,23 +245,44 @@ def readline(self):
251245
data = buf.readline()
252246

253247
self.bytes_read += len(data)
248+
return data[:-2]
254249

255-
# purge the buffer when we've consumed it all so it doesn't
256-
# grow forever
257-
if self.bytes_read == self.bytes_written:
258-
self.purge()
250+
def get_pos(self):
251+
"""
252+
Get current read position
253+
"""
254+
return self.bytes_read
259255

260-
return data[:-2]
256+
def rewind(self, pos):
257+
"""
258+
Rewind the buffer to a specific position, to re-start reading
259+
"""
260+
self.bytes_read = pos
261261

262262
def purge(self):
263-
self._buffer.seek(0)
264-
self._buffer.truncate()
265-
self.bytes_written = 0
263+
"""
264+
After a successful read, purge the read part of buffer
265+
"""
266+
unread = self.bytes_written - self.bytes_read
267+
268+
# Only if we have read all of the buffer do we truncate, to
269+
# reduce the amount of memory thrashing. This heuristic
270+
# can be changed or removed later.
271+
if unread > 0:
272+
return
273+
274+
if unread > 0:
275+
# move unread data to the front
276+
view = self._buffer.getbuffer()
277+
view[:unread] = view[-unread:]
278+
self._buffer.truncate(unread)
279+
self.bytes_written = unread
266280
self.bytes_read = 0
281+
self._buffer.seek(0)
267282

268283
def close(self):
269284
try:
270-
self.purge()
285+
self.bytes_written = self.bytes_read = 0
271286
self._buffer.close()
272287
except Exception:
273288
# issue #633 suggests the purge/close somehow raised a
@@ -315,6 +330,17 @@ def can_read(self, timeout):
315330
return self._buffer and self._buffer.can_read(timeout)
316331

317332
def read_response(self, disable_decoding=False):
333+
pos = self._buffer.get_pos()
334+
try:
335+
result = self._read_response(disable_decoding=disable_decoding)
336+
except BaseException:
337+
self._buffer.rewind(pos)
338+
raise
339+
else:
340+
self._buffer.purge()
341+
return result
342+
343+
def _read_response(self, disable_decoding=False):
318344
raw = self._buffer.readline()
319345
if not raw:
320346
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
@@ -355,7 +381,7 @@ def read_response(self, disable_decoding=False):
355381
if length == -1:
356382
return None
357383
response = [
358-
self.read_response(disable_decoding=disable_decoding)
384+
self._read_response(disable_decoding=disable_decoding)
359385
for i in range(length)
360386
]
361387
if isinstance(response, bytes) and disable_decoding is False:

tests/test_connection.py

Lines changed: 81 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@
55

66
import pytest
77

8+
import redis
89
from redis.backoff import NoBackoff
9-
from redis.connection import Connection
10+
from redis.connection import Connection, PythonParser, HiredisParser
1011
from redis.exceptions import ConnectionError, InvalidResponse, TimeoutError
1112
from redis.retry import Retry
1213
from redis.utils import HIREDIS_AVAILABLE
@@ -122,3 +123,82 @@ def test_connect_timeout_error_without_retry(self):
122123
assert conn._connect.call_count == 1
123124
assert str(e.value) == "Timeout connecting to server"
124125
self.clear(conn)
126+
127+
128+
class FakeSocket:
129+
"""
130+
A class simulating an readable socket, but raising a
131+
special exception every other read.
132+
"""
133+
134+
class TestError(BaseException):
135+
pass
136+
137+
def __init__(self, data, interrupt_every=0):
138+
self.data = data
139+
self.counter = 0
140+
self.pos = 0
141+
self.interrupt_every = interrupt_every
142+
143+
def tick(self):
144+
self.counter += 1
145+
if not self.interrupt_every:
146+
return
147+
if (self.counter % self.interrupt_every) == 0:
148+
raise self.TestError()
149+
150+
def recv(self, bufsize):
151+
self.tick()
152+
bufsize = min(5, bufsize) # truncate the read size
153+
result = self.data[self.pos : self.pos + bufsize]
154+
self.pos += len(result)
155+
return result
156+
157+
def recv_into(self, buffer, nbytes=0, flags=0):
158+
self.tick()
159+
if nbytes == 0:
160+
nbytes = len(buffer)
161+
nbytes = min(5, nbytes) # truncate the read size
162+
result = self.data[self.pos : self.pos + nbytes]
163+
self.pos += len(result)
164+
buffer[: len(result)] = result
165+
return len(result)
166+
167+
168+
@pytest.mark.onlynoncluster
169+
@pytest.mark.parametrize(
170+
"parser_class", [PythonParser, HiredisParser], ids=["PythonParser", "HiredisParser"]
171+
)
172+
def test_connection_parse_response_resume(r: redis.Redis, parser_class):
173+
"""
174+
This test verifies that the Connection parser,
175+
be that PythonParser or HiredisParser,
176+
can be interrupted at IO time and then resume parsing.
177+
"""
178+
if parser_class is HiredisParser and not HIREDIS_AVAILABLE:
179+
pytest.skip("Hiredis not available)")
180+
args = dict(r.connection_pool.connection_kwargs)
181+
args["parser_class"] = parser_class
182+
conn = Connection(**args)
183+
conn.connect()
184+
message = (
185+
b"*3\r\n$7\r\nmessage\r\n$8\r\nchannel1\r\n"
186+
b"$25\r\nhi\r\nthere\r\n+how\r\nare\r\nyou\r\n"
187+
)
188+
fake_socket = FakeSocket(message, interrupt_every=2)
189+
190+
if isinstance(conn._parser, PythonParser):
191+
conn._parser._buffer._sock = fake_socket
192+
else:
193+
conn._parser._sock = fake_socket
194+
for i in range(100):
195+
try:
196+
response = conn.read_response()
197+
break
198+
except FakeSocket.TestError:
199+
pass
200+
201+
else:
202+
pytest.fail("didn't receive a response")
203+
assert response
204+
assert i > 0

0 commit comments

Comments
 (0)