Skip to content

Commit 26790ab

Browse files
committed
Make tag more rich and add a richer API for tag deletion as well
todo: make it compile and add tests
1 parent e199899 commit 26790ab

File tree

3 files changed

+209
-79
lines changed

3 files changed

+209
-79
lines changed

src/rpc/client/tags.rs

Lines changed: 157 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
//! [`Client::list_hash_seq`] can be used to list all tags with a hash_seq format.
1111
//!
1212
//! [`Client::delete`] can be used to delete a tag.
13+
use std::ops::{Bound, RangeBounds};
14+
1315
use anyhow::Result;
1416
use futures_lite::{Stream, StreamExt};
1517
use quic_rpc::{client::BoxedConnector, Connector, RpcClient};
@@ -30,6 +32,125 @@ pub struct Client<C = BoxedConnector<RpcService>> {
3032
pub(super) rpc: RpcClient<RpcService, C>,
3133
}
3234

35+
/// Options for a list operation.
36+
#[derive(Debug, Clone)]
37+
pub struct ListOptions {
38+
/// List tags to hash seqs
39+
pub hash_seq: bool,
40+
/// List tags to raw blobs
41+
pub raw: bool,
42+
/// Optional from tag (inclusive)
43+
pub from: Option<Tag>,
44+
/// Optional to tag (exclusive)
45+
pub to: Option<Tag>,
46+
}
47+
48+
fn tags_from_range<R, E>(range: R) -> (Option<Tag>, Option<Tag>)
49+
where
50+
R: RangeBounds<E>,
51+
E: AsRef<[u8]>,
52+
{
53+
let from = match range.start_bound() {
54+
Bound::Included(start) => Some(Tag::from(start.as_ref())),
55+
Bound::Excluded(start) => Some(Tag::from(start.as_ref()).successor()),
56+
Bound::Unbounded => None,
57+
};
58+
let to = match range.end_bound() {
59+
Bound::Included(end) => Some(Tag::from(end.as_ref()).successor()),
60+
Bound::Excluded(end) => Some(Tag::from(end.as_ref())),
61+
Bound::Unbounded => None,
62+
};
63+
(from, to)
64+
}
65+
66+
impl ListOptions {
67+
/// List a range of tags
68+
pub fn range<R, E>(range: R) -> Self
69+
where
70+
R: RangeBounds<E>,
71+
E: AsRef<[u8]>,
72+
{
73+
let (from, to) = tags_from_range(range);
74+
Self {
75+
from,
76+
to,
77+
raw: true,
78+
hash_seq: true,
79+
}
80+
}
81+
82+
/// List tags with a prefix
83+
pub fn prefix(prefix: &[u8]) -> Self {
84+
let from = Tag::from(prefix);
85+
let to = from.next_prefix();
86+
Self {
87+
raw: true,
88+
hash_seq: true,
89+
from: Some(from),
90+
to,
91+
}
92+
}
93+
94+
/// List a single tag
95+
pub fn single(name: &[u8]) -> Self {
96+
let from = Tag::from(name);
97+
Self {
98+
to: Some(from.successor()),
99+
from: Some(from),
100+
raw: true,
101+
hash_seq: true,
102+
}
103+
}
104+
105+
/// List all tags
106+
pub fn all() -> Self {
107+
Self {
108+
raw: true,
109+
hash_seq: true,
110+
from: None,
111+
to: None,
112+
}
113+
}
114+
115+
/// List raw tags
116+
pub fn raw() -> Self {
117+
Self {
118+
raw: true,
119+
hash_seq: false,
120+
from: None,
121+
to: None,
122+
}
123+
}
124+
125+
/// List hash seq tags
126+
pub fn hash_seq() -> Self {
127+
Self {
128+
raw: false,
129+
hash_seq: true,
130+
from: None,
131+
to: None,
132+
}
133+
}
134+
}
135+
136+
/// Options for a delete operation.
137+
#[derive(Debug, Clone)]
138+
pub struct DeleteOptions {
139+
/// Optional from tag (inclusive)
140+
pub from: Option<Tag>,
141+
/// Optional to tag (exclusive)
142+
pub to: Option<Tag>,
143+
}
144+
145+
impl DeleteOptions {
146+
pub fn single(name: Tag) -> Self {
147+
Self {
148+
to: Some(name.successor()),
149+
from: Some(name),
150+
}
151+
}
152+
}
153+
33154
/// A client that uses the memory connector.
34155
pub type MemClient = Client<crate::rpc::MemConnector>;
35156

@@ -42,44 +163,67 @@ where
42163
Self { rpc }
43164
}
44165

166+
/// List all tags with options.
167+
///
168+
/// This is the most flexible way to list tags. All the other list methods are just convenience
169+
/// methods that call this one with the appropriate options.
170+
pub async fn list_with_opts(
171+
&self,
172+
options: ListOptions,
173+
) -> Result<impl Stream<Item = Result<TagInfo>>> {
174+
let stream = self
175+
.rpc
176+
.server_streaming(ListRequest::from(options))
177+
.await?;
178+
Ok(stream.map(|res| res.map_err(anyhow::Error::from)))
179+
}
180+
45181
/// Get the value of a single tag
46182
pub async fn get(&self, name: impl AsRef<[u8]>) -> Result<Option<TagInfo>> {
47183
let mut stream = self
48-
.rpc
49-
.server_streaming(ListRequest::single(name.as_ref()))
184+
.list_with_opts(ListOptions::single(name.as_ref()))
50185
.await?;
51186
Ok(stream.next().await.transpose()?)
52187
}
53188

189+
/// List a range of tags
190+
pub async fn list_range<R, E>(&self, range: R) -> Result<impl Stream<Item = Result<TagInfo>>>
191+
where
192+
R: RangeBounds<E>,
193+
E: AsRef<[u8]>,
194+
{
195+
self.list_with_opts(ListOptions::range(range)).await
196+
}
197+
54198
/// Lists all tags.
55199
pub async fn list_prefix(
56200
&self,
57201
prefix: impl AsRef<[u8]>,
58202
) -> Result<impl Stream<Item = Result<TagInfo>>> {
59-
let stream = self
60-
.rpc
61-
.server_streaming(ListRequest::prefix(prefix.as_ref()))
62-
.await?;
63-
Ok(stream.map(|res| res.map_err(anyhow::Error::from)))
203+
self.list_with_opts(ListOptions::prefix(prefix.as_ref()))
204+
.await
64205
}
65206

66207
/// Lists all tags.
67208
pub async fn list(&self) -> Result<impl Stream<Item = Result<TagInfo>>> {
68-
let stream = self.rpc.server_streaming(ListRequest::all()).await?;
69-
Ok(stream.map(|res| res.map_err(anyhow::Error::from)))
209+
self.list_with_opts(ListOptions::all()).await
70210
}
71211

72212
/// Lists all tags with a hash_seq format.
73213
pub async fn list_hash_seq(&self) -> Result<impl Stream<Item = Result<TagInfo>>> {
74-
let stream = self.rpc.server_streaming(ListRequest::hash_seq()).await?;
75-
Ok(stream.map(|res| res.map_err(anyhow::Error::from)))
214+
self.list_with_opts(ListOptions::hash_seq()).await
76215
}
77216

78217
/// Deletes a tag.
79-
pub async fn delete(&self, name: Tag) -> Result<()> {
80-
self.rpc.rpc(DeleteRequest { name }).await??;
218+
pub async fn delete_with_opts(&self, options: DeleteOptions) -> Result<()> {
219+
self.rpc.rpc(DeleteRequest::from(options)).await??;
81220
Ok(())
82221
}
222+
223+
/// Deletes a tag.
224+
pub async fn delete(&self, name: Tag) -> Result<()> {
225+
self.delete_with_opts(DeleteOptions::single(name)).await
226+
}
83227
}
84228

85229
/// Information about a tag.

src/rpc/proto/tags.rs

Lines changed: 21 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
//! Tags RPC protocol
2-
use bytes::Bytes;
32
use nested_enum_utils::enum_conversions;
43
use quic_rpc_derive::rpc_requests;
54
use serde::{Deserialize, Serialize};
65

76
use super::{RpcResult, RpcService};
87
use crate::{
98
net_protocol::BatchId,
10-
rpc::client::tags::TagInfo,
11-
util::{increment_vec, next_prefix},
9+
rpc::client::tags::{DeleteOptions, ListOptions, TagInfo},
1210
HashAndFormat, Tag,
1311
};
1412

@@ -79,80 +77,37 @@ pub struct ListRequest {
7977
pub raw: bool,
8078
/// List hash seq tags
8179
pub hash_seq: bool,
82-
/// From tag
80+
/// From tag (inclusive)
8381
pub from: Option<Tag>,
8482
/// To tag (exclusive)
8583
pub to: Option<Tag>,
8684
}
8785

88-
impl ListRequest {
89-
/// List tags with a prefix
90-
pub fn prefix(prefix: &[u8]) -> Self {
91-
let from = prefix.to_vec();
92-
let mut to = from.clone();
93-
let from = Bytes::from(from).into();
94-
let to = if next_prefix(&mut to) {
95-
Some(Bytes::from(to).into())
96-
} else {
97-
None
98-
};
86+
impl From<ListOptions> for ListRequest {
87+
fn from(options: ListOptions) -> Self {
9988
Self {
100-
raw: true,
101-
hash_seq: true,
102-
from: Some(from),
103-
to,
104-
}
105-
}
106-
107-
/// List a single tag
108-
pub fn single(name: &[u8]) -> Self {
109-
let from = name.to_vec();
110-
let mut next = from.clone();
111-
increment_vec(&mut next);
112-
let from = Bytes::from(from).into();
113-
let to = Bytes::from(next).into();
114-
Self {
115-
raw: true,
116-
hash_seq: true,
117-
from: Some(from),
118-
to: Some(to),
119-
}
120-
}
121-
122-
/// List all tags
123-
pub fn all() -> Self {
124-
Self {
125-
raw: true,
126-
hash_seq: true,
127-
from: None,
128-
to: None,
129-
}
130-
}
131-
132-
/// List raw tags
133-
pub fn raw() -> Self {
134-
Self {
135-
raw: true,
136-
hash_seq: false,
137-
from: None,
138-
to: None,
139-
}
140-
}
141-
142-
/// List hash seq tags
143-
pub fn hash_seq() -> Self {
144-
Self {
145-
raw: false,
146-
hash_seq: true,
147-
from: None,
148-
to: None,
89+
raw: options.raw,
90+
hash_seq: options.hash_seq,
91+
from: options.from,
92+
to: options.to,
14993
}
15094
}
15195
}
15296

15397
/// Delete a tag
15498
#[derive(Debug, Serialize, Deserialize)]
15599
pub struct DeleteRequest {
156-
/// Name of the tag
157-
pub name: Tag,
100+
/// From tag (inclusive)
101+
pub from: Option<Tag>,
102+
/// To tag (exclusive)
103+
pub to: Option<Tag>,
104+
}
105+
106+
impl From<DeleteOptions> for DeleteRequest {
107+
fn from(options: DeleteOptions) -> Self {
108+
Self {
109+
from: options.from,
110+
to: options.to,
111+
}
112+
}
158113
}

src/util.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,18 @@ mod redb_support {
7474
}
7575
}
7676

77+
impl From<&[u8]> for Tag {
78+
fn from(value: &[u8]) -> Self {
79+
Self(Bytes::copy_from_slice(value))
80+
}
81+
}
82+
83+
impl AsRef<[u8]> for Tag {
84+
fn as_ref(&self) -> &[u8] {
85+
self.0.as_ref()
86+
}
87+
}
88+
7789
impl Borrow<[u8]> for Tag {
7890
fn borrow(&self) -> &[u8] {
7991
self.0.as_ref()
@@ -132,6 +144,25 @@ impl Tag {
132144
i += 1;
133145
}
134146
}
147+
148+
/// The successor of this tag in lexicographic order.
149+
pub fn successor(&self) -> Self {
150+
let mut bytes = self.0.to_vec();
151+
increment_vec(&mut bytes);
152+
Self(bytes.into())
153+
}
154+
155+
/// If this is a prefix, get the next prefix.
156+
///
157+
/// This is like successor, except that it will return None if the prefix is all 0xFF instead of appending a 0 byte.
158+
pub fn next_prefix(&self) -> Option<Self> {
159+
let mut bytes = self.0.to_vec();
160+
if next_prefix(&mut bytes) {
161+
Some(Self(bytes.into()))
162+
} else {
163+
None
164+
}
165+
}
135166
}
136167

137168
/// Option for commands that allow setting a tag

0 commit comments

Comments
 (0)