Skip to content

Commit da73b55

Browse files
committed
Async parquet reader (#111)
Add Sync + Send bounds to parquet crate
1 parent aa71aea commit da73b55

File tree

16 files changed

+504
-50
lines changed

16 files changed

+504
-50
lines changed

parquet/Cargo.toml

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ description = "Apache Parquet implementation in Rust"
2323
homepage = "https://github.com/apache/arrow-rs"
2424
repository = "https://github.com/apache/arrow-rs"
2525
authors = ["Apache Arrow <[email protected]>"]
26-
keywords = [ "arrow", "parquet", "hadoop" ]
26+
keywords = ["arrow", "parquet", "hadoop"]
2727
readme = "README.md"
2828
build = "build.rs"
2929
edition = "2021"
@@ -45,6 +45,8 @@ base64 = { version = "0.13", optional = true }
4545
clap = { version = "2.33.3", optional = true }
4646
serde_json = { version = "1.0", features = ["preserve_order"], optional = true }
4747
rand = "0.8"
48+
futures = { version = "0.3", optional = true }
49+
tokio = { version = "1.0", optional = true, default-features = false, features = ["macros", "fs", "rt", "io-util"] }
4850

4951
[dev-dependencies]
5052
criterion = "0.3"
@@ -63,16 +65,18 @@ cli = ["serde_json", "base64", "clap"]
6365
test_common = []
6466
# Experimental, unstable functionality primarily used for testing
6567
experimental = []
68+
# Experimental, unstable, async API
69+
async = ["futures", "tokio"]
6670

67-
[[ bin ]]
71+
[[bin]]
6872
name = "parquet-read"
6973
required-features = ["cli"]
7074

71-
[[ bin ]]
75+
[[bin]]
7276
name = "parquet-schema"
7377
required-features = ["cli"]
7478

75-
[[ bin ]]
79+
[[bin]]
7680
name = "parquet-rowcount"
7781
required-features = ["cli"]
7882

parquet/src/arrow/array_reader.rs

Lines changed: 38 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ use arrow::datatypes::{
4242
Float32Type as ArrowFloat32Type, Float64Type as ArrowFloat64Type,
4343
Int16Type as ArrowInt16Type, Int32Type as ArrowInt32Type,
4444
Int64Type as ArrowInt64Type, Int8Type as ArrowInt8Type, IntervalUnit, Schema,
45-
Time32MillisecondType as ArrowTime32MillisecondType,
45+
SchemaRef, Time32MillisecondType as ArrowTime32MillisecondType,
4646
Time32SecondType as ArrowTime32SecondType,
4747
Time64MicrosecondType as ArrowTime64MicrosecondType,
4848
Time64NanosecondType as ArrowTime64NanosecondType, TimeUnit as ArrowTimeUnit,
@@ -91,7 +91,7 @@ pub use byte_array::make_byte_array_reader;
9191
pub use byte_array_dictionary::make_byte_array_dictionary_reader;
9292

9393
/// Array reader reads parquet data into arrow array.
94-
pub trait ArrayReader {
94+
pub trait ArrayReader: Send {
9595
fn as_any(&self) -> &dyn Any;
9696

9797
/// Returns the arrow type of this array reader.
@@ -117,6 +117,26 @@ pub trait ArrayReader {
117117
fn get_rep_levels(&self) -> Option<&[i16]>;
118118
}
119119

120+
/// A collection of row groups
121+
pub trait RowGroupCollection {
122+
/// Get schema of parquet file.
123+
fn schema(&self) -> Result<SchemaDescPtr>;
124+
125+
/// Returns an iterator over the column chunks for particular column
126+
fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>>;
127+
}
128+
129+
impl RowGroupCollection for Arc<dyn FileReader> {
130+
fn schema(&self) -> Result<SchemaDescPtr> {
131+
Ok(self.metadata().file_metadata().schema_descr_ptr())
132+
}
133+
134+
fn column_chunks(&self, column_index: usize) -> Result<Box<dyn PageIterator>> {
135+
let iterator = FilePageIterator::new(column_index, Arc::clone(self))?;
136+
Ok(Box::new(iterator))
137+
}
138+
}
139+
120140
/// Uses `record_reader` to read up to `batch_size` records from `pages`
121141
///
122142
/// Returns the number of records read, which can be less than batch_size if
@@ -478,7 +498,7 @@ where
478498
impl<T, C> ArrayReader for ComplexObjectArrayReader<T, C>
479499
where
480500
T: DataType,
481-
C: Converter<Vec<Option<T::T>>, ArrayRef> + 'static,
501+
C: Converter<Vec<Option<T::T>>, ArrayRef> + Send + 'static,
482502
{
483503
fn as_any(&self) -> &dyn Any {
484504
self
@@ -1311,9 +1331,9 @@ impl ArrayReader for StructArrayReader {
13111331
/// Create array reader from parquet schema, column indices, and parquet file reader.
13121332
pub fn build_array_reader<T>(
13131333
parquet_schema: SchemaDescPtr,
1314-
arrow_schema: Schema,
1334+
arrow_schema: SchemaRef,
13151335
column_indices: T,
1316-
file_reader: Arc<dyn FileReader>,
1336+
row_groups: Box<dyn RowGroupCollection>,
13171337
) -> Result<Box<dyn ArrayReader>>
13181338
where
13191339
T: IntoIterator<Item = usize>,
@@ -1351,13 +1371,8 @@ where
13511371
fields: filtered_root_fields,
13521372
};
13531373

1354-
ArrayReaderBuilder::new(
1355-
Arc::new(proj),
1356-
Arc::new(arrow_schema),
1357-
Arc::new(leaves),
1358-
file_reader,
1359-
)
1360-
.build_array_reader()
1374+
ArrayReaderBuilder::new(Arc::new(proj), arrow_schema, Arc::new(leaves), row_groups)
1375+
.build_array_reader()
13611376
}
13621377

13631378
/// Used to build array reader.
@@ -1367,7 +1382,7 @@ struct ArrayReaderBuilder {
13671382
// Key: columns that need to be included in final array builder
13681383
// Value: column index in schema
13691384
columns_included: Arc<HashMap<*const Type, usize>>,
1370-
file_reader: Arc<dyn FileReader>,
1385+
row_groups: Box<dyn RowGroupCollection>,
13711386
}
13721387

13731388
/// Used in type visitor.
@@ -1667,13 +1682,13 @@ impl<'a> ArrayReaderBuilder {
16671682
root_schema: TypePtr,
16681683
arrow_schema: Arc<Schema>,
16691684
columns_included: Arc<HashMap<*const Type, usize>>,
1670-
file_reader: Arc<dyn FileReader>,
1685+
file_reader: Box<dyn RowGroupCollection>,
16711686
) -> Self {
16721687
Self {
16731688
root_schema,
16741689
arrow_schema,
16751690
columns_included,
1676-
file_reader,
1691+
row_groups: file_reader,
16771692
}
16781693
}
16791694

@@ -1707,10 +1722,10 @@ impl<'a> ArrayReaderBuilder {
17071722
context.rep_level,
17081723
context.path.clone(),
17091724
));
1710-
let page_iterator = Box::new(FilePageIterator::new(
1711-
self.columns_included[&(cur_type.as_ref() as *const Type)],
1712-
self.file_reader.clone(),
1713-
)?);
1725+
1726+
let page_iterator = self
1727+
.row_groups
1728+
.column_chunks(self.columns_included[&(cur_type.as_ref() as *const Type)])?;
17141729

17151730
let arrow_type: Option<ArrowType> = self
17161731
.get_arrow_field(&cur_type, context)
@@ -2823,7 +2838,8 @@ mod tests {
28232838
#[test]
28242839
fn test_create_array_reader() {
28252840
let file = get_test_file("nulls.snappy.parquet");
2826-
let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
2841+
let file_reader: Arc<dyn FileReader> =
2842+
Arc::new(SerializedFileReader::new(file).unwrap());
28272843

28282844
let file_metadata = file_reader.metadata().file_metadata();
28292845
let arrow_schema = parquet_to_arrow_schema(
@@ -2834,9 +2850,9 @@ mod tests {
28342850

28352851
let array_reader = build_array_reader(
28362852
file_reader.metadata().file_metadata().schema_descr_ptr(),
2837-
arrow_schema,
2853+
Arc::new(arrow_schema),
28382854
vec![0usize].into_iter(),
2839-
file_reader,
2855+
Box::new(file_reader),
28402856
)
28412857
.unwrap();
28422858

parquet/src/arrow/arrow_reader.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,9 +144,9 @@ impl ArrowReader for ParquetFileArrowReader {
144144
.metadata()
145145
.file_metadata()
146146
.schema_descr_ptr(),
147-
self.get_schema()?,
147+
Arc::new(self.get_schema()?),
148148
column_indices,
149-
self.file_reader.clone(),
149+
Box::new(self.file_reader.clone()),
150150
)?;
151151

152152
ParquetRecordBatchReader::try_new(batch_size, array_reader)

0 commit comments

Comments
 (0)