|
1 | 1 | # GreptimeDB Rust Ingester
|
2 | 2 |
|
3 |
| -A Rust client for ingesting data into GreptimeDB, using GreptimeDB's gRPC |
4 |
| -protocol. |
| 3 | +A high-performance Rust client for ingesting data into GreptimeDB, supporting both low-latency individual inserts and high-throughput bulk streaming operations. |
5 | 4 |
|
6 |
| -See |
7 |
| -[examples](https://github.com/GreptimeTeam/greptimedb-ingester-rust/blob/master/examples) |
8 |
| -for latest usage demo. |
| 5 | +## Features |
| 6 | + |
| 7 | +- **Two Ingestion Approaches**: Choose between low-latency inserts and high-throughput bulk streaming |
| 8 | +- **Parallel Processing**: Async request submission with configurable parallelism |
| 9 | +- **Type Safety**: Comprehensive support for all GreptimeDB data types |
| 10 | +- **Performance Optimized**: Memory-efficient operations with zero-copy access patterns |
| 11 | +- **Production Ready**: Robust error handling, timeouts, and connection management |
| 12 | + |
| 13 | +## Architecture Overview |
| 14 | + |
| 15 | +The ingester provides two main APIs tailored for different use cases: |
| 16 | + |
| 17 | +### 1. Low-Latency Insert API 🚀 |
| 18 | +**Best for**: Real-time applications, IoT sensors, interactive systems |
| 19 | + |
| 20 | +```rust |
| 21 | +use greptimedb_ingester::api::v1::*; |
| 22 | +use greptimedb_ingester::client::Client; |
| 23 | +use greptimedb_ingester::{database::Database, Result}; |
| 24 | + |
| 25 | +// Connect to GreptimeDB |
| 26 | +let client = Client::with_urls(&["localhost:4001"]); |
| 27 | +let database = Database::new_with_dbname("public", client); |
| 28 | + |
| 29 | +// Insert data with minimal latency |
| 30 | +let insert_request = RowInsertRequests { |
| 31 | + inserts: vec![RowInsertRequest { |
| 32 | + table_name: "sensor_data".to_string(), |
| 33 | + rows: Some(Rows { |
| 34 | + schema: vec![/* column definitions */], |
| 35 | + rows: vec![/* data rows */], |
| 36 | + }), |
| 37 | + }], |
| 38 | +}; |
| 39 | + |
| 40 | +let affected_rows = database.insert(insert_request).await?; |
| 41 | +``` |
| 42 | + |
| 43 | +### 2. High-Throughput Bulk API ⚡ |
| 44 | +**Best for**: ETL operations, data migration, batch processing, log ingestion |
| 45 | + |
| 46 | +```rust |
| 47 | +use greptimedb_ingester::{BulkInserter, BulkWriteOptions, ColumnDataType, Row, Table, Value}; |
| 48 | + |
| 49 | +// Create bulk inserter |
| 50 | +let bulk_inserter = BulkInserter::new(client, "public"); |
| 51 | + |
| 52 | +// Define table schema |
| 53 | +let table_template = Table::builder() |
| 54 | + .name("sensor_readings") |
| 55 | + .build() |
| 56 | + .unwrap() |
| 57 | + .add_timestamp("ts", ColumnDataType::TimestampMillisecond) |
| 58 | + .add_field("device_id", ColumnDataType::String) |
| 59 | + .add_field("temperature", ColumnDataType::Float64); |
| 60 | + |
| 61 | +// Create high-performance stream writer |
| 62 | +let mut bulk_writer = bulk_inserter |
| 63 | + .create_bulk_stream_writer( |
| 64 | + &table_template, |
| 65 | + Some(BulkWriteOptions::default() |
| 66 | + .with_parallelism(8) // 8 concurrent requests |
| 67 | + .with_compression(true) // Enable compression |
| 68 | + .with_timeout_ms(60000) // 60s timeout |
| 69 | + ), |
| 70 | + ) |
| 71 | + .await?; |
| 72 | + |
| 73 | +// High-throughput parallel writing |
| 74 | +for batch in data_batches { |
| 75 | + let request_id = bulk_writer.write_rows_async(batch).await?; |
| 76 | + // Requests are processed in parallel |
| 77 | +} |
| 78 | + |
| 79 | +// Wait for all operations to complete |
| 80 | +let responses = bulk_writer.wait_for_all_pending().await?; |
| 81 | +bulk_writer.finish().await?; |
| 82 | +``` |
| 83 | + |
| 84 | +> **Important**: For bulk operations, currently use `add_field()` instead of `add_tag()`. Tag columns are part of the primary key in GreptimeDB, but bulk operations don't yet support tables with tag columns. This limitation will be addressed in future versions. |
| 85 | +
|
| 86 | +## When to Choose Which API |
| 87 | + |
| 88 | +| Scenario | API Choice | Why | |
| 89 | +|----------|------------|-----| |
| 90 | +| **IoT sensor data** | Low-Latency Insert | Real-time requirements, small batches | |
| 91 | +| **Interactive dashboards** | Low-Latency Insert | User expects immediate feedback | |
| 92 | +| **ETL pipelines** | Bulk Streaming | Process millions of records efficiently | |
| 93 | +| **Log ingestion** | Bulk Streaming | High volume, can batch data | |
| 94 | +| **Data migration** | Bulk Streaming | Transfer large datasets quickly | |
| 95 | + |
| 96 | +## Examples |
| 97 | + |
| 98 | +The repository includes comprehensive examples demonstrating both approaches: |
| 99 | + |
| 100 | +### Low-Latency Examples |
| 101 | + |
| 102 | +Run with: `cargo run --example insert_example` |
| 103 | + |
| 104 | +- **Real-time sensor ingestion**: Simulates IoT devices sending data with latency measurements |
| 105 | +- **Data type demonstration**: Shows support for all GreptimeDB column types |
| 106 | +- **Interactive patterns**: Best practices for real-time applications |
| 107 | + |
| 108 | +### High-Throughput Examples |
| 109 | + |
| 110 | +Run with: `cargo run --example bulk_stream_writer_example` |
| 111 | + |
| 112 | +- **Performance comparison**: Sequential vs parallel processing benchmarks |
| 113 | +- **Async submission patterns**: Demonstrates `write_rows_async()` for maximum throughput |
| 114 | +- **Best practices**: Optimal configuration for high-volume scenarios |
| 115 | + |
| 116 | +## Performance Characteristics |
| 117 | + |
| 118 | +### Low-Latency Insert API |
| 119 | +- **Latency**: 1-10ms per operation |
| 120 | +- **Throughput**: 100-1,000 ops/sec |
| 121 | +- **Memory**: Low, constant |
| 122 | +- **Use case**: Real-time applications |
| 123 | + |
| 124 | +### Bulk Streaming API |
| 125 | +- **Latency**: 100-1000ms per batch |
| 126 | +- **Throughput**: 10,000-100,000+ rows/sec |
| 127 | +- **Memory**: Higher during batching |
| 128 | +- **Use case**: High-volume processing |
| 129 | + |
| 130 | +## Advanced Usage |
| 131 | + |
| 132 | +### Parallel Bulk Operations |
| 133 | + |
| 134 | +The bulk API supports true parallelism through async request submission: |
| 135 | + |
| 136 | +```rust |
| 137 | +// Submit multiple batches without waiting |
| 138 | +let mut request_ids = Vec::new(); |
| 139 | +for batch in batches { |
| 140 | + let id = bulk_writer.write_rows_async(batch).await?; |
| 141 | + request_ids.push(id); |
| 142 | +} |
| 143 | + |
| 144 | +// Option 1: Wait for all pending requests |
| 145 | +let responses = bulk_writer.wait_for_all_pending().await?; |
| 146 | + |
| 147 | +// Option 2: Wait for specific requests |
| 148 | +for request_id in request_ids { |
| 149 | + let response = bulk_writer.wait_for_response(request_id).await?; |
| 150 | + println!("Request {} completed with {} rows", |
| 151 | + request_id, response.affected_rows()); |
| 152 | +} |
| 153 | +``` |
| 154 | + |
| 155 | +### Data Type Support |
| 156 | + |
| 157 | +Full support for GreptimeDB data types: |
| 158 | + |
| 159 | +```rust |
| 160 | +use greptimedb_ingester::{Value, ColumnDataType}; |
| 161 | + |
| 162 | +let row = Row::new() |
| 163 | + .add_value(Value::TimestampMillisecond(1234567890123)) |
| 164 | + .add_value(Value::String("device_001".to_string())) |
| 165 | + .add_value(Value::Float64(23.5)) |
| 166 | + .add_value(Value::Int64(1)) |
| 167 | + .add_value(Value::Boolean(true)) |
| 168 | + .add_value(Value::Binary(vec![0xDE, 0xAD, 0xBE, 0xEF])) |
| 169 | + .add_value(Value::Json(r#"{"key": "value"}"#.to_string())); |
| 170 | +``` |
| 171 | + |
| 172 | +### Type-Safe Data Access |
| 173 | + |
| 174 | +Efficient data access patterns: |
| 175 | + |
| 176 | +```rust |
| 177 | +// Type-safe value access |
| 178 | +if let Some(device_name) = row.get_string(1) { |
| 179 | + println!("Device: {}", device_name); |
| 180 | +} |
| 181 | + |
| 182 | +// Binary data access |
| 183 | +if let Some(binary_data) = row.get_binary(5) { |
| 184 | + process_binary(&binary_data); |
| 185 | +} |
| 186 | +``` |
| 187 | + |
| 188 | +## Best Practices |
| 189 | + |
| 190 | +### For Low-Latency Applications |
| 191 | +- Use small batch sizes (200-1000 rows) |
| 192 | +- Monitor and optimize network round-trip times |
| 193 | + |
| 194 | +### For High-Throughput Applications |
| 195 | +- Use parallelism=8-16 for network-bound workloads |
| 196 | +- Batch 2000-100000 rows per request for optimal performance |
| 197 | +- Enable compression to reduce network overhead |
| 198 | +- Monitor memory usage when submitting many async requests |
| 199 | +- Implement backpressure control for very high-volume scenarios |
| 200 | + |
| 201 | +### General Recommendations |
| 202 | +- Use appropriate data types to minimize serialization overhead |
| 203 | +- Pre-allocate vectors with known capacity |
| 204 | +- Reuse connections when possible |
| 205 | +- Handle errors gracefully with retry logic |
| 206 | +- Monitor performance metrics in production |
| 207 | + |
| 208 | +## Configuration |
| 209 | + |
| 210 | +Set up your GreptimeDB connection: |
| 211 | + |
| 212 | +```rust |
| 213 | +use greptimedb_ingester::{ChannelConfig, ChannelManager}; |
| 214 | +use std::time::Duration; |
| 215 | + |
| 216 | +let channel_config = ChannelConfig::default() |
| 217 | + .timeout(Duration::from_secs(30)) |
| 218 | + .connect_timeout(Duration::from_secs(5)); |
| 219 | +let channel_manager = ChannelManager::with_config(channel_config); |
| 220 | +let client = Client::with_manager_and_urls(channel_manager, |
| 221 | + &["localhost:4001"]); |
| 222 | +``` |
| 223 | + |
| 224 | +## Error Handling |
| 225 | + |
| 226 | +The library provides comprehensive error types: |
| 227 | + |
| 228 | +```rust |
| 229 | +use greptimedb_ingester::{Result, Error}; |
| 230 | + |
| 231 | +match database.insert(request).await { |
| 232 | + Ok(affected_rows) => println!("Inserted {} rows", affected_rows), |
| 233 | + Err(Error::RequestTimeout { .. }) => { |
| 234 | + // Handle timeout |
| 235 | + }, |
| 236 | + Err(Error::SerializeMetadata { .. }) => { |
| 237 | + // Handle metadata serialization issues |
| 238 | + }, |
| 239 | + Err(e) => { |
| 240 | + eprintln!("Unexpected error: {:?}", e); |
| 241 | + } |
| 242 | +} |
| 243 | +``` |
| 244 | + |
| 245 | +## API Reference |
| 246 | + |
| 247 | +### Core Types |
| 248 | +- `Client`: Connection management |
| 249 | +- `Database`: Low-level insert operations |
| 250 | +- `BulkInserter`: High-level bulk operations |
| 251 | +- `BulkStreamWriter`: Streaming bulk writer |
| 252 | +- `Table`: Table schema definition |
| 253 | +- `Row`: Data row representation |
| 254 | +- `Value`: Type-safe value wrapper |
| 255 | + |
| 256 | +### Key Methods |
| 257 | + |
| 258 | +**Low-Latency API:** |
| 259 | +- `database.insert(request)` - Insert with immediate response |
| 260 | + |
| 261 | +**Bulk API:** |
| 262 | +- `bulk_writer.write_rows(rows)` - Submit and wait for completion |
| 263 | +- `bulk_writer.write_rows_async(rows)` - Submit without waiting |
| 264 | +- `bulk_writer.wait_for_response(id)` - Wait for specific request |
| 265 | +- `bulk_writer.wait_for_all_pending()` - Wait for all pending requests |
| 266 | +- `bulk_writer.finish()` - Clean shutdown |
| 267 | +- `bulk_writer.finish_with_responses()` - Shutdown with response collection |
9 | 268 |
|
10 | 269 | ## License
|
11 | 270 |
|
12 |
| -This library uses the Apache 2.0 license to strike a balance between open |
13 |
| -contributions and allowing you to use the software however you want. |
| 271 | +This library uses the Apache 2.0 license to strike a balance between open contributions and allowing you to use the software however you want. |
| 272 | + |
| 273 | +## Links |
| 274 | + |
| 275 | +- [GreptimeDB Documentation](https://docs.greptime.com/) |
| 276 | +- [Examples Directory](./examples/) |
| 277 | +- [API Documentation](https://docs.rs/greptimedb-ingester/) |
0 commit comments