Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions src/neo4j/_async/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -860,9 +860,10 @@ async def example(driver: neo4j.AsyncDriver) -> neo4j.Record::
else:
raise ValueError("Invalid routing control value: %r"
% routing_)
return await executor(
_work, query_, parameters, result_transformer_
)
with session._pipelined_begin:
return await executor(
_work, query_, parameters, result_transformer_
)

@property
def execute_query_bookmark_manager(self) -> AsyncBookmarkManager:
Expand Down
20 changes: 17 additions & 3 deletions src/neo4j/_async/work/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
deprecated,
PreviewWarning,
)
from ..._util import ContextBool
from ..._work import Query
from ...api import (
Bookmarks,
Expand Down Expand Up @@ -100,6 +101,10 @@ class AsyncSession(AsyncWorkspace):
# The state this session is in.
_state_failed = False

_config: SessionConfig
_bookmark_manager: t.Optional[Bookmarks]
_pipelined_begin: ContextBool

def __init__(self, pool, session_config):
assert isinstance(session_config, SessionConfig)
if session_config.auth is not None:
Expand All @@ -115,6 +120,7 @@ def __init__(self, pool, session_config):
self._config = session_config
self._initialize_bookmarks(session_config.bookmarks)
self._bookmark_manager = session_config.bookmark_manager
self._pipelined_begin = ContextBool()

async def __aenter__(self) -> AsyncSession:
return self
Expand Down Expand Up @@ -421,6 +427,7 @@ async def _open_transaction(
bookmarks, access_mode, metadata, timeout,
self._config.notifications_min_severity,
self._config.notifications_disabled_categories,
pipelined=self._pipelined_begin
)

async def begin_transaction(
Expand Down Expand Up @@ -480,9 +487,15 @@ async def begin_transaction(

return t.cast(AsyncTransaction, self._transaction)


async def _run_transaction(
self, access_mode, transaction_function, *args, **kwargs
):
self,
access_mode: str,
transaction_function: t.Callable[
te.Concatenate[AsyncManagedTransaction, _P], t.Awaitable[_R]
],
*args: _P.args, **kwargs: _P.kwargs
) -> _R:
self._check_state()
if not callable(transaction_function):
raise TypeError("Unit of work is not callable")
Expand All @@ -498,7 +511,7 @@ async def _run_transaction(

errors = []

t0 = -1 # Timer
t0: float = -1 # Timer

while True:
try:
Expand All @@ -507,6 +520,7 @@ async def _run_transaction(
access_mode=access_mode, metadata=metadata,
timeout=timeout
)
assert isinstance(self._transaction, AsyncManagedTransaction)
tx = self._transaction
try:
result = await transaction_function(tx, *args, **kwargs)
Expand Down
4 changes: 3 additions & 1 deletion src/neo4j/_async/work/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ async def _exit(self, exception_type, exception_value, traceback):
async def _begin(
self, database, imp_user, bookmarks, access_mode, metadata, timeout,
notifications_min_severity, notifications_disabled_categories,
pipelined=False,
):
self._database = database
self._connection.begin(
Expand All @@ -83,7 +84,8 @@ async def _begin(
notifications_disabled_categories=notifications_disabled_categories
)
await self._error_handling_connection.send_all()
await self._error_handling_connection.fetch_all()
if not pipelined:
await self._error_handling_connection.fetch_all()

async def _result_on_closed_handler(self):
pass
Expand Down
7 changes: 4 additions & 3 deletions src/neo4j/_sync/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -859,9 +859,10 @@ def example(driver: neo4j.Driver) -> neo4j.Record::
else:
raise ValueError("Invalid routing control value: %r"
% routing_)
return executor(
_work, query_, parameters, result_transformer_
)
with session._pipelined_begin:
return executor(
_work, query_, parameters, result_transformer_
)

@property
def execute_query_bookmark_manager(self) -> BookmarkManager:
Expand Down
20 changes: 17 additions & 3 deletions src/neo4j/_sync/work/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
deprecated,
PreviewWarning,
)
from ..._util import ContextBool
from ..._work import Query
from ...api import (
Bookmarks,
Expand Down Expand Up @@ -100,6 +101,10 @@ class Session(Workspace):
# The state this session is in.
_state_failed = False

_config: SessionConfig
_bookmark_manager: t.Optional[Bookmarks]
_pipelined_begin: ContextBool

def __init__(self, pool, session_config):
assert isinstance(session_config, SessionConfig)
if session_config.auth is not None:
Expand All @@ -115,6 +120,7 @@ def __init__(self, pool, session_config):
self._config = session_config
self._initialize_bookmarks(session_config.bookmarks)
self._bookmark_manager = session_config.bookmark_manager
self._pipelined_begin = ContextBool()

def __enter__(self) -> Session:
return self
Expand Down Expand Up @@ -421,6 +427,7 @@ def _open_transaction(
bookmarks, access_mode, metadata, timeout,
self._config.notifications_min_severity,
self._config.notifications_disabled_categories,
pipelined=self._pipelined_begin
)

def begin_transaction(
Expand Down Expand Up @@ -480,9 +487,15 @@ def begin_transaction(

return t.cast(Transaction, self._transaction)


def _run_transaction(
self, access_mode, transaction_function, *args, **kwargs
):
self,
access_mode: str,
transaction_function: t.Callable[
te.Concatenate[ManagedTransaction, _P], t.Union[_R]
],
*args: _P.args, **kwargs: _P.kwargs
) -> _R:
self._check_state()
if not callable(transaction_function):
raise TypeError("Unit of work is not callable")
Expand All @@ -498,7 +511,7 @@ def _run_transaction(

errors = []

t0 = -1 # Timer
t0: float = -1 # Timer

while True:
try:
Expand All @@ -507,6 +520,7 @@ def _run_transaction(
access_mode=access_mode, metadata=metadata,
timeout=timeout
)
assert isinstance(self._transaction, ManagedTransaction)
tx = self._transaction
try:
result = transaction_function(tx, *args, **kwargs)
Expand Down
4 changes: 3 additions & 1 deletion src/neo4j/_sync/work/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def _exit(self, exception_type, exception_value, traceback):
def _begin(
self, database, imp_user, bookmarks, access_mode, metadata, timeout,
notifications_min_severity, notifications_disabled_categories,
pipelined=False,
):
self._database = database
self._connection.begin(
Expand All @@ -83,7 +84,8 @@ def _begin(
notifications_disabled_categories=notifications_disabled_categories
)
self._error_handling_connection.send_all()
self._error_handling_connection.fetch_all()
if not pipelined:
self._error_handling_connection.fetch_all()

def _result_on_closed_handler(self):
pass
Expand Down
22 changes: 22 additions & 0 deletions src/neo4j/_util/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Copyright (c) "Neo4j"
# Neo4j Sweden AB [https://neo4j.com]
#
# This file is part of Neo4j.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from ._context_bool import ContextBool


__all__ = ["ContextBool"]
36 changes: 36 additions & 0 deletions src/neo4j/_util/_context_bool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Copyright (c) "Neo4j"
# Neo4j Sweden AB [https://neo4j.com]
#
# This file is part of Neo4j.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from __future__ import annotations


__all__ = ["ContextBool"]


class ContextBool:
def __init__(self) -> None:
self._value = False

def __bool__(self) -> bool:
return self._value

def __enter__(self) -> None:
self._value = True

def __exit__(self, exc_type, exc_value, traceback) -> None:
self._value = False
1 change: 1 addition & 0 deletions testkitbackend/test_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
"Optimization:AuthPipelining": true,
"Optimization:ConnectionReuse": true,
"Optimization:EagerTransactionBegin": true,
"Optimization:ExecuteQueryPipelining": true,
"Optimization:ImplicitDefaultArguments": true,
"Optimization:MinimalBookmarksSet": true,
"Optimization:MinimalResets": true,
Expand Down
Loading