Skip to content

Commit b8e1c84

Browse files
committed
Merge remote-tracking branch 'upstream/master' into remove-arrow-array-reader
2 parents 0f0ba7a + d801ac2 commit b8e1c84

File tree

14 files changed

+1615
-258
lines changed

14 files changed

+1615
-258
lines changed

parquet/benches/arrow_reader.rs

Lines changed: 128 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use arrow::array::Array;
19+
use arrow::datatypes::DataType;
1820
use criterion::{criterion_group, criterion_main, Criterion};
1921
use parquet::util::{DataPageBuilder, DataPageBuilderImpl, InMemoryPageIterator};
2022
use parquet::{
@@ -24,6 +26,7 @@ use parquet::{
2426
data_type::{ByteArrayType, Int32Type},
2527
schema::types::{ColumnDescPtr, SchemaDescPtr},
2628
};
29+
use rand::{rngs::StdRng, Rng, SeedableRng};
2730
use std::{collections::VecDeque, sync::Arc};
2831

2932
fn build_test_schema() -> SchemaDescPtr {
@@ -47,9 +50,6 @@ const PAGES_PER_GROUP: usize = 2;
4750
const VALUES_PER_PAGE: usize = 10_000;
4851
const BATCH_SIZE: usize = 8192;
4952

50-
use arrow::array::Array;
51-
use rand::{rngs::StdRng, Rng, SeedableRng};
52-
5353
pub fn seedable_rng() -> StdRng {
5454
StdRng::seed_from_u64(42)
5555
}
@@ -311,6 +311,46 @@ fn create_string_byte_array_reader(
311311
make_byte_array_reader(Box::new(page_iterator), column_desc, None, true).unwrap()
312312
}
313313

314+
fn create_string_byte_array_dictionary_reader(
315+
page_iterator: impl PageIterator + 'static,
316+
column_desc: ColumnDescPtr,
317+
) -> Box<dyn ArrayReader> {
318+
use parquet::arrow::array_reader::make_byte_array_dictionary_reader;
319+
let arrow_type =
320+
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
321+
322+
make_byte_array_dictionary_reader(
323+
Box::new(page_iterator),
324+
column_desc,
325+
Some(arrow_type),
326+
true,
327+
)
328+
.unwrap()
329+
}
330+
331+
fn create_complex_object_byte_array_dictionary_reader(
332+
page_iterator: impl PageIterator + 'static,
333+
column_desc: ColumnDescPtr,
334+
) -> Box<dyn ArrayReader> {
335+
use parquet::arrow::array_reader::{
336+
make_byte_array_dictionary_reader, ComplexObjectArrayReader,
337+
};
338+
use parquet::arrow::converter::{Utf8ArrayConverter, Utf8Converter};
339+
let arrow_type =
340+
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
341+
342+
let converter = Utf8Converter::new(Utf8ArrayConverter {});
343+
Box::new(
344+
ComplexObjectArrayReader::<ByteArrayType, Utf8Converter>::new(
345+
Box::new(page_iterator),
346+
column_desc,
347+
converter,
348+
Some(arrow_type),
349+
)
350+
.unwrap(),
351+
)
352+
}
353+
314354
fn add_benches(c: &mut Criterion) {
315355
const EXPECTED_VALUE_COUNT: usize =
316356
NUM_ROW_GROUPS * PAGES_PER_GROUP * VALUES_PER_PAGE;
@@ -322,10 +362,7 @@ fn add_benches(c: &mut Criterion) {
322362
let mandatory_int32_column_desc = schema.column(0);
323363
let optional_int32_column_desc = schema.column(1);
324364
let mandatory_string_column_desc = schema.column(2);
325-
// println!("mandatory_string_column_desc: {:?}", mandatory_string_column_desc);
326365
let optional_string_column_desc = schema.column(3);
327-
// println!("optional_string_column_desc: {:?}", optional_string_column_desc);
328-
329366
// primitive / int32 benchmarks
330367
// =============================
331368

@@ -541,7 +578,7 @@ fn add_benches(c: &mut Criterion) {
541578

542579
// string, dictionary encoded, half NULLs
543580
let dictionary_string_half_null_data = build_dictionary_encoded_string_page_iterator(
544-
schema,
581+
schema.clone(),
545582
optional_string_column_desc.clone(),
546583
0.5,
547584
);
@@ -559,6 +596,90 @@ fn add_benches(c: &mut Criterion) {
559596
},
560597
);
561598

599+
group.bench_function(
600+
"read StringDictionary, dictionary encoded, mandatory, no NULLs - old",
601+
|b| {
602+
b.iter(|| {
603+
let array_reader = create_complex_object_byte_array_dictionary_reader(
604+
dictionary_string_no_null_data.clone(),
605+
mandatory_string_column_desc.clone(),
606+
);
607+
count = bench_array_reader(array_reader);
608+
});
609+
assert_eq!(count, EXPECTED_VALUE_COUNT);
610+
},
611+
);
612+
613+
group.bench_function(
614+
"read StringDictionary, dictionary encoded, mandatory, no NULLs - new",
615+
|b| {
616+
b.iter(|| {
617+
let array_reader = create_string_byte_array_dictionary_reader(
618+
dictionary_string_no_null_data.clone(),
619+
mandatory_string_column_desc.clone(),
620+
);
621+
count = bench_array_reader(array_reader);
622+
});
623+
assert_eq!(count, EXPECTED_VALUE_COUNT);
624+
},
625+
);
626+
627+
group.bench_function(
628+
"read StringDictionary, dictionary encoded, optional, no NULLs - old",
629+
|b| {
630+
b.iter(|| {
631+
let array_reader = create_complex_object_byte_array_dictionary_reader(
632+
dictionary_string_no_null_data.clone(),
633+
optional_string_column_desc.clone(),
634+
);
635+
count = bench_array_reader(array_reader);
636+
});
637+
assert_eq!(count, EXPECTED_VALUE_COUNT);
638+
},
639+
);
640+
641+
group.bench_function(
642+
"read StringDictionary, dictionary encoded, optional, no NULLs - new",
643+
|b| {
644+
b.iter(|| {
645+
let array_reader = create_string_byte_array_dictionary_reader(
646+
dictionary_string_no_null_data.clone(),
647+
optional_string_column_desc.clone(),
648+
);
649+
count = bench_array_reader(array_reader);
650+
});
651+
assert_eq!(count, EXPECTED_VALUE_COUNT);
652+
},
653+
);
654+
655+
group.bench_function(
656+
"read StringDictionary, dictionary encoded, optional, half NULLs - old",
657+
|b| {
658+
b.iter(|| {
659+
let array_reader = create_complex_object_byte_array_dictionary_reader(
660+
dictionary_string_half_null_data.clone(),
661+
optional_string_column_desc.clone(),
662+
);
663+
count = bench_array_reader(array_reader);
664+
});
665+
assert_eq!(count, EXPECTED_VALUE_COUNT);
666+
},
667+
);
668+
669+
group.bench_function(
670+
"read StringDictionary, dictionary encoded, optional, half NULLs - new",
671+
|b| {
672+
b.iter(|| {
673+
let array_reader = create_string_byte_array_dictionary_reader(
674+
dictionary_string_half_null_data.clone(),
675+
optional_string_column_desc.clone(),
676+
);
677+
count = bench_array_reader(array_reader);
678+
});
679+
assert_eq!(count, EXPECTED_VALUE_COUNT);
680+
},
681+
);
682+
562683
group.finish();
563684
}
564685

parquet/src/arrow/array_reader.rs

Lines changed: 26 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,10 @@ use arrow::datatypes::{
5656
use arrow::util::bit_util;
5757

5858
use crate::arrow::converter::{
59-
BinaryArrayConverter, BinaryConverter, Converter, DecimalArrayConverter,
60-
DecimalConverter, FixedLenBinaryConverter, FixedSizeArrayConverter,
61-
Int96ArrayConverter, Int96Converter, IntervalDayTimeArrayConverter,
62-
IntervalDayTimeConverter, IntervalYearMonthArrayConverter,
63-
IntervalYearMonthConverter, Utf8ArrayConverter, Utf8Converter,
59+
Converter, DecimalArrayConverter, DecimalConverter, FixedLenBinaryConverter,
60+
FixedSizeArrayConverter, Int96ArrayConverter, Int96Converter,
61+
IntervalDayTimeArrayConverter, IntervalDayTimeConverter,
62+
IntervalYearMonthArrayConverter, IntervalYearMonthConverter,
6463
};
6564
use crate::arrow::record_reader::buffer::{ScalarValue, ValuesBuffer};
6665
use crate::arrow::record_reader::{GenericRecordReader, RecordReader};
@@ -70,8 +69,8 @@ use crate::column::page::PageIterator;
7069
use crate::column::reader::decoder::ColumnValueDecoder;
7170
use crate::column::reader::ColumnReaderImpl;
7271
use crate::data_type::{
73-
BoolType, ByteArrayType, DataType, DoubleType, FixedLenByteArrayType, FloatType,
74-
Int32Type, Int64Type, Int96Type,
72+
BoolType, DataType, DoubleType, FixedLenByteArrayType, FloatType, Int32Type,
73+
Int64Type, Int96Type,
7574
};
7675
use crate::errors::{ParquetError, ParquetError::ArrowError, Result};
7776
use crate::file::reader::{FilePageIterator, FileReader};
@@ -81,9 +80,15 @@ use crate::schema::types::{
8180
use crate::schema::visitor::TypeVisitor;
8281

8382
mod byte_array;
83+
mod byte_array_dictionary;
84+
mod dictionary_buffer;
8485
mod offset_buffer;
8586

87+
#[cfg(test)]
88+
mod test_util;
89+
8690
pub use byte_array::make_byte_array_reader;
91+
pub use byte_array_dictionary::make_byte_array_dictionary_reader;
8792

8893
/// Array reader reads parquet data into arrow array.
8994
pub trait ArrayReader {
@@ -271,7 +276,8 @@ where
271276
.clone(),
272277
};
273278

274-
let record_reader = RecordReader::<T>::new_with_options(column_desc.clone(), null_mask_only);
279+
let record_reader =
280+
RecordReader::<T>::new_with_options(column_desc.clone(), null_mask_only);
275281

276282
Ok(Self {
277283
data_type,
@@ -829,17 +835,18 @@ fn remove_indices(
829835
size
830836
),
831837
ArrowType::Struct(fields) => {
832-
let struct_array = arr.as_any()
838+
let struct_array = arr
839+
.as_any()
833840
.downcast_ref::<StructArray>()
834841
.expect("Array should be a struct");
835842

836843
// Recursively call remove indices on each of the structs fields
837-
let new_columns = fields.into_iter()
844+
let new_columns = fields
845+
.into_iter()
838846
.zip(struct_array.columns())
839847
.map(|(field, column)| {
840848
let dt = field.data_type().clone();
841-
Ok((field,
842-
remove_indices(column.clone(), dt, indices.clone())?))
849+
Ok((field, remove_indices(column.clone(), dt, indices.clone())?))
843850
})
844851
.collect::<Result<Vec<_>>>()?;
845852

@@ -1783,35 +1790,12 @@ impl<'a> ArrayReaderBuilder {
17831790
)?,
17841791
)),
17851792
PhysicalType::BYTE_ARRAY => match arrow_type {
1786-
// TODO: Replace with optimised dictionary reader (#171)
1787-
Some(ArrowType::Dictionary(_, _)) => {
1788-
match cur_type.get_basic_info().converted_type() {
1789-
ConvertedType::UTF8 => {
1790-
let converter = Utf8Converter::new(Utf8ArrayConverter {});
1791-
Ok(Box::new(ComplexObjectArrayReader::<
1792-
ByteArrayType,
1793-
Utf8Converter,
1794-
>::new(
1795-
page_iterator,
1796-
column_desc,
1797-
converter,
1798-
arrow_type,
1799-
)?))
1800-
}
1801-
_ => {
1802-
let converter = BinaryConverter::new(BinaryArrayConverter {});
1803-
Ok(Box::new(ComplexObjectArrayReader::<
1804-
ByteArrayType,
1805-
BinaryConverter,
1806-
>::new(
1807-
page_iterator,
1808-
column_desc,
1809-
converter,
1810-
arrow_type,
1811-
)?))
1812-
}
1813-
}
1814-
}
1793+
Some(ArrowType::Dictionary(_, _)) => make_byte_array_dictionary_reader(
1794+
page_iterator,
1795+
column_desc,
1796+
arrow_type,
1797+
null_mask_only,
1798+
),
18151799
_ => make_byte_array_reader(
18161800
page_iterator,
18171801
column_desc,
@@ -2025,7 +2009,7 @@ mod tests {
20252009
use crate::arrow::schema::parquet_to_arrow_schema;
20262010
use crate::basic::{Encoding, Type as PhysicalType};
20272011
use crate::column::page::{Page, PageReader};
2028-
use crate::data_type::{ByteArray, DataType, Int32Type, Int64Type};
2012+
use crate::data_type::{ByteArray, ByteArrayType, DataType, Int32Type, Int64Type};
20292013
use crate::errors::Result;
20302014
use crate::file::reader::{FileReader, SerializedFileReader};
20312015
use crate::schema::parser::parse_message_type;

0 commit comments

Comments
 (0)