Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions examples/config_utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fs;
use std::io;

pub struct DbConfig {
pub endpoint: String,
pub database: String,
}

impl Default for DbConfig {
fn default() -> Self {
Self {
endpoint: "localhost:4001".to_string(),
database: "public".to_string(),
}
}
}

impl DbConfig {
pub fn from_env() -> Self {
let config = Self::from_file().unwrap_or_default();
Self {
endpoint: std::env::var("GREPTIMEDB_ENDPOINT").unwrap_or(config.endpoint),
database: std::env::var("GREPTIMEDB_DBNAME").unwrap_or(config.database),
}
}

pub fn from_file() -> io::Result<Self> {
let content = fs::read_to_string("examples/db-connection.toml")?;
let mut endpoint = String::new();
let mut database = String::new();

for line in content.lines() {
let line = line.trim();
if line.starts_with("endpoints") {
endpoint = line
.split('=')
.nth(1)
.and_then(|s| {
s.trim()
.trim_matches(&['[', ']', '"'][..])
.split(',')
.next()
})
.unwrap_or("127.0.0.1:4001")
.to_string();
} else if line.starts_with("dbname") {
database = line
.split('=')
.nth(1)
.map(|s| s.trim().trim_matches('"'))
.unwrap_or("public")
.to_string();
}
}

Ok(Self { endpoint, database })
}
}

#[allow(dead_code)]
fn main() {
let config = DbConfig::from_env();
println!("Using GreptimeDB endpoint: {}", config.endpoint);
println!("Using database: {}", config.database);
}
3 changes: 3 additions & 0 deletions examples/db-connection.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[database]
endpoints = ["127.0.0.1:4001"]
dbname = "public"
15 changes: 8 additions & 7 deletions examples/ingest.rs → examples/high_level_ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,26 @@

use derive_new::new;

mod config_utils;
use config_utils::DbConfig;

use greptimedb_ingester::api::v1::*;
use greptimedb_ingester::helpers::schema::*;
use greptimedb_ingester::helpers::values::*;
use greptimedb_ingester::{
ChannelConfig, ChannelManager, ClientBuilder, ClientTlsOption, Database, DEFAULT_SCHEMA_NAME,
ChannelConfig, ChannelManager, ClientBuilder, ClientTlsOption, Database,
};

#[tokio::main]
async fn main() {
let greptimedb_endpoint =
std::env::var("GREPTIMEDB_ENDPOINT").unwrap_or_else(|_| "localhost:4001".to_owned());
let greptimedb_dbname =
std::env::var("GREPTIMEDB_DBNAME").unwrap_or_else(|_| DEFAULT_SCHEMA_NAME.to_owned());
let config = DbConfig::from_env();

let greptimedb_secure = std::env::var("GREPTIMEDB_TLS")
.map(|s| s == "1")
.unwrap_or(false);

let builder = ClientBuilder::default()
.peers(vec![&greptimedb_endpoint])
.peers(vec![&config.endpoint])
.compression(greptimedb_ingester::Compression::Gzip);
let grpc_client = if greptimedb_secure {
let channel_config = ChannelConfig::default().client_tls_config(ClientTlsOption::default());
Expand All @@ -47,7 +48,7 @@ async fn main() {
builder.build()
};

let client = Database::new_with_dbname(greptimedb_dbname, grpc_client);
let client = Database::new_with_dbname(config.database, grpc_client);

let records = weather_records();
let result = client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,27 @@

use derive_new::new;

mod config_utils;
use config_utils::DbConfig;

use greptimedb_ingester::api::v1::*;
use greptimedb_ingester::helpers::schema::{field, tag, timestamp};
use greptimedb_ingester::helpers::values::{
f32_value, i32_value, string_value, timestamp_millisecond_value,
};
use greptimedb_ingester::{ClientBuilder, Database, DEFAULT_SCHEMA_NAME};
use greptimedb_ingester::{ClientBuilder, Database};

#[tokio::main]
async fn main() {
let greptimedb_endpoint =
std::env::var("GREPTIMEDB_ENDPOINT").unwrap_or_else(|_| "localhost:4001".to_owned());

let greptimedb_dbname =
std::env::var("GREPTIMEDB_DBNAME").unwrap_or_else(|_| DEFAULT_SCHEMA_NAME.to_owned());
let config = DbConfig::from_env();

let grpc_client = ClientBuilder::default()
.peers(vec![&greptimedb_endpoint])
.peers(vec![&config.endpoint])
.build();

let client = Database::new_with_dbname(greptimedb_dbname, grpc_client);
let client = Database::new_with_dbname(config.database, grpc_client);

let stream_inserter = client.streaming_inserter(1024, Some("ttl=7d")).unwrap();
let stream_inserter = client.streaming_inserter(1024, Some("ttl=1d")).unwrap();

if let Err(e) = stream_inserter
.row_insert(to_insert_requests(weather_records_1()))
Expand Down
181 changes: 181 additions & 0 deletions examples/low_level_ingest.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#![allow(clippy::print_stderr)]
#![allow(clippy::print_stdout)]

mod config_utils;
use config_utils::DbConfig;

use greptimedb_ingester::api::v1::*;
use greptimedb_ingester::helpers::schema::*;
use greptimedb_ingester::helpers::values::*;
use greptimedb_ingester::{ClientBuilder, Database, Result};

/// Example of sensor data insertion using low-level API
pub async fn low_level_sensor_ingest() -> Result<()> {
let config = DbConfig::from_env();

let grpc_client = ClientBuilder::default()
.peers(vec![&config.endpoint])
.build();

let database = Database::new_with_dbname(&config.database, grpc_client);

let sensor_schema = build_sensor_schema();

let sensor_data = build_sensor_data();

let insert_request = RowInsertRequests {
inserts: vec![RowInsertRequest {
table_name: "sensor_readings".to_owned(),
rows: Some(Rows {
schema: sensor_schema,
rows: sensor_data,
}),
}],
};

let affected_rows = database.row_insert(insert_request).await?;
println!("Successfully inserted {} rows", affected_rows);

Ok(())
}

/// Use schema helpers to build column definitions for sensor table
fn build_sensor_schema() -> Vec<ColumnSchema> {
vec![
// Tag columns - for indexing and grouping
tag("device_id", ColumnDataType::String),
tag("location", ColumnDataType::String),
// Timestamp column - timeline for time series data
timestamp("timestamp", ColumnDataType::TimestampMillisecond),
// Field columns - actual measurement values
field("temperature", ColumnDataType::Float64),
field("humidity", ColumnDataType::Float64),
field("pressure", ColumnDataType::Float64),
field("battery_level", ColumnDataType::Int32),
field("is_online", ColumnDataType::Boolean),
]
}

/// Use value helpers to build sensor data rows
fn build_sensor_data() -> Vec<Row> {
let mut rows = Vec::new();

rows.push(Row {
values: vec![
string_value("sensor_001".to_string()),
string_value("building_a_floor_1".to_string()),
timestamp_millisecond_value(1748500685000),
f64_value(23.5),
f64_value(65.2),
f64_value(1013.25),
i32_value(85),
bool_value(true),
],
});

rows.push(Row {
values: vec![
string_value("sensor_002".to_string()),
string_value("building_a_floor_2".to_string()),
timestamp_millisecond_value(1748500685000),
f64_value(24.1),
f64_value(62.8),
f64_value(1012.80),
i32_value(78),
bool_value(true),
],
});

rows.push(Row {
values: vec![
string_value("sensor_003".to_string()),
string_value("building_b_floor_1".to_string()),
timestamp_millisecond_value(1748500685000),
f64_value(22.8),
f64_value(68.5),
f64_value(1014.10),
i32_value(15),
bool_value(false),
],
});

rows
}

/// Demonstrate batch insertion of different data types
pub async fn low_level_mixed_data_ingest() -> Result<()> {
let config = DbConfig::from_env();

let grpc_client = ClientBuilder::default()
.peers(vec![&config.endpoint])
.build();
let database = Database::new_with_dbname(&config.database, grpc_client);

// Build table with various data types
let mixed_schema = vec![
tag("category", ColumnDataType::String),
timestamp("event_time", ColumnDataType::TimestampNanosecond),
field("int8_val", ColumnDataType::Int8),
field("int16_val", ColumnDataType::Int16),
field("int64_val", ColumnDataType::Int64),
field("uint32_val", ColumnDataType::Uint32),
field("float32_val", ColumnDataType::Float32),
field("binary_data", ColumnDataType::Binary),
field("date_val", ColumnDataType::Date),
];

let mixed_data = vec![Row {
values: vec![
string_value("test_category".to_string()),
timestamp_nanosecond_value(1748500685000000000),
i8_value(127),
i16_value(32767),
i64_value(9223372036854775807),
u32_value(4294967295),
f32_value(std::f32::consts::PI),
binary_value(vec![0x48, 0x65, 0x6c, 0x6c, 0x6f]),
date_value(19358),
],
}];

let insert_request = RowInsertRequests {
inserts: vec![RowInsertRequest {
table_name: "mixed_data_table".to_string(),
rows: Some(Rows {
schema: mixed_schema,
rows: mixed_data,
}),
}],
};

let affected_rows = database.row_insert(insert_request).await?;
println!("Mixed data insert: {} rows affected", affected_rows);

Ok(())
}

#[tokio::main]
async fn main() -> Result<()> {
println!("Starting low-level ingestion examples...");

low_level_sensor_ingest().await?;

low_level_mixed_data_ingest().await?;

println!("All examples completed successfully!");
Ok(())
}
Loading