Skip to content

Optimize driver.execute_query by pipelining BEGIN #956

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Aug 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions src/neo4j/_async/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -860,9 +860,10 @@ async def example(driver: neo4j.AsyncDriver) -> neo4j.Record::
else:
raise ValueError("Invalid routing control value: %r"
% routing_)
return await executor(
_work, query_, parameters, result_transformer_
)
with session._pipelined_begin:
return await executor(
_work, query_, parameters, result_transformer_
)

@property
def execute_query_bookmark_manager(self) -> AsyncBookmarkManager:
Expand Down
20 changes: 17 additions & 3 deletions src/neo4j/_async/work/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
deprecated,
PreviewWarning,
)
from ..._util import ContextBool
from ..._work import Query
from ...api import (
Bookmarks,
Expand Down Expand Up @@ -100,6 +101,10 @@ class AsyncSession(AsyncWorkspace):
# The state this session is in.
_state_failed = False

_config: SessionConfig
_bookmark_manager: t.Optional[Bookmarks]
_pipelined_begin: ContextBool

def __init__(self, pool, session_config):
assert isinstance(session_config, SessionConfig)
if session_config.auth is not None:
Expand All @@ -115,6 +120,7 @@ def __init__(self, pool, session_config):
self._config = session_config
self._initialize_bookmarks(session_config.bookmarks)
self._bookmark_manager = session_config.bookmark_manager
self._pipelined_begin = ContextBool()

async def __aenter__(self) -> AsyncSession:
return self
Expand Down Expand Up @@ -421,6 +427,7 @@ async def _open_transaction(
bookmarks, access_mode, metadata, timeout,
self._config.notifications_min_severity,
self._config.notifications_disabled_categories,
pipelined=self._pipelined_begin
)

async def begin_transaction(
Expand Down Expand Up @@ -480,9 +487,15 @@ async def begin_transaction(

return t.cast(AsyncTransaction, self._transaction)


async def _run_transaction(
self, access_mode, transaction_function, *args, **kwargs
):
self,
access_mode: str,
transaction_function: t.Callable[
te.Concatenate[AsyncManagedTransaction, _P], t.Awaitable[_R]
],
*args: _P.args, **kwargs: _P.kwargs
) -> _R:
self._check_state()
if not callable(transaction_function):
raise TypeError("Unit of work is not callable")
Expand All @@ -498,7 +511,7 @@ async def _run_transaction(

errors = []

t0 = -1 # Timer
t0: float = -1 # Timer

while True:
try:
Expand All @@ -507,6 +520,7 @@ async def _run_transaction(
access_mode=access_mode, metadata=metadata,
timeout=timeout
)
assert isinstance(self._transaction, AsyncManagedTransaction)
tx = self._transaction
try:
result = await transaction_function(tx, *args, **kwargs)
Expand Down
6 changes: 4 additions & 2 deletions src/neo4j/_async/work/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ async def _exit(self, exception_type, exception_value, traceback):
async def _begin(
self, database, imp_user, bookmarks, access_mode, metadata, timeout,
notifications_min_severity, notifications_disabled_categories,
pipelined=False,
):
self._database = database
self._connection.begin(
Expand All @@ -82,8 +83,9 @@ async def _begin(
notifications_min_severity=notifications_min_severity,
notifications_disabled_categories=notifications_disabled_categories
)
await self._error_handling_connection.send_all()
await self._error_handling_connection.fetch_all()
if not pipelined:
await self._error_handling_connection.send_all()
await self._error_handling_connection.fetch_all()

async def _result_on_closed_handler(self):
pass
Expand Down
7 changes: 4 additions & 3 deletions src/neo4j/_sync/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -859,9 +859,10 @@ def example(driver: neo4j.Driver) -> neo4j.Record::
else:
raise ValueError("Invalid routing control value: %r"
% routing_)
return executor(
_work, query_, parameters, result_transformer_
)
with session._pipelined_begin:
return executor(
_work, query_, parameters, result_transformer_
)

@property
def execute_query_bookmark_manager(self) -> BookmarkManager:
Expand Down
20 changes: 17 additions & 3 deletions src/neo4j/_sync/work/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
deprecated,
PreviewWarning,
)
from ..._util import ContextBool
from ..._work import Query
from ...api import (
Bookmarks,
Expand Down Expand Up @@ -100,6 +101,10 @@ class Session(Workspace):
# The state this session is in.
_state_failed = False

_config: SessionConfig
_bookmark_manager: t.Optional[Bookmarks]
_pipelined_begin: ContextBool

def __init__(self, pool, session_config):
assert isinstance(session_config, SessionConfig)
if session_config.auth is not None:
Expand All @@ -115,6 +120,7 @@ def __init__(self, pool, session_config):
self._config = session_config
self._initialize_bookmarks(session_config.bookmarks)
self._bookmark_manager = session_config.bookmark_manager
self._pipelined_begin = ContextBool()

def __enter__(self) -> Session:
return self
Expand Down Expand Up @@ -421,6 +427,7 @@ def _open_transaction(
bookmarks, access_mode, metadata, timeout,
self._config.notifications_min_severity,
self._config.notifications_disabled_categories,
pipelined=self._pipelined_begin
)

def begin_transaction(
Expand Down Expand Up @@ -480,9 +487,15 @@ def begin_transaction(

return t.cast(Transaction, self._transaction)


def _run_transaction(
self, access_mode, transaction_function, *args, **kwargs
):
self,
access_mode: str,
transaction_function: t.Callable[
te.Concatenate[ManagedTransaction, _P], t.Union[_R]
],
*args: _P.args, **kwargs: _P.kwargs
) -> _R:
self._check_state()
if not callable(transaction_function):
raise TypeError("Unit of work is not callable")
Expand All @@ -498,7 +511,7 @@ def _run_transaction(

errors = []

t0 = -1 # Timer
t0: float = -1 # Timer

while True:
try:
Expand All @@ -507,6 +520,7 @@ def _run_transaction(
access_mode=access_mode, metadata=metadata,
timeout=timeout
)
assert isinstance(self._transaction, ManagedTransaction)
tx = self._transaction
try:
result = transaction_function(tx, *args, **kwargs)
Expand Down
6 changes: 4 additions & 2 deletions src/neo4j/_sync/work/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def _exit(self, exception_type, exception_value, traceback):
def _begin(
self, database, imp_user, bookmarks, access_mode, metadata, timeout,
notifications_min_severity, notifications_disabled_categories,
pipelined=False,
):
self._database = database
self._connection.begin(
Expand All @@ -82,8 +83,9 @@ def _begin(
notifications_min_severity=notifications_min_severity,
notifications_disabled_categories=notifications_disabled_categories
)
self._error_handling_connection.send_all()
self._error_handling_connection.fetch_all()
if not pipelined:
self._error_handling_connection.send_all()
self._error_handling_connection.fetch_all()

def _result_on_closed_handler(self):
pass
Expand Down
22 changes: 22 additions & 0 deletions src/neo4j/_util/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Copyright (c) "Neo4j"
# Neo4j Sweden AB [https://neo4j.com]
#
# This file is part of Neo4j.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from ._context_bool import ContextBool


__all__ = ["ContextBool"]
36 changes: 36 additions & 0 deletions src/neo4j/_util/_context_bool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Copyright (c) "Neo4j"
# Neo4j Sweden AB [https://neo4j.com]
#
# This file is part of Neo4j.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from __future__ import annotations


__all__ = ["ContextBool"]


class ContextBool:
def __init__(self) -> None:
self._value = False

def __bool__(self) -> bool:
return self._value

def __enter__(self) -> None:
self._value = True

def __exit__(self, exc_type, exc_value, traceback) -> None:
self._value = False
1 change: 1 addition & 0 deletions testkitbackend/test_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
"Optimization:AuthPipelining": true,
"Optimization:ConnectionReuse": true,
"Optimization:EagerTransactionBegin": true,
"Optimization:ExecuteQueryPipelining": true,
"Optimization:ImplicitDefaultArguments": true,
"Optimization:MinimalBookmarksSet": true,
"Optimization:MinimalResets": true,
Expand Down
Loading