Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Generated by Cargo
# will have compiled files and executables
/target/
/.idea/

# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries
# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html
Expand All @@ -9,4 +10,4 @@ Cargo.lock
# These are backup files generated by rustfmt
**/*.rs.bk

parquet-testing
parquet-testing
36 changes: 22 additions & 14 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,26 @@ keywords = [ "arrow", "query", "sql", "datafusion" ]
name = "datafusion_objectstore_s3"
path = "src/lib.rs"

# https://doc.rust-lang.org/cargo/reference/profiles.html#overrides
[profile.dev.package."*"]
opt-level = 2

[dependencies]
arrow = {version = "9.0.0", features = ["prettyprint"]}
async-trait = "0.1.52"
aws-config = "0.9.0"
aws-sdk-s3 = "0.9.0"
aws-smithy-async = "0.39.0"
aws-smithy-types = "0.39.0"
aws-smithy-types-convert = {version = "0.39.0", features = ["convert-chrono"]}
aws-types = "0.9.0"
bytes = "1.1.0"
datafusion = "7.0.0"
futures = "0.3.19"
http = "0.2.6"
num_cpus = "1.13.1"
tokio = {version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs"]}
datafusion-data-access = { git = "https://github.com/apache/arrow-datafusion.git", branch = "master" }
async-trait = "0.1"
futures = "0.3"
tokio = { version = "1", features = ["rt"] }
rust-s3 = { git = "https://github.com/mateuszkj/rust-s3.git", branch = "master" }
chrono = { version = "0.4", default-features = false }
parking_lot = "0.12"
byteorder = "1"
num_cpus = "1"
bytes = "1"
log = "0.4"

[dev-dependencies]
test-log = "0.2.10"
env_logger = "0.9.0"
arrow = { version = "11.1.0", features = ["prettyprint"] }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "master" }
tokio = { version = "1.17.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs"] }
49 changes: 16 additions & 33 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,76 +4,59 @@ S3 as an ObjectStore for [Datafusion](https://github.com/apache/arrow-datafusion

## Querying files on S3 with DataFusion

This crate implements the DataFusion `ObjectStore` trait on AWS S3 and implementers of the S3 standard. We leverage the official [AWS Rust SDK](https://github.com/awslabs/aws-sdk-rust) for interacting with S3. While it is our understanding that the AWS APIs we are using a relatively stable, we can make no assurances on API stability either on AWS' part or within this crate. This crates API is tightly connected with DataFusion, a fast moving project, and as such we will make changes inline with those upstream changes.
This crate implements the DataFusion `ObjectStore` trait on AWS S3 and implementers of the S3 standard. While it is our understanding that the AWS APIs we are using a relatively stable, we can make no assurances on API stability either on AWS' part or within this crate. This crates API is tightly connected with DataFusion, a fast moving project, and as such we will make changes inline with those upstream changes.

## Examples

Examples for querying AWS and other implementors, such as MinIO, are shown below.

Load credentials from default AWS credential provider (such as environment or ~/.aws/credentials)

```rust
let s3_file_system = Arc::new(S3FileSystem::default().await);
```

`S3FileSystem::default()` is a convenience wrapper for `S3FileSystem::new(None, None, None, None, None, None)`.

Connect to implementor of S3 API (MinIO, in this case) using access key and secret.

```rust
// Example credentials provided by MinIO
const ACCESS_KEY_ID: &str = "AKIAIOSFODNN7EXAMPLE";
const SECRET_ACCESS_KEY: &str = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY";
const PROVIDER_NAME: &str = "Static";
const BUCKET_NAME: &str = "data";
const MINIO_ENDPOINT: &str = "http://localhost:9000";

let s3_file_system = S3FileSystem::new(
Some(SharedCredentialsProvider::new(Credentials::new(
MINIO_ACCESS_KEY_ID,
MINIO_SECRET_ACCESS_KEY,
None,
None,
PROVIDER_NAME,
))), // Credentials provider
None, // Region
Some(Endpoint::immutable(Uri::from_static(MINIO_ENDPOINT))), // Endpoint
None, // RetryConfig
None, // AsyncSleep
None, // TimeoutConfig
)
.await;
let s3_file_system = S3FileSystem::new_custom(
BUCKET_NAME,
MINIO_ENDPOINT,
Some(MINIO_ACCESS_KEY_ID),
Some(MINIO_SECRET_ACCESS_KEY),
S3FileSystemOptions::from_envs()?
)?;
```

Using DataFusion's `ListingTableConfig` we register a table into a DataFusion `ExecutionContext` so that it can be queried.
Using DataFusion's `ListingTableConfig` we register a table into a DataFusion `SessionContext` so that it can be queried.

```rust
let filename = "data/alltypes_plain.snappy.parquet";
let filename = "alltypes_plain.snappy.parquet";

let config = ListingTableConfig::new(s3_file_system, filename).infer().await?;

let table = ListingTable::try_new(config)?;

let mut ctx = ExecutionContext::new();
let mut ctx = SessionContext::new();

ctx.register_table("tbl", Arc::new(table))?;

let df = ctx.sql("SELECT * FROM tbl").await?;
df.show()
```

We can also register the `S3FileSystem` directly as an `ObjectStore` on an `ExecutionContext`. This provides an idiomatic way of creating `TableProviders` that can be queried.
We can also register the `S3FileSystem` directly as an `ObjectStore` on an `SessionContext`. This provides an idiomatic way of creating `TableProviders` that can be queried.

```rust
execution_ctx.register_object_store(
"s3",
Arc::new(S3FileSystem::default().await),
);

let input_uri = "s3://parquet-testing/data/alltypes_plain.snappy.parquet";
let input_uri = "s3://alltypes_plain.snappy.parquet";

let (object_store, _) = ctx.object_store(input_uri)?;
let (object_store, _) = ctx.remote_env().object_store(input_uri)?;

let config = ListingTableConfig::new(s3_file_system, filename).infer().await?;
let config = ListingTableConfig::new(s3_file_system, input_uri).infer().await?;

let mut table_provider: Arc<dyn TableProvider + Send + Sync> = Arc::new(ListingTable::try_new(config)?);
```
Expand Down
16 changes: 16 additions & 0 deletions run_test_mino.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/bin/sh

set -e

git submodule update --init --recursive

docker run \
--rm \
--publish 9000:9000 \
--publish 9001:9001 \
--name minio \
--volume "$(pwd)/parquet-testing:/data" \
--env "MINIO_ROOT_USER=AKIAIOSFODNN7EXAMPLE" \
--env "MINIO_ROOT_PASSWORD=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" \
quay.io/minio/minio server /data \
--console-address ":9001"
25 changes: 0 additions & 25 deletions src/error.rs

This file was deleted.

Loading