From bec85f3eb01d81ac8d221405754702d8d348f132 Mon Sep 17 00:00:00 2001 From: Jack Kelly Date: Wed, 15 Nov 2023 15:33:16 +0000 Subject: [PATCH 01/42] Make a start on draft API design --- src/draft_API_design.rs | 84 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100644 src/draft_API_design.rs diff --git a/src/draft_API_design.rs b/src/draft_API_design.rs new file mode 100644 index 0000000..fc14141 --- /dev/null +++ b/src/draft_API_design.rs @@ -0,0 +1,84 @@ +/// This is just me sketching out pseudo-code for the design of the API, +/// and sketching out some of the important internals. + +fn main() -> () { + // Initialise + let reader = LightSpeedIO::builder() + .base_path("/mnt/storage_ssd") // So LSIO can infer which backend to use + .latency_miliseconds(0.001) // So LSIO knows when to merge nearby reads + .bandwidth_gb_per_sec(10) + .build(); + + // Question: Maybe reader doesn't have to be stateful? Maybe we can just do stateless function + // calls like read_rel_to_file_ends. But then we wouldn't be able to pre-allocate memory, etc. + // But maybe that's not necessary? + // Advantages of stateful (like above): + // - Nice API to customise, but customisation could still be done with stateless calls: + // read_builder().foo(x).read("filename"); + // or: + // let config = SSDConfig::auto_calibrate(); + // or: + // let config = SSDConfig::new().latency_ms(0.001).bw_gbps(10); + // let future = LightSpeedIO::IoUringLocal::read(&filename, &config) + // - + + // Or maybe it'd be better to specify the precise struct for the workload, + // and only the Python API will automatically find the right class, given the + // base path? This way, Rust can do more compile-time checks. + let reader = LightSpeedIO::IoUringSSD::builder() + .latency_miliseconds(0.001) // So LSIO knows when to merge nearby reads + .bandwidth_gb_per_sec(10) + .build(); + + // Read the shard_indexes from the end of files + let file_chunks_to_load = vec![ + FileChunks{path, Chunk{offset: 100, len: 1000}}, + ]; + + let future = reader.read_rel_to_file_ends(&file_chunks_to_load); + // Under the hood, this needs to first chain {open, statx} in io_uring to get the filesizes, + // so we can compute the offset, + // and then, as soon as a cqe is available, submit a + // chain of {read, close} to io_uring to get the data. + + let data: Vec> = future.wait(); + + // Read all of some files (e.g. reading many unsharded Zarr chunks) + let future = reader.read_entire_files(vec!["foo/bar", "foo/baz"]) + // Under the hood, this needs to first chain {open, statx} in io_uring to get the filesizes, + // then have a threadpool allocate appropriate-sized buffers, + // and then chain {read, close} in io_uring to get the data. + + // Read many chunks from a small number of files + let future = reader.read_chunks(vec![FileChunks{path, chunks}, FileChunks{path2, chunks2}]); + // This time, we don't need the filesize ahead of time! So don't bother doing `statx`. +} + +// -------------- CONFIG ----------------------- +struct Config { + latency_ms: f64, + bandwidth_gbps: f64, +} + +impl Config { + fn ssd_pcie_gen4_default() -> Self { + Config { + latency_ms: 0.001, + bandwidth_gbps: 8, + } + } +} + +struct FileChunks { + path: &Path, + chunks: Vector, +} + +enum Chunk { + offset(u64), + len(u32), +} + +trait Reader { + fn read_chunks(&self, chunks: &Vec) -> Future>>; +} From fb6427f8156dadc586e3563cc0b94bec1efc44d4 Mon Sep 17 00:00:00 2001 From: Jack Kelly Date: Wed, 15 Nov 2023 15:48:18 +0000 Subject: [PATCH 02/42] Made a start on a stateless API. But I've gone off stateless because we can't cache anything (like the lengths of files) --- src/draft_API_design.rs | 39 ++++++++++++--------------------------- 1 file changed, 12 insertions(+), 27 deletions(-) diff --git a/src/draft_API_design.rs b/src/draft_API_design.rs index fc14141..a83d1eb 100644 --- a/src/draft_API_design.rs +++ b/src/draft_API_design.rs @@ -2,33 +2,18 @@ /// and sketching out some of the important internals. fn main() -> () { - // Initialise - let reader = LightSpeedIO::builder() - .base_path("/mnt/storage_ssd") // So LSIO can infer which backend to use - .latency_miliseconds(0.001) // So LSIO knows when to merge nearby reads - .bandwidth_gb_per_sec(10) - .build(); + // Set config options (latency, bandwidth, maybe others) + let config = Config::ssd_pcie_gen4_default(); - // Question: Maybe reader doesn't have to be stateful? Maybe we can just do stateless function - // calls like read_rel_to_file_ends. But then we wouldn't be able to pre-allocate memory, etc. - // But maybe that's not necessary? - // Advantages of stateful (like above): - // - Nice API to customise, but customisation could still be done with stateless calls: - // read_builder().foo(x).read("filename"); - // or: - // let config = SSDConfig::auto_calibrate(); - // or: - // let config = SSDConfig::new().latency_ms(0.001).bw_gbps(10); - // let future = LightSpeedIO::IoUringLocal::read(&filename, &config) - // - + // Or do this :) + let config = Config::auto_calibrate(); + + // Define which chunks to load: + let chunks = vec![FileChunks{path1, chunks1}, FileChunks{path2, chunks2}]; + + // Start async loading of data from disk: + let future = light_speed_io::IoUringLocal::read_chunks(&chunks, &config); - // Or maybe it'd be better to specify the precise struct for the workload, - // and only the Python API will automatically find the right class, given the - // base path? This way, Rust can do more compile-time checks. - let reader = LightSpeedIO::IoUringSSD::builder() - .latency_miliseconds(0.001) // So LSIO knows when to merge nearby reads - .bandwidth_gb_per_sec(10) - .build(); // Read the shard_indexes from the end of files let file_chunks_to_load = vec![ @@ -44,13 +29,13 @@ fn main() -> () { let data: Vec> = future.wait(); // Read all of some files (e.g. reading many unsharded Zarr chunks) - let future = reader.read_entire_files(vec!["foo/bar", "foo/baz"]) + let future = reader.read_entire_files(vec!["foo/bar", "foo/baz"]); // Under the hood, this needs to first chain {open, statx} in io_uring to get the filesizes, // then have a threadpool allocate appropriate-sized buffers, // and then chain {read, close} in io_uring to get the data. // Read many chunks from a small number of files - let future = reader.read_chunks(vec![FileChunks{path, chunks}, FileChunks{path2, chunks2}]); + let future = reader.read_chunks(&chunks); // This time, we don't need the filesize ahead of time! So don't bother doing `statx`. } From 3d46e76d9d6be4f7fb5d1e2ee20bb676357f6434 Mon Sep 17 00:00:00 2001 From: Jack Kelly Date: Wed, 15 Nov 2023 16:24:26 +0000 Subject: [PATCH 03/42] Trying a stateful API again --- src/draft_API_design.rs | 74 ++++++++++++++++++++++++++++++++--------- 1 file changed, 58 insertions(+), 16 deletions(-) diff --git a/src/draft_API_design.rs b/src/draft_API_design.rs index a83d1eb..972a21f 100644 --- a/src/draft_API_design.rs +++ b/src/draft_API_design.rs @@ -3,18 +3,42 @@ fn main() -> () { // Set config options (latency, bandwidth, maybe others) - let config = Config::ssd_pcie_gen4_default(); - + let config = LocalIoConfig::SSD_PCIe_gen4; + // Or do this :) - let config = Config::auto_calibrate(); + let config = LocalIoConfig::FromFile("filename"); + let config = LocalIoConfig::AutoCalibrate; + + // Init: + let reader = IoUringLocal::from_config(&config); // Define which chunks to load: - let chunks = vec![FileChunks{path1, chunks1}, FileChunks{path2, chunks2}]; + let chunks = vec![ + FileChunks{ + path: "/foo/bar", + range: Range::All, // Read all of file + }, + FileChunks{ + path: "/foo/baz", + range: Range::MultiRange( + vec![ + // TODO: Should these be specified using Rust's builtin ranges? + Chunk{ // Read the first 1,000 bytes: + offset: Offset::FromStart(0), + len: Len::Bytes(1000), + }, + Chunk{ // Read the last 200 bytes: + offset: Offset::FromEnd(200), + len: Len::Bytes(200), + }, + ], + ), + }, + ]; // Start async loading of data from disk: let future = light_speed_io::IoUringLocal::read_chunks(&chunks, &config); - // Read the shard_indexes from the end of files let file_chunks_to_load = vec![ FileChunks{path, Chunk{offset: 100, len: 1000}}, @@ -40,28 +64,46 @@ fn main() -> () { } // -------------- CONFIG ----------------------- -struct Config { - latency_ms: f64, - bandwidth_gbps: f64, +// But, how to express that SSD_PCIe_gen4 isn't a valid config for, say, network IO? +// Maybe don't pass in a config Enum variant, +// instead have a ssd_pcie_gen4() method on IoUringLocal? +enum LocalIoConfig { + SSD_PCIe_gen4, + AutoCalibrate, + FromFile(PathBuf), } -impl Config { - fn ssd_pcie_gen4_default() -> Self { - Config { - latency_ms: 0.001, - bandwidth_gbps: 8, +trait LocalIo { + fn from_config(config: &LocalIoConfig) -> Self { + match config { + SSD_PCIe_gen4 => LocalIoConfig { + latency_ms: 0.001, + bandwidth_gbps: 8, + }, + AutoCalibrate => { + // TODO: Automatically calibrate. + }, + FromFile(filename) => { + // TODO: Load filename. + } } } } +struct IoUringLocal { + latency_ms: f64, + bandwidth_gbps: f64, +} + +impl LocalIo for IoUringLocal {} struct FileChunks { path: &Path, chunks: Vector, } -enum Chunk { - offset(u64), - len(u32), +struct Chunk { + offset: u64, + len: u64, } trait Reader { From 4d632ad4d7c720e66f3e25a3b533a504c1058917 Mon Sep 17 00:00:00 2001 From: Jack Kelly Date: Wed, 15 Nov 2023 17:22:05 +0000 Subject: [PATCH 04/42] This uses custom Chunk structs. But about to replace with custom RangeFromEnd --- src/draft_API_design.rs | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/src/draft_API_design.rs b/src/draft_API_design.rs index 972a21f..1d64d08 100644 --- a/src/draft_API_design.rs +++ b/src/draft_API_design.rs @@ -1,5 +1,15 @@ /// This is just me sketching out pseudo-code for the design of the API, /// and sketching out some of the important internals. +/// +/// Use-cases that this design needs to be tested against: +/// 1. Load ends of files (e.g. Zarr shard_index) +/// 2. Cache the lengths of files. +/// 3. Load huge numbers of files (e.g. non-sharded Zarrs) +/// 4. Load huge numbers of chunks from a small number of files. +/// 5. "Scatter" data to multiple arrays +/// (e.g. loading uncompressed Zarr / EUMETSAT / GRIB files into final numpy array) +/// 6. Per chunk: Decompress, process, and copy to final array. +/// 7. Allow LSIO to merge nearby chunks. fn main() -> () { // Set config options (latency, bandwidth, maybe others) @@ -16,20 +26,23 @@ fn main() -> () { let chunks = vec![ FileChunks{ path: "/foo/bar", - range: Range::All, // Read all of file + range: Range::EntireFile, // Read all of file }, FileChunks{ path: "/foo/baz", range: Range::MultiRange( vec![ - // TODO: Should these be specified using Rust's builtin ranges? + // Should these be specified using Rust's builtin ranges? + // Rust ranges can't express "get the last n elements". + // But maybe I should create a little crate which allows + // for Ranges from the end, like -10.. (the last 10 elements) or -10..-5. Chunk{ // Read the first 1,000 bytes: offset: Offset::FromStart(0), len: Len::Bytes(1000), }, Chunk{ // Read the last 200 bytes: offset: Offset::FromEnd(200), - len: Len::Bytes(200), + len: Len::ToEnd, }, ], ), From 2276bcc7559654ebff882f2a6ca5b736025b5ec9 Mon Sep 17 00:00:00 2001 From: Jack Kelly Date: Wed, 15 Nov 2023 18:09:07 +0000 Subject: [PATCH 05/42] Spec is looking OK now! --- src/draft_API_design.rs | 100 ++++++++++++++++++++++------------------ 1 file changed, 54 insertions(+), 46 deletions(-) diff --git a/src/draft_API_design.rs b/src/draft_API_design.rs index 1d64d08..05dd49b 100644 --- a/src/draft_API_design.rs +++ b/src/draft_API_design.rs @@ -7,7 +7,7 @@ /// 3. Load huge numbers of files (e.g. non-sharded Zarrs) /// 4. Load huge numbers of chunks from a small number of files. /// 5. "Scatter" data to multiple arrays -/// (e.g. loading uncompressed Zarr / EUMETSAT / GRIB files into final numpy array) +/// (e.g. loading uncompressed Zarr / EUMETSAT / GRIB files into final numpy array using DMA!) /// 6. Per chunk: Decompress, process, and copy to final array. /// 7. Allow LSIO to merge nearby chunks. @@ -26,57 +26,36 @@ fn main() -> () { let chunks = vec![ FileChunks{ path: "/foo/bar", - range: Range::EntireFile, // Read all of file + range: FileRange::EntireFile, // Read all of file }, FileChunks{ path: "/foo/baz", - range: Range::MultiRange( + range: FileRange::MultiRange( vec![ - // Should these be specified using Rust's builtin ranges? // Rust ranges can't express "get the last n elements". - // But maybe I should create a little crate which allows + // I'll assume I can create a little crate which allows // for Ranges from the end, like -10.. (the last 10 elements) or -10..-5. - Chunk{ // Read the first 1,000 bytes: - offset: Offset::FromStart(0), - len: Len::Bytes(1000), - }, - Chunk{ // Read the last 200 bytes: - offset: Offset::FromEnd(200), - len: Len::ToEnd, - }, + ..1000, // Read the first 1,000 bytes + -200.., // Read the last 200 bytes ], ), }, ]; // Start async loading of data from disk: - let future = light_speed_io::IoUringLocal::read_chunks(&chunks, &config); - - // Read the shard_indexes from the end of files - let file_chunks_to_load = vec![ - FileChunks{path, Chunk{offset: 100, len: 1000}}, - ]; - - let future = reader.read_rel_to_file_ends(&file_chunks_to_load); - // Under the hood, this needs to first chain {open, statx} in io_uring to get the filesizes, - // so we can compute the offset, - // and then, as soon as a cqe is available, submit a - // chain of {read, close} to io_uring to get the data. - + let future = reader.read_chunks(&chunks); let data: Vec> = future.wait(); - // Read all of some files (e.g. reading many unsharded Zarr chunks) - let future = reader.read_entire_files(vec!["foo/bar", "foo/baz"]); - // Under the hood, this needs to first chain {open, statx} in io_uring to get the filesizes, - // then have a threadpool allocate appropriate-sized buffers, - // and then chain {read, close} in io_uring to get the data. - - // Read many chunks from a small number of files - let future = reader.read_chunks(&chunks); - // This time, we don't need the filesize ahead of time! So don't bother doing `statx`. + // Or, read chunks and apply a function: + let mut final_array = Array(); + let processing_fn = |chunk_idx: u64, chunk: &[u8]| -> &[u8] { + // * decompress + // * process + // * move chunk to final array (the address of the final array would be passed into the closure?) + }; + let future = read.read_chunks_and_apply(&chunks, processing_fn); } -// -------------- CONFIG ----------------------- // But, how to express that SSD_PCIe_gen4 isn't a valid config for, say, network IO? // Maybe don't pass in a config Enum variant, // instead have a ssd_pcie_gen4() method on IoUringLocal? @@ -97,28 +76,57 @@ trait LocalIo { // TODO: Automatically calibrate. }, FromFile(filename) => { - // TODO: Load filename. + // TODO: Load config from filename. } } } + + fn read_chunks(&self, chunks: &Vec) -> Future>> { + // Split `chunks` and send to threads (if there are enough chunks to be worth the hassle) + // If there are >1,000 files to open then we'll have to process them in batches, + // so as not to exceed the max filehandles per process.`` + // Perhaps split using Rayon?!? Although I'm not sure how Rayon would work with io_uring?! + // Within each thread: + // Loop through `chunks` to find the length of the buffers we need to pre-allocate. + // (We might be loading anything from a few bytes to a few GB, so we can't just pre-allocate large + // arrays and hope for the best!) + // For chunks for which we already know the chunksize (maybe we have the filesize in cache, or the chunk + // explicitly tells us its length), immediately allocate the buffer and submit a read SQE to io_uring. + // Some FileChunks will require us to get the length of the file before we can calculate the length of the buffer. + // let nbytes = self.get_size(filename); + // Only get the file sizes when necessary. + // Allocate buffers. + // Submit a read submission queue entry to io_uring. (Each thread will have its own io_uring) + // Thought: Maybe, in a later version of LSIO, we should have *two* threads per io_uring: + // One thread submits requests to io_uring, and then goes away when it has finished. + // The other thread processes requests. This way, we can could start decompressing / copying + // chunks before we've even got all the filesizes back. + // Once we've submitted all read requests, then process the completion queue entries. + // In a later version of LSIO, some of those CQEs will contain the filesize, and we'll have to submit a read request. + } + + fn get_nbytes(&self, path: &PathBuf) -> u64 { + // TODO: Use POSIX standard function name. + // first check our local cache of file sizes. If that's empty, then + // for the MVP, just use Rust's standard way to get the file length. Later, we may use io_uring by + // chaining {open, statx, close}. + // Cache nbytes before returning. + } } struct IoUringLocal { latency_ms: f64, bandwidth_gbps: f64, + cache_of_nbytes_per_filename: map, } -impl LocalIo for IoUringLocal {} - struct FileChunks { path: &Path, - chunks: Vector, + range: FileRange, } -struct Chunk { - offset: u64, - len: u64, +enum FileRange { + EntireFile, + MultiRange(Vec), } -trait Reader { - fn read_chunks(&self, chunks: &Vec) -> Future>>; -} +impl LocalIo for IoUringLocal {} From c4e573820afbfa98c64413f7e1a757fcf5710a99 Mon Sep 17 00:00:00 2001 From: Jack Kelly Date: Tue, 21 Nov 2023 17:18:51 +0000 Subject: [PATCH 06/42] Tidy up MultiRange --- src/draft_API_design.rs | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/src/draft_API_design.rs b/src/draft_API_design.rs index 05dd49b..40efb8e 100644 --- a/src/draft_API_design.rs +++ b/src/draft_API_design.rs @@ -2,14 +2,14 @@ /// and sketching out some of the important internals. /// /// Use-cases that this design needs to be tested against: -/// 1. Load ends of files (e.g. Zarr shard_index) -/// 2. Cache the lengths of files. -/// 3. Load huge numbers of files (e.g. non-sharded Zarrs) -/// 4. Load huge numbers of chunks from a small number of files. -/// 5. "Scatter" data to multiple arrays -/// (e.g. loading uncompressed Zarr / EUMETSAT / GRIB files into final numpy array using DMA!) -/// 6. Per chunk: Decompress, process, and copy to final array. -/// 7. Allow LSIO to merge nearby chunks. +/// - [x] Load ends of files (e.g. Zarr shard_index) +/// - [x] Cache the lengths of files. +/// - [x] Load huge numbers of files (e.g. non-sharded Zarrs) +/// - [x] Load huge numbers of chunks from a small number of files. +/// - [x] "Scatter" data to multiple arrays +/// (e.g. loading uncompressed Zarr / EUMETSAT / GRIB files into final numpy array using DMA!) +/// - [x] Per chunk: Decompress, process, and copy to final array. +/// - [x] Allow LSIO to merge nearby chunks. fn main() -> () { // Set config options (latency, bandwidth, maybe others) @@ -32,13 +32,14 @@ fn main() -> () { path: "/foo/baz", range: FileRange::MultiRange( vec![ - // Rust ranges can't express "get the last n elements". - // I'll assume I can create a little crate which allows - // for Ranges from the end, like -10.. (the last 10 elements) or -10..-5. - ..1000, // Read the first 1,000 bytes - -200.., // Read the last 200 bytes + ..1000, // Read the first 1,000 bytes + -500.., // Read the last 500 bytes + -500..-100, // Read 400 bytes, until the 100th byte before the end ], ), + // I had considered also including `destinations` (mutable references to) + // the target memory buffers. But - at this point in the code - we + // don't know the sizes of the chunks that are relative to the end of the file. }, ]; @@ -54,6 +55,8 @@ fn main() -> () { // * move chunk to final array (the address of the final array would be passed into the closure?) }; let future = read.read_chunks_and_apply(&chunks, processing_fn); + + } // But, how to express that SSD_PCIe_gen4 isn't a valid config for, say, network IO? From 14ff4c04a2a06ff91d67499cf55d26aa1c193f66 Mon Sep 17 00:00:00 2001 From: Jack Kelly Date: Tue, 21 Nov 2023 19:07:46 +0000 Subject: [PATCH 07/42] Small tweak to docs --- src/draft_API_design.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/draft_API_design.rs b/src/draft_API_design.rs index 40efb8e..0fc086d 100644 --- a/src/draft_API_design.rs +++ b/src/draft_API_design.rs @@ -17,6 +17,7 @@ fn main() -> () { // Or do this :) let config = LocalIoConfig::FromFile("filename"); + // ...or... let config = LocalIoConfig::AutoCalibrate; // Init: @@ -37,7 +38,7 @@ fn main() -> () { -500..-100, // Read 400 bytes, until the 100th byte before the end ], ), - // I had considered also including `destinations` (mutable references to) + // I had considered also including `destinations` field, holding Vec of mutable references to // the target memory buffers. But - at this point in the code - we // don't know the sizes of the chunks that are relative to the end of the file. }, From f6aa56f2367824f55c73bf115fda898f413574d5 Mon Sep 17 00:00:00 2001 From: Jack Kelly Date: Tue, 21 Nov 2023 21:03:48 +0000 Subject: [PATCH 08/42] Neater way to configure things. --- src/draft_API_design.rs | 87 ++++++++++++++++++++--------------------- 1 file changed, 43 insertions(+), 44 deletions(-) diff --git a/src/draft_API_design.rs b/src/draft_API_design.rs index 0fc086d..f921199 100644 --- a/src/draft_API_design.rs +++ b/src/draft_API_design.rs @@ -13,25 +13,22 @@ fn main() -> () { // Set config options (latency, bandwidth, maybe others) - let config = LocalIoConfig::SSD_PCIe_gen4; - + let config = SSD_PCIE_GEN4; // Or do this :) - let config = LocalIoConfig::FromFile("filename"); - // ...or... - let config = LocalIoConfig::AutoCalibrate; + let config = IoConfig::auto_calibrate(); // Init: - let reader = IoUringLocal::from_config(&config); + let reader = IoUringLocal::new(&config); // Define which chunks to load: let chunks = vec![ FileChunks{ path: "/foo/bar", - range: FileRange::EntireFile, // Read all of file + byte_range: ByteRange::EntireFile, // Read all of file }, FileChunks{ path: "/foo/baz", - range: FileRange::MultiRange( + byte_range: ByteRange::MultiRange( vec![ ..1000, // Read the first 1,000 bytes -500.., // Read the last 500 bytes @@ -50,42 +47,44 @@ fn main() -> () { // Or, read chunks and apply a function: let mut final_array = Array(); + let chunk_idx_to_array_loc = Vec::new(); let processing_fn = |chunk_idx: u64, chunk: &[u8]| -> &[u8] { - // * decompress - // * process - // * move chunk to final array (the address of the final array would be passed into the closure?) + // ******** DECOMRESS ************ + // If we don't know the size of the uncompressed chunk, then + // deliberately over-allocate, and shrink later... + const OVER_ALLOCATION_RATIO: usize = 4; + let mut decompressed_chunk = Vec::with_capacity(OVER_ALLOCATION_RATIO * chunk.size()); + decompress(&chunk, &mut decompressed_chunk); + + // ******** PROCESS *********** + decompressed_chunk = decompressed_chunk / 2; // to give a very simple example! + + // ******** COPY TO FINAL ARRAY ************** + final_array[chunk_idx_to_array_loc[chunk_idx]] = decompressed_chunk; }; let future = read.read_chunks_and_apply(&chunks, processing_fn); + future.wait(); + pass_to_python(&final_array); +} - +pub struct IoConfig { + pub latency_millisecs: f64, + pub bandwidth_gbytes_per_sec: f64, } -// But, how to express that SSD_PCIe_gen4 isn't a valid config for, say, network IO? -// Maybe don't pass in a config Enum variant, -// instead have a ssd_pcie_gen4() method on IoUringLocal? -enum LocalIoConfig { - SSD_PCIe_gen4, - AutoCalibrate, - FromFile(PathBuf), +impl IoConfig { + fn auto_calibrate() -> Self {} + // Use Serde to save / load IoConfig to disk. } -trait LocalIo { - fn from_config(config: &LocalIoConfig) -> Self { - match config { - SSD_PCIe_gen4 => LocalIoConfig { - latency_ms: 0.001, - bandwidth_gbps: 8, - }, - AutoCalibrate => { - // TODO: Automatically calibrate. - }, - FromFile(filename) => { - // TODO: Load config from filename. - } - } - } +const SSD_PCIE_GEN4: IoConfig = IoConfig{latency_millisecs: 0.001, bandwidth_gbytes_per_sec: 8}; + +trait Reader { + fn new(config: &IoConfig) -> Self { Self {config} } fn read_chunks(&self, chunks: &Vec) -> Future>> { + // (Implement all the general-purpose functionality in the Reader trait, + // and implement the io_uring-specific stuff in IoUringLocal.) // Split `chunks` and send to threads (if there are enough chunks to be worth the hassle) // If there are >1,000 files to open then we'll have to process them in batches, // so as not to exceed the max filehandles per process.`` @@ -109,8 +108,7 @@ trait LocalIo { // In a later version of LSIO, some of those CQEs will contain the filesize, and we'll have to submit a read request. } - fn get_nbytes(&self, path: &PathBuf) -> u64 { - // TODO: Use POSIX standard function name. + fn get_file_size_in_bytes(&self, path: &PathBuf) -> u64 { // first check our local cache of file sizes. If that's empty, then // for the MVP, just use Rust's standard way to get the file length. Later, we may use io_uring by // chaining {open, statx, close}. @@ -118,19 +116,20 @@ trait LocalIo { } } struct IoUringLocal { - latency_ms: f64, - bandwidth_gbps: f64, - cache_of_nbytes_per_filename: map, + config: &IoConfig, + cached_file_sizes_in_bytes: map, +} + +impl LocalIo for IoUringLocal { + // Implement io_uring-specific stuff... } struct FileChunks { path: &Path, - range: FileRange, + byte_range: ByteRange, } -enum FileRange { +enum ByteRange { EntireFile, MultiRange(Vec), -} - -impl LocalIo for IoUringLocal {} +} \ No newline at end of file From 205c00d748a1391304ebd6db09463453d4ada026 Mon Sep 17 00:00:00 2001 From: Jack Kelly Date: Tue, 21 Nov 2023 21:04:41 +0000 Subject: [PATCH 09/42] shrink to fit --- src/draft_API_design.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/draft_API_design.rs b/src/draft_API_design.rs index f921199..37bae9e 100644 --- a/src/draft_API_design.rs +++ b/src/draft_API_design.rs @@ -55,6 +55,7 @@ fn main() -> () { const OVER_ALLOCATION_RATIO: usize = 4; let mut decompressed_chunk = Vec::with_capacity(OVER_ALLOCATION_RATIO * chunk.size()); decompress(&chunk, &mut decompressed_chunk); + decompressed_chunk.shrink_to_fit(); // ******** PROCESS *********** decompressed_chunk = decompressed_chunk / 2; // to give a very simple example! From 9584a4f8b942838bd1ba9ad79e8f33c6169f2240 Mon Sep 17 00:00:00 2001 From: Jack Kelly Date: Wed, 22 Nov 2023 16:59:57 +0000 Subject: [PATCH 10/42] Fix some 'bugs' in the comments --- src/draft_API_design.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/draft_API_design.rs b/src/draft_API_design.rs index 37bae9e..2ea19ad 100644 --- a/src/draft_API_design.rs +++ b/src/draft_API_design.rs @@ -35,9 +35,9 @@ fn main() -> () { -500..-100, // Read 400 bytes, until the 100th byte before the end ], ), - // I had considered also including `destinations` field, holding Vec of mutable references to + // I had considered also including a `destinations` field, holding Vec of mutable references to // the target memory buffers. But - at this point in the code - we - // don't know the sizes of the chunks that are relative to the end of the file. + // don't know the file sizes, so we can't allocate buffers yet for EntireFiles. }, ]; @@ -49,7 +49,7 @@ fn main() -> () { let mut final_array = Array(); let chunk_idx_to_array_loc = Vec::new(); let processing_fn = |chunk_idx: u64, chunk: &[u8]| -> &[u8] { - // ******** DECOMRESS ************ + // ******** DECOMPRESS ************ // If we don't know the size of the uncompressed chunk, then // deliberately over-allocate, and shrink later... const OVER_ALLOCATION_RATIO: usize = 4; From bbec8fd9b1755f81ea86e82996719cd2126f2a56 Mon Sep 17 00:00:00 2001 From: Jack Kelly Date: Wed, 22 Nov 2023 17:49:07 +0000 Subject: [PATCH 11/42] Tiny tweak to wording of comment --- src/draft_API_design.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/draft_API_design.rs b/src/draft_API_design.rs index 2ea19ad..4640474 100644 --- a/src/draft_API_design.rs +++ b/src/draft_API_design.rs @@ -35,7 +35,7 @@ fn main() -> () { -500..-100, // Read 400 bytes, until the 100th byte before the end ], ), - // I had considered also including a `destinations` field, holding Vec of mutable references to + // I considered also including a `destinations` field, holding Vec of mutable references to // the target memory buffers. But - at this point in the code - we // don't know the file sizes, so we can't allocate buffers yet for EntireFiles. }, From c65ab45cf271372cf35632c0d652c11db8d90355 Mon Sep 17 00:00:00 2001 From: Jack Kelly Date: Mon, 27 Nov 2023 17:13:17 +0000 Subject: [PATCH 12/42] Add design.md (not finished yet!) --- design.md | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) create mode 100644 design.md diff --git a/design.md b/design.md new file mode 100644 index 0000000..785a3c9 --- /dev/null +++ b/design.md @@ -0,0 +1,29 @@ +# Draft design for `light-speed-io` + +## Planned features + +`light-speed-io` (or "LSIO", for short) will be a Rust library crate for loading and processing many chunks of files, as fast as the storage system will allow. + + +- [ ] Provide a simple, async API for reading many chunks of files (and/or many files) with single API call. Users will be able to ask LSIO: "_Please get me these million file chunks, and apply this function to each chunk. Tell me when you're done._". +- [ ] The API will be the same, no matter which operating system you're on, and no matter whether the data is on local disk, or a cloud storage bucket, or available over HTTP. (Hat tip to [fsspec](https://filesystem-spec.readthedocs.io/en/latest/) :smiley:!) +- [ ] Laser-focus on _speed_: + - Achieve many [input/output operations per second](https://en.wikipedia.org/wiki/IOPS) (IOPS), high bandwidth, and low latency by exploiting "modern" operating system storage APIs, and designing for inherently parallel storage systems like NVMe SSDs and cloud storage buckets. + - Before submitting any IO operations, tune the sequence of IO operations according to the performance characteristics of each storage system. For example, on a storage system that uses hard drives (with spinning platters), the performance of random reads is dominated by the time taken to move the read head. So merge nearby reads, even if those reads aren't strictly consecutive. Or, when reading large files from a cloud storage bucket, it may be faster to split each file into multiple chunks, and request those chunks in parallel. + - Exploit CPU caches: Minimise the number of round-trips to RAM. Once a chunk is loaded into CPU cache, perform all transformations on that chunk in quick succession, and pin the computation to a single CPU core per chunk. + - Use multiple CPU cores in parallel. + - When scheduling work across multiple CPU cores: Avoid locks, or any synchronization primitives that would block a CPU core, wherever possible. + - Look for opportunities to completely cut the CPU out of the data path. For example, if we're loading uncompressed [Zarr](https://zarr.dev/) chunks that are destined to be merged into a final numpy array, then we may be able to use DMA to directly copy chunks into the final numpy array, without the CPU ever touching the data. This may be possible even in cases where the creation of the final array is more complicated than simply concatenating the chunks in RAM. + - Where appropriate, align chunks in RAM (and pad the ends of chunks) so the CPU & compiler can easily use SIMD instructions. (SIMD registers may be useful "just" for memory copies). +- [ ] The user-supplied function that's applied to each chunk could include, for example, decompression, followed by some numerical transformation, followed by copying the transformed data to a large array which is the concatenation of all the chunks. As much of this as possible should happen in the CPU cache (without time-consuming round-trips to RAM) +- [ ] Implement multiple IO backends. Each backend will exploit the performance features of a particular operating system and storage system. The ambition is to support: + - These operating systems: + - [ ] Linux [io_uring](https://en.wikipedia.org/wiki/Io_uring) (for local storage and network storage). + - [ ] Windows [I/O Ring](https://windows-internals.com/i-o-rings-when-one-i-o-operation-is-not-enough/). + - [ ] MacOS X [kqueue](https://en.wikipedia.org/wiki/Kqueue). + - These storage systems: + - [ ] Local disks. + - [ ] Cloud storage buckets. + - [ ] HTTP. +- [ ] Provide an async Rust API +- [ ] Provide an async Python API. \ No newline at end of file From a8dfee1bbe5b143c0d0d47e8578992ab415837a2 Mon Sep 17 00:00:00 2001 From: Jack Kelly Date: Mon, 27 Nov 2023 17:29:32 +0000 Subject: [PATCH 13/42] Add use-cases to design.md --- design.md | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/design.md b/design.md index 785a3c9..23ea40b 100644 --- a/design.md +++ b/design.md @@ -1,23 +1,23 @@ -# Draft design for `light-speed-io` - -## Planned features +# Draft design for `light-speed-io` (LSIO) `light-speed-io` (or "LSIO", for short) will be a Rust library crate for loading and processing many chunks of files, as fast as the storage system will allow. +## Planned features - [ ] Provide a simple, async API for reading many chunks of files (and/or many files) with single API call. Users will be able to ask LSIO: "_Please get me these million file chunks, and apply this function to each chunk. Tell me when you're done._". - [ ] The API will be the same, no matter which operating system you're on, and no matter whether the data is on local disk, or a cloud storage bucket, or available over HTTP. (Hat tip to [fsspec](https://filesystem-spec.readthedocs.io/en/latest/) :smiley:!) - [ ] Laser-focus on _speed_: - Achieve many [input/output operations per second](https://en.wikipedia.org/wiki/IOPS) (IOPS), high bandwidth, and low latency by exploiting "modern" operating system storage APIs, and designing for inherently parallel storage systems like NVMe SSDs and cloud storage buckets. - - Before submitting any IO operations, tune the sequence of IO operations according to the performance characteristics of each storage system. For example, on a storage system that uses hard drives (with spinning platters), the performance of random reads is dominated by the time taken to move the read head. So merge nearby reads, even if those reads aren't strictly consecutive. Or, when reading large files from a cloud storage bucket, it may be faster to split each file into multiple chunks, and request those chunks in parallel. - - Exploit CPU caches: Minimise the number of round-trips to RAM. Once a chunk is loaded into CPU cache, perform all transformations on that chunk in quick succession, and pin the computation to a single CPU core per chunk. + - Before submitting any IO operations, tune the sequence of IO operations according to the performance characteristics of each storage system. For example, on a hard drive (with spinning platters), the performance of random reads is dominated by the time taken to move the read head. So LSIO will merge nearby reads, even if those reads aren't strictly consecutive: For example, if we want to read every fifth block of a file, it may be faster to read the entire file, even if we immediately throw away four fifths of the data. Or, when reading large files from a cloud storage bucket, it may be faster to split each file into multiple chunks, and request those chunks in parallel. + - "Auto-tune" to each storage system. Or, if users do not want to auto-tune, then provide sane defaults for a range of common storage systems. + - Exploit CPU caches and hence minimize the number of round-trips to RAM. Once a chunk is loaded into CPU cache, perform all transformations on that chunk in quick succession, and pin the computation for a given chunk to a single CPU core. - Use multiple CPU cores in parallel. - When scheduling work across multiple CPU cores: Avoid locks, or any synchronization primitives that would block a CPU core, wherever possible. - Look for opportunities to completely cut the CPU out of the data path. For example, if we're loading uncompressed [Zarr](https://zarr.dev/) chunks that are destined to be merged into a final numpy array, then we may be able to use DMA to directly copy chunks into the final numpy array, without the CPU ever touching the data. This may be possible even in cases where the creation of the final array is more complicated than simply concatenating the chunks in RAM. - Where appropriate, align chunks in RAM (and pad the ends of chunks) so the CPU & compiler can easily use SIMD instructions. (SIMD registers may be useful "just" for memory copies). -- [ ] The user-supplied function that's applied to each chunk could include, for example, decompression, followed by some numerical transformation, followed by copying the transformed data to a large array which is the concatenation of all the chunks. As much of this as possible should happen in the CPU cache (without time-consuming round-trips to RAM) -- [ ] Implement multiple IO backends. Each backend will exploit the performance features of a particular operating system and storage system. The ambition is to support: - - These operating systems: +- [ ] The user-supplied function that's applied to each chunk could include, for example, decompression, followed by some numerical transformation, followed by copying the transformed data to a large array which is the concatenation of all the chunks. As much of this as possible should happen in the CPU cache (without time-consuming round-trips to RAM). +- [ ] LSIO will implement multiple IO backends. Each backend will exploit the performance features of a particular operating system and storage system. The ambition is to support: + - These operating system APIs: - [ ] Linux [io_uring](https://en.wikipedia.org/wiki/Io_uring) (for local storage and network storage). - [ ] Windows [I/O Ring](https://windows-internals.com/i-o-rings-when-one-i-o-operation-is-not-enough/). - [ ] MacOS X [kqueue](https://en.wikipedia.org/wiki/Kqueue). @@ -26,4 +26,14 @@ - [ ] Cloud storage buckets. - [ ] HTTP. - [ ] Provide an async Rust API -- [ ] Provide an async Python API. \ No newline at end of file +- [ ] Provide an async Python API. + +## Use cases + +Allow for very fast access to: +* [Zarr](https://zarr.dev/) arrays. Jack is mostly focused on [sharded Zarr arrays](https://zarr.dev/zeps/accepted/ZEP0002.html). But LSIO could also be helpful for non-sharded Zarr arrays. +* Other file formats used for multi-dimensional arrays, such as NetCDF, GRIB, and EUMETSAT's native file format. (LSIO could help to speed up [kerchunk](https://fsspec.github.io/kerchunk/)) + +## Design + +TODO! (But, for now, see the file `src/draft_API_design.rs`) \ No newline at end of file From bf6ef3288753dcf841e19f89295d66390af99713 Mon Sep 17 00:00:00 2001 From: Jack Kelly Date: Mon, 27 Nov 2023 17:30:22 +0000 Subject: [PATCH 14/42] Add "in this pull request" --- design.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/design.md b/design.md index 23ea40b..972f076 100644 --- a/design.md +++ b/design.md @@ -36,4 +36,4 @@ Allow for very fast access to: ## Design -TODO! (But, for now, see the file `src/draft_API_design.rs`) \ No newline at end of file +TODO! (But, for now, see the file `src/draft_API_design.rs` in this pull request) \ No newline at end of file From c1064bdd4e66d720a7324750e71db1dc0a21d666 Mon Sep 17 00:00:00 2001 From: Jack Kelly Date: Mon, 27 Nov 2023 17:31:27 +0000 Subject: [PATCH 15/42] add link to draft_API_design.rs --- design.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/design.md b/design.md index 972f076..c1342d7 100644 --- a/design.md +++ b/design.md @@ -36,4 +36,4 @@ Allow for very fast access to: ## Design -TODO! (But, for now, see the file `src/draft_API_design.rs` in this pull request) \ No newline at end of file +TODO! (But, for now, see the file [`src/draft_API_design.rs` in this pull request](https://github.com/JackKelly/light-speed-io/blob/draft-API-design/src/draft_API_design.rs)) \ No newline at end of file From ce807d9c368e22ea19bb651dfbcafd527b17ab4d Mon Sep 17 00:00:00 2001 From: Jack Kelly Date: Mon, 27 Nov 2023 17:32:39 +0000 Subject: [PATCH 16/42] Link to design.md from draft_API_design.rs --- src/draft_API_design.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/draft_API_design.rs b/src/draft_API_design.rs index 4640474..de79cba 100644 --- a/src/draft_API_design.rs +++ b/src/draft_API_design.rs @@ -10,6 +10,9 @@ /// (e.g. loading uncompressed Zarr / EUMETSAT / GRIB files into final numpy array using DMA!) /// - [x] Per chunk: Decompress, process, and copy to final array. /// - [x] Allow LSIO to merge nearby chunks. +/// +/// This file will soon be merged into, and replaced by: +/// https://github.com/JackKelly/light-speed-io/blob/draft-API-design/design.md fn main() -> () { // Set config options (latency, bandwidth, maybe others) From 6110c47de043ec0241e118a60d2fa235c8eb3401 Mon Sep 17 00:00:00 2001 From: Jack Kelly Date: Mon, 27 Nov 2023 17:40:57 +0000 Subject: [PATCH 17/42] added a timeline section --- design.md | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/design.md b/design.md index c1342d7..b829b8a 100644 --- a/design.md +++ b/design.md @@ -8,13 +8,13 @@ - [ ] The API will be the same, no matter which operating system you're on, and no matter whether the data is on local disk, or a cloud storage bucket, or available over HTTP. (Hat tip to [fsspec](https://filesystem-spec.readthedocs.io/en/latest/) :smiley:!) - [ ] Laser-focus on _speed_: - Achieve many [input/output operations per second](https://en.wikipedia.org/wiki/IOPS) (IOPS), high bandwidth, and low latency by exploiting "modern" operating system storage APIs, and designing for inherently parallel storage systems like NVMe SSDs and cloud storage buckets. - - Before submitting any IO operations, tune the sequence of IO operations according to the performance characteristics of each storage system. For example, on a hard drive (with spinning platters), the performance of random reads is dominated by the time taken to move the read head. So LSIO will merge nearby reads, even if those reads aren't strictly consecutive: For example, if we want to read every fifth block of a file, it may be faster to read the entire file, even if we immediately throw away four fifths of the data. Or, when reading large files from a cloud storage bucket, it may be faster to split each file into multiple chunks, and request those chunks in parallel. - - "Auto-tune" to each storage system. Or, if users do not want to auto-tune, then provide sane defaults for a range of common storage systems. - - Exploit CPU caches and hence minimize the number of round-trips to RAM. Once a chunk is loaded into CPU cache, perform all transformations on that chunk in quick succession, and pin the computation for a given chunk to a single CPU core. - - Use multiple CPU cores in parallel. + - Before submitting any IO operations, tune the sequence of IO operations according to the performance characteristics of each storage system. For example, on a hard drive (with spinning platters), the performance of random reads is dominated by the time taken to move the read head. So LSIO will merge nearby reads, even if those reads aren't strictly consecutive: For example, if we want to read every third block of a file, it may be faster to read the entire file, even if we immediately throw away two thirds of the data. Or, when reading large files from a cloud storage bucket, it may be faster to split each file into consecutive chunks, and request those chunks in parallel. + - "Auto-tune" to each storage system. Or, if users does not want to auto-tune, then provide sane defaults for a range of common storage systems. + - Exploit CPU caches and hence minimize the number of time-consuming reads from RAM. Once a chunk is loaded into CPU cache, perform all transformations on that chunk in quick succession, and pin the computation for a given chunk to a single CPU core. + - Use multiple CPU cores in parallel (each working on a different chunk). - When scheduling work across multiple CPU cores: Avoid locks, or any synchronization primitives that would block a CPU core, wherever possible. - Look for opportunities to completely cut the CPU out of the data path. For example, if we're loading uncompressed [Zarr](https://zarr.dev/) chunks that are destined to be merged into a final numpy array, then we may be able to use DMA to directly copy chunks into the final numpy array, without the CPU ever touching the data. This may be possible even in cases where the creation of the final array is more complicated than simply concatenating the chunks in RAM. - - Where appropriate, align chunks in RAM (and pad the ends of chunks) so the CPU & compiler can easily use SIMD instructions. (SIMD registers may be useful "just" for memory copies). + - Where appropriate, align chunks in RAM (and pad the ends of chunks) so the CPU & compiler can easily use SIMD instructions, and minimize the number of cache lines that must be read (using SIMD may provide a large speedup "just" for memory copies, even if the transform function doesn't use SIMD). - [ ] The user-supplied function that's applied to each chunk could include, for example, decompression, followed by some numerical transformation, followed by copying the transformed data to a large array which is the concatenation of all the chunks. As much of this as possible should happen in the CPU cache (without time-consuming round-trips to RAM). - [ ] LSIO will implement multiple IO backends. Each backend will exploit the performance features of a particular operating system and storage system. The ambition is to support: - These operating system APIs: @@ -34,6 +34,10 @@ Allow for very fast access to: * [Zarr](https://zarr.dev/) arrays. Jack is mostly focused on [sharded Zarr arrays](https://zarr.dev/zeps/accepted/ZEP0002.html). But LSIO could also be helpful for non-sharded Zarr arrays. * Other file formats used for multi-dimensional arrays, such as NetCDF, GRIB, and EUMETSAT's native file format. (LSIO could help to speed up [kerchunk](https://fsspec.github.io/kerchunk/)) +## Timeline + +Ha! :smiley:. This project is in the earliest planning stages! It'll be _months_ before it does anything vaguely useful! And, for now at least, this project is just Jack hacking away his spare time, whilst learning Rust! + ## Design TODO! (But, for now, see the file [`src/draft_API_design.rs` in this pull request](https://github.com/JackKelly/light-speed-io/blob/draft-API-design/src/draft_API_design.rs)) \ No newline at end of file From 33281fd905c4249f4caa359d51ef414a6961aa45 Mon Sep 17 00:00:00 2001 From: Jack Kelly Date: Tue, 28 Nov 2023 08:46:34 +0000 Subject: [PATCH 18/42] Add priorities --- design.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/design.md b/design.md index b829b8a..af22c89 100644 --- a/design.md +++ b/design.md @@ -32,12 +32,20 @@ Allow for very fast access to: * [Zarr](https://zarr.dev/) arrays. Jack is mostly focused on [sharded Zarr arrays](https://zarr.dev/zeps/accepted/ZEP0002.html). But LSIO could also be helpful for non-sharded Zarr arrays. + * Jack is particularly focused on speeding up the data pipeline for training machine learning models on multi-dimensional datasets. This is described in [Jack's blog post](https://jack-kelly.com/blog/2023-07-28-speeding-up-zarr). * Other file formats used for multi-dimensional arrays, such as NetCDF, GRIB, and EUMETSAT's native file format. (LSIO could help to speed up [kerchunk](https://fsspec.github.io/kerchunk/)) ## Timeline Ha! :smiley:. This project is in the earliest planning stages! It'll be _months_ before it does anything vaguely useful! And, for now at least, this project is just Jack hacking away his spare time, whilst learning Rust! +## Priorities + +Jack's priority is to build an MVP that's sufficient for loading sharded Zarrs from a local SSD using Linux io_uring. Benchmark this against existing Zarr implementations. Test in a machine learning pipeline. + +If this proves to provide a significant speed-up, then Jack will focus on implementing reading from cloud storage buckets, possibly using io_uring for async network IO. + + ## Design TODO! (But, for now, see the file [`src/draft_API_design.rs` in this pull request](https://github.com/JackKelly/light-speed-io/blob/draft-API-design/src/draft_API_design.rs)) \ No newline at end of file From 1dc018e73db16702e6340a9a766c176bd9edd79c Mon Sep 17 00:00:00 2001 From: Jack Kelly Date: Tue, 28 Nov 2023 09:05:38 +0000 Subject: [PATCH 19/42] Add more context to the Priorities section --- design.md | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/design.md b/design.md index af22c89..c5ebee4 100644 --- a/design.md +++ b/design.md @@ -41,10 +41,15 @@ Ha! :smiley:. This project is in the earliest planning stages! It'll be _months_ ## Priorities -Jack's priority is to build an MVP that's sufficient for loading sharded Zarrs from a local SSD using Linux io_uring. Benchmark this against existing Zarr implementations. Test in a machine learning pipeline. +Jack's main hypothesis is that it _should_ be possible to train large machine learning (ML) models _directly_ from multi-dimensional data (e.g. many timesteps of satellite imagery) stored on disk as Zarr arrays, instead of having to prepare ML training batches ahead of time. These ML models require random crops to be selected from multi-dimensional datasets, at several gigabytes per second. (See [Jack's blog post](https://jack-kelly.com/blog/2023-07-28-speeding-up-zarr) for more details.) -If this proves to provide a significant speed-up, then Jack will focus on implementing reading from cloud storage buckets, possibly using io_uring for async network IO. +(And, even more ambitiously, LSIO may allow us to train directly from the _original data_ stored in, for example, GRIB files). +The ultimate test is: Can LSIO enable us to train ML models directly from Zarr? (whilst ensuring that the GPU is constantly at near 100% utilization). So, Jack's priority will be to implement just enough of LSIO to enable us to test this hypothesis empirically: and that means implementing just one IO backend (io_uring for local files), to start with. + +If this provides a significant speed-up, then Jack will focus on implementing reading from Google Cloud Storage buckets, maybe using io_uring for async network IO. + +If this does not provide a speed-up, then - to be frank - LSIO will probably be abandoned! ## Design From a0419462ad09aac02edc23a34a9dc5ba74bd7174 Mon Sep 17 00:00:00 2001 From: Jack Kelly Date: Tue, 28 Nov 2023 09:15:29 +0000 Subject: [PATCH 20/42] Link to WikiPedia article on DMA --- design.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/design.md b/design.md index c5ebee4..49b1e01 100644 --- a/design.md +++ b/design.md @@ -13,7 +13,7 @@ - Exploit CPU caches and hence minimize the number of time-consuming reads from RAM. Once a chunk is loaded into CPU cache, perform all transformations on that chunk in quick succession, and pin the computation for a given chunk to a single CPU core. - Use multiple CPU cores in parallel (each working on a different chunk). - When scheduling work across multiple CPU cores: Avoid locks, or any synchronization primitives that would block a CPU core, wherever possible. - - Look for opportunities to completely cut the CPU out of the data path. For example, if we're loading uncompressed [Zarr](https://zarr.dev/) chunks that are destined to be merged into a final numpy array, then we may be able to use DMA to directly copy chunks into the final numpy array, without the CPU ever touching the data. This may be possible even in cases where the creation of the final array is more complicated than simply concatenating the chunks in RAM. + - Look for opportunities to completely cut the CPU out of the data path. For example, if we're loading uncompressed [Zarr](https://zarr.dev/) chunks that are destined to be merged into a final numpy array, then we may be able to use [direct memory access](https://en.wikipedia.org/wiki/Direct_memory_access) (DMA) to directly copy chunks into the final numpy array, without the CPU ever touching the data. This may be possible even in cases where the creation of the final array is more complicated than simply concatenating the chunks in RAM. - Where appropriate, align chunks in RAM (and pad the ends of chunks) so the CPU & compiler can easily use SIMD instructions, and minimize the number of cache lines that must be read (using SIMD may provide a large speedup "just" for memory copies, even if the transform function doesn't use SIMD). - [ ] The user-supplied function that's applied to each chunk could include, for example, decompression, followed by some numerical transformation, followed by copying the transformed data to a large array which is the concatenation of all the chunks. As much of this as possible should happen in the CPU cache (without time-consuming round-trips to RAM). - [ ] LSIO will implement multiple IO backends. Each backend will exploit the performance features of a particular operating system and storage system. The ambition is to support: From e454f4acc1ceeca753987d4c08687c6d114ea93a Mon Sep 17 00:00:00 2001 From: Jack Kelly Date: Tue, 28 Nov 2023 09:28:42 +0000 Subject: [PATCH 21/42] Tweak Priorities --- design.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/design.md b/design.md index 49b1e01..e583551 100644 --- a/design.md +++ b/design.md @@ -41,7 +41,7 @@ Ha! :smiley:. This project is in the earliest planning stages! It'll be _months_ ## Priorities -Jack's main hypothesis is that it _should_ be possible to train large machine learning (ML) models _directly_ from multi-dimensional data (e.g. many timesteps of satellite imagery) stored on disk as Zarr arrays, instead of having to prepare ML training batches ahead of time. These ML models require random crops to be selected from multi-dimensional datasets, at several gigabytes per second. (See [Jack's blog post](https://jack-kelly.com/blog/2023-07-28-speeding-up-zarr) for more details.) +Jack's main hypothesis is that it _should_ be possible to train large machine learning (ML) models _directly_ from multi-dimensional data stored on disk as Zarr arrays, instead of having to prepare ML training batches ahead of time. These ML models require random crops to be selected from multi-dimensional datasets, at several gigabytes per second. (See [Jack's blog post](https://jack-kelly.com/blog/2023-07-28-speeding-up-zarr) for more details. An example multi-dimensional dataset is satellite imagery over time.) (And, even more ambitiously, LSIO may allow us to train directly from the _original data_ stored in, for example, GRIB files). From 3b60fbc929a703962b2a2a33a81253f6d5166e20 Mon Sep 17 00:00:00 2001 From: Jack Kelly Date: Tue, 28 Nov 2023 09:30:57 +0000 Subject: [PATCH 22/42] Link to priorities section from use-cases --- design.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/design.md b/design.md index e583551..98a7a7e 100644 --- a/design.md +++ b/design.md @@ -32,7 +32,7 @@ Allow for very fast access to: * [Zarr](https://zarr.dev/) arrays. Jack is mostly focused on [sharded Zarr arrays](https://zarr.dev/zeps/accepted/ZEP0002.html). But LSIO could also be helpful for non-sharded Zarr arrays. - * Jack is particularly focused on speeding up the data pipeline for training machine learning models on multi-dimensional datasets. This is described in [Jack's blog post](https://jack-kelly.com/blog/2023-07-28-speeding-up-zarr). + * Jack is particularly focused on speeding up the data pipeline for training machine learning models on multi-dimensional datasets. This is described below in the [Priorities](#priorities) section. * Other file formats used for multi-dimensional arrays, such as NetCDF, GRIB, and EUMETSAT's native file format. (LSIO could help to speed up [kerchunk](https://fsspec.github.io/kerchunk/)) ## Timeline From 93402b1fb87d9ae77470818ddfe1ddc4de87546f Mon Sep 17 00:00:00 2001 From: Jack Kelly Date: Tue, 28 Nov 2023 09:32:05 +0000 Subject: [PATCH 23/42] Flesh out use-cases a little --- design.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/design.md b/design.md index 98a7a7e..054facf 100644 --- a/design.md +++ b/design.md @@ -32,7 +32,7 @@ Allow for very fast access to: * [Zarr](https://zarr.dev/) arrays. Jack is mostly focused on [sharded Zarr arrays](https://zarr.dev/zeps/accepted/ZEP0002.html). But LSIO could also be helpful for non-sharded Zarr arrays. - * Jack is particularly focused on speeding up the data pipeline for training machine learning models on multi-dimensional datasets. This is described below in the [Priorities](#priorities) section. + * Jack is particularly focused on speeding up the data pipeline for training machine learning models on multi-dimensional datasets, where we want to select random crops of data, as fast as the hardware will allow. This is described below in the [Priorities](#priorities) section. * Other file formats used for multi-dimensional arrays, such as NetCDF, GRIB, and EUMETSAT's native file format. (LSIO could help to speed up [kerchunk](https://fsspec.github.io/kerchunk/)) ## Timeline From ddd141fc941db4a5ba9fc6221f1e4cffac6ac9b9 Mon Sep 17 00:00:00 2001 From: Jack Kelly Date: Tue, 28 Nov 2023 09:35:18 +0000 Subject: [PATCH 24/42] Put Timeline after Priorities --- design.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/design.md b/design.md index 054facf..26c1dc2 100644 --- a/design.md +++ b/design.md @@ -22,7 +22,7 @@ - [ ] Windows [I/O Ring](https://windows-internals.com/i-o-rings-when-one-i-o-operation-is-not-enough/). - [ ] MacOS X [kqueue](https://en.wikipedia.org/wiki/Kqueue). - These storage systems: - - [ ] Local disks. + - [ ] Local disks. (With different optimizations for SSDs and HDDs). - [ ] Cloud storage buckets. - [ ] HTTP. - [ ] Provide an async Rust API @@ -35,10 +35,6 @@ Allow for very fast access to: * Jack is particularly focused on speeding up the data pipeline for training machine learning models on multi-dimensional datasets, where we want to select random crops of data, as fast as the hardware will allow. This is described below in the [Priorities](#priorities) section. * Other file formats used for multi-dimensional arrays, such as NetCDF, GRIB, and EUMETSAT's native file format. (LSIO could help to speed up [kerchunk](https://fsspec.github.io/kerchunk/)) -## Timeline - -Ha! :smiley:. This project is in the earliest planning stages! It'll be _months_ before it does anything vaguely useful! And, for now at least, this project is just Jack hacking away his spare time, whilst learning Rust! - ## Priorities Jack's main hypothesis is that it _should_ be possible to train large machine learning (ML) models _directly_ from multi-dimensional data stored on disk as Zarr arrays, instead of having to prepare ML training batches ahead of time. These ML models require random crops to be selected from multi-dimensional datasets, at several gigabytes per second. (See [Jack's blog post](https://jack-kelly.com/blog/2023-07-28-speeding-up-zarr) for more details. An example multi-dimensional dataset is satellite imagery over time.) @@ -51,6 +47,10 @@ If this provides a significant speed-up, then Jack will focus on implementing re If this does not provide a speed-up, then - to be frank - LSIO will probably be abandoned! +## Timeline + +Ha! :smiley:. This project is in the earliest planning stages! It'll be _months_ before it does anything vaguely useful! And, for now at least, this project is just Jack hacking away his spare time, whilst learning Rust! + ## Design TODO! (But, for now, see the file [`src/draft_API_design.rs` in this pull request](https://github.com/JackKelly/light-speed-io/blob/draft-API-design/src/draft_API_design.rs)) \ No newline at end of file From d0fd3e856eed276c2b9220109c7a35b81da231f7 Mon Sep 17 00:00:00 2001 From: Jack Kelly Date: Tue, 28 Nov 2023 09:47:48 +0000 Subject: [PATCH 25/42] Tweaks to design.md --- design.md | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/design.md b/design.md index 26c1dc2..4fbf7b0 100644 --- a/design.md +++ b/design.md @@ -5,17 +5,17 @@ ## Planned features - [ ] Provide a simple, async API for reading many chunks of files (and/or many files) with single API call. Users will be able to ask LSIO: "_Please get me these million file chunks, and apply this function to each chunk. Tell me when you're done._". -- [ ] The API will be the same, no matter which operating system you're on, and no matter whether the data is on local disk, or a cloud storage bucket, or available over HTTP. (Hat tip to [fsspec](https://filesystem-spec.readthedocs.io/en/latest/) :smiley:!) +- [ ] The API will be the same, no matter which operating system you're on, and no matter whether the data is on local disk, or a cloud storage bucket, or available over HTTP. (Inspired by [fsspec](https://filesystem-spec.readthedocs.io/en/latest/) :smiley:!) - [ ] Laser-focus on _speed_: - Achieve many [input/output operations per second](https://en.wikipedia.org/wiki/IOPS) (IOPS), high bandwidth, and low latency by exploiting "modern" operating system storage APIs, and designing for inherently parallel storage systems like NVMe SSDs and cloud storage buckets. - Before submitting any IO operations, tune the sequence of IO operations according to the performance characteristics of each storage system. For example, on a hard drive (with spinning platters), the performance of random reads is dominated by the time taken to move the read head. So LSIO will merge nearby reads, even if those reads aren't strictly consecutive: For example, if we want to read every third block of a file, it may be faster to read the entire file, even if we immediately throw away two thirds of the data. Or, when reading large files from a cloud storage bucket, it may be faster to split each file into consecutive chunks, and request those chunks in parallel. - "Auto-tune" to each storage system. Or, if users does not want to auto-tune, then provide sane defaults for a range of common storage systems. - - Exploit CPU caches and hence minimize the number of time-consuming reads from RAM. Once a chunk is loaded into CPU cache, perform all transformations on that chunk in quick succession, and pin the computation for a given chunk to a single CPU core. + - Exploit CPU caches and hence minimize the number of time-consuming reads from RAM. Once a chunk is loaded into CPU cache, perform all transformations on that chunk in quick succession (to maximize the chance that the data stays in cache), and pin the computation for a given chunk to a single CPU core (because level-1 CPU cache is specific to a CPU core). - Use multiple CPU cores in parallel (each working on a different chunk). - When scheduling work across multiple CPU cores: Avoid locks, or any synchronization primitives that would block a CPU core, wherever possible. - - Look for opportunities to completely cut the CPU out of the data path. For example, if we're loading uncompressed [Zarr](https://zarr.dev/) chunks that are destined to be merged into a final numpy array, then we may be able to use [direct memory access](https://en.wikipedia.org/wiki/Direct_memory_access) (DMA) to directly copy chunks into the final numpy array, without the CPU ever touching the data. This may be possible even in cases where the creation of the final array is more complicated than simply concatenating the chunks in RAM. - - Where appropriate, align chunks in RAM (and pad the ends of chunks) so the CPU & compiler can easily use SIMD instructions, and minimize the number of cache lines that must be read (using SIMD may provide a large speedup "just" for memory copies, even if the transform function doesn't use SIMD). -- [ ] The user-supplied function that's applied to each chunk could include, for example, decompression, followed by some numerical transformation, followed by copying the transformed data to a large array which is the concatenation of all the chunks. As much of this as possible should happen in the CPU cache (without time-consuming round-trips to RAM). + - Look for opportunities to completely cut the CPU out of the data path. For example, if we're loading uncompressed [Zarr](https://zarr.dev/) chunks that are destined to be merged into a final numpy array, then we may be able to use [direct memory access](https://en.wikipedia.org/wiki/Direct_memory_access) (DMA) to directly copy chunks into the final numpy array from IO, without the CPU ever touching the data. This may be possible even in cases where the creation of the final array is more complicated than simply concatenating the chunks in RAM. + - Where appropriate, align chunks in RAM (and pad the ends of chunks) so the CPU & compiler can easily use SIMD instructions, and minimize the number of cache lines that must be read. (Using SIMD may provide a large speedup "just" for memory copies, even if the transform function doesn't use SIMD). +- [ ] The user-supplied function that's applied to each chunk could include, for example, decompression, followed by some numerical transformation, followed by copying the transformed data to a large array which is the concatenation of all the chunks. As much of this as possible should happen whilst the chunk is in the CPU cache (without time-consuming round-trips to RAM). - [ ] LSIO will implement multiple IO backends. Each backend will exploit the performance features of a particular operating system and storage system. The ambition is to support: - These operating system APIs: - [ ] Linux [io_uring](https://en.wikipedia.org/wiki/Io_uring) (for local storage and network storage). @@ -25,14 +25,14 @@ - [ ] Local disks. (With different optimizations for SSDs and HDDs). - [ ] Cloud storage buckets. - [ ] HTTP. -- [ ] Provide an async Rust API -- [ ] Provide an async Python API. +- [ ] Async Rust API. +- [ ] Async Python API. ## Use cases -Allow for very fast access to: -* [Zarr](https://zarr.dev/) arrays. Jack is mostly focused on [sharded Zarr arrays](https://zarr.dev/zeps/accepted/ZEP0002.html). But LSIO could also be helpful for non-sharded Zarr arrays. - * Jack is particularly focused on speeding up the data pipeline for training machine learning models on multi-dimensional datasets, where we want to select random crops of data, as fast as the hardware will allow. This is described below in the [Priorities](#priorities) section. +Allow for very fast access to arbitrary selections of: +* Multi-dimensional [Zarr](https://zarr.dev/) arrays. Jack is mostly focused on [_sharded_ Zarr arrays](https://zarr.dev/zeps/accepted/ZEP0002.html). But LSIO could also be helpful for non-sharded Zarr arrays. + * Jack is particularly focused on speeding up the data pipeline for training machine learning models on multi-dimensional datasets, where we want to select hundreds of random crops of data per second. This is described below in the [Priorities](#priorities) section. The ambition is to enable us to read on the order of 1 million Zarr chunks per second (from a fast, local SSD). * Other file formats used for multi-dimensional arrays, such as NetCDF, GRIB, and EUMETSAT's native file format. (LSIO could help to speed up [kerchunk](https://fsspec.github.io/kerchunk/)) ## Priorities @@ -45,7 +45,7 @@ The ultimate test is: Can LSIO enable us to train ML models directly from Zarr? If this provides a significant speed-up, then Jack will focus on implementing reading from Google Cloud Storage buckets, maybe using io_uring for async network IO. -If this does not provide a speed-up, then - to be frank - LSIO will probably be abandoned! +On the other hand, if LSIO does _not_ provide a speed-up, then - to be frank - LSIO will probably be abandoned! ## Timeline From aa8384b50d366e3a35eb5eb9247cab26c8641f78 Mon Sep 17 00:00:00 2001 From: Jack Kelly Date: Tue, 28 Nov 2023 10:43:53 +0000 Subject: [PATCH 26/42] Start moving Rust code into design.md --- design.md | 34 +++++++++++++++++++++++++++++++++- src/draft_API_design.rs | 7 +++++-- 2 files changed, 38 insertions(+), 3 deletions(-) diff --git a/design.md b/design.md index 4fbf7b0..3ac859e 100644 --- a/design.md +++ b/design.md @@ -53,4 +53,36 @@ Ha! :smiley:. This project is in the earliest planning stages! It'll be _months_ ## Design -TODO! (But, for now, see the file [`src/draft_API_design.rs` in this pull request](https://github.com/JackKelly/light-speed-io/blob/draft-API-design/src/draft_API_design.rs)) \ No newline at end of file +TODO! (But, for now, see the file [`src/draft_API_design.rs` in this pull request](https://github.com/JackKelly/light-speed-io/blob/draft-API-design/src/draft_API_design.rs)) + +### Public Rust API + +#### Configuration + +First, the user must set the configuration options using pre-defined defaults, or auto calibration, or manually specifying options, +or loading from disk (using [`serde`](https://serde.rs/)). The user's code would look like this: + +```rust +let config = SSD_PCIE_GEN4; +// Or do this :) +let config = IoConfig::auto_calibrate(); +``` + +Under the hood (in LSIO): + +```rust +pub struct IoConfig { + pub latency_millisecs: f64, + pub bandwidth_gbytes_per_sec: f64, +} + +impl IoConfig { + pub fn auto_calibrate() -> Self {} + // Use Serde to save / load IoConfig to disk. +} + +pub const SSD_PCIE_GEN4: IoConfig = IoConfig{ + latency_millisecs: 0.001, + bandwidth_gbytes_per_sec: 8, +}; +``` diff --git a/src/draft_API_design.rs b/src/draft_API_design.rs index de79cba..8753bb3 100644 --- a/src/draft_API_design.rs +++ b/src/draft_API_design.rs @@ -77,11 +77,14 @@ pub struct IoConfig { } impl IoConfig { - fn auto_calibrate() -> Self {} + pub fn auto_calibrate() -> Self {} // Use Serde to save / load IoConfig to disk. } -const SSD_PCIE_GEN4: IoConfig = IoConfig{latency_millisecs: 0.001, bandwidth_gbytes_per_sec: 8}; +pub const SSD_PCIE_GEN4: IoConfig = IoConfig{ + latency_millisecs: 0.001, + bandwidth_gbytes_per_sec: 8, +}; trait Reader { fn new(config: &IoConfig) -> Self { Self {config} } From 27e2dfb5710bb4b0d9b4876ac5fd3c1b1ee19f52 Mon Sep 17 00:00:00 2001 From: Jack Kelly Date: Tue, 28 Nov 2023 11:08:09 +0000 Subject: [PATCH 27/42] flesh out the public Rust API --- design.md | 29 ++++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/design.md b/design.md index 3ac859e..44b3eb9 100644 --- a/design.md +++ b/design.md @@ -57,13 +57,12 @@ TODO! (But, for now, see the file [`src/draft_API_design.rs` in this pull reques ### Public Rust API -#### Configuration +#### Describe the performance characteristics of the storage subsystem -First, the user must set the configuration options using pre-defined defaults, or auto calibration, or manually specifying options, -or loading from disk (using [`serde`](https://serde.rs/)). The user's code would look like this: +First, the user must describe the performance characteristics of their storage subsystem. This can be done using pre-defined defaults, or auto calibration, or manually specifying options, or loading from disk (using [`serde`](https://serde.rs/)). This information will be used by LSIO to optimize the sequence of chunks for the user's storage system, prior to submitting IO operations to the hardware. The user's code would look like this: ```rust -let config = SSD_PCIE_GEN4; +let config = SSD_NVME_PCIE_GEN4; // Or do this :) let config = IoConfig::auto_calibrate(); ``` @@ -71,18 +70,30 @@ let config = IoConfig::auto_calibrate(); Under the hood (in LSIO): ```rust +/// Describe the performance characteristics of the storage subsystem pub struct IoConfig { pub latency_millisecs: f64, - pub bandwidth_gbytes_per_sec: f64, + pub bandwidth_megabytes_per_sec: f64, + + /// Files larger than this will be broken into consecutive chunks, + /// and the chunks will be requested concurrently. + /// Breaking up files may speed up reading from cloud storage buckets. + /// Each chunk will be no larger than this size. + /// Set this to `None` if you never want to break files apart. + pub max_megabytes_of_single_read: Option, } impl IoConfig { - pub fn auto_calibrate() -> Self {} - // Use Serde to save / load IoConfig to disk. + pub fn auto_calibrate() -> Self { + // TODO + } + // Use Serde to save and load IoConfig. } -pub const SSD_PCIE_GEN4: IoConfig = IoConfig{ +/// Default config options for NVMe SSDs using PCIe generation 4. +pub const SSD_NVME_PCIE_GEN4: IoConfig = IoConfig{ latency_millisecs: 0.001, - bandwidth_gbytes_per_sec: 8, + bandwidth_megabytes_per_sec: 8000, + max_megabytes_of_single_read: None, }; ``` From f4007e8f4107be4d07a4058e56c17e8ea43e2f5e Mon Sep 17 00:00:00 2001 From: Jack Kelly Date: Tue, 28 Nov 2023 11:32:46 +0000 Subject: [PATCH 28/42] Init a Reader struct --- design.md | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/design.md b/design.md index 44b3eb9..f1bf9a9 100644 --- a/design.md +++ b/design.md @@ -63,6 +63,7 @@ First, the user must describe the performance characteristics of their storage s ```rust let config = SSD_NVME_PCIE_GEN4; + // Or do this :) let config = IoConfig::auto_calibrate(); ``` @@ -97,3 +98,32 @@ pub const SSD_NVME_PCIE_GEN4: IoConfig = IoConfig{ max_megabytes_of_single_read: None, }; ``` + +#### Initialize a `Reader` struct + +Using a persistent object will allow us to cache (in memory) values such as file sizes. And provides an opportunity to pre-allocated memory buffers (where possible). + +User code: + +```rust +let reader = IoUringLocal::new(config); +``` + +Under the hood (in LSIO): + +```rust +pub trait Reader { + pub fn new(config: IoConfig) -> Self { Self {config} } +} + +pub struct IoUringLocal { + config: IoConfig, + + /// Map from the full file name to the file size in bytes + cached_file_sizes_in_bytes: map, +} + +impl LocalIo for IoUringLocal { + // Implement io_uring-specific stuff... +} +``` \ No newline at end of file From 1cd9239e133df7035f7846f23c3f13702955a682 Mon Sep 17 00:00:00 2001 From: Jack Kelly Date: Tue, 28 Nov 2023 11:33:57 +0000 Subject: [PATCH 29/42] Fix bug in design! --- design.md | 2 +- src/draft_API_design.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/design.md b/design.md index f1bf9a9..8a6e20a 100644 --- a/design.md +++ b/design.md @@ -123,7 +123,7 @@ pub struct IoUringLocal { cached_file_sizes_in_bytes: map, } -impl LocalIo for IoUringLocal { +impl Reader for IoUringLocal { // Implement io_uring-specific stuff... } ``` \ No newline at end of file diff --git a/src/draft_API_design.rs b/src/draft_API_design.rs index 8753bb3..e00d9b5 100644 --- a/src/draft_API_design.rs +++ b/src/draft_API_design.rs @@ -127,7 +127,7 @@ struct IoUringLocal { cached_file_sizes_in_bytes: map, } -impl LocalIo for IoUringLocal { +impl Reader for IoUringLocal { // Implement io_uring-specific stuff... } From 91981d8b2a6a849060972097128de9aebedbc92b Mon Sep 17 00:00:00 2001 From: Jack Kelly Date: Tue, 28 Nov 2023 13:23:22 +0000 Subject: [PATCH 30/42] Sketch out some thoughts about internal design --- design.md | 141 +++++++++++++++++++++++++++++++++++++-- src/draft_API_design.rs | 142 ---------------------------------------- 2 files changed, 136 insertions(+), 147 deletions(-) delete mode 100644 src/draft_API_design.rs diff --git a/design.md b/design.md index 8a6e20a..96df3ce 100644 --- a/design.md +++ b/design.md @@ -61,6 +61,8 @@ TODO! (But, for now, see the file [`src/draft_API_design.rs` in this pull reques First, the user must describe the performance characteristics of their storage subsystem. This can be done using pre-defined defaults, or auto calibration, or manually specifying options, or loading from disk (using [`serde`](https://serde.rs/)). This information will be used by LSIO to optimize the sequence of chunks for the user's storage system, prior to submitting IO operations to the hardware. The user's code would look like this: +##### User code + ```rust let config = SSD_NVME_PCIE_GEN4; @@ -68,7 +70,7 @@ let config = SSD_NVME_PCIE_GEN4; let config = IoConfig::auto_calibrate(); ``` -Under the hood (in LSIO): +##### Under the hood (in LSIO) ```rust /// Describe the performance characteristics of the storage subsystem @@ -103,27 +105,156 @@ pub const SSD_NVME_PCIE_GEN4: IoConfig = IoConfig{ Using a persistent object will allow us to cache (in memory) values such as file sizes. And provides an opportunity to pre-allocated memory buffers (where possible). -User code: +##### User code ```rust let reader = IoUringLocal::new(config); ``` -Under the hood (in LSIO): +##### Under the hood (in LSIO) ```rust pub trait Reader { pub fn new(config: IoConfig) -> Self { Self {config} } } +/// Linux io_uring for locally-attached disks. pub struct IoUringLocal { config: IoConfig, - /// Map from the full file name to the file size in bytes + /// Map from the full file name to the file size in bytes. + /// We need to know the length of each file if we want to read the file + /// in its entirety, or if we want to seek to a position relative to the + /// end of the file. cached_file_sizes_in_bytes: map, } impl Reader for IoUringLocal { // Implement io_uring-specific stuff... } -``` \ No newline at end of file +``` + +#### Specify which chunks to read + +##### User code + +In this example, we read the entirety of `/foo/bar`, and we also read three chunks from `/foo/baz`: + +```rust +let chunks = vec![ + FileChunks{ + path: "/foo/bar", + byte_range: ByteRange::EntireFile, // Read all of file + }, + FileChunks{ + path: "/foo/baz", + byte_range: ByteRange::MultiRange( + vec![ + ..1000, // Read the first 1,000 bytes + -500..-200, // Read 300 bytes, until the 200th byte from the end + -100.., // Read the last 100 bytes. For example, shared Zarrs store + // the shard index at the end of each file. + ], + ), + // I considered also including a `destinations` field, holding Vec of mutable references to + // the target memory buffers. But - at this point in the code - we + // don't know the file sizes, so we can't allocate buffers yet for `EntireFiles`. + }, +]; +``` + +##### Under the hood (in LSIO) + +```rust +pub struct FileChunks { + pub path: Path, + pub byte_range: ByteRange, +} + +pub enum ByteRange { + EntireFile, + MultiRange(Vec), +} +``` + +#### Async reading of chunks + +##### User code + +```rust +// Start async loading of data from disk: +let future = reader.read_chunks(&chunks); + +// Wait for data to all be ready: +// We need one `Result` per chunk, because reading each chunk could fail: +let data: Vec> = future.wait(); +``` + +Or, if we want to apply a function to each chunk, we could do something like this. This example +is based on the Zarr use-case. For each chunk, want to decompress, and apply a simple numerical +transformation, and then move the transformed data into a final array: + +```rust +let mut final_array = Array(); +let chunk_idx_to_array_loc = Vec::new(); +// TODO: Fill out `chunk_idx_to_array_loc` + +// processing_fn could fail, so we return a Result. +// processing_fn may not return any data (because the data has been moved to another location) +// so we return an Option wrapped in a Result. +let processing_fn = |chunk_idx: u64, chunk: &[u8]| -> Result> { + // ******** DECOMPRESS ************ + // If we don't know the size of the uncompressed chunk, then + // deliberately over-allocate, and shrink later... + const OVER_ALLOCATION_RATIO: usize = 4; + let mut decompressed_chunk = Vec::with_capacity(OVER_ALLOCATION_RATIO * chunk.size()); + decompress(&chunk, &mut decompressed_chunk)?; + decompressed_chunk.shrink_to_fit(); + + // ******** PROCESS *********** + decompressed_chunk = decompressed_chunk / 2; // to give a very simple example! + + // ******** COPY TO FINAL ARRAY ************** + final_array[chunk_idx_to_array_loc[chunk_idx]] = decompressed_chunk; + Ok(None) // We're deliberately not passing back the decompressed array. +}; +let future = read.read_chunks_and_apply(&chunks, processing_fn); +let results = future.wait(); +// TODO: check `results` for any failures +pass_to_python(&final_array); +``` + +### Internal design of LSIO + +TODO. Things to consider: + +Within LSIO, the pipeline for the IO ops is something like this: + +- User submits a Vector of `FileChunks`. +- In the main thread: + - We need to get the file size for: + - any `EntireFiles`. If these exist, then we need to get the file size ahead-of-time, so we can pre-allocate a memory buffer. + - any `MultiRange`s which include offsets from the end of the file, iff the backend doesn't natively support offsets from the end of the file (or maybe this should be the backend's problem? Although I'd guess it'd be faster to get all file sizes in one go, ahead of time?) + - For any `MultiRange`s, LSIO optimizes the sequence of ranges. This is dependent on `IoConfig`, but shouldn't be dependent on the IO backend. Maybe this could be implemented as a set of methods on `ByteRange`? + - Merge any overlapping read requests (e.g. if the user requests `[..1000, ..100]` then only actually read `..1000`, but return - as the second chunk - a memory copy of the last 100 bytes). Maybe don't implement this in the MVP. But check that the design can support this. + - Merge nearby reads into smaller reads, depending on `IoConfig`. + - Split large reads into multiple smaller reader, depending on `IoConfig.max_megabytes_of_single_read`. + - Perhaps we need a new type for the _optimized_ byte ranges? We need to express: + - "_this single optimized read started life as multiple, nearby reads. After performing this single read, the memory buffer will need to be split into sub-chunks, and those sub-chunks processed in parallel. And we may want to throw away some data. The IO backend should be encouraged to use `readv` if available, to directly read into multiple buffers. (POSIX can use `readv` to read sockets as well as files.)_" + - "_this single optimized read started life as n multiple, overlapping reads. The user is expecting n slices (views) of this memory buffer_" + - "_these multiple optimized reads started life as a single read request. Create one large memory buffer. And each sub-chunk should be read directly into a different slice of the memory buffer._" + - Maybe the answer is that the optimization step should be responsible for allocating the memory buffers, and it just submits a sequence of abstracted `readv` operations to the IO backend? If the backend can't natively perform `readv` then it's trivial for the backend to split one `readv` call into multiple `read`s. But! We should only allocate memory buffers when we're actually ready to read! Say we want to read 1,000,000 chunks. Using io_uring, we won't actually submit all 1,000,000 read requests at once: instead we'll keep the submission ring topped up with, say, 64 tasks. If the user wants all 1,000,000 chunks returned then we have no option but to allocate all 1,000,000 chunks. But if, instead, the user wants each chunk to be processed and then moved into a common final array, then we only have to allocate 64 buffers per thread. + - Pass this optimized sequence to the IO backend (e.g. `IoUringLocal`). +- For `IoUringLocal`, the main thread spins up _n_ io_uring rings, and _n_ worker threads (where _n_ defaults to the number of logical CPU cores, or the number of requested read ops, which ever is smaller - there's no point spinning up 32 threads if we only have 2 read operations!). Each worker thread gets its own completion ring. The main thread is responsible for submitting operations to all _n_ submission rings. The worker threads all write to a single, shared channel, to say when they've completed a task, which tells the main thread to submit another task to that thread's submission queue. This design should be faster than the main thread creating single queue of tasks, which each worker thread reads from. Queues block. Blocking is bad! + - The main thread: + - Starts by splitting all the operations into _n_ lists. For example, if we start with 1,000,000 read operations, and have 10 CPU cores, then we end up with 100,000 read ops per CPU core. + - But we don't want to simply submit all 100,000 ops to each submission queue, in one go. That doesn't give us the opportunity to balance work across the worker threads. (Some read ops might take longer than others.) And, we can't use that many filehandles per process! + - Allocate filehandles for each read op in flight. (So io_uring can chain open(fh), read(fh), close(fh)). + - Submit the first, say, 64 read ops to each thread's submission queue. (Each "read op" would actually be a chain of open, read, close). + - Block on `channel.recv()`. When a message arrives, submit another task to that thread's submission ring. + - Each worker thread: + - Blocks waiting for data from its io_uring completion queue. + - When data arrives, it checks for errors, and performs the requested processing. + - The worker thread ends its ID to the channel, to signal that it has completed a task. + - BUT! In cases where the user has not requested any processing, then the worker threads are redundant??? Maybe we simply don't spin up any worker threads, in that case? Although, actually, we still need to check each completion queue entry for errors, I think? Maybe threads would be useful for that??? + - AND! What about when we're reading uncompressed Zarr chunks directly into the final merged array? How does the user specify that?! \ No newline at end of file diff --git a/src/draft_API_design.rs b/src/draft_API_design.rs deleted file mode 100644 index e00d9b5..0000000 --- a/src/draft_API_design.rs +++ /dev/null @@ -1,142 +0,0 @@ -/// This is just me sketching out pseudo-code for the design of the API, -/// and sketching out some of the important internals. -/// -/// Use-cases that this design needs to be tested against: -/// - [x] Load ends of files (e.g. Zarr shard_index) -/// - [x] Cache the lengths of files. -/// - [x] Load huge numbers of files (e.g. non-sharded Zarrs) -/// - [x] Load huge numbers of chunks from a small number of files. -/// - [x] "Scatter" data to multiple arrays -/// (e.g. loading uncompressed Zarr / EUMETSAT / GRIB files into final numpy array using DMA!) -/// - [x] Per chunk: Decompress, process, and copy to final array. -/// - [x] Allow LSIO to merge nearby chunks. -/// -/// This file will soon be merged into, and replaced by: -/// https://github.com/JackKelly/light-speed-io/blob/draft-API-design/design.md - -fn main() -> () { - // Set config options (latency, bandwidth, maybe others) - let config = SSD_PCIE_GEN4; - // Or do this :) - let config = IoConfig::auto_calibrate(); - - // Init: - let reader = IoUringLocal::new(&config); - - // Define which chunks to load: - let chunks = vec![ - FileChunks{ - path: "/foo/bar", - byte_range: ByteRange::EntireFile, // Read all of file - }, - FileChunks{ - path: "/foo/baz", - byte_range: ByteRange::MultiRange( - vec![ - ..1000, // Read the first 1,000 bytes - -500.., // Read the last 500 bytes - -500..-100, // Read 400 bytes, until the 100th byte before the end - ], - ), - // I considered also including a `destinations` field, holding Vec of mutable references to - // the target memory buffers. But - at this point in the code - we - // don't know the file sizes, so we can't allocate buffers yet for EntireFiles. - }, - ]; - - // Start async loading of data from disk: - let future = reader.read_chunks(&chunks); - let data: Vec> = future.wait(); - - // Or, read chunks and apply a function: - let mut final_array = Array(); - let chunk_idx_to_array_loc = Vec::new(); - let processing_fn = |chunk_idx: u64, chunk: &[u8]| -> &[u8] { - // ******** DECOMPRESS ************ - // If we don't know the size of the uncompressed chunk, then - // deliberately over-allocate, and shrink later... - const OVER_ALLOCATION_RATIO: usize = 4; - let mut decompressed_chunk = Vec::with_capacity(OVER_ALLOCATION_RATIO * chunk.size()); - decompress(&chunk, &mut decompressed_chunk); - decompressed_chunk.shrink_to_fit(); - - // ******** PROCESS *********** - decompressed_chunk = decompressed_chunk / 2; // to give a very simple example! - - // ******** COPY TO FINAL ARRAY ************** - final_array[chunk_idx_to_array_loc[chunk_idx]] = decompressed_chunk; - }; - let future = read.read_chunks_and_apply(&chunks, processing_fn); - future.wait(); - pass_to_python(&final_array); -} - -pub struct IoConfig { - pub latency_millisecs: f64, - pub bandwidth_gbytes_per_sec: f64, -} - -impl IoConfig { - pub fn auto_calibrate() -> Self {} - // Use Serde to save / load IoConfig to disk. -} - -pub const SSD_PCIE_GEN4: IoConfig = IoConfig{ - latency_millisecs: 0.001, - bandwidth_gbytes_per_sec: 8, -}; - -trait Reader { - fn new(config: &IoConfig) -> Self { Self {config} } - - fn read_chunks(&self, chunks: &Vec) -> Future>> { - // (Implement all the general-purpose functionality in the Reader trait, - // and implement the io_uring-specific stuff in IoUringLocal.) - // Split `chunks` and send to threads (if there are enough chunks to be worth the hassle) - // If there are >1,000 files to open then we'll have to process them in batches, - // so as not to exceed the max filehandles per process.`` - // Perhaps split using Rayon?!? Although I'm not sure how Rayon would work with io_uring?! - // Within each thread: - // Loop through `chunks` to find the length of the buffers we need to pre-allocate. - // (We might be loading anything from a few bytes to a few GB, so we can't just pre-allocate large - // arrays and hope for the best!) - // For chunks for which we already know the chunksize (maybe we have the filesize in cache, or the chunk - // explicitly tells us its length), immediately allocate the buffer and submit a read SQE to io_uring. - // Some FileChunks will require us to get the length of the file before we can calculate the length of the buffer. - // let nbytes = self.get_size(filename); - // Only get the file sizes when necessary. - // Allocate buffers. - // Submit a read submission queue entry to io_uring. (Each thread will have its own io_uring) - // Thought: Maybe, in a later version of LSIO, we should have *two* threads per io_uring: - // One thread submits requests to io_uring, and then goes away when it has finished. - // The other thread processes requests. This way, we can could start decompressing / copying - // chunks before we've even got all the filesizes back. - // Once we've submitted all read requests, then process the completion queue entries. - // In a later version of LSIO, some of those CQEs will contain the filesize, and we'll have to submit a read request. - } - - fn get_file_size_in_bytes(&self, path: &PathBuf) -> u64 { - // first check our local cache of file sizes. If that's empty, then - // for the MVP, just use Rust's standard way to get the file length. Later, we may use io_uring by - // chaining {open, statx, close}. - // Cache nbytes before returning. - } -} -struct IoUringLocal { - config: &IoConfig, - cached_file_sizes_in_bytes: map, -} - -impl Reader for IoUringLocal { - // Implement io_uring-specific stuff... -} - -struct FileChunks { - path: &Path, - byte_range: ByteRange, -} - -enum ByteRange { - EntireFile, - MultiRange(Vec), -} \ No newline at end of file From 76c211e6ccb59299d6f36a87244a29ee9a83e6fa Mon Sep 17 00:00:00 2001 From: Jack Kelly Date: Tue, 28 Nov 2023 17:08:03 +0000 Subject: [PATCH 31/42] Allow the user to supply buffers, optionally --- design.md | 37 +++++++++++++++++++++++++++++++------ 1 file changed, 31 insertions(+), 6 deletions(-) diff --git a/design.md b/design.md index 96df3ce..9322a4d 100644 --- a/design.md +++ b/design.md @@ -141,10 +141,15 @@ impl Reader for IoUringLocal { In this example, we read the entirety of `/foo/bar`, and we also read three chunks from `/foo/baz`: ```rust +let mut buf0 = vec![0; 1000]; +let mut buf1 = vec![0; 300]; +let mut buf2 = vec![0; 100]; + let chunks = vec![ FileChunks{ path: "/foo/bar", byte_range: ByteRange::EntireFile, // Read all of file + buffer: None, // LSIO takes responsibility for allocating a memory buffer }, FileChunks{ path: "/foo/baz", @@ -156,9 +161,25 @@ let chunks = vec![ // the shard index at the end of each file. ], ), - // I considered also including a `destinations` field, holding Vec of mutable references to - // the target memory buffers. But - at this point in the code - we - // don't know the file sizes, so we can't allocate buffers yet for `EntireFiles`. + + // If the user wants LSIO to allocate the buffers: + buffer: None, + + // Else, if the user wants to supply buffers, then use Some(Vec<&mut [u8]>) + // For example, this would allow us to bypass the CPU when copying multiple + // uncompressed chunks from a sharded zarr directly into the final array. + // The buffers could point to different slices of the final array. + // This mechanism could even be used when creating the final array is more + // complicated than simply appending chunks: you could, for example, read each + // row of each chunk into a different `&mut [u8]`. Under the hood, LSIO would + // notice the consecutive reads, and would use `readv` where available. + buffer: Some( + vec![ + &mut buf0, + &mut buf1, + &mut buf2, + ] + ) }, ]; ``` @@ -169,6 +190,10 @@ let chunks = vec![ pub struct FileChunks { pub path: Path, pub byte_range: ByteRange, + + // If buffer is None, then LSIO will take responsibility for allocating + // the memory buffers. This should be the preferred approach. + pub buffer: Option>, } pub enum ByteRange { @@ -238,7 +263,8 @@ Within LSIO, the pipeline for the IO ops is something like this: - For any `MultiRange`s, LSIO optimizes the sequence of ranges. This is dependent on `IoConfig`, but shouldn't be dependent on the IO backend. Maybe this could be implemented as a set of methods on `ByteRange`? - Merge any overlapping read requests (e.g. if the user requests `[..1000, ..100]` then only actually read `..1000`, but return - as the second chunk - a memory copy of the last 100 bytes). Maybe don't implement this in the MVP. But check that the design can support this. - Merge nearby reads into smaller reads, depending on `IoConfig`. - - Split large reads into multiple smaller reader, depending on `IoConfig.max_megabytes_of_single_read`. + - Split large reads into multiple smaller reads, depending on `IoConfig.max_megabytes_of_single_read`. + - Detect contiguous chunks, and use `readv`. (Although we should benchmark `readv` vs `read`). - Perhaps we need a new type for the _optimized_ byte ranges? We need to express: - "_this single optimized read started life as multiple, nearby reads. After performing this single read, the memory buffer will need to be split into sub-chunks, and those sub-chunks processed in parallel. And we may want to throw away some data. The IO backend should be encouraged to use `readv` if available, to directly read into multiple buffers. (POSIX can use `readv` to read sockets as well as files.)_" - "_this single optimized read started life as n multiple, overlapping reads. The user is expecting n slices (views) of this memory buffer_" @@ -256,5 +282,4 @@ Within LSIO, the pipeline for the IO ops is something like this: - Blocks waiting for data from its io_uring completion queue. - When data arrives, it checks for errors, and performs the requested processing. - The worker thread ends its ID to the channel, to signal that it has completed a task. - - BUT! In cases where the user has not requested any processing, then the worker threads are redundant??? Maybe we simply don't spin up any worker threads, in that case? Although, actually, we still need to check each completion queue entry for errors, I think? Maybe threads would be useful for that??? - - AND! What about when we're reading uncompressed Zarr chunks directly into the final merged array? How does the user specify that?! \ No newline at end of file + - BUT! In cases where the user has not requested any processing, then the worker threads are redundant??? Maybe we simply don't spin up any worker threads, in that case? Although, actually, we still need to check each completion queue entry for errors, I think? Maybe threads would be useful for that??? And, for the MVP, maybe we should always spin up threads, so we don't have to worry about a separate code path for the "no processing" case? From 7dc924f13bb63cf726cb399af13d6b70947e6125 Mon Sep 17 00:00:00 2001 From: Jack Kelly Date: Tue, 28 Nov 2023 19:05:03 +0000 Subject: [PATCH 32/42] Slight tweaks --- design.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/design.md b/design.md index 9322a4d..7b1b9bd 100644 --- a/design.md +++ b/design.md @@ -165,9 +165,10 @@ let chunks = vec![ // If the user wants LSIO to allocate the buffers: buffer: None, - // Else, if the user wants to supply buffers, then use Some(Vec<&mut [u8]>) + // Else, if the user wants to supply buffers, then use `Some(Vec<&mut [u8]>)` + // with one buffer per element in the `byte_range` vector. // For example, this would allow us to bypass the CPU when copying multiple - // uncompressed chunks from a sharded zarr directly into the final array. + // uncompressed chunks from a sharded Zarr directly into the final array. // The buffers could point to different slices of the final array. // This mechanism could even be used when creating the final array is more // complicated than simply appending chunks: you could, for example, read each From cf30270a7b42ef9865d830b41c2187a35e5c72c7 Mon Sep 17 00:00:00 2001 From: Jack Kelly Date: Tue, 28 Nov 2023 19:13:36 +0000 Subject: [PATCH 33/42] minor tweaks --- design.md | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/design.md b/design.md index 7b1b9bd..2099b00 100644 --- a/design.md +++ b/design.md @@ -146,11 +146,15 @@ let mut buf1 = vec![0; 300]; let mut buf2 = vec![0; 100]; let chunks = vec![ + + // Read the entirity of /foo/bar FileChunks{ path: "/foo/bar", byte_range: ByteRange::EntireFile, // Read all of file buffer: None, // LSIO takes responsibility for allocating a memory buffer }, + + // Read 3 chunks from /foo/baz FileChunks{ path: "/foo/baz", byte_range: ByteRange::MultiRange( @@ -162,10 +166,7 @@ let chunks = vec![ ], ), - // If the user wants LSIO to allocate the buffers: - buffer: None, - - // Else, if the user wants to supply buffers, then use `Some(Vec<&mut [u8]>)` + // If the user wants to supply buffers, then use `Some(Vec<&mut [u8]>)` // with one buffer per element in the `byte_range` vector. // For example, this would allow us to bypass the CPU when copying multiple // uncompressed chunks from a sharded Zarr directly into the final array. @@ -211,9 +212,10 @@ pub enum ByteRange { // Start async loading of data from disk: let future = reader.read_chunks(&chunks); -// Wait for data to all be ready: -// We need one `Result` per chunk, because reading each chunk could fail: -let data: Vec> = future.wait(); +// Wait for data to all be ready. +// We need one `Result` per chunk, because reading each chunk could fail. +// Note that we take ownership of the returned vectors of bytes. +let data: Vec>> = future.wait(); ``` Or, if we want to apply a function to each chunk, we could do something like this. This example From dd78711bfb1a31ed9928b85fecd818ded82d924b Mon Sep 17 00:00:00 2001 From: Jack Kelly Date: Wed, 29 Nov 2023 09:52:27 +0000 Subject: [PATCH 34/42] Adding a few more thoughts to the Internal design of LSIO --- design.md | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/design.md b/design.md index 2099b00..5eca0b0 100644 --- a/design.md +++ b/design.md @@ -256,24 +256,28 @@ pass_to_python(&final_array); TODO. Things to consider: -Within LSIO, the pipeline for the IO ops is something like this: +Within LSIO, the pipeline for the IO ops will be something like this: - User submits a Vector of `FileChunks`. - In the main thread: - We need to get the file size for: - any `EntireFiles`. If these exist, then we need to get the file size ahead-of-time, so we can pre-allocate a memory buffer. - any `MultiRange`s which include offsets from the end of the file, iff the backend doesn't natively support offsets from the end of the file (or maybe this should be the backend's problem? Although I'd guess it'd be faster to get all file sizes in one go, ahead of time?) + - in the MVP, let's get the file sizes in the main thread, using the easiest (blocking) method. In later versions, we can get the file sizes async. (Getting filesizes async might be useful when, for example, we need to read huge numbers of un-sharded Zarr chunks). - For any `MultiRange`s, LSIO optimizes the sequence of ranges. This is dependent on `IoConfig`, but shouldn't be dependent on the IO backend. Maybe this could be implemented as a set of methods on `ByteRange`? - - Merge any overlapping read requests (e.g. if the user requests `[..1000, ..100]` then only actually read `..1000`, but return - as the second chunk - a memory copy of the last 100 bytes). Maybe don't implement this in the MVP. But check that the design can support this. + - Merge any overlapping read requests (e.g. if the user requests `[..1000, ..100]` then only actually read `..1000`, but return - as the second chunk - an immutable slice of the last 100 bytes). Maybe don't implement this in the MVP. But check that the design can support this. + - (We should probably enforce that the data read from disk is always immutable. That will make the design easier and faster: If the data is always immutable, then we can use slices instead of copies when apportioning merged reads). - Merge nearby reads into smaller reads, depending on `IoConfig`. - Split large reads into multiple smaller reads, depending on `IoConfig.max_megabytes_of_single_read`. - - Detect contiguous chunks, and use `readv`. (Although we should benchmark `readv` vs `read`). - - Perhaps we need a new type for the _optimized_ byte ranges? We need to express: - - "_this single optimized read started life as multiple, nearby reads. After performing this single read, the memory buffer will need to be split into sub-chunks, and those sub-chunks processed in parallel. And we may want to throw away some data. The IO backend should be encouraged to use `readv` if available, to directly read into multiple buffers. (POSIX can use `readv` to read sockets as well as files.)_" - - "_this single optimized read started life as n multiple, overlapping reads. The user is expecting n slices (views) of this memory buffer_" - - "_these multiple optimized reads started life as a single read request. Create one large memory buffer. And each sub-chunk should be read directly into a different slice of the memory buffer._" - - Maybe the answer is that the optimization step should be responsible for allocating the memory buffers, and it just submits a sequence of abstracted `readv` operations to the IO backend? If the backend can't natively perform `readv` then it's trivial for the backend to split one `readv` call into multiple `read`s. But! We should only allocate memory buffers when we're actually ready to read! Say we want to read 1,000,000 chunks. Using io_uring, we won't actually submit all 1,000,000 read requests at once: instead we'll keep the submission ring topped up with, say, 64 tasks. If the user wants all 1,000,000 chunks returned then we have no option but to allocate all 1,000,000 chunks. But if, instead, the user wants each chunk to be processed and then moved into a common final array, then we only have to allocate 64 buffers per thread. - - Pass this optimized sequence to the IO backend (e.g. `IoUringLocal`). + - Detect contiguous chunks destined for different buffers, and use `readv` to read these. (Although we should benchmark `readv` vs `read`). + - Merging and splitting read operations means that there's no longer a one-to-one mapping between chunks that the _user_ requested, and chunks that LSIO will request from the storage subsystem. This raises some important design questions: + - How do we ensure that each of the user's chunks are processes in their own threads. (The transform function supplied by the user probably expects the chunks that the user requested) + - Perhaps we need a new type for the _optimized_ byte ranges? We need to express: + - "_this single optimized read started life as multiple, nearby reads. After performing this single read, the memory buffer will need to be sliced, and those slices processed in parallel. And we may want to throw away some data. The IO backend should be encouraged to use `readv` if available, to directly read into multiple buffers. (POSIX can use `readv` to read sockets as well as files.)_" + - "_this single optimized read started life as n multiple, overlapping reads. The user is expecting n slices (views) of this memory buffer_" + - "_these multiple optimized reads started life as a single read request. Create one large memory buffer. And each sub-chunk should be read directly into a different slice of the memory buffer._" + - Maybe the answer is that the optimization step should be responsible for allocating the memory buffers, and it just submits a sequence of abstracted `readv` operations to the IO backend? If the backend can't natively perform `readv` then it's trivial for the backend to split one `readv` call into multiple `read`s. But! We should only allocate memory buffers when we're actually ready to read! Say we want to read 1,000,000 chunks. Using io_uring, we won't actually submit all 1,000,000 read requests at once: instead we'll keep the submission ring topped up with, say, 64 tasks. If the user wants all 1,000,000 chunks returned then we have no option but to allocate all 1,000,000 chunks. But if, instead, the user wants each chunk to be processed and then moved into a common final array, then we only have to allocate 64 buffers per thread. + - Pass this optimized sequence to the IO backend (e.g. `IoUringLocal`). - For `IoUringLocal`, the main thread spins up _n_ io_uring rings, and _n_ worker threads (where _n_ defaults to the number of logical CPU cores, or the number of requested read ops, which ever is smaller - there's no point spinning up 32 threads if we only have 2 read operations!). Each worker thread gets its own completion ring. The main thread is responsible for submitting operations to all _n_ submission rings. The worker threads all write to a single, shared channel, to say when they've completed a task, which tells the main thread to submit another task to that thread's submission queue. This design should be faster than the main thread creating single queue of tasks, which each worker thread reads from. Queues block. Blocking is bad! - The main thread: - Starts by splitting all the operations into _n_ lists. For example, if we start with 1,000,000 read operations, and have 10 CPU cores, then we end up with 100,000 read ops per CPU core. From 83289872a642cc12dd8fcbe1bf6900c22b7f996f Mon Sep 17 00:00:00 2001 From: Jack Kelly Date: Wed, 29 Nov 2023 12:34:11 +0000 Subject: [PATCH 35/42] more tweaks --- design.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/design.md b/design.md index 5eca0b0..1ee493c 100644 --- a/design.md +++ b/design.md @@ -272,6 +272,12 @@ Within LSIO, the pipeline for the IO ops will be something like this: - Detect contiguous chunks destined for different buffers, and use `readv` to read these. (Although we should benchmark `readv` vs `read`). - Merging and splitting read operations means that there's no longer a one-to-one mapping between chunks that the _user_ requested, and chunks that LSIO will request from the storage subsystem. This raises some important design questions: - How do we ensure that each of the user's chunks are processes in their own threads. (The transform function supplied by the user probably expects the chunks that the user requested) + - Some potential answers: + - Use tokio! This might be a classic use-case requiring tokio. But! We'll still have tasks which block for much longer than the 100 microseconds recommended by Tokio. So maybe I should use Rayon's join API? + - Use a manually-coded thread pool. If a thread gets a read from its io_uring completion queue that requires splitting, then just loop within that thread to do each task sequentially. But that could result in some CPU cores being busy, and others not. + - Can io_uring communicate tasks to other threads? Or maybe worker threads can use the common channel to tell the main thread to put a new (non-IO) task into the queue that will be shared amongst worker threads. + - Or, using manually-coded thread pools, threads could also share a _second_ queue, for non-IO tasks. And, if there's no data ready on a thread's io_uring, then it checks that queue. + - When we want to merge multiple reads into a single memory location, then that's a bit harder, and requires us to join on all those tasks. - Perhaps we need a new type for the _optimized_ byte ranges? We need to express: - "_this single optimized read started life as multiple, nearby reads. After performing this single read, the memory buffer will need to be sliced, and those slices processed in parallel. And we may want to throw away some data. The IO backend should be encouraged to use `readv` if available, to directly read into multiple buffers. (POSIX can use `readv` to read sockets as well as files.)_" - "_this single optimized read started life as n multiple, overlapping reads. The user is expecting n slices (views) of this memory buffer_" From 143a2be7440e5f36f97e84bd4fe1bf7be98ba8f1 Mon Sep 17 00:00:00 2001 From: Jack Kelly Date: Wed, 29 Nov 2023 14:53:27 +0000 Subject: [PATCH 36/42] More notes on using Rayon --- design.md | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/design.md b/design.md index 1ee493c..2d6bd38 100644 --- a/design.md +++ b/design.md @@ -261,7 +261,7 @@ Within LSIO, the pipeline for the IO ops will be something like this: - User submits a Vector of `FileChunks`. - In the main thread: - We need to get the file size for: - - any `EntireFiles`. If these exist, then we need to get the file size ahead-of-time, so we can pre-allocate a memory buffer. + - any `EntireFiles` (where `buffer` is `None`). If `EntireFiles` exist, then we need to get the file size ahead-of-time, so we can pre-allocate a memory buffer. - any `MultiRange`s which include offsets from the end of the file, iff the backend doesn't natively support offsets from the end of the file (or maybe this should be the backend's problem? Although I'd guess it'd be faster to get all file sizes in one go, ahead of time?) - in the MVP, let's get the file sizes in the main thread, using the easiest (blocking) method. In later versions, we can get the file sizes async. (Getting filesizes async might be useful when, for example, we need to read huge numbers of un-sharded Zarr chunks). - For any `MultiRange`s, LSIO optimizes the sequence of ranges. This is dependent on `IoConfig`, but shouldn't be dependent on the IO backend. Maybe this could be implemented as a set of methods on `ByteRange`? @@ -273,7 +273,10 @@ Within LSIO, the pipeline for the IO ops will be something like this: - Merging and splitting read operations means that there's no longer a one-to-one mapping between chunks that the _user_ requested, and chunks that LSIO will request from the storage subsystem. This raises some important design questions: - How do we ensure that each of the user's chunks are processes in their own threads. (The transform function supplied by the user probably expects the chunks that the user requested) - Some potential answers: - - Use tokio! This might be a classic use-case requiring tokio. But! We'll still have tasks which block for much longer than the 100 microseconds recommended by Tokio. So maybe I should use Rayon's join API? + - Use tokio! This might be a classic use-case requiring tokio. But! We'll still have tasks which block for much longer than the 100 microseconds recommended by Tokio. So... + - Use Rayon! <--- **CURRENTLY MY PREFERRED IDEA!!** + - Hmm... Can we just use `ring.completion().par_iter()`??? Which I _think_ wouldn't use a blocking thread synchronization primitive (instead it would use work steeling). I could test this pretty easily (10 lines of Rust?!) + - [`ndarray` uses Rayon](https://docs.rs/ndarray/latest/ndarray/parallel/index.html). - Use a manually-coded thread pool. If a thread gets a read from its io_uring completion queue that requires splitting, then just loop within that thread to do each task sequentially. But that could result in some CPU cores being busy, and others not. - Can io_uring communicate tasks to other threads? Or maybe worker threads can use the common channel to tell the main thread to put a new (non-IO) task into the queue that will be shared amongst worker threads. - Or, using manually-coded thread pools, threads could also share a _second_ queue, for non-IO tasks. And, if there's no data ready on a thread's io_uring, then it checks that queue. From abb0283388381f47b564d371111031b22c463b0d Mon Sep 17 00:00:00 2001 From: Jack Kelly Date: Wed, 29 Nov 2023 17:43:01 +0000 Subject: [PATCH 37/42] some more rambling about rayon --- design.md | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/design.md b/design.md index 2d6bd38..e1067b4 100644 --- a/design.md +++ b/design.md @@ -275,8 +275,13 @@ Within LSIO, the pipeline for the IO ops will be something like this: - Some potential answers: - Use tokio! This might be a classic use-case requiring tokio. But! We'll still have tasks which block for much longer than the 100 microseconds recommended by Tokio. So... - Use Rayon! <--- **CURRENTLY MY PREFERRED IDEA!!** - - Hmm... Can we just use `ring.completion().par_iter()`??? Which I _think_ wouldn't use a blocking thread synchronization primitive (instead it would use work steeling). I could test this pretty easily (10 lines of Rust?!) + - Hmm... Can we just use `ring.completion().par_iter()`??? Which I _think_ wouldn't use a blocking thread synchronization primitive (instead it would use work steeling). I could test this pretty easily (10 lines of Rust?!). I think I'd still need a mechanism to keep the submission queue topped up. - [`ndarray` uses Rayon](https://docs.rs/ndarray/latest/ndarray/parallel/index.html). + - can we make life as simple as possible: We start by doing `file_chunks.par_iter().for_each_init(init, op)`. Each worker thread will have its own entire io_uring: each thread will have a submission queue and a completion queue (initialised by the `init` closure). The `op` closure will call `join(a, b)`. Closure `a` submits a read op to the submission queue (or blocks if the submission queue is full). Closure `b` takes a single item from the completion queue, and processes it; or blocks if there's no data yet. No, no, this won't work: this will only submit one request at once, because this thread will block! + - But how to persist io_urings in Rayon? + - It may be as simple as using [`for_each_init`](https://docs.rs/rayon/latest/rayon/iter/trait.ParallelIterator.html#method.for_each_init) to init an io_uring for each thread. But [this github comment](https://github.com/rayon-rs/rayon/issues/718) from 2020 suggests that the init function is called many more times than the number of threads. + - Failing that, use the [`thread_local` crate](https://docs.rs/thread_local/1.1.7/thread_local/), to create a separate io_uring local to each thread? + - Or, perhaps we can create a Rayon Threadpool and somehow init an io_uring per thread. But I'm not sure how! - Use a manually-coded thread pool. If a thread gets a read from its io_uring completion queue that requires splitting, then just loop within that thread to do each task sequentially. But that could result in some CPU cores being busy, and others not. - Can io_uring communicate tasks to other threads? Or maybe worker threads can use the common channel to tell the main thread to put a new (non-IO) task into the queue that will be shared amongst worker threads. - Or, using manually-coded thread pools, threads could also share a _second_ queue, for non-IO tasks. And, if there's no data ready on a thread's io_uring, then it checks that queue. From 426d40c24bf0ea9f1fcbe1fd70a453e026ed6a9f Mon Sep 17 00:00:00 2001 From: Jack Kelly Date: Thu, 30 Nov 2023 15:08:43 +0000 Subject: [PATCH 38/42] tiny tweak --- design.md | 33 ++++++++++++++++++++++++++++++--- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/design.md b/design.md index e1067b4..e753407 100644 --- a/design.md +++ b/design.md @@ -265,10 +265,10 @@ Within LSIO, the pipeline for the IO ops will be something like this: - any `MultiRange`s which include offsets from the end of the file, iff the backend doesn't natively support offsets from the end of the file (or maybe this should be the backend's problem? Although I'd guess it'd be faster to get all file sizes in one go, ahead of time?) - in the MVP, let's get the file sizes in the main thread, using the easiest (blocking) method. In later versions, we can get the file sizes async. (Getting filesizes async might be useful when, for example, we need to read huge numbers of un-sharded Zarr chunks). - For any `MultiRange`s, LSIO optimizes the sequence of ranges. This is dependent on `IoConfig`, but shouldn't be dependent on the IO backend. Maybe this could be implemented as a set of methods on `ByteRange`? - - Merge any overlapping read requests (e.g. if the user requests `[..1000, ..100]` then only actually read `..1000`, but return - as the second chunk - an immutable slice of the last 100 bytes). Maybe don't implement this in the MVP. But check that the design can support this. + - Merge any overlapping read requests (e.g. if the user requests `[..1000, ..100]` then only actually read `..1000`, but return - as the second chunk - an immutable slice of the last 100 bytes). Maybe don't implement this in the MVP. But check that the design can support this. Although don't worry too much - I'm not even sure if this issue would arise in the real world. - (We should probably enforce that the data read from disk is always immutable. That will make the design easier and faster: If the data is always immutable, then we can use slices instead of copies when apportioning merged reads). - - Merge nearby reads into smaller reads, depending on `IoConfig`. - - Split large reads into multiple smaller reads, depending on `IoConfig.max_megabytes_of_single_read`. + - Merge nearby reads, depending on `IoConfig`, possibly using `readv` to scatter the single read into the requested vectors (and can we scatter the unwanted data to /dev/null?!? Prob not?) + - Split large reads into multiple smaller reads, depending on `IoConfig.max_megabytes_of_single_read`. (Maybe don't worry about this for now, given that this isn't relevant for reading local SSDs using io_uring. This may still be possible in a single vectored read operation, which reads into slices of the same underlying array. Or, if that's not possible, maybe spin up a separate io_uring context just for the individual reads that make up the single requested read, so it's clear when all the reads have finished.) - Detect contiguous chunks destined for different buffers, and use `readv` to read these. (Although we should benchmark `readv` vs `read`). - Merging and splitting read operations means that there's no longer a one-to-one mapping between chunks that the _user_ requested, and chunks that LSIO will request from the storage subsystem. This raises some important design questions: - How do we ensure that each of the user's chunks are processes in their own threads. (The transform function supplied by the user probably expects the chunks that the user requested) @@ -304,3 +304,30 @@ Within LSIO, the pipeline for the IO ops will be something like this: - When data arrives, it checks for errors, and performs the requested processing. - The worker thread ends its ID to the channel, to signal that it has completed a task. - BUT! In cases where the user has not requested any processing, then the worker threads are redundant??? Maybe we simply don't spin up any worker threads, in that case? Although, actually, we still need to check each completion queue entry for errors, I think? Maybe threads would be useful for that??? And, for the MVP, maybe we should always spin up threads, so we don't have to worry about a separate code path for the "no processing" case? + + +Assuming we do have to keep track of how many entries... +```rust +use std::sync::mpsc::channel; + +let mut ring = IoUring::new(); +let (sender, receiver) = channel(); + +// Start a thread which is responsible for keeping the submission queue +// topped up with, say, 64 entries. It blocks, reading from a channel. +// Threads send a message to the channel when they're done. + +// Assuming the transform function also copies to the final memory location: +// If we want this to return a vector of arrays then use `map_with()`. +ring.completion().par_iter().for_each_with( + sender, + |s, cqe| { + let mut vec = Vector::with_capacity(); + decompress(&cqe, &mut vec); + s.send(1); + } +) +``` + +TODO: Test if we'll overflow the submission queue if we don't manually keep track +of how many entries have popped up on the completion queue. From 9fd7c6f3dbc6f7c5de465107de295620211b10af Mon Sep 17 00:00:00 2001 From: Jack Kelly Date: Thu, 30 Nov 2023 19:29:58 +0000 Subject: [PATCH 39/42] Don't need to keep track of number of ops in flight --- design.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/design.md b/design.md index e753407..803db04 100644 --- a/design.md +++ b/design.md @@ -275,9 +275,10 @@ Within LSIO, the pipeline for the IO ops will be something like this: - Some potential answers: - Use tokio! This might be a classic use-case requiring tokio. But! We'll still have tasks which block for much longer than the 100 microseconds recommended by Tokio. So... - Use Rayon! <--- **CURRENTLY MY PREFERRED IDEA!!** - - Hmm... Can we just use `ring.completion().par_iter()`??? Which I _think_ wouldn't use a blocking thread synchronization primitive (instead it would use work steeling). I could test this pretty easily (10 lines of Rust?!). I think I'd still need a mechanism to keep the submission queue topped up. + - Hmm... Can we just use `ring.completion().par_iter()`??? Which I _think_ wouldn't use a blocking thread synchronization primitive (instead it would use work steeling). I could test this pretty easily (10 lines of Rust?!). + - How to keep the submission queue topped up? Maybe a separate thread (not part of the worker thread pool, because we don't want to take CPU cores away from decompression). Set `IORING_SETUP_CQ_NODROP`. Then check for `-EBUSY` returned from `io_uring_submit()`, and wait before submitting, and warn the user that the CQ needs to be larger. When using `IORING_SETUP_SQPOLL`, also need to check `io_uring_has_overflow()` before submitting (and warn the user if overflow). See [my SO Q&A](https://stackoverflow.com/questions/77580828/how-to-guarantee-that-the-io-uring-completion-queue-never-overflows/77580829#77580829). - [`ndarray` uses Rayon](https://docs.rs/ndarray/latest/ndarray/parallel/index.html). - - can we make life as simple as possible: We start by doing `file_chunks.par_iter().for_each_init(init, op)`. Each worker thread will have its own entire io_uring: each thread will have a submission queue and a completion queue (initialised by the `init` closure). The `op` closure will call `join(a, b)`. Closure `a` submits a read op to the submission queue (or blocks if the submission queue is full). Closure `b` takes a single item from the completion queue, and processes it; or blocks if there's no data yet. No, no, this won't work: this will only submit one request at once, because this thread will block! + - **I don't think the following idea will work...** can we make life as simple as possible: We start by doing `file_chunks.par_iter().for_each_init(init, op)`. Each worker thread will have its own entire io_uring: each thread will have a submission queue and a completion queue (initialised by the `init` closure). The `op` closure will call `join(a, b)`. Closure `a` submits a read op to the submission queue (or blocks if the submission queue is full). Closure `b` takes a single item from the completion queue, and processes it; or blocks if there's no data yet. No, no, this won't work: this will only submit one request at once, because this thread will block! - But how to persist io_urings in Rayon? - It may be as simple as using [`for_each_init`](https://docs.rs/rayon/latest/rayon/iter/trait.ParallelIterator.html#method.for_each_init) to init an io_uring for each thread. But [this github comment](https://github.com/rayon-rs/rayon/issues/718) from 2020 suggests that the init function is called many more times than the number of threads. - Failing that, use the [`thread_local` crate](https://docs.rs/thread_local/1.1.7/thread_local/), to create a separate io_uring local to each thread? @@ -306,7 +307,7 @@ Within LSIO, the pipeline for the IO ops will be something like this: - BUT! In cases where the user has not requested any processing, then the worker threads are redundant??? Maybe we simply don't spin up any worker threads, in that case? Although, actually, we still need to check each completion queue entry for errors, I think? Maybe threads would be useful for that??? And, for the MVP, maybe we should always spin up threads, so we don't have to worry about a separate code path for the "no processing" case? -Assuming we do have to keep track of how many entries... +Assuming we do have to keep track of how many entries... UPDATE, DON'T NEED TO KEEP TRACK OF HOW MANY ENTRIES ARE IN FLIGHT ```rust use std::sync::mpsc::channel; @@ -329,5 +330,4 @@ ring.completion().par_iter().for_each_with( ) ``` -TODO: Test if we'll overflow the submission queue if we don't manually keep track -of how many entries have popped up on the completion queue. +TODO: Finish this design - perhaps without optimising FileChunks - and try using Rayon with `ring.completion().par_iter().for_each()`, and a separate thread for keeping the submission queue topped up. From 6c008fd536b22f837e2c6fc14d9c0a152c68bf2e Mon Sep 17 00:00:00 2001 From: Jack Kelly Date: Fri, 1 Dec 2023 13:14:03 +0000 Subject: [PATCH 40/42] Some big changes to the design! No longer async! Based on iterators. --- design.md | 134 +++++++++++++++++++++++------------------------------- 1 file changed, 56 insertions(+), 78 deletions(-) diff --git a/design.md b/design.md index 803db04..99bd72a 100644 --- a/design.md +++ b/design.md @@ -4,8 +4,9 @@ ## Planned features -- [ ] Provide a simple, async API for reading many chunks of files (and/or many files) with single API call. Users will be able to ask LSIO: "_Please get me these million file chunks, and apply this function to each chunk. Tell me when you're done._". +- [ ] Provide a simple API (using Rust's iterators) for reading many chunks of files (and/or many files) with single API call. Users will be able to ask LSIO: "_Please get me these million file chunks, and apply this function to each chunk, and then move the resulting data to these array locations._". - [ ] The API will be the same, no matter which operating system you're on, and no matter whether the data is on local disk, or a cloud storage bucket, or available over HTTP. (Inspired by [fsspec](https://filesystem-spec.readthedocs.io/en/latest/) :smiley:!) +- [ ] Expose a Rust API and a Python API. - [ ] Laser-focus on _speed_: - Achieve many [input/output operations per second](https://en.wikipedia.org/wiki/IOPS) (IOPS), high bandwidth, and low latency by exploiting "modern" operating system storage APIs, and designing for inherently parallel storage systems like NVMe SSDs and cloud storage buckets. - Before submitting any IO operations, tune the sequence of IO operations according to the performance characteristics of each storage system. For example, on a hard drive (with spinning platters), the performance of random reads is dominated by the time taken to move the read head. So LSIO will merge nearby reads, even if those reads aren't strictly consecutive: For example, if we want to read every third block of a file, it may be faster to read the entire file, even if we immediately throw away two thirds of the data. Or, when reading large files from a cloud storage bucket, it may be faster to split each file into consecutive chunks, and request those chunks in parallel. @@ -15,7 +16,7 @@ - When scheduling work across multiple CPU cores: Avoid locks, or any synchronization primitives that would block a CPU core, wherever possible. - Look for opportunities to completely cut the CPU out of the data path. For example, if we're loading uncompressed [Zarr](https://zarr.dev/) chunks that are destined to be merged into a final numpy array, then we may be able to use [direct memory access](https://en.wikipedia.org/wiki/Direct_memory_access) (DMA) to directly copy chunks into the final numpy array from IO, without the CPU ever touching the data. This may be possible even in cases where the creation of the final array is more complicated than simply concatenating the chunks in RAM. - Where appropriate, align chunks in RAM (and pad the ends of chunks) so the CPU & compiler can easily use SIMD instructions, and minimize the number of cache lines that must be read. (Using SIMD may provide a large speedup "just" for memory copies, even if the transform function doesn't use SIMD). -- [ ] The user-supplied function that's applied to each chunk could include, for example, decompression, followed by some numerical transformation, followed by copying the transformed data to a large array which is the concatenation of all the chunks. As much of this as possible should happen whilst the chunk is in the CPU cache (without time-consuming round-trips to RAM). +- [ ] For each chunk, the user could request, for example, that the chunk be decompressed, followed by some numerical transformation, followed by moving the transformed data to a large array which is the concatenation of all the chunks. As much of this as possible should happen whilst the chunk is in the CPU cache (without time-consuming round-trips to RAM). - [ ] LSIO will implement multiple IO backends. Each backend will exploit the performance features of a particular operating system and storage system. The ambition is to support: - These operating system APIs: - [ ] Linux [io_uring](https://en.wikipedia.org/wiki/Io_uring) (for local storage and network storage). @@ -25,8 +26,6 @@ - [ ] Local disks. (With different optimizations for SSDs and HDDs). - [ ] Cloud storage buckets. - [ ] HTTP. -- [ ] Async Rust API. -- [ ] Async Python API. ## Use cases @@ -53,8 +52,6 @@ Ha! :smiley:. This project is in the earliest planning stages! It'll be _months_ ## Design -TODO! (But, for now, see the file [`src/draft_API_design.rs` in this pull request](https://github.com/JackKelly/light-speed-io/blob/draft-API-design/src/draft_API_design.rs)) - ### Public Rust API #### Describe the performance characteristics of the storage subsystem @@ -138,7 +135,7 @@ impl Reader for IoUringLocal { ##### User code -In this example, we read the entirety of `/foo/bar`, and we also read three chunks from `/foo/baz`: +In this example, we read the entirety of `/foo/bar`. And we read three chunks from `/foo/baz`: ```rust let mut buf0 = vec![0; 1000]; @@ -147,42 +144,36 @@ let mut buf2 = vec![0; 100]; let chunks = vec![ - // Read the entirity of /foo/bar + // Read entirety of /foo/bar, and ask LSIO to allocate the memory buffer: FileChunks{ - path: "/foo/bar", - byte_range: ByteRange::EntireFile, // Read all of file - buffer: None, // LSIO takes responsibility for allocating a memory buffer + path: "/foo/bar", + chunks: vec![ + Chunk{ + byte_range: ..., + final_buffers: None, + }, + ], }, // Read 3 chunks from /foo/baz FileChunks{ path: "/foo/baz", - byte_range: ByteRange::MultiRange( - vec![ - ..1000, // Read the first 1,000 bytes - -500..-200, // Read 300 bytes, until the 200th byte from the end - -100.., // Read the last 100 bytes. For example, shared Zarrs store - // the shard index at the end of each file. - ], - ), - - // If the user wants to supply buffers, then use `Some(Vec<&mut [u8]>)` - // with one buffer per element in the `byte_range` vector. - // For example, this would allow us to bypass the CPU when copying multiple - // uncompressed chunks from a sharded Zarr directly into the final array. - // The buffers could point to different slices of the final array. - // This mechanism could even be used when creating the final array is more - // complicated than simply appending chunks: you could, for example, read each - // row of each chunk into a different `&mut [u8]`. Under the hood, LSIO would - // notice the consecutive reads, and would use `readv` where available. - buffer: Some( - vec![ - &mut buf0, - &mut buf1, - &mut buf2, - ] - ) + chunks: vec![ + Chunk{ + byte_range: ..1000, // Read the first 1,000 bytes + final_buffers: Some(vec![&mut buf0]) + }, + Chunk{ + byte_range: -500..-200, // Read 300 bytes, until the 200th byte from the end + final_buffers: Some(vec![&mut buf1]) + }, + Chunk{ + byte_range: -100.., // Read the last 100 bytes. For example, shared Zarrs store + final_buffers: Some(vec![&mut buf2]) // the shard index at the end of each file. + }, + ], }, + ]; ``` @@ -191,67 +182,54 @@ let chunks = vec![ ```rust pub struct FileChunks { pub path: Path, - pub byte_range: ByteRange, - - // If buffer is None, then LSIO will take responsibility for allocating - // the memory buffers. This should be the preferred approach. - pub buffer: Option>, + pub chunks: Vec, } -pub enum ByteRange { - EntireFile, - MultiRange(Vec), +pub struct Chunk{ + pub byte_range: Range, + + // If final_buffers is None, then LSIO will take responsibility for allocating + // the memory buffers. + // If the user wants to supply buffers, then use `Some(Vec<&mut [u8]>)`. + // For example, this would allow us to bypass the CPU when copying multiple + // uncompressed chunks from a sharded Zarr directly into the final array. + // The buffers could point to different slices of the final array. + // This mechanism could be used when creating the final array is more + // complicated than simply appending chunks: you could, for example, read each + // row of a chunk into a different `&mut [u8]`. Under the hood, LSIO would + // notice the consecutive reads, and would use `readv` where available. + pub final_buffers: Option>, } ``` -#### Async reading of chunks +#### Reading chunks ##### User code ```rust -// Start async loading of data from disk: -let future = reader.read_chunks(&chunks); - -// Wait for data to all be ready. +// Load chunks // We need one `Result` per chunk, because reading each chunk could fail. // Note that we take ownership of the returned vectors of bytes. -let data: Vec>> = future.wait(); +let results: Vec> = reader + .read_chunks(&chunks) // Returns a rayon::iter::ParallelIterator. + .collect_into_vec(); ``` Or, if we want to apply a function to each chunk, we could do something like this. This example -is based on the Zarr use-case. For each chunk, want to decompress, and apply a simple numerical +is based on the Zarr use-case. For each chunk, we want to decompress, and apply a simple numerical transformation, and then move the transformed data into a final array: ```rust -let mut final_array = Array(); -let chunk_idx_to_array_loc = Vec::new(); -// TODO: Fill out `chunk_idx_to_array_loc` - -// processing_fn could fail, so we return a Result. -// processing_fn may not return any data (because the data has been moved to another location) -// so we return an Option wrapped in a Result. -let processing_fn = |chunk_idx: u64, chunk: &[u8]| -> Result> { - // ******** DECOMPRESS ************ - // If we don't know the size of the uncompressed chunk, then - // deliberately over-allocate, and shrink later... - const OVER_ALLOCATION_RATIO: usize = 4; - let mut decompressed_chunk = Vec::with_capacity(OVER_ALLOCATION_RATIO * chunk.size()); - decompress(&chunk, &mut decompressed_chunk)?; - decompressed_chunk.shrink_to_fit(); - - // ******** PROCESS *********** - decompressed_chunk = decompressed_chunk / 2; // to give a very simple example! - - // ******** COPY TO FINAL ARRAY ************** - final_array[chunk_idx_to_array_loc[chunk_idx]] = decompressed_chunk; - Ok(None) // We're deliberately not passing back the decompressed array. -}; -let future = read.read_chunks_and_apply(&chunks, processing_fn); -let results = future.wait(); -// TODO: check `results` for any failures -pass_to_python(&final_array); +let results: Vec> = reader + .read_chunks(chunks) + .decompress_zstd() + .map(|chunk| chunk * 2) + .mem_move_to_final_buffers(); ``` +`mem_move_to_final_buffers()` moves the data to its final location, specified in `Chunk.final_buffers`. + + ### Internal design of LSIO TODO. Things to consider: From 4fb14ded63c45cf18612009290fc57cb1ddbe836 Mon Sep 17 00:00:00 2001 From: Jack Kelly Date: Fri, 1 Dec 2023 17:40:19 +0000 Subject: [PATCH 41/42] adding more thoughts about iterators --- design.md | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/design.md b/design.md index 99bd72a..11f9d8c 100644 --- a/design.md +++ b/design.md @@ -221,10 +221,11 @@ transformation, and then move the transformed data into a final array: ```rust let results: Vec> = reader - .read_chunks(chunks) + .read_chunks(&chunks) .decompress_zstd() .map(|chunk| chunk * 2) - .mem_move_to_final_buffers(); + .mem_move_to_final_buffers(&chunks); // Pass in chunks Vector, so `mem_move_to_final_buffers` can interpret + // the user_data, which could be two u32s: index into the FileChunk, and index into the Chunk. ``` `mem_move_to_final_buffers()` moves the data to its final location, specified in `Chunk.final_buffers`. @@ -234,6 +235,8 @@ let results: Vec> = reader TODO. Things to consider: + + Within LSIO, the pipeline for the IO ops will be something like this: - User submits a Vector of `FileChunks`. @@ -248,6 +251,9 @@ Within LSIO, the pipeline for the IO ops will be something like this: - Merge nearby reads, depending on `IoConfig`, possibly using `readv` to scatter the single read into the requested vectors (and can we scatter the unwanted data to /dev/null?!? Prob not?) - Split large reads into multiple smaller reads, depending on `IoConfig.max_megabytes_of_single_read`. (Maybe don't worry about this for now, given that this isn't relevant for reading local SSDs using io_uring. This may still be possible in a single vectored read operation, which reads into slices of the same underlying array. Or, if that's not possible, maybe spin up a separate io_uring context just for the individual reads that make up the single requested read, so it's clear when all the reads have finished.) - Detect contiguous chunks destined for different buffers, and use `readv` to read these. (Although we should benchmark `readv` vs `read`). + - We could make the optimisation modular, using iterators. Maybe we explicity make it a 2-step process. + - First, users create a set list of abstracted read operations: `let operations = chunks.merge_overlaps().merge_nearby_reads(threshold)`. (Which makes the code easy to read. But, under the hood, `merge_overlaps` will have to look through all the elements, and then re-emit them for the next iterator... which isn't exactly in the spirit of an iterator!) + - And then we submit those operations and process them: `let data = reader.submit(operations).decompress_zstd().collect()`. - Merging and splitting read operations means that there's no longer a one-to-one mapping between chunks that the _user_ requested, and chunks that LSIO will request from the storage subsystem. This raises some important design questions: - How do we ensure that each of the user's chunks are processes in their own threads. (The transform function supplied by the user probably expects the chunks that the user requested) - Some potential answers: From 4ad82cd4fd144460159bc130b4ca57b4d7650858 Mon Sep 17 00:00:00 2001 From: Jack Kelly Date: Fri, 1 Dec 2023 19:59:01 +0000 Subject: [PATCH 42/42] thoughts about lazy evaluation --- design.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/design.md b/design.md index 11f9d8c..4076988 100644 --- a/design.md +++ b/design.md @@ -252,8 +252,8 @@ Within LSIO, the pipeline for the IO ops will be something like this: - Split large reads into multiple smaller reads, depending on `IoConfig.max_megabytes_of_single_read`. (Maybe don't worry about this for now, given that this isn't relevant for reading local SSDs using io_uring. This may still be possible in a single vectored read operation, which reads into slices of the same underlying array. Or, if that's not possible, maybe spin up a separate io_uring context just for the individual reads that make up the single requested read, so it's clear when all the reads have finished.) - Detect contiguous chunks destined for different buffers, and use `readv` to read these. (Although we should benchmark `readv` vs `read`). - We could make the optimisation modular, using iterators. Maybe we explicity make it a 2-step process. - - First, users create a set list of abstracted read operations: `let operations = chunks.merge_overlaps().merge_nearby_reads(threshold)`. (Which makes the code easy to read. But, under the hood, `merge_overlaps` will have to look through all the elements, and then re-emit them for the next iterator... which isn't exactly in the spirit of an iterator!) - - And then we submit those operations and process them: `let data = reader.submit(operations).decompress_zstd().collect()`. + - First, users create a set list of abstracted read operations: `let operations = chunks.merge_overlaps().merge_nearby_reads(threshold)`. Each `Item` will be a single `FileChunks` struct. After this line, no processing will have started yet. You'd have to call `collect()` to collect, if you wanted to... but we probably want to submit the first few operations before we've finished computing the operations. So, usually, you'd leave `operations` as an uncollected iterator. + - And then we submit those operations and process them: `let data = reader.submit(operations).decompress_zstd().collect()`. (Maybe, as a separate crate, I could make a crate which wraps compression algorithms as iterator adaptors, for decompressing chunks like this. See the [streaming-compressor crate](https://github.com/jorgecarleitao/streaming-decompressor/tree/main), although it doesn't actually implement any compressors) - Merging and splitting read operations means that there's no longer a one-to-one mapping between chunks that the _user_ requested, and chunks that LSIO will request from the storage subsystem. This raises some important design questions: - How do we ensure that each of the user's chunks are processes in their own threads. (The transform function supplied by the user probably expects the chunks that the user requested) - Some potential answers: