Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions src/observability/simple_trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,27 @@ impl SimpleTracer {
}
result
}

pub fn push_tracer(&mut self, mut tracer: SimpleTracer) {
assert_eq!(tracer.open_spans.len(), 1);
let mut children = tracer.open_spans.pop().unwrap().children;
// Set depth of children to the depth of the current span + 1
for child in children.iter_mut() {
set_depth(child, self.open_spans.len());
}
self.open_spans.last_mut().unwrap().children.extend(children);
}

pub fn elapsed(&self) -> Duration {
self.open_spans.last().unwrap().start_time.elapsed()
}
}

fn set_depth(child: &mut SimpleSpan, depth: usize) {
child.depth = depth;
for child in child.children.iter_mut() {
set_depth(child, depth + 1);
}
}

impl SimpleSpan {
Expand Down Expand Up @@ -132,9 +153,9 @@ impl SimpleSpan {
for (name, (total_duration, count)) in agg_spans {
let duration_str = format_duration(total_duration);
if count > 1 {
result.push_str(&format!("{}: {} (× {})\n", name, duration_str, count));
result.push_str(&format!("{} {}: {} (×{})\n", indent, name, duration_str, count));
} else {
result.push_str(&format!("{}: {}\n", name, duration_str));
result.push_str(&format!("{} {}: {}\n", indent, name, duration_str));
}
}
} else {
Expand Down
70 changes: 64 additions & 6 deletions src/scheduler/inner_locustdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ use self::meta_store::SubpartitionMetadata;
use self::raw_col::MixedCol;
use self::wal_segment::WalSegment;

// Table name + list of partitions
pub type PartitionList = (String, Vec<(u64, String)>);

pub struct InnerLocustDB {
tables: RwLock<HashMap<String, Arc<Table>>>,
lru: Lru,
Expand Down Expand Up @@ -400,11 +403,32 @@ impl InnerLocustDB {
let tx = tx.clone();
let this = self.clone();
self.walflush_threadpool.execute(move || {
let to_delete = this.compact(table, id, range, &parts);
tx.send(to_delete).unwrap();
let (to_delete, tracer) = this.compact(table, id, range, &parts);
tx.send((to_delete, tracer)).unwrap();
});
}
let partitions_to_delete = rx.iter().take(num_compactions).flatten().collect();
let mut partitions_to_delete = vec![];
let mut longest_span: Option<(SimpleTracer, Duration)> = None;
for (to_delete, tracer) in rx.iter().take(num_compactions) {
if let Some(to_delete) = to_delete {
partitions_to_delete.push(to_delete);
}
let elapsed = tracer.elapsed();
if longest_span.is_none() || elapsed > longest_span.as_ref().unwrap().1 {
longest_span = Some((tracer, elapsed));
}
}
tracer.annotate("table_count", num_compactions);
tracer.annotate(
"partition_count",
partitions_to_delete
.iter()
.map(|(_, p)| p.len())
.sum::<usize>(),
);
if let Some((compaction_tracer, _)) = longest_span {
tracer.push_tracer(compaction_tracer);
}
tracer.end_span(span_compaction);

// Update metastore and clean up orphaned partitions and WAL segments
Expand Down Expand Up @@ -530,22 +554,35 @@ impl InnerLocustDB {
id: PartitionID,
range: Range<usize>,
parts: &[u64],
) -> Option<(String, Vec<(u64, String)>)> {
) -> (Option<PartitionList>, SimpleTracer) {
// get table, create new merged partition/sub-partitions (not registered with table)
// - get names of all columns
// - run query for each column, construct Column
// - create subpartitions
let mut tracer = SimpleTracer::default();

let span_load_column_names = tracer.start_span("load_column_names");
if !table.columns_names_loaded() {
let column_names = self
.query_column_names(table.name())
.expect("Failed to query column names");
table.init_column_names(column_names.into_iter().collect());
}
let colnames = table.column_names();
tracer.end_span(span_load_column_names);

let span_snapshot_partitions = tracer.start_span("snapshot_partitions");
let data = table.snapshot_parts(parts);
tracer.end_span(span_snapshot_partitions);

let span_load_columns = tracer.start_span("load_columns");
let mut columns = Vec::with_capacity(colnames.len());
for column in &colnames {
let span_create_query = tracer.start_span("create_query");
let query = Query::read_column(table.name(), column);
tracer.end_span(span_create_query);

let span_schedule_query = tracer.start_span("schedule_query");
let (sender, receiver) = oneshot::channel();
let query_task = QueryTask::new(
query,
Expand All @@ -560,7 +597,13 @@ impl InnerLocustDB {
)
.unwrap();
self.schedule(query_task);
tracer.end_span(span_schedule_query);

let span_block_on = tracer.start_span("await_query");
let result = block_on(receiver).unwrap().unwrap();
tracer.end_span(span_block_on);

let span_column_builder = tracer.start_span("build_column");
let mut column_builder = MixedCol::default();
let column_data = result.columns.into_iter().next().unwrap().1;
match column_data {
Expand All @@ -572,22 +615,34 @@ impl InnerLocustDB {
raws.into_iter().for_each(|r| column_builder.push(r))
}
}
tracer.end_span(span_column_builder);

assert_eq!(
range.len(),
column_builder.len(),
"range={range:?}, column_builder.len() = {}, table = {}, column = {column}",
column_builder.len(),
table.name(),
);

let span_finalize_column = tracer.start_span("finalize_column");
columns.push(column_builder.finalize(column));
tracer.end_span(span_finalize_column);
}
tracer.end_span(span_load_columns);

let span_subpartition = tracer.start_span("subpartition");
let (metadata, subpartitions) = subpartition(&self.opts, columns.clone());
tracer.end_span(span_subpartition);

// replace old partitions with new partition
let span_compact_partitions = tracer.start_span("compact_partitions");
table.compact(id, range.start, columns, parts);
tracer.end_span(span_compact_partitions);

// write new subpartitions to disk and update in-memory metastore
self.storage.as_ref().map(|s| {
let span_prepare_compact = tracer.start_span("prepare_compact");
let to_delete = self.storage.as_ref().map(|s| {
let to_delete = s.prepare_compact(
table.name(),
id,
Expand All @@ -597,7 +652,10 @@ impl InnerLocustDB {
range.start,
);
(table.name().to_string(), to_delete)
})
});
tracer.end_span(span_prepare_compact);

(to_delete, tracer)
}

pub fn restore(&self, id: PartitionID, column: Column) {
Expand Down
Loading