Skip to content

Push-Based Parquet Reader #1605

@tustvold

Description

@tustvold

TLDR

This proposal will allow reading data from parquet files in a "streaming" fashion that reduces the IO required, the resources to locally buffer parquet files, and potentially reduces the latency of first data. It will help the usecase of reading parquet files from remote object stores such as AWS S3.

Problem

SerializedFileReader is currently created with a ChunkReader which looks like

pub trait ChunkReader: Length + Send + Sync {
    type T: Read + Send;
    /// get a serialy readeable slice of the current reader
    /// This should fail if the slice exceeds the current bounds
    fn get_read(&self, start: u64, length: usize) -> Result<Self::T>;
}

The process for reading a file is then

  • SerializedFileReader::new will call footer::parse_metadata
    • parse_metadata will
      • Call ChunkReader::get_read with the final 64 KB byte range, and read this to a buffer
      • Determine the footer length
      • Potentially call ChunkReader::get_read to read the remainder of the footer, and read this to a buffer
  • SerializedFileReader::get_row_iter will return a RowIter which for each row group
  • Call SerializedRowGroupReader::new which will
    • Call ChunkReader::get_read with the byte range of each column chunk

There are two major options to apply this to files in object storage

  1. Fetch the entire file to local disk or memory and pass it to SerializedFileReader
  2. Convert ChunkReader::get_read to a range request to object storage

The first option is problematic as it cannot use pruning logic to reduce the amount of data fetched from object storage.

The second option runs into two problems:

  1. The interface is not async and blocking a thread on network IO is not ideal
  2. Lots of small range requests per file adding cost and latency

Proposal

I would like to decouple the parquet reader entirely from IO concerns, allowing downstreams complete freedom to decide how they want to handle this. This will allow the reader to support a wide variety of potentially data access patterns:

  • Sync/Async Disk IO
  • Sync/Async Network IO
  • In-memory/mmapped parquet files
  • Interleaved row group decode with fetching the next row group

Footer Decode

Introduce functions to assist parsing the parquet metadata

/// Parses the 8-bytes parquet footer and returns the length of the metadata section
pub fn parse_footer(footer: [u8; 8]) -> Result<usize> {}

/// Parse metadata payload
pub fn parse_metadata(metadata: &[u8]) -> Result<ParquetMetaData> {}

This will allow callers to obtain ParquetMetaData regardless of how they choose to fetch the corresponding bytes

ScanBuilder / Scan

Next introduce a ScanBuilder and accompanying Scan.

/// Build a [`Scan`]
///
/// Eventually this will support predicate pushdown (#1191)
pub struct ScanBuilder {}

impl ScanBuilder {
  pub fn new(metadata: Arc<ParquetMetaData>) -> Self {}
  
  pub fn with_projection(self, projection: Vec<usize>) -> Self {}
  
  pub fn with_row_groups(self, groups: Vec<usize>) -> Self {}
  
  pub fn with_range(self, range: Range<usize>) -> Self {}
  
  pub fn build(self) -> Scan {}
}

/// Identifies a portion of a file to read
pub struct Scan {}

impl Scan {
  /// Returns a list of byte ranges needed
  pub fn ranges(&self) -> &[Range<usize>] {}
  
  /// Perform the scan returning a [`ParquetRecordBatchReader`] 
  pub fn execute<R: ChunkReader>(self, reader: R) -> Result<ParquetRecordBatchReader> {}
}

Where ParquetRecordBatchReader is the same type returned by the current ParquetFileArrowReader::get_record_reader, and is just an Iterator<Item=ArrowResult<RecordBatch>> with a Schema.

This design will only support the arrow use-case, but I couldn't see an easy way to add this at a lower level without introducing API inconsistencies when not scanning the entire file

Deprecate Async

Once implemented, I would propose deprecating and then removing the async API added by #1154

Alternatives Considered

#1154 added an async reader that uses the AsyncRead and AsyncSeek traits to read individual column chunks into memory from an async source. This is the approach taken by arrow2, with its range_reader abstraction. This was not found to perform particularly well (#1473).

#1473 proposed an async reader with prefetch functionality, and was also suggested by @alamb in apache/datafusion#2205 (comment). This is similar to the new FSDataInputStream vectored IO API in the Hadoop ecosystem. This was implemented in #1509 and found to perform better, but still represented a non-trivial performance regression on local files.

Additional Context

The motivating discussion for this issue can be found apache/datafusion#2205

@mateuszkj clearly documented the limitations of the current API datafusion-contrib/datafusion-objectstore-s3#53

Metadata

Metadata

Assignees

Labels

enhancementAny new improvement worthy of a entry in the changelogparquetChanges to the parquet crate

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions