Skip to content

Commit 8afff2f

Browse files
authored
feat(examples): add low-level API examples and improve configuration (#21)
* feat(examples): add low-level API examples and improve configuration management - Add low-level ingestion and streaming examples - Implement DbConfig struct for better configuration management - Update high-level examples to use new configuration utils - Add db-connection.toml file for database connection settings * refactor(config): update configuration parsing and naming conventions - Modify config file format and content: - Rename 'dbconfig' to 'database' - Rename 'db.endpoints' to 'endpoints' - Rename 'db.database' to 'dbname' - Update configuration parsing logic in code - Adjust error handling in Server error variant - Improve code formatting and readability in examples * refactor(error): improve readability of server error handling code - Enhance code formatting for better visibility - Use consistent indentation and line breaks * refactor(error): improve Server error handling and reduce memory usage - Remove unnecessary Box usage for String in Server error variant - Simplify Server error variant structure
1 parent 5a483f6 commit 8afff2f

File tree

7 files changed

+670
-18
lines changed

7 files changed

+670
-18
lines changed

examples/config_utils.rs

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
// Copyright 2023 Greptime Team
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::fs;
16+
use std::io;
17+
18+
pub struct DbConfig {
19+
pub endpoint: String,
20+
pub database: String,
21+
}
22+
23+
impl Default for DbConfig {
24+
fn default() -> Self {
25+
Self {
26+
endpoint: "localhost:4001".to_string(),
27+
database: "public".to_string(),
28+
}
29+
}
30+
}
31+
32+
impl DbConfig {
33+
pub fn from_env() -> Self {
34+
let config = Self::from_file().unwrap_or_default();
35+
Self {
36+
endpoint: std::env::var("GREPTIMEDB_ENDPOINT").unwrap_or(config.endpoint),
37+
database: std::env::var("GREPTIMEDB_DBNAME").unwrap_or(config.database),
38+
}
39+
}
40+
41+
pub fn from_file() -> io::Result<Self> {
42+
let content = fs::read_to_string("examples/db-connection.toml")?;
43+
let mut endpoint = String::new();
44+
let mut database = String::new();
45+
46+
for line in content.lines() {
47+
let line = line.trim();
48+
if line.starts_with("endpoints") {
49+
endpoint = line
50+
.split('=')
51+
.nth(1)
52+
.and_then(|s| {
53+
s.trim()
54+
.trim_matches(&['[', ']', '"'][..])
55+
.split(',')
56+
.next()
57+
})
58+
.unwrap_or("127.0.0.1:4001")
59+
.to_string();
60+
} else if line.starts_with("dbname") {
61+
database = line
62+
.split('=')
63+
.nth(1)
64+
.map(|s| s.trim().trim_matches('"'))
65+
.unwrap_or("public")
66+
.to_string();
67+
}
68+
}
69+
70+
Ok(Self { endpoint, database })
71+
}
72+
}
73+
74+
#[allow(dead_code)]
75+
fn main() {
76+
let config = DbConfig::from_env();
77+
println!("Using GreptimeDB endpoint: {}", config.endpoint);
78+
println!("Using database: {}", config.database);
79+
}

examples/db-connection.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
[database]
2+
endpoints = ["127.0.0.1:4001"]
3+
dbname = "public"

examples/ingest.rs renamed to examples/high_level_ingest.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,25 +17,26 @@
1717

1818
use derive_new::new;
1919

20+
mod config_utils;
21+
use config_utils::DbConfig;
22+
2023
use greptimedb_ingester::api::v1::*;
2124
use greptimedb_ingester::helpers::schema::*;
2225
use greptimedb_ingester::helpers::values::*;
2326
use greptimedb_ingester::{
24-
ChannelConfig, ChannelManager, ClientBuilder, ClientTlsOption, Database, DEFAULT_SCHEMA_NAME,
27+
ChannelConfig, ChannelManager, ClientBuilder, ClientTlsOption, Database,
2528
};
2629

2730
#[tokio::main]
2831
async fn main() {
29-
let greptimedb_endpoint =
30-
std::env::var("GREPTIMEDB_ENDPOINT").unwrap_or_else(|_| "localhost:4001".to_owned());
31-
let greptimedb_dbname =
32-
std::env::var("GREPTIMEDB_DBNAME").unwrap_or_else(|_| DEFAULT_SCHEMA_NAME.to_owned());
32+
let config = DbConfig::from_env();
33+
3334
let greptimedb_secure = std::env::var("GREPTIMEDB_TLS")
3435
.map(|s| s == "1")
3536
.unwrap_or(false);
3637

3738
let builder = ClientBuilder::default()
38-
.peers(vec![&greptimedb_endpoint])
39+
.peers(vec![&config.endpoint])
3940
.compression(greptimedb_ingester::Compression::Gzip);
4041
let grpc_client = if greptimedb_secure {
4142
let channel_config = ChannelConfig::default().client_tls_config(ClientTlsOption::default());
@@ -47,7 +48,7 @@ async fn main() {
4748
builder.build()
4849
};
4950

50-
let client = Database::new_with_dbname(greptimedb_dbname, grpc_client);
51+
let client = Database::new_with_dbname(config.database, grpc_client);
5152

5253
let records = weather_records();
5354
let result = client

examples/stream_ingest.rs renamed to examples/high_level_stream_ingest.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,28 +17,27 @@
1717

1818
use derive_new::new;
1919

20+
mod config_utils;
21+
use config_utils::DbConfig;
22+
2023
use greptimedb_ingester::api::v1::*;
2124
use greptimedb_ingester::helpers::schema::{field, tag, timestamp};
2225
use greptimedb_ingester::helpers::values::{
2326
f32_value, i32_value, string_value, timestamp_millisecond_value,
2427
};
25-
use greptimedb_ingester::{ClientBuilder, Database, DEFAULT_SCHEMA_NAME};
28+
use greptimedb_ingester::{ClientBuilder, Database};
2629

2730
#[tokio::main]
2831
async fn main() {
29-
let greptimedb_endpoint =
30-
std::env::var("GREPTIMEDB_ENDPOINT").unwrap_or_else(|_| "localhost:4001".to_owned());
31-
32-
let greptimedb_dbname =
33-
std::env::var("GREPTIMEDB_DBNAME").unwrap_or_else(|_| DEFAULT_SCHEMA_NAME.to_owned());
32+
let config = DbConfig::from_env();
3433

3534
let grpc_client = ClientBuilder::default()
36-
.peers(vec![&greptimedb_endpoint])
35+
.peers(vec![&config.endpoint])
3736
.build();
3837

39-
let client = Database::new_with_dbname(greptimedb_dbname, grpc_client);
38+
let client = Database::new_with_dbname(config.database, grpc_client);
4039

41-
let stream_inserter = client.streaming_inserter(1024, Some("ttl=7d")).unwrap();
40+
let stream_inserter = client.streaming_inserter(1024, Some("ttl=1d")).unwrap();
4241

4342
if let Err(e) = stream_inserter
4443
.row_insert(to_insert_requests(weather_records_1()))

examples/low_level_ingest.rs

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
// Copyright 2023 Greptime Team
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#![allow(clippy::print_stderr)]
16+
#![allow(clippy::print_stdout)]
17+
18+
mod config_utils;
19+
use config_utils::DbConfig;
20+
21+
use greptimedb_ingester::api::v1::*;
22+
use greptimedb_ingester::helpers::schema::*;
23+
use greptimedb_ingester::helpers::values::*;
24+
use greptimedb_ingester::{ClientBuilder, Database, Result};
25+
26+
/// Example of sensor data insertion using low-level API
27+
pub async fn low_level_sensor_ingest() -> Result<()> {
28+
let config = DbConfig::from_env();
29+
30+
let grpc_client = ClientBuilder::default()
31+
.peers(vec![&config.endpoint])
32+
.build();
33+
34+
let database = Database::new_with_dbname(&config.database, grpc_client);
35+
36+
let sensor_schema = build_sensor_schema();
37+
38+
let sensor_data = build_sensor_data();
39+
40+
let insert_request = RowInsertRequests {
41+
inserts: vec![RowInsertRequest {
42+
table_name: "sensor_readings".to_owned(),
43+
rows: Some(Rows {
44+
schema: sensor_schema,
45+
rows: sensor_data,
46+
}),
47+
}],
48+
};
49+
50+
let affected_rows = database.row_insert(insert_request).await?;
51+
println!("Successfully inserted {} rows", affected_rows);
52+
53+
Ok(())
54+
}
55+
56+
/// Use schema helpers to build column definitions for sensor table
57+
fn build_sensor_schema() -> Vec<ColumnSchema> {
58+
vec![
59+
// Tag columns - for indexing and grouping
60+
tag("device_id", ColumnDataType::String),
61+
tag("location", ColumnDataType::String),
62+
// Timestamp column - timeline for time series data
63+
timestamp("timestamp", ColumnDataType::TimestampMillisecond),
64+
// Field columns - actual measurement values
65+
field("temperature", ColumnDataType::Float64),
66+
field("humidity", ColumnDataType::Float64),
67+
field("pressure", ColumnDataType::Float64),
68+
field("battery_level", ColumnDataType::Int32),
69+
field("is_online", ColumnDataType::Boolean),
70+
]
71+
}
72+
73+
/// Use value helpers to build sensor data rows
74+
fn build_sensor_data() -> Vec<Row> {
75+
let mut rows = Vec::new();
76+
77+
rows.push(Row {
78+
values: vec![
79+
string_value("sensor_001".to_string()),
80+
string_value("building_a_floor_1".to_string()),
81+
timestamp_millisecond_value(1748500685000),
82+
f64_value(23.5),
83+
f64_value(65.2),
84+
f64_value(1013.25),
85+
i32_value(85),
86+
bool_value(true),
87+
],
88+
});
89+
90+
rows.push(Row {
91+
values: vec![
92+
string_value("sensor_002".to_string()),
93+
string_value("building_a_floor_2".to_string()),
94+
timestamp_millisecond_value(1748500685000),
95+
f64_value(24.1),
96+
f64_value(62.8),
97+
f64_value(1012.80),
98+
i32_value(78),
99+
bool_value(true),
100+
],
101+
});
102+
103+
rows.push(Row {
104+
values: vec![
105+
string_value("sensor_003".to_string()),
106+
string_value("building_b_floor_1".to_string()),
107+
timestamp_millisecond_value(1748500685000),
108+
f64_value(22.8),
109+
f64_value(68.5),
110+
f64_value(1014.10),
111+
i32_value(15),
112+
bool_value(false),
113+
],
114+
});
115+
116+
rows
117+
}
118+
119+
/// Demonstrate batch insertion of different data types
120+
pub async fn low_level_mixed_data_ingest() -> Result<()> {
121+
let config = DbConfig::from_env();
122+
123+
let grpc_client = ClientBuilder::default()
124+
.peers(vec![&config.endpoint])
125+
.build();
126+
let database = Database::new_with_dbname(&config.database, grpc_client);
127+
128+
// Build table with various data types
129+
let mixed_schema = vec![
130+
tag("category", ColumnDataType::String),
131+
timestamp("event_time", ColumnDataType::TimestampNanosecond),
132+
field("int8_val", ColumnDataType::Int8),
133+
field("int16_val", ColumnDataType::Int16),
134+
field("int64_val", ColumnDataType::Int64),
135+
field("uint32_val", ColumnDataType::Uint32),
136+
field("float32_val", ColumnDataType::Float32),
137+
field("binary_data", ColumnDataType::Binary),
138+
field("date_val", ColumnDataType::Date),
139+
];
140+
141+
let mixed_data = vec![Row {
142+
values: vec![
143+
string_value("test_category".to_string()),
144+
timestamp_nanosecond_value(1748500685000000000),
145+
i8_value(127),
146+
i16_value(32767),
147+
i64_value(9223372036854775807),
148+
u32_value(4294967295),
149+
f32_value(std::f32::consts::PI),
150+
binary_value(vec![0x48, 0x65, 0x6c, 0x6c, 0x6f]),
151+
date_value(19358),
152+
],
153+
}];
154+
155+
let insert_request = RowInsertRequests {
156+
inserts: vec![RowInsertRequest {
157+
table_name: "mixed_data_table".to_string(),
158+
rows: Some(Rows {
159+
schema: mixed_schema,
160+
rows: mixed_data,
161+
}),
162+
}],
163+
};
164+
165+
let affected_rows = database.row_insert(insert_request).await?;
166+
println!("Mixed data insert: {} rows affected", affected_rows);
167+
168+
Ok(())
169+
}
170+
171+
#[tokio::main]
172+
async fn main() -> Result<()> {
173+
println!("Starting low-level ingestion examples...");
174+
175+
low_level_sensor_ingest().await?;
176+
177+
low_level_mixed_data_ingest().await?;
178+
179+
println!("All examples completed successfully!");
180+
Ok(())
181+
}

0 commit comments

Comments
 (0)