Skip to content

Commit a2780f3

Browse files
committed
chore: add compression encoding for request/response
1 parent d71b1ae commit a2780f3

File tree

5 files changed

+160
-33
lines changed

5 files changed

+160
-33
lines changed

examples/ingest.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use greptimedb_ingester::api::v1::*;
1818
use greptimedb_ingester::helpers::schema::*;
1919
use greptimedb_ingester::helpers::values::*;
2020
use greptimedb_ingester::{
21-
ChannelConfig, ChannelManager, Client, ClientTlsOption, Database, DEFAULT_SCHEMA_NAME,
21+
ChannelConfig, ChannelManager, ClientBuilder, ClientTlsOption, Database, DEFAULT_SCHEMA_NAME,
2222
};
2323

2424
#[tokio::main]
@@ -36,9 +36,14 @@ async fn main() {
3636

3737
let channel_manager = ChannelManager::with_tls_config(channel_config)
3838
.expect("Failed to create channel manager");
39-
Client::with_manager_and_urls(channel_manager, vec![&greptimedb_endpoint])
39+
ClientBuilder::default()
40+
.channel_manager(channel_manager)
41+
.peers(vec![&greptimedb_endpoint])
42+
.build()
4043
} else {
41-
Client::with_urls(vec![&greptimedb_endpoint])
44+
ClientBuilder::default()
45+
.peers(vec![&greptimedb_endpoint])
46+
.build()
4247
};
4348

4449
let client = Database::new_with_dbname(greptimedb_dbname, grpc_client);

examples/stream_ingest.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
use derive_new::new;
1616

1717
use greptimedb_ingester::api::v1::*;
18-
use greptimedb_ingester::{Client, Database, DEFAULT_SCHEMA_NAME};
18+
use greptimedb_ingester::{ClientBuilder, Database, DEFAULT_SCHEMA_NAME};
1919

2020
#[tokio::main]
2121
async fn main() {
@@ -25,7 +25,9 @@ async fn main() {
2525
let greptimedb_dbname =
2626
std::env::var("GREPTIMEDB_DBNAME").unwrap_or_else(|_| DEFAULT_SCHEMA_NAME.to_owned());
2727

28-
let grpc_client = Client::with_urls(vec![&greptimedb_endpoint]);
28+
let grpc_client = ClientBuilder::default()
29+
.peers(vec![&greptimedb_endpoint])
30+
.build();
2931

3032
let client = Database::new_with_dbname(greptimedb_dbname, grpc_client);
3133

src/client.rs

Lines changed: 145 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use crate::api::v1::HealthCheckRequest;
2020
use crate::channel_manager::ChannelManager;
2121
use parking_lot::RwLock;
2222
use snafu::OptionExt;
23+
use tonic::codec::CompressionEncoding;
2324
use tonic::transport::Channel;
2425

2526
use crate::load_balance::{LoadBalance, Loadbalancer};
@@ -36,22 +37,73 @@ pub struct Client {
3637
inner: Arc<Inner>,
3738
}
3839

40+
#[derive(Default)]
41+
pub struct ClientBuilder {
42+
channel_manager: ChannelManager,
43+
load_balance: Loadbalancer,
44+
compression: Compression,
45+
peers: Arc<RwLock<Vec<String>>>,
46+
}
47+
48+
impl ClientBuilder {
49+
pub fn channel_manager(mut self, channel_manager: ChannelManager) -> Self {
50+
self.channel_manager = channel_manager;
51+
self
52+
}
53+
54+
pub fn load_balance(mut self, load_balance: Loadbalancer) -> Self {
55+
self.load_balance = load_balance;
56+
self
57+
}
58+
59+
pub fn compression(mut self, compression: Compression) -> Self {
60+
self.compression = compression;
61+
self
62+
}
63+
64+
pub fn peers<U, A>(mut self, peers: A) -> Self
65+
where
66+
U: AsRef<str>,
67+
A: AsRef<[U]>,
68+
{
69+
self.peers = Arc::new(RwLock::new(normailze_urls(peers)));
70+
self
71+
}
72+
73+
pub fn build(self) -> Client {
74+
let inner = InnerBuilder::default()
75+
.channel_manager(self.channel_manager)
76+
.load_balance(self.load_balance)
77+
.compression(self.compression)
78+
.build();
79+
Client {
80+
inner: Arc::new(inner),
81+
}
82+
}
83+
}
84+
85+
#[derive(Debug, Clone)]
86+
pub enum Compression {
87+
Gzip,
88+
Zstd,
89+
Plain,
90+
}
91+
92+
impl Default for Compression {
93+
fn default() -> Self {
94+
Self::Gzip
95+
}
96+
}
97+
3998
#[derive(Debug, Default)]
4099
struct Inner {
41100
channel_manager: ChannelManager,
42101
peers: Arc<RwLock<Vec<String>>>,
43102
load_balance: Loadbalancer,
103+
compression: Compression,
44104
}
45105

46106
impl Inner {
47-
fn with_manager(channel_manager: ChannelManager) -> Self {
48-
Self {
49-
channel_manager,
50-
peers: Default::default(),
51-
load_balance: Default::default(),
52-
}
53-
}
54-
55107
fn set_peers(&self, peers: Vec<String>) {
56108
let mut guard = self.peers.write();
57109
*guard = peers;
@@ -63,51 +115,97 @@ impl Inner {
63115
}
64116
}
65117

118+
#[derive(Default)]
119+
pub struct InnerBuilder {
120+
channel_manager: ChannelManager,
121+
load_balance: Loadbalancer,
122+
compression: Compression,
123+
peers: Arc<RwLock<Vec<String>>>,
124+
}
125+
126+
impl InnerBuilder {
127+
pub(self) fn channel_manager(mut self, channel_manager: ChannelManager) -> Self {
128+
self.channel_manager = channel_manager;
129+
self
130+
}
131+
132+
pub(self) fn load_balance(mut self, load_balance: Loadbalancer) -> Self {
133+
self.load_balance = load_balance;
134+
self
135+
}
136+
137+
pub(self) fn compression(mut self, compression: Compression) -> Self {
138+
self.compression = compression;
139+
self
140+
}
141+
142+
pub(self) fn peers<U, A>(mut self, peers: A) -> Self
143+
where
144+
U: AsRef<str>,
145+
A: AsRef<[U]>,
146+
{
147+
self.peers = Arc::new(RwLock::new(normailze_urls(peers)));
148+
self
149+
}
150+
151+
pub(self) fn build(self) -> Inner {
152+
Inner {
153+
channel_manager: self.channel_manager,
154+
load_balance: self.load_balance,
155+
compression: self.compression,
156+
peers: self.peers,
157+
}
158+
}
159+
}
160+
66161
impl Client {
162+
#[deprecated(since = "0.1.0", note = "use `ClientBuilder` instead of this method")]
67163
pub fn new() -> Self {
68164
Default::default()
69165
}
70166

167+
#[deprecated(since = "0.1.0", note = "use `ClientBuilder` instead of this method")]
71168
pub fn with_manager(channel_manager: ChannelManager) -> Self {
72-
let inner = Arc::new(Inner::with_manager(channel_manager));
73-
Self { inner }
169+
let inner = InnerBuilder::default()
170+
.channel_manager(channel_manager)
171+
.build();
172+
Self {
173+
inner: Arc::new(inner),
174+
}
74175
}
75176

177+
#[deprecated(since = "0.1.0", note = "use `ClientBuilder` instead of this method")]
76178
pub fn with_urls<U, A>(urls: A) -> Self
77179
where
78180
U: AsRef<str>,
79181
A: AsRef<[U]>,
80182
{
81-
Self::with_manager_and_urls(ChannelManager::new(), urls)
183+
ClientBuilder::default().peers(urls).build()
82184
}
83185

186+
#[deprecated(since = "0.1.0", note = "use `ClientBuilder` instead of this method")]
84187
pub fn with_manager_and_urls<U, A>(channel_manager: ChannelManager, urls: A) -> Self
85188
where
86189
U: AsRef<str>,
87190
A: AsRef<[U]>,
88191
{
89-
let inner = Inner::with_manager(channel_manager);
90-
let urls: Vec<String> = urls
91-
.as_ref()
92-
.iter()
93-
.map(|peer| peer.as_ref().to_string())
94-
.collect();
95-
inner.set_peers(urls);
192+
let inner = InnerBuilder::default()
193+
.channel_manager(channel_manager)
194+
.peers(urls)
195+
.build();
196+
96197
Self {
97198
inner: Arc::new(inner),
98199
}
99200
}
100201

202+
#[deprecated(since = "0.1.0", note = "should be removed in the future")]
101203
pub fn start<U, A>(&self, urls: A)
102204
where
103205
U: AsRef<str>,
104206
A: AsRef<[U]>,
105207
{
106-
let urls: Vec<String> = urls
107-
.as_ref()
108-
.iter()
109-
.map(|peer| peer.as_ref().to_string())
110-
.collect();
208+
let urls: Vec<String> = normailze_urls(urls);
111209

112210
self.inner.set_peers(urls);
113211
}
@@ -127,8 +225,19 @@ impl Client {
127225

128226
pub(crate) fn make_database_client(&self) -> Result<DatabaseClient> {
129227
let (_, channel) = self.find_channel()?;
130-
let client =
131-
GreptimeDatabaseClient::new(channel).max_decoding_message_size(MAX_MESSAGE_SIZE);
228+
let mut client = GreptimeDatabaseClient::new(channel)
229+
.max_decoding_message_size(MAX_MESSAGE_SIZE)
230+
.accept_compressed(CompressionEncoding::Gzip)
231+
.accept_compressed(CompressionEncoding::Zstd);
232+
match self.inner.compression {
233+
Compression::Gzip => {
234+
client = client.send_compressed(CompressionEncoding::Gzip);
235+
}
236+
Compression::Zstd => {
237+
client = client.send_compressed(CompressionEncoding::Zstd);
238+
}
239+
Compression::Plain => {}
240+
}
132241
Ok(DatabaseClient { inner: client })
133242
}
134243

@@ -140,6 +249,17 @@ impl Client {
140249
}
141250
}
142251

252+
fn normailze_urls<U, A>(urls: A) -> Vec<String>
253+
where
254+
U: AsRef<str>,
255+
A: AsRef<[U]>,
256+
{
257+
urls.as_ref()
258+
.iter()
259+
.map(|peer| peer.as_ref().to_string())
260+
.collect()
261+
}
262+
143263
#[cfg(test)]
144264
mod tests {
145265
use std::collections::HashSet;

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ pub mod load_balance;
2222
mod stream_insert;
2323

2424
pub use self::channel_manager::{ChannelConfig, ChannelManager, ClientTlsOption};
25-
pub use self::client::Client;
25+
pub use self::client::{Client, ClientBuilder};
2626
pub use self::database::Database;
2727
pub use self::error::{Error, Result};
2828
pub use self::stream_insert::StreamInserter;

src/load_balance.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ pub trait LoadBalance {
2121
}
2222

2323
#[enum_dispatch(LoadBalance)]
24-
#[derive(Debug)]
24+
#[derive(Debug, Clone)]
2525
pub enum Loadbalancer {
2626
Random,
2727
}
@@ -32,7 +32,7 @@ impl Default for Loadbalancer {
3232
}
3333
}
3434

35-
#[derive(Debug)]
35+
#[derive(Debug, Copy, Clone)]
3636
pub struct Random;
3737

3838
impl LoadBalance for Random {

0 commit comments

Comments
 (0)