Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions src/bin/db_inspector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ struct Opt {
fn main() {
env_logger::init();
let opts = Opt::from_args();
let (storage, wal, _) = Storage::new(&opts.db_path, Arc::new(PerfCounter::default()), true);
let (storage, wal, _) = Storage::new(&opts.db_path, Arc::new(PerfCounter::default()), true, 1);

{
let meta = storage.meta_store().read().unwrap();
Expand Down Expand Up @@ -66,9 +66,7 @@ fn main() {
for (i, subpartition) in partition.subpartitions.iter().enumerate() {
println!(
" Subpartition {} has last column {} ({} bytes)",
i,
subpartition.last_column,
subpartition.size_bytes,
i, subpartition.last_column, subpartition.size_bytes,
);
if opts.meta > 2 {
println!(
Expand Down
6 changes: 6 additions & 0 deletions src/bin/repl/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ struct Opt {
#[structopt(long, default_value = "1")]
wal_flush_compaction_threads: usize,

/// Number of parallel threads used for IO operations
#[structopt(long, default_value = "1")]
io_threads: usize,

/// Internal metrics collection interval in seconds
#[structopt(long, default_value = "15")]
metrics_interval: u64,
Expand Down Expand Up @@ -144,6 +148,7 @@ fn main() {
wal_flush_compaction_threads,
metrics_interval,
metrics_table_name,
io_threads,
} = Opt::from_args();

let options = locustdb::Options {
Expand All @@ -159,6 +164,7 @@ fn main() {
batch_size,
max_partition_length: 1024 * 1024,
wal_flush_compaction_threads,
io_threads,
metrics_interval,
metrics_table_name,
};
Expand Down
3 changes: 1 addition & 2 deletions src/disk_store/meta_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,6 @@ impl MetaStore {
let mut buf = Vec::new();
serialize_packed::write_message(&mut buf, &builder).unwrap();
tracer.end_span(span_message_serialization);
tracer.end_span(span_serialize);

tracer.annotate("table_count", self.partitions.len());
tracer.annotate("partition_count", total_partitions);
tracer.annotate("unique_column_count", 0);
Expand All @@ -245,6 +243,7 @@ impl MetaStore {
tracer.annotate("column_names_bytes", 0);
tracer.annotate("compressed_column_names_bytes", 0);
tracer.annotate("column_name_lengths_bytes", 0);
tracer.end_span(span_serialize);

buf
}
Expand Down
148 changes: 126 additions & 22 deletions src/disk_store/storage.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::collections::BTreeMap;
use std::ops::Range;
use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};
use std::sync::{mpsc, Arc, RwLock};

use threadpool::ThreadPool;

use super::azure_writer::AzureBlobWriter;
use super::file_writer::{BlobWriter, FileBlobWriter, VersionedChecksummedBlobWriter};
Expand Down Expand Up @@ -50,13 +52,16 @@ pub struct Storage {
meta_store: Arc<RwLock<MetaStore>>,
writer: Box<dyn BlobWriter + Send + Sync + 'static>,
perf_counter: Arc<PerfCounter>,

io_threadpool: Option<ThreadPool>,
}

impl Storage {
pub fn new(
path: &Path,
perf_counter: Arc<PerfCounter>,
readonly: bool,
io_threads: usize,
) -> (Storage, Vec<WalSegment<'static>>, u64) {
let (writer, path): (Box<dyn BlobWriter + Send + Sync + 'static>, PathBuf) =
if path.starts_with("gs://") {
Expand Down Expand Up @@ -122,6 +127,11 @@ impl Storage {
meta_store,
writer,
perf_counter,
io_threadpool: if io_threads > 1 {
Some(ThreadPool::new(io_threads))
} else {
None
},
},
wal_segments,
wal_size,
Expand Down Expand Up @@ -238,34 +248,94 @@ impl Storage {
}

pub fn persist_partitions(
&self,
self: &Arc<Storage>,
partitions: Vec<(PartitionMetadata, Vec<Vec<Arc<Column>>>)>,
tracer: &mut SimpleTracer,
) {
let span_persist_partitions = tracer.start_span("persist_partitions");
let mut partition_count = 0;
let mut partition_bytes = 0;
if let Some(io_threadpool) = &self.io_threadpool {
let (tx, rx) = mpsc::channel();
let span_spawn_tasks = tracer.start_span("spawn_tasks");
for (partition, subpartitions) in partitions {
partition_count += 1;
partition_bytes += subpartitions
.iter()
.flat_map(|s| s.iter())
.map(|c| c.heap_size_of_children())
.sum::<usize>();
let tx = tx.clone();
let storage = self.clone();
io_threadpool.execute(move || {
storage.write_subpartitions(&partition, subpartitions, false);
let mut meta_store = storage.meta_store.write().unwrap();
meta_store.insert_partition(partition);
tx.send(()).unwrap();
});
}
tracer.end_span(span_spawn_tasks);

// Write out new partition files
for (partition, subpartition_cols) in partitions {
let span_write_subpartitions = tracer.start_span("write_subpartitions");
self.write_subpartitions(&partition, subpartition_cols, false);
tracer.end_span(span_write_subpartitions);

let span_lock_meta_store = tracer.start_span("lock_meta_store");
let mut meta_store = self.meta_store.write().unwrap();
meta_store.insert_partition(partition);
tracer.end_span(span_lock_meta_store);
let span_wait_for_tasks = tracer.start_span("wait_for_tasks");
for _ in rx.iter().take(partition_count) {
// Wait for all partitions to be persisted
}
tracer.end_span(span_wait_for_tasks);
} else {
// Write out new partition files
for (partition, subpartition_cols) in partitions {
let span_write_subpartitions = tracer.start_span("write_subpartitions");
self.write_subpartitions(&partition, subpartition_cols, false);
tracer.end_span(span_write_subpartitions);

let span_lock_meta_store = tracer.start_span("lock_meta_store");
let mut meta_store = self.meta_store.write().unwrap();
meta_store.insert_partition(partition);
tracer.end_span(span_lock_meta_store);
}
}

tracer.annotate("partition_count", partition_count);
tracer.annotate("partition_bytes", partition_bytes);
tracer.end_span(span_persist_partitions);
}

pub fn persist_partition(
&self,
partition: PartitionMetadata,
subpartition_cols: Vec<Vec<Arc<Column>>>,
) {
self.write_subpartitions(&partition, subpartition_cols, false);
let mut meta_store = self.meta_store.write().unwrap();
meta_store.insert_partition(partition);
}

/// Delete WAL segments with ids in the given range.
pub fn delete_wal_segments(&self, ids: Range<u64>, tracer: &mut SimpleTracer) {
pub fn delete_wal_segments(self: &Arc<Storage>, ids: Range<u64>, tracer: &mut SimpleTracer) {
let span_delete_wal_segments = tracer.start_span("delete_wal_segments");
for id in ids {
let path = self.wal_dir.join(format!("{}.wal", id));
self.writer.delete(&path).unwrap();

if let Some(io_threadpool) = &self.io_threadpool {
let count = ids.end - ids.start;
let (tx, rx) = mpsc::channel();
for id in ids {
let tx = tx.clone();
let storage = self.clone();
io_threadpool.execute(move || {
let path = storage.wal_dir.join(format!("{}.wal", id));
storage.writer.delete(&path).unwrap();
tx.send(()).unwrap();
});
}
for _ in rx.iter().take(count as usize) {
// Wait for all WAL segments to be deleted
}
} else {
for id in ids {
let path = self.wal_dir.join(format!("{}.wal", id));
self.writer.delete(&path).unwrap();
}
}

tracer.end_span(span_delete_wal_segments);
}

Expand Down Expand Up @@ -311,19 +381,53 @@ impl Storage {
}

pub fn delete_orphaned_partitions(
&self,
self: &Arc<Storage>,
to_delete: Vec<(String, Vec<(u64, String)>)>,
tracer: &mut SimpleTracer,
) {
// Delete old partition files
let span_delete_orphaned_partitions = tracer.start_span("delete_orphaned_partitions");
for (table, to_delete) in &to_delete {
for (id, key) in to_delete {
let table_dir = self.tables_path.join(sanitize_table_name(table));
let path = table_dir.join(partition_filename(*id, key));
self.writer.delete(&path).unwrap();
let mut table_count = 0;
let mut partition_count = 0;

if let Some(io_threadpool) = &self.io_threadpool {
let (tx, rx) = mpsc::channel();
let span_spawn_tasks = tracer.start_span("spawn_tasks");
for (table, to_delete) in to_delete {
table_count += 1;
for (id, key) in to_delete {
partition_count += 1;
let tx = tx.clone();
let storage = self.clone();
let table = table.clone();
io_threadpool.execute(move || {
let table_dir = storage.tables_path.join(sanitize_table_name(&table));
let path = table_dir.join(partition_filename(id, &key));
storage.writer.delete(&path).unwrap();
tx.send(()).unwrap();
});
}
}
tracer.end_span(span_spawn_tasks);

let span_wait_for_tasks = tracer.start_span("wait_for_tasks");
for _ in rx.iter().take(partition_count) {
// Wait for all partitions to be deleted
}
tracer.end_span(span_wait_for_tasks);
} else {
for (table, to_delete) in &to_delete {
table_count += 1;
for (id, key) in to_delete {
partition_count += 1;
let table_dir = self.tables_path.join(sanitize_table_name(table));
let path = table_dir.join(partition_filename(*id, key));
self.writer.delete(&path).unwrap();
}
}
}
tracer.annotate("table_count", table_count);
tracer.annotate("partition_count", partition_count);
tracer.end_span(span_delete_orphaned_partitions);
}

Expand Down
3 changes: 3 additions & 0 deletions src/locustdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ pub struct Options {
pub max_partition_length: usize,
/// Number of parallel threads used during WAL flush table batching and compacting
pub wal_flush_compaction_threads: usize,
/// Number of parallel threads used for IO operations
pub io_threads: usize,
/// Internal metrics collection interval in seconds
pub metrics_interval: u64,
/// Internal metrics table name
Expand All @@ -254,6 +256,7 @@ impl Default for Options {
batch_size: 1024,
max_partition_length: 1024 * 1024,
wal_flush_compaction_threads: 1,
io_threads: 1,
metrics_interval: 15,
metrics_table_name: Some("_metrics".to_string()),
}
Expand Down
Loading
Loading