Skip to content

Commit ec67996

Browse files
authored
Handle asyncio.CancelledError gracefully (#761)
* Patch `asyncio.wait_for` Async wait_for shim raise cancellation right away Deferring the cancellation makes it much harder to reason about and might make the Python driver behave in a surprising way when cancelled. * Fix unit tests
1 parent e29e042 commit ec67996

33 files changed

+1424
-192
lines changed

CHANGELOG.md

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,9 @@
8787
they have been removed.
8888
- Deprecated Nodes' and Relationships' `id` property (`int`) in favor of
8989
`element_id` (`str`).
90-
This also affects `Graph` objects as `graph.nodes[...]` and
91-
`graph.relationships[...]` now prefers strings over integers.
90+
This also affects `Graph` objects as indexing `graph.nodes[...]` and
91+
`graph.relationships[...]` with integers has been deprecated in favor of
92+
indexing them with strings.
9293
- `ServerInfo.connection_id` has been deprecated and will be removed in a
9394
future release. There is no replacement as this is considered internal
9495
information.
@@ -118,6 +119,8 @@
118119
be used by client code. `Record` should be imported directly from `neo4j`
119120
instead. `neo4j.data.DataHydrator` and `neo4j.data.DataDeydrator` have been
120121
removed without replacement.
122+
- Introduced `neo4j.exceptions.SessionError` that is raised when trying to
123+
execute work on a closed or otherwise terminated session.
121124

122125

123126
## Version 4.4

docs/source/api.rst

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,8 @@ To construct a :class:`neo4j.Session` use the :meth:`neo4j.Driver.session` metho
511511
512512
513513
Sessions will often be created and destroyed using a *with block context*.
514+
This is the recommended approach as it takes care of closing the session
515+
properly even when an exception is raised.
514516

515517
.. code-block:: python
516518
@@ -536,6 +538,8 @@ Session
536538

537539
.. automethod:: close
538540

541+
.. automethod:: closed
542+
539543
.. automethod:: run
540544

541545
.. automethod:: last_bookmarks
@@ -643,7 +647,7 @@ context of the impersonated user. For this, the user for which the
643647
.. Note::
644648

645649
The server or all servers of the cluster need to support impersonation when.
646-
Otherwise, the driver will raise :py:exc:`.ConfigurationError`
650+
Otherwise, the driver will raise :exc:`.ConfigurationError`
647651
as soon as it encounters a server that does not.
648652

649653

@@ -708,15 +712,15 @@ Neo4j supports three kinds of transaction:
708712
+ :ref:`explicit-transactions-ref`
709713
+ :ref:`managed-transactions-ref`
710714

711-
Each has pros and cons but if in doubt, use a managed transaction with a `transaction function`.
715+
Each has pros and cons but if in doubt, use a managed transaction with a *transaction function*.
712716

713717

714718
.. _auto-commit-transactions-ref:
715719

716720
Auto-commit Transactions
717721
========================
718722
Auto-commit transactions are the simplest form of transaction, available via
719-
:py:meth:`neo4j.Session.run`. These are easy to use but support only one
723+
:meth:`neo4j.Session.run`. These are easy to use but support only one
720724
statement per transaction and are not automatically retried on failure.
721725

722726
Auto-commit transactions are also the only way to run ``PERIODIC COMMIT``
@@ -756,7 +760,7 @@ Example:
756760

757761
Explicit Transactions
758762
=====================
759-
Explicit transactions support multiple statements and must be created with an explicit :py:meth:`neo4j.Session.begin_transaction` call.
763+
Explicit transactions support multiple statements and must be created with an explicit :meth:`neo4j.Session.begin_transaction` call.
760764

761765
This creates a new :class:`neo4j.Transaction` object that can be used to run Cypher.
762766

@@ -766,16 +770,16 @@ It also gives applications the ability to directly control ``commit`` and ``roll
766770

767771
.. automethod:: run
768772

769-
.. automethod:: close
770-
771-
.. automethod:: closed
772-
773773
.. automethod:: commit
774774

775775
.. automethod:: rollback
776776

777+
.. automethod:: close
778+
779+
.. automethod:: closed
780+
777781
Closing an explicit transaction can either happen automatically at the end of a ``with`` block,
778-
or can be explicitly controlled through the :py:meth:`neo4j.Transaction.commit`, :py:meth:`neo4j.Transaction.rollback` or :py:meth:`neo4j.Transaction.close` methods.
782+
or can be explicitly controlled through the :meth:`neo4j.Transaction.commit`, :meth:`neo4j.Transaction.rollback` or :meth:`neo4j.Transaction.close` methods.
779783

780784
Explicit transactions are most useful for applications that need to distribute Cypher execution across multiple functions for the same transaction.
781785

@@ -811,8 +815,8 @@ Managed Transactions (`transaction functions`)
811815
==============================================
812816
Transaction functions are the most powerful form of transaction, providing access mode override and retry capabilities.
813817

814-
+ :py:meth:`neo4j.Session.write_transaction`
815-
+ :py:meth:`neo4j.Session.read_transaction`
818+
+ :meth:`neo4j.Session.write_transaction`
819+
+ :meth:`neo4j.Session.read_transaction`
816820

817821
These allow a function object representing the transactional unit of work to be passed as a parameter.
818822
This function is called one or more times, within a configurable time limit, until it succeeds.
@@ -912,8 +916,8 @@ Record
912916
.. autoclass:: neo4j.Record()
913917

914918
A :class:`neo4j.Record` is an immutable ordered collection of key-value
915-
pairs. It is generally closer to a :py:class:`namedtuple` than to an
916-
:py:class:`OrderedDict` inasmuch as iteration of the collection will
919+
pairs. It is generally closer to a :class:`namedtuple` than to an
920+
:class:`OrderedDict` inasmuch as iteration of the collection will
917921
yield values rather than keys.
918922

919923
.. describe:: Record(iterable)
@@ -1313,6 +1317,8 @@ Client-side errors
13131317

13141318
* :class:`neo4j.exceptions.DriverError`
13151319

1320+
* :class:`neo4j.exceptions.SessionError`
1321+
13161322
* :class:`neo4j.exceptions.TransactionError`
13171323

13181324
* :class:`neo4j.exceptions.TransactionNestingError`
@@ -1347,6 +1353,9 @@ Client-side errors
13471353
.. autoclass:: neo4j.exceptions.DriverError
13481354
:members: is_retryable
13491355

1356+
.. autoclass:: neo4j.exceptions.SessionError
1357+
:show-inheritance:
1358+
13501359
.. autoclass:: neo4j.exceptions.TransactionError
13511360
:show-inheritance:
13521361

docs/source/async_api.rst

Lines changed: 94 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,8 @@ To construct a :class:`neo4j.AsyncSession` use the :meth:`neo4j.AsyncDriver.sess
287287
288288
289289
Sessions will often be created and destroyed using a *with block context*.
290+
This is the recommended approach as it takes care of closing the session
291+
properly even when an exception is raised.
290292

291293
.. code-block:: python
292294
@@ -311,8 +313,43 @@ AsyncSession
311313

312314
.. autoclass:: neo4j.AsyncSession()
313315

316+
.. note::
317+
318+
Some asyncio utility functions (e.g., :func:`asyncio.wait_for` and
319+
:func:`asyncio.shield`) will wrap work in a :class:`asyncio.Task`.
320+
This introduces concurrency and can lead to undefined behavior as
321+
:class:`AsyncSession` is not concurrency-safe.
322+
323+
Consider this **wrong** example::
324+
325+
async def dont_do_this(driver):
326+
async with driver.session() as session:
327+
await asyncio.shield(session.run("RETURN 1"))
328+
329+
If ``dont_do_this`` gets cancelled while waiting for ``session.run``,
330+
``session.run`` itself won't get cancelled (it's shielded) so it will
331+
continue to use the session in another Task. Concurrently, will the
332+
async context manager (``async with driver.session()``) on exit clean
333+
up the session. That's two Tasks handling the session concurrently.
334+
Therefore, this yields undefined behavior.
335+
336+
In this particular example, the problem could be solved by shielding
337+
the whole coroutine ``dont_do_this`` instead of only the
338+
``session.run``. Like so::
339+
340+
async def thats_better(driver):
341+
async def inner()
342+
async with driver.session() as session:
343+
await session.run("RETURN 1")
344+
345+
await asyncio.shield(inner())
346+
314347
.. automethod:: close
315348

349+
.. automethod:: cancel
350+
351+
.. automethod:: closed
352+
316353
.. automethod:: run
317354

318355
.. automethod:: last_bookmarks
@@ -346,15 +383,15 @@ Neo4j supports three kinds of async transaction:
346383
+ :ref:`async-explicit-transactions-ref`
347384
+ :ref:`async-managed-transactions-ref`
348385

349-
Each has pros and cons but if in doubt, use a managed transaction with a `transaction function`.
386+
Each has pros and cons but if in doubt, use a managed transaction with a *transaction function*.
350387

351388

352389
.. _async-auto-commit-transactions-ref:
353390

354391
Auto-commit Transactions
355392
========================
356393
Auto-commit transactions are the simplest form of transaction, available via
357-
:py:meth:`neo4j.Session.run`. These are easy to use but support only one
394+
:meth:`neo4j.Session.run`. These are easy to use but support only one
358395
statement per transaction and are not automatically retried on failure.
359396

360397
Auto-commit transactions are also the only way to run ``PERIODIC COMMIT``
@@ -398,7 +435,7 @@ Example:
398435

399436
Explicit Async Transactions
400437
===========================
401-
Explicit transactions support multiple statements and must be created with an explicit :py:meth:`neo4j.AsyncSession.begin_transaction` call.
438+
Explicit transactions support multiple statements and must be created with an explicit :meth:`neo4j.AsyncSession.begin_transaction` call.
402439

403440
This creates a new :class:`neo4j.AsyncTransaction` object that can be used to run Cypher.
404441

@@ -408,16 +445,18 @@ It also gives applications the ability to directly control ``commit`` and ``roll
408445

409446
.. automethod:: run
410447

411-
.. automethod:: close
412-
413-
.. automethod:: closed
414-
415448
.. automethod:: commit
416449

417450
.. automethod:: rollback
418451

452+
.. automethod:: close
453+
454+
.. automethod:: cancel
455+
456+
.. automethod:: closed
457+
419458
Closing an explicit transaction can either happen automatically at the end of a ``async with`` block,
420-
or can be explicitly controlled through the :py:meth:`neo4j.AsyncTransaction.commit`, :py:meth:`neo4j.AsyncTransaction.rollback` or :py:meth:`neo4j.AsyncTransaction.close` methods.
459+
or can be explicitly controlled through the :meth:`neo4j.AsyncTransaction.commit`, :meth:`neo4j.AsyncTransaction.rollback`, :meth:`neo4j.AsyncTransaction.close` or :meth:`neo4j.AsyncTransaction.cancel` methods.
421460

422461
Explicit transactions are most useful for applications that need to distribute Cypher execution across multiple functions for the same transaction.
423462

@@ -456,8 +495,8 @@ Managed Async Transactions (`transaction functions`)
456495
====================================================
457496
Transaction functions are the most powerful form of transaction, providing access mode override and retry capabilities.
458497

459-
+ :py:meth:`neo4j.AsyncSession.write_transaction`
460-
+ :py:meth:`neo4j.AsyncSession.read_transaction`
498+
+ :meth:`neo4j.AsyncSession.write_transaction`
499+
+ :meth:`neo4j.AsyncSession.read_transaction`
461500

462501
These allow a function object representing the transactional unit of work to be passed as a parameter.
463502
This function is called one or more times, within a configurable time limit, until it succeeds.
@@ -531,3 +570,48 @@ A :class:`neo4j.AsyncResult` is attached to an active connection, through a :cla
531570
.. automethod:: closed
532571

533572
See https://neo4j.com/docs/python-manual/current/cypher-workflow/#python-driver-type-mapping for more about type mapping.
573+
574+
575+
576+
******************
577+
Async Cancellation
578+
******************
579+
580+
Async Python provides a mechanism for cancelling futures
581+
(:meth:`asyncio.Future.cancel`). The driver and its components can handle this.
582+
However, generally, it's not advised to rely on cancellation as it forces the
583+
driver to close affected connections to avoid leaving them in an undefined
584+
state. This makes the driver less efficient.
585+
586+
The easiest way to make sure your application code's interaction with the driver
587+
is playing nicely with cancellation is to always use the async context manager
588+
provided by :class:`neo4j.AsyncSession` like so: ::
589+
590+
async with driver.session() as session:
591+
... # do what you need to do with the session
592+
593+
If, for whatever reason, you need handle the session manually, you can it like
594+
so: ::
595+
596+
session = await with driver.session()
597+
try:
598+
... # do what you need to do with the session
599+
except asyncio.CancelledError:
600+
session.cancel()
601+
raise
602+
finally:
603+
# this becomes a no-op if the session has been cancelled before
604+
await session.close()
605+
606+
As mentioned above, any cancellation of I/O work will cause the driver to close
607+
the affected connection. This will kill any :class:`neo4j.AsyncTransaction` and
608+
:class:`neo4j.AsyncResult` objects that are attached to that connection. Hence,
609+
after catching a :class:`asyncio.CancelledError`, you should not try to use
610+
transactions or results created earlier. They are likely to not be valid
611+
anymore.
612+
613+
Furthermore, there is no guarantee as to whether a piece of ongoing work got
614+
successfully executed on the server side or not, when a cancellation happens:
615+
``await transaction.commit()`` and other methods can throw
616+
:exc:`asyncio.CancelledError` but still have managed to complete from the
617+
server's perspective.

neo4j/_async/io/_bolt.py

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import abc
2020
import asyncio
21+
import socket
2122
from collections import deque
2223
from logging import getLogger
2324
from time import perf_counter
@@ -370,8 +371,9 @@ def time_remaining():
370371
await connection.hello()
371372
finally:
372373
connection.socket.set_deadline(None)
373-
except Exception:
374-
await connection.close_non_blocking()
374+
except Exception as e:
375+
log.debug("[#%04X] C: <OPEN FAILED> %r", s.getsockname()[1], e)
376+
connection.kill()
375377
raise
376378

377379
return connection
@@ -678,22 +680,28 @@ async def _set_defunct_write(self, error=None, silent=False):
678680
async def _set_defunct(self, message, error=None, silent=False):
679681
from ._pool import AsyncBoltPool
680682
direct_driver = isinstance(self.pool, AsyncBoltPool)
683+
user_cancelled = isinstance(error, asyncio.CancelledError)
681684

682685
if error:
683-
log.debug("[#%04X] %r", self.socket.getsockname()[1], error)
684-
log.error(message)
686+
log.debug("[#%04X] %r", self.local_port, error)
687+
if not user_cancelled:
688+
log.error(message)
685689
# We were attempting to receive data but the connection
686690
# has unexpectedly terminated. So, we need to close the
687691
# connection from the client side, and remove the address
688692
# from the connection pool.
689693
self._defunct = True
694+
if user_cancelled:
695+
self.kill()
696+
raise error # cancellation error should not be re-written
690697
if not self._closing:
691698
# If we fail while closing the connection, there is no need to
692699
# remove the connection from the pool, nor to try to close the
693700
# connection again.
694701
await self.close()
695702
if self.pool:
696703
await self.pool.deactivate(address=self.unresolved_address)
704+
697705
# Iterate through the outstanding responses, and if any correspond
698706
# to COMMIT requests then raise an error to signal that we are
699707
# unable to confirm that the COMMIT completed successfully.
@@ -736,8 +744,9 @@ async def close(self):
736744
self.goodbye()
737745
try:
738746
await self._send_all()
739-
except (OSError, BoltError, DriverError):
740-
pass
747+
except (OSError, BoltError, DriverError) as exc:
748+
log.debug("[#%04X] ignoring failed close %r",
749+
self.local_port, exc)
741750
log.debug("[#%04X] C: <CLOSE>", self.local_port)
742751
try:
743752
await self.socket.close()
@@ -746,18 +755,19 @@ async def close(self):
746755
finally:
747756
self._closed = True
748757

749-
async def close_non_blocking(self):
750-
"""Set the socket to non-blocking and close it.
751-
752-
This will try to send the `GOODBYE` message (given the socket is not
753-
marked as defunct). However, should the write operation require
754-
blocking (e.g., a full network buffer), then the socket will be closed
755-
immediately (without `GOODBYE` message).
756-
"""
757-
if self._closed or self._closing:
758+
def kill(self):
759+
"""Close the socket most violently. No flush, no goodbye, no mercy."""
760+
if self._closed:
758761
return
759-
self.socket.settimeout(0)
760-
await self.close()
762+
log.debug("[#%04X] C: <KILL>", self.local_port)
763+
self._closing = True
764+
try:
765+
self.socket.kill()
766+
except OSError as exc:
767+
log.debug("[#%04X] ignoring failed kill %r",
768+
self.local_port, exc)
769+
finally:
770+
self._closed = True
761771

762772
def closed(self):
763773
return self._closed

0 commit comments

Comments
 (0)