Skip to content
2 changes: 1 addition & 1 deletion kvdb-rocksdb/examples/memtest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ fn main() {
let mut config = DatabaseConfig::with_columns(COLUMN_COUNT);

for c in 0..=COLUMN_COUNT {
config.memory_budget.insert(c, mb_per_col);
config.columns[c as usize].memory_budget = Some(mb_per_col);
}
let dir = tempfile::Builder::new().prefix("rocksdb-example").tempdir().unwrap();

Expand Down
105 changes: 81 additions & 24 deletions kvdb-rocksdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use rocksdb::{
WriteOptions, DB,
};

pub use rocksdb::DBRawIterator;

use kvdb::{DBKeyValue, DBOp, DBTransaction, DBValue, KeyValueDB};

#[cfg(target_os = "linux")]
Expand Down Expand Up @@ -144,25 +146,37 @@ impl CompactionProfile {
}
}

#[derive(Clone, Default)]
pub struct ColumnConfig {
/// Memory budget (in MiB) used for setting block cache size and
/// write buffer size for each column including the default one.
/// If the memory budget of a column is not specified,
/// `DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB` is used for that column.
pub memory_budget: Option<MiB>,

/// Custom comparison function for row ordering.
/// Pass `None` to use the default lexicographic ordering.
///
/// The client must ensure that the comparator supplied here has the same
/// name and orders keys *exactly* the same as the comparator provided to
/// previous open calls on the same DB.
pub comparator: Option<fn(&[u8], &[u8]) -> cmp::Ordering>,
}

/// Database configuration
#[derive(Clone)]
#[non_exhaustive]
pub struct DatabaseConfig {
/// Max number of open files.
pub max_open_files: i32,
/// Memory budget (in MiB) used for setting block cache size and
/// write buffer size for each column including the default one.
/// If the memory budget of a column is not specified,
/// `DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB` is used for that column.
pub memory_budget: HashMap<u32, MiB>,
/// Compaction profile.
pub compaction: CompactionProfile,
/// Set number of columns.
/// Configuration for column spaces.
///
/// # Safety
///
/// The number of columns must not be zero.
pub columns: u32,
pub columns: Vec<ColumnConfig>,
/// Specify the maximum number of info/debug log files to be kept.
pub keep_log_file_num: i32,
/// Enable native RocksDB statistics.
Expand Down Expand Up @@ -191,7 +205,7 @@ pub struct DatabaseConfig {
}

impl DatabaseConfig {
/// Create new `DatabaseConfig` with default parameters and specified set of columns.
/// Create new `DatabaseConfig` with default parameters and specified number of columns with default configuration.
/// Note that cache sizes must be explicitly set.
///
/// # Safety
Expand All @@ -200,19 +214,37 @@ impl DatabaseConfig {
pub fn with_columns(columns: u32) -> Self {
assert!(columns > 0, "the number of columns must not be zero");

Self { columns: (0..columns).map(|_| ColumnConfig::default()).collect(), ..Default::default() }
}

/// Create new `DatabaseConfig` with default parameters and specified set of columns.
/// Note that cache sizes must be explicitly set.
///
/// # Safety
///
/// The number of `columns` must not be zero.
pub fn with_configured_columns(columns: Vec<ColumnConfig>) -> Self {
assert!(columns.len() > 0, "the number of columns must not be zero");

Self { columns, ..Default::default() }
}

/// Returns the total memory budget in bytes.
pub fn memory_budget(&self) -> MiB {
(0..self.columns)
.map(|i| self.memory_budget.get(&i).unwrap_or(&DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB) * MB)
self.columns
.iter()
.map(|cfg| cfg.memory_budget.unwrap_or(DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB) * MB)
.sum()
}

/// Returns the memory budget of the specified column in bytes.
fn memory_budget_for_col(&self, col: u32) -> MiB {
self.memory_budget.get(&col).unwrap_or(&DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB) * MB
let budget_mb = if let Some(cfg) = self.columns.get(col as usize) {
cfg.memory_budget.unwrap_or(DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB)
} else {
DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB
};
budget_mb * MB
}

// Get column family configuration with the given block based options.
Expand All @@ -225,6 +257,9 @@ impl DatabaseConfig {
opts.optimize_level_style_compaction(column_mem_budget);
opts.set_target_file_size_base(self.compaction.initial_file_size);
opts.set_compression_per_level(&[]);
if let Some(comparator) = self.columns[col as usize].comparator {
opts.set_comparator(&format!("column_{col}_comparator"), Box::new(comparator));
}

opts
}
Expand All @@ -234,9 +269,8 @@ impl Default for DatabaseConfig {
fn default() -> DatabaseConfig {
DatabaseConfig {
max_open_files: 512,
memory_budget: HashMap::new(),
compaction: CompactionProfile::default(),
columns: 1,
columns: vec![ColumnConfig::default()],
keep_log_file_num: 1,
enable_statistics: false,
secondary: None,
Expand Down Expand Up @@ -335,12 +369,12 @@ impl Database {
///
/// The number of `config.columns` must not be zero.
pub fn open<P: AsRef<Path>>(config: &DatabaseConfig, path: P) -> io::Result<Database> {
assert!(config.columns > 0, "the number of columns must not be zero");
assert!(config.columns.len() > 0, "the number of columns must not be zero");

let opts = generate_options(config);
let block_opts = generate_block_based_options(config)?;

let column_names: Vec<_> = (0..config.columns).map(|c| format!("col{}", c)).collect();
let column_names: Vec<_> = (0..config.columns.len()).map(|c| format!("col{}", c)).collect();
let write_opts = WriteOptions::default();
let read_opts = generate_read_options();

Expand Down Expand Up @@ -370,8 +404,8 @@ impl Database {
column_names: &[&str],
block_opts: &BlockBasedOptions,
) -> io::Result<rocksdb::DB> {
let cf_descriptors: Vec<_> = (0..config.columns)
.map(|i| ColumnFamilyDescriptor::new(column_names[i as usize], config.column_config(&block_opts, i)))
let cf_descriptors: Vec<_> = (0..config.columns.len())
.map(|i| ColumnFamilyDescriptor::new(column_names[i as usize], config.column_config(&block_opts, i as u32)))
.collect();

let db = match DB::open_cf_descriptors(&opts, path.as_ref(), cf_descriptors) {
Expand Down Expand Up @@ -537,11 +571,17 @@ impl Database {
Ok(())
}

/// Add a new column family to the DB.
/// Add a new column family with default config to the DB.
pub fn add_column(&mut self) -> io::Result<()> {
self.add_column_with_config(Default::default())
}

/// Add a new column family to the DB.
pub fn add_column_with_config(&mut self, cfg: ColumnConfig) -> io::Result<()> {
let DBAndColumns { ref mut db, ref mut column_names } = self.inner;
let col = column_names.len() as u32;
let name = format!("col{}", col);
self.config.columns.push(cfg);
let col_config = self.config.column_config(&self.block_opts, col as u32);
let _ = db.create_cf(&name, &col_config).map_err(other_io_err)?;
column_names.push(name);
Expand Down Expand Up @@ -581,6 +621,10 @@ impl Database {
self.inner.db.try_catch_up_with_primary().map_err(other_io_err)
}

pub fn raw_iter(&self, col: u32) -> io::Result<DBRawIterator<'_>> {
Ok(self.inner.db.raw_iterator_cf(self.inner.cf(col as usize)?))
}

/// Force compact a single column.
///
/// After compaction of the column, this may lead to better read performance.
Expand Down Expand Up @@ -778,7 +822,7 @@ mod tests {
#[test]
#[should_panic]
fn open_db_with_zero_columns() {
let cfg = DatabaseConfig { columns: 0, ..Default::default() };
let cfg = DatabaseConfig { columns: vec![], ..Default::default() };
let _db = Database::open(&cfg, "");
}

Expand Down Expand Up @@ -849,7 +893,7 @@ mod tests {
#[test]
fn default_memory_budget() {
let c = DatabaseConfig::default();
assert_eq!(c.columns, 1);
assert_eq!(c.columns.len(), 1);
assert_eq!(c.memory_budget(), DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB * MB, "total memory budget is default");
assert_eq!(
c.memory_budget_for_col(0),
Expand All @@ -865,8 +909,13 @@ mod tests {

#[test]
fn memory_budget() {
let mut c = DatabaseConfig::with_columns(3);
c.memory_budget = [(0, 10), (1, 15), (2, 20)].iter().cloned().collect();
let c = DatabaseConfig::with_configured_columns(
[10, 15, 20]
.iter()
.cloned()
.map(|n| ColumnConfig { memory_budget: Some(n), ..Default::default() })
.collect(),
);
assert_eq!(c.memory_budget(), 45 * MB, "total budget is the sum of the column budget");
}

Expand All @@ -890,11 +939,19 @@ rocksdb.db.get.micros P50 : 2.000000 P95 : 3.000000 P99 : 4.000000 P100 : 5.0000
#[test]
fn rocksdb_settings() {
const NUM_COLS: usize = 2;
let mut cfg = DatabaseConfig { enable_statistics: true, ..DatabaseConfig::with_columns(NUM_COLS as u32) };
let mut cfg = DatabaseConfig {
enable_statistics: true,
..DatabaseConfig::with_configured_columns(
[30, 300]
.iter()
.cloned()
.map(|n| ColumnConfig { memory_budget: Some(n), ..Default::default() })
.collect(),
)
};
cfg.max_open_files = 123; // is capped by the OS fd limit (typically 1024)
cfg.compaction.block_size = 323232;
cfg.compaction.initial_file_size = 102030;
cfg.memory_budget = [(0, 30), (1, 300)].iter().cloned().collect();

let db_path = TempfileBuilder::new()
.prefix("config_test")
Expand Down