Skip to content

chore: Upgrade to v25 api #269

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .trunk/trunk.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ lint:
ignore:
- linters: [ALL]
paths:
- pydgraph/proto/api_pb*.py
- pydgraph/proto/*_pb2*.py

enabled:
- [email protected]
Expand Down
346 changes: 346 additions & 0 deletions pydgraph/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,20 @@
from pydgraph import errors, txn, util
from pydgraph.meta import VERSION
from pydgraph.proto import api_pb2 as api
from pydgraph.proto import api_v2_pb2 as api_v2
from pydgraph.proto.api_v2_pb2 import (
AllocateIDsRequest,
)
from pydgraph.proto.api_v2_pb2 import AlterRequest as AlterRequestV2
from pydgraph.proto.api_v2_pb2 import (
CreateNamespaceRequest,
DropNamespaceRequest,
ListNamespacesRequest,
RunDQLRequest,
SignInUserRequest,
UpdateExtSnapshotStreamingStateRequest,
UpdateNamespaceRequest,
)

__author__ = "Mohit Ranka <[email protected]>"
__maintainer__ = "Hypermode Inc. <[email protected]>"
Expand Down Expand Up @@ -130,6 +144,337 @@ def alter(self, operation, timeout=None, metadata=None, credentials=None):
else:
self._common_except_alter(error)

def ping(self, timeout=None, metadata=None, credentials=None):
"""Runs a ping via this client."""
"""Note this is only supported on Dgraph v25.0.0 and above."""
new_metadata = self.add_login_metadata(metadata)
req = api_v2.PingRequest()
try:
return self.any_client().ping(
req,
timeout=timeout,
metadata=new_metadata,
credentials=credentials,
)
except Exception as error:
if util.is_jwt_expired(error):
self.retry_login()
new_metadata = self.add_login_metadata(metadata)
return self.any_client().ping(
req,
timeout=timeout,
metadata=new_metadata,
credentials=credentials,
)
else:
raise error

def allocate_ids(
self, how_many, lease_type, timeout=None, metadata=None, credentials=None
):
"""Runs an AllocateIDs via this client."""
"""Note this is only supported on Dgraph v25.0.0 and above."""
new_metadata = self.add_login_metadata(metadata)
req = AllocateIDsRequest(how_many=how_many, lease_type=lease_type)
try:
return self.any_client().allocate_ids(
req,
timeout=timeout,
metadata=new_metadata,
credentials=credentials,
)
except Exception as error:
if util.is_jwt_expired(error):
self.retry_login()
new_metadata = self.add_login_metadata(metadata)
return self.any_client().allocate_ids(
req,
timeout=timeout,
metadata=new_metadata,
credentials=credentials,
)
else:
raise error

def sign_in_user(
self,
user_id,
password,
refresh_token=None,
timeout=None,
metadata=None,
credentials=None,
):
"""Runs a SignInUser via this client."""
"""Note this is only supported on Dgraph v25.0.0 and above."""
new_metadata = self.add_login_metadata(metadata)
req = SignInUserRequest(
user_id=user_id, password=password, refresh_token=refresh_token
)
try:
return self.any_client().sign_in_user(
req,
timeout=timeout,
metadata=new_metadata,
credentials=credentials,
)
except Exception as error:
if util.is_jwt_expired(error):
self.retry_login()
new_metadata = self.add_login_metadata(metadata)
return self.any_client().sign_in_user(
req,
timeout=timeout,
metadata=new_metadata,
credentials=credentials,
)
else:
raise error

def alter_v2(
self,
op,
ns_name=None,
schema=None,
run_in_background=False,
predicate_to_drop=None,
type_to_drop=None,
timeout=None,
metadata=None,
credentials=None,
):
"""Runs an Alter (v2) via this client."""
"""Note this is only supported on Dgraph v25.0.0 and above."""
new_metadata = self.add_login_metadata(metadata)
req = AlterRequestV2(
op=op,
ns_name=ns_name,
schema=schema,
run_in_background=run_in_background,
predicate_to_drop=predicate_to_drop,
type_to_drop=type_to_drop,
)
try:
return self.any_client().alter_v2(
req,
timeout=timeout,
metadata=new_metadata,
credentials=credentials,
)
except Exception as error:
if util.is_jwt_expired(error):
self.retry_login()
new_metadata = self.add_login_metadata(metadata)
return self.any_client().alter_v2(
req,
timeout=timeout,
metadata=new_metadata,
credentials=credentials,
)
else:
raise error

def run_dql(
self,
dql_query,
ns_name=None,
vars=None,
read_only=False,
best_effort=False,
resp_format=api_v2.RespFormat.JSON,
timeout=None,
metadata=None,
credentials=None,
):
"""Runs a RunDQL via this client."""
"""Note this is only supported on Dgraph v25.0.0 and above."""
new_metadata = self.add_login_metadata(metadata)
req = RunDQLRequest(
dql_query=dql_query,
ns_name=ns_name,
vars=vars,
read_only=read_only,
best_effort=best_effort,
resp_format=resp_format,
)
try:
return self.any_client().run_dql(
req,
timeout=timeout,
metadata=new_metadata,
credentials=credentials,
)
except Exception as error:
if util.is_jwt_expired(error):
self.retry_login()
new_metadata = self.add_login_metadata(metadata)
return self.any_client().run_dql(
req,
timeout=timeout,
metadata=new_metadata,
credentials=credentials,
)
else:
raise error

def create_namespace(self, ns_name, timeout=None, metadata=None, credentials=None):
"""Runs a CreateNamespace via this client."""
"""Note this is only supported on Dgraph v25.0.0 and above."""
new_metadata = self.add_login_metadata(metadata)
req = CreateNamespaceRequest(ns_name=ns_name)
try:
return self.any_client().create_namespace(
req,
timeout=timeout,
metadata=new_metadata,
credentials=credentials,
)
except Exception as error:
if util.is_jwt_expired(error):
self.retry_login()
new_metadata = self.add_login_metadata(metadata)
return self.any_client().create_namespace(
req,
timeout=timeout,
metadata=new_metadata,
credentials=credentials,
)
else:
raise error

def drop_namespace(self, ns_name, timeout=None, metadata=None, credentials=None):
"""Runs a DropNamespace via this client."""
"""Note this is only supported on Dgraph v25.0.0 and above."""
new_metadata = self.add_login_metadata(metadata)
req = DropNamespaceRequest(ns_name=ns_name)
try:
return self.any_client().drop_namespace(
req,
timeout=timeout,
metadata=new_metadata,
credentials=credentials,
)
except Exception as error:
if util.is_jwt_expired(error):
self.retry_login()
new_metadata = self.add_login_metadata(metadata)
return self.any_client().drop_namespace(
req,
timeout=timeout,
metadata=new_metadata,
credentials=credentials,
)
else:
raise error

def update_namespace(
self, ns_name, rename_to_ns, timeout=None, metadata=None, credentials=None
):
"""Runs an UpdateNamespace via this client."""
"""Note this is only supported on Dgraph v25.0.0 and above."""
new_metadata = self.add_login_metadata(metadata)
req = UpdateNamespaceRequest(ns_name=ns_name, rename_to_ns=rename_to_ns)
try:
return self.any_client().update_namespace(
req,
timeout=timeout,
metadata=new_metadata,
credentials=credentials,
)
except Exception as error:
if util.is_jwt_expired(error):
self.retry_login()
new_metadata = self.add_login_metadata(metadata)
return self.any_client().update_namespace(
req,
timeout=timeout,
metadata=new_metadata,
credentials=credentials,
)
else:
raise error

def list_namespaces(self, timeout=None, metadata=None, credentials=None):
"""Runs a ListNamespaces via this client."""
"""Note this is only supported on Dgraph v25.0.0 and above."""
new_metadata = self.add_login_metadata(metadata)
req = ListNamespacesRequest()
try:
return self.any_client().list_namespaces(
req,
timeout=timeout,
metadata=new_metadata,
credentials=credentials,
)
except Exception as error:
if util.is_jwt_expired(error):
self.retry_login()
new_metadata = self.add_login_metadata(metadata)
return self.any_client().list_namespaces(
req,
timeout=timeout,
metadata=new_metadata,
credentials=credentials,
)
else:
raise error

def update_ext_snapshot_streaming_state(
self,
start=False,
finish=False,
drop_data=False,
timeout=None,
metadata=None,
credentials=None,
):
"""Runs an UpdateExtSnapshotStreamingState via this client."""
"""Note this is only supported on Dgraph v25.0.0 and above."""
new_metadata = self.add_login_metadata(metadata)
req = UpdateExtSnapshotStreamingStateRequest(
start=start, finish=finish, drop_data=drop_data
)
try:
return self.any_client().update_ext_snapshot_streaming_state(
req,
timeout=timeout,
metadata=new_metadata,
credentials=credentials,
)
except Exception as error:
if util.is_jwt_expired(error):
self.retry_login()
new_metadata = self.add_login_metadata(metadata)
return self.any_client().update_ext_snapshot_streaming_state(
req,
timeout=timeout,
metadata=new_metadata,
credentials=credentials,
)
else:
raise error

def stream_ext_snapshot(
self, request_iterator, timeout=None, metadata=None, credentials=None
):
"""Runs a StreamExtSnapshot via this client. This is a client-streaming RPC.
The caller is responsible for handling JWT expiry and retries for this streaming call.
"""
"""Note this is only supported on Dgraph v25.0.0 and above."""
new_metadata = self.add_login_metadata(metadata)
try:
# The underlying gRPC stub expects an iterator for client-streaming RPCs.
return self.any_client().stream_ext_snapshot(
request_iterator,
timeout=timeout,
metadata=new_metadata,
credentials=credentials,
)
except Exception as error:
# JWT retry logic is not implemented for streaming calls as it's non-trivial.
# The stream would be broken on error anyway.
self._common_except_alter(error)

@staticmethod
def _common_except_alter(error):
if util.is_retriable_error(error):
Expand All @@ -142,6 +487,7 @@ def _common_except_alter(error):

def async_alter(self, operation, timeout=None, metadata=None, credentials=None):
"""The async version of alter."""
"""Note this is only supported on Dgraph v25.0.0 and above."""
new_metadata = self.add_login_metadata(metadata)
return self.any_client().async_alter(
operation, timeout=timeout, metadata=new_metadata, credentials=credentials
Expand Down
Loading
Loading