Skip to content

Commit 0617169

Browse files
committed
Async ParquetExec
1 parent ad392fd commit 0617169

File tree

6 files changed

+243
-280
lines changed

6 files changed

+243
-280
lines changed

Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,9 @@ members = [
3131
[profile.release]
3232
lto = true
3333
codegen-units = 1
34+
35+
[patch.crates-io]
36+
# TODO: TEMPORARY
37+
arrow = { git = "https://github.com/tustvold/arrow-rs", rev = "7825ea86c425ad8f95664295a5ead576824bf832" }
38+
arrow-flight = { git = "https://github.com/tustvold/arrow-rs", rev = "7825ea86c425ad8f95664295a5ead576824bf832" }
39+
parquet = { git = "https://github.com/tustvold/arrow-rs", rev = "7825ea86c425ad8f95664295a5ead576824bf832" }

datafusion/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ avro = ["avro-rs", "num-traits"]
5353
ahash = { version = "0.7", default-features = false }
5454
hashbrown = { version = "0.11", features = ["raw"] }
5555
arrow = { version = "7.0.0", features = ["prettyprint"] }
56-
parquet = { version = "7.0.0", features = ["arrow"] }
56+
parquet = { version = "7.0.0", features = ["arrow", "async"] }
5757
sqlparser = "0.13"
5858
paste = "^1.0"
5959
num_cpus = "1.13.0"

datafusion/src/datasource/object_store/local.rs

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@ use std::io::{BufReader, Read, Seek, SeekFrom};
2222
use std::sync::Arc;
2323

2424
use async_trait::async_trait;
25-
use futures::{stream, AsyncRead, StreamExt};
25+
use futures::{stream, StreamExt};
2626

2727
use crate::datasource::object_store::{
28-
FileMeta, FileMetaStream, ListEntryStream, ObjectReader, ObjectStore,
28+
ChunkReader, FileMeta, FileMetaStream, ListEntryStream, ObjectReader, ObjectStore,
2929
};
3030
use crate::datasource::PartitionedFile;
3131
use crate::error::DataFusionError;
@@ -68,14 +68,10 @@ impl LocalFileReader {
6868

6969
#[async_trait]
7070
impl ObjectReader for LocalFileReader {
71-
async fn chunk_reader(
72-
&self,
73-
_start: u64,
74-
_length: usize,
75-
) -> Result<Box<dyn AsyncRead>> {
76-
todo!(
77-
"implement once async file readers are available (arrow-rs#78, arrow-rs#111)"
78-
)
71+
async fn chunk_reader(&self) -> Result<Box<dyn ChunkReader>> {
72+
let file = tokio::fs::File::open(&self.file.path).await?;
73+
let file = tokio::io::BufReader::new(file);
74+
Ok(Box::new(file))
7975
}
8076

8177
fn sync_chunk_reader(

datafusion/src/datasource/object_store/mod.rs

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,21 +27,42 @@ use std::sync::{Arc, RwLock};
2727

2828
use async_trait::async_trait;
2929
use chrono::{DateTime, Utc};
30-
use futures::{AsyncRead, Stream, StreamExt};
30+
use futures::{Stream, StreamExt};
31+
use tokio::io::{AsyncBufRead, AsyncSeek};
3132

3233
use local::LocalFileSystem;
3334

3435
use crate::error::{DataFusionError, Result};
3536

37+
/// Provides async access to read a file, combing [`AsyncSeek`]
38+
/// and [`AsyncBufRead`] so they can be used as a trait object
39+
///
40+
/// [`AsyncSeek`] is necessary because readers may need to seek around whilst
41+
/// reading, either because the format itself is structured (e.g. parquet)
42+
/// or because it needs to read metadata or infer schema as an initial step
43+
///
44+
/// [`AsyncBufRead`] is necessary because readers may wish to read data
45+
/// up until some delimiter (e.g. csv or newline-delimited JSON)
46+
///
47+
/// Note: the same block of data may be read multiple times
48+
///
49+
/// Implementations that fetch from object storage may wish to maintain an internal
50+
/// buffer of fetched data blocks, potentially discarding them or spilling them to disk
51+
/// based on memory pressure
52+
///
53+
/// TODO(#1614): Remove Sync
54+
pub trait ChunkReader: AsyncBufRead + AsyncSeek + Send + Sync + Unpin {}
55+
impl<T: AsyncBufRead + AsyncSeek + Send + Sync + Unpin> ChunkReader for T {}
56+
3657
/// Object Reader for one file in an object store.
3758
///
3859
/// Note that the dynamic dispatch on the reader might
3960
/// have some performance impacts.
4061
#[async_trait]
4162
pub trait ObjectReader: Send + Sync {
42-
/// Get reader for a part [start, start + length] in the file asynchronously
43-
async fn chunk_reader(&self, start: u64, length: usize)
44-
-> Result<Box<dyn AsyncRead>>;
63+
/// Get a [`ChunkReader`] for the file, successive calls to this MUST
64+
/// return readers with independent seek positions
65+
async fn chunk_reader(&self) -> Result<Box<dyn ChunkReader>>;
4566

4667
/// Get reader for a part [start, start + length] in the file
4768
fn sync_chunk_reader(

0 commit comments

Comments
 (0)