Skip to content

[Epic]: Kernel based table scans #3733

@roeap

Description

@roeap

Important

This is a living issue to track the work required to fully kernalize our scan planning. New sub-issues might be added as out pathfinding continues.

Description

Delta-rs recently migrated to kernel based log replay where we use delta-kernel-rs for log replay. After log replay we have access to all relevant file actions. Currently we use this data to manually plan our table scans within the TableProvider implementations. This however means that we need to still interpret much of the delta metadata to enable specific features.

Kernel exposes a more generic API to plan scans which handles much of the metadata interpretation for us. We should refactor our scans such that we take advantage of these more powerful APis.

Once we have an up-to-date snapshot of the table (embedded in out snapshots), we can start planning a scan. This is a two step process.

  1. Invoke scan_metadat[_from] on the snapshot to get the latest log state
  2. Visit the log data to get read tasks

There are a few related issues we likely need to tackle for this as well.

  • per-table object store handling
  • ...

WE are also faced with an addition issue, which is snapshot freshness. As we execute our operations, we create ad-how datafusion sessions to run the query. However if our table provider is used within an external session, it might become stale, as we do not ensure latest state during the scan. There are several ways to mitigate this some going a bit further as in managing this via datafusion catalog abstractions.

A rough sketch of where we ned to end up.

  1. get and process the kernel metadata to get the raw scan plan. below is a version from the deltalake-datafusion crate which does not take advantage yet of our materialised/cached state.
async fn scan_metadata(
    engine: Arc<dyn Engine>,
    snapshot: &Arc<Snapshot>,
    projection: Option<&Vec<usize>>,
    predicate: Arc<Predicate>,
    table_schema: &ArrowSchemaRef,
) -> Result<TableScan> {
    // set up the scan with projection an predicate
    let projected_delta_schema = project_delta_schema(table_schema, snapshot.schema(), projection);
    let scan = snapshot
        .clone()
        .scan_builder()
        .with_schema(projected_delta_schema)
        .with_predicate(predicate)
        .build()
        .map_err(to_df_err)?;

    let table_root = scan.table_root().clone();
    let physical_schema: ArrowSchemaRef =
        Arc::new(scan.physical_schema().as_ref().try_into_arrow()?);
    let physical_predicate = scan.physical_predicate();
    let logical_schema = Arc::new(scan.logical_schema().as_ref().try_into_arrow()?);

    // visit the log data to get expressions and selection vectors
    let scan_inner = move || {
        let mut context = ScanContext::new(engine.clone(), table_root);
        let meta = scan.scan_metadata(engine.as_ref()).map_err(to_df_err)?;
        for scan_meta in meta {
            let scan_meta = scan_meta.map_err(to_df_err)?;
            context = scan_meta
                .visit_scan_files(context, visit_scan_file)
                .map_err(to_df_err)?;
        }
        context.errs.error_or(context.files)
    };
    let files = tokio::task::spawn_blocking(scan_inner)
        .await
        .map_err(|e| DataFusionError::External(Box::new(e)))??;

    Ok(TableScan {
        files,
        physical_schema,
        physical_predicate,
        logical_schema,
    })
}

with the actual visiting function being something like

fn visit_scan_file(
    ctx: &mut ScanContext,
    path: &str,
    size: i64,
    stats: Option<Stats>,
    dv_info: DvInfo,
    transform: Option<ExpressionRef>,
    // NB: partition values are passed for backwards compatibility
    // all required transformations are now part of the transform field
    _: std::collections::HashMap<String, String>,
) {
    let file_url = match ctx.parse_path(path) {
        Ok(v) => v,
        Err(e) => {
            ctx.errs.add_error(e);
            return;
        }
    };

    // Get the selection vector (i.e. inverse deletion vector)
    let Ok(selection_vector) = dv_info.get_selection_vector(ctx.engine.as_ref(), &ctx.table_root)
    else {
        ctx.errs
            .add_error(exec_err!("Failed to get selection vector"));
        return;
    };

    ctx.files.push(ScanFileContext {
        selection_vector,
        transform,
        stats,
        file_url,
        size: size as u64,
    });
}

After this we need to:

  • plan our parquet scan
  • apply the transformations

My assumption right now is that these will be two steps - i.e. we cannot push everything into the parquet read. Some things to consider.

  • use new PhysicalExpressionFactory API to handle missing columns (potentially nested)
  • maybe also use SchemaMapper for some projection
  • Apply more elaborate expressions on a processing stream

cc: @rtyler

Use Case

Support more table features.

Sub-issues

Metadata

Metadata

Assignees

Labels

enhancementNew feature or request

Type

Projects

Status

Ready

Relationships

None yet

Development

No branches or pull requests

Issue actions