-
Notifications
You must be signed in to change notification settings - Fork 990
Description
Background
In #1154 I added an async
parquet API in the form of ParquetRecordBatchStream
. This was maximally async, that is it made use of tokio's async IO traits to be as generic as possible. However, having experimented with this I'm not sure that this design is quite right.
In particular apache/datafusion#1617 showed non-trivial performance regressions operating on local files. This is caused by three major factors:
- Additional copying of buffers in tokio and the ParquetRecordBatchStream necessary to convert an async Read to a sync Read
- Less parallelism due to parquet decode taking place in a separate blocking thread on master
- Overheads due to
tokio::fs::File
callingspawn_blocking
for every IO operation
This last point is pretty important and touches on something I was not aware of, tokio does not use an IO reactor for file IO like say boost::asio, instead it just calls tokio::task::spawn_blocking
for every IO call. This somewhat undermines the concept of async file IO, as all you're doing is moving where the tokio::task::spawn_blocking
is called, and in fact you're moving it lower in the call chain where its overheads are less amortized.
As part of further exploring this design space I created #1472 which instead of using the tokio IO traits, uses the non-async ChunkReader
trait and tokio::task::spawn_blocking
. Effectively this just upstreams logic from DataFusion's ParquetExec operator, and so perhaps unsurprisingly does not represent a performance regression.
This is still technically an async
API, however, I am aware that a number of people expressed interest in an async
version of ChunkReader
which suggests they want lower-level async-ness. It is also unclear that ChunkReader
is quite right either - see #1163 and apache/datafusion#1905.
To further complicate matters, differing storage media have different trade-offs, in particular when fetching from local disk or memory it may make sense to perform the most granular reads possible, potentially filtering out individual pages, columns, etc... However, when fetching data from object storage this is less clear cut. As each request costs and comes with non-trivial latency, there is likely a desire to coalesce proximate byte ranges into a single request, even if this results in reading more data then needed. As a result there is likely no general-purpose strategy for fetching data, and we therefore need the flexibility to allow this to be customized downstream.
Finally, there is ongoing effort to introduce more parallelism into the parquet scan - apache/datafusion#1990, and whilst async is a concurrency primitive and not a parallelism primitive, the two concepts are closely related in practice.
Requirements
I think the requirements are therefore as follows
- Provide an
async
API that yields a stream ofResult<RecordBatch>
- Use predicate and projection pushdown to filter the data to scan
- Separate identifying the byte ranges of column data to scan, from actually performing the scan
- Delegate fetching the corresponding byte ranges to an
async
trait, allowing downstream customisation of the fetch strategy - Avoid copying the page data between buffers
- Avoid calling spawn_blocking where the read implementation will not block (e.g. already in-memory)
- Be extensible to support parallel column decoding (#TBD)
- Be extensible to support more advanced predicate pushdown (Parquet Scan Filter #1191)
Proposal
An intentionally vague proposal would be to extend apache/datafusion#1617 replacing the use of ChunkReader
with a Storage
trait that might look something like
#[async_trait]
pub trait Storage {
async fn prefetch(&mut self, ranges: Vec<std::ops::Range<usize>>) -> Result<()>,
async fn read(&mut self, range: std::ops::Range<usize>) -> Result<ByteBufferPtr>
}
ParquetRecordBatchStreamBuilder
would use this trait to first read the footer, and then as part of build()
invoke prefetch()
with the determined byte ranges to scan. Finally ParquetRecordBatchStream
would drive Storage::read
with the individual column chunk ranges as needed by the stream.
This will likely require some minor alterations to SerializedPageReader
in order to avoid copying the data returned from Storage::read
but I think this is worthwhile and will also benefit reading data from in-memory.