A test data management tool with automated data generation, validation and clean up.
Generate data for databases, files, messaging systems or HTTP requests via UI, Scala/Java SDK or YAML input and executed via Spark. Run data validations after generating data to ensure it is consumed correctly. Clean up generated data or consumed data in downstream data sources to keep your environments tidy. Define alerts to get notified when failures occur and deep dive into issues from the generated report.
Scala/Java examples found here.
- Batch and/or event data generation
- Maintain relationships across any dataset
- Create custom data generation/validation scenarios
- Data validation
- Clean up generated and downstream data
- Suggest data validations
- Metadata discovery
- Detailed report of generated data and validation results
- Alerts to be notified of results
- Run as GitHub Action
git clone [email protected]:data-catering/data-caterer.git
cd data-caterer/example
./run.shIt will run the DocumentationPlanRun class.
Press Enter to run the default example. Check results at docker/sample/report/index.html.
git clone [email protected]:data-catering/data-caterer.git
cd data-caterer/example
./run.sh csv.yamlIt will run the csv.yaml plan file and the csv_transaction_file task file.
Check results at docker/data/custom/report/index.html.
docker run -d -p 9898:9898 -e DEPLOY_MODE=standalone --name datacaterer datacatering/data-caterer:0.18.0Open http://localhost:9898.
Data Caterer supports the below data sources. Check here for the full roadmap.
| Data Source Type | Data Source | Support |
|---|---|---|
| Cloud Storage | AWS S3 | âś… |
| Cloud Storage | Azure Blob Storage | âś… |
| Cloud Storage | GCP Cloud Storage | âś… |
| Database | BigQuery | âś… |
| Database | Cassandra | âś… |
| Database | MySQL | âś… |
| Database | Postgres | âś… |
| Database | Elasticsearch | ❌ |
| Database | MongoDB | ❌ |
| File | CSV | âś… |
| File | Delta Lake | âś… |
| File | JSON | âś… |
| File | Iceberg | âś… |
| File | ORC | âś… |
| File | Parquet | âś… |
| File | Hudi | ❌ |
| HTTP | REST API | âś… |
| Messaging | Kafka | âś… |
| Messaging | RabbitMQ | âś… |
| Messaging | Solace | âś… |
| Messaging | ActiveMQ | ❌ |
| Messaging | Pulsar | ❌ |
| Metadata | Data Contract CLI | âś… |
| Metadata | Great Expectations | âś… |
| Metadata | JSON Schema | âś… |
| Metadata | Marquez | âś… |
| Metadata | OpenAPI/Swagger | âś… |
| Metadata | OpenMetadata | âś… |
| Metadata | Open Data Contract Standard (ODCS) | âś… |
| Metadata | Amundsen | ❌ |
| Metadata | Datahub | ❌ |
| Metadata | Solace Event Portal | ❌ |
Different ways to run Data Caterer based on your use case:
Design motivations and details can be found here.
Can check here for full list of roadmap items.
postgres("customer_postgres", "jdbc:postgresql://localhost:5432/customer") //name and urlpostgres("customer_postgres", "jdbc:postgresql://localhost:5432/customer")
.fields(field.name("account_id").regex("ACC[0-9]{10}").unique(true))val postgresTask = postgres("customer_postgres", "jdbc:postgresql://localhost:5432/customer")
.fields(field.name("account_id").regex("ACC[0-9]{10}").unique(true))
val parquetValidation = parquet("output_parquet", "/data/parquet/customer")
.validation(validation.count.isEqual(1000))val postgresTask = postgres("customer_postgres", "jdbc:postgresql://localhost:5432/customer")
.fields(field.name("account_id").regex("ACC[0-9]{10}").unique(true))
val parquetValidation = parquet("output_parquet", "/data/parquet/customer")
.validation(
validation.upstreamData(postgresTask)
.joinFields("account_id")
.withValidation(validation.count().isEqual(1000))
)val postgresTask = postgres("customer_postgres", "jdbc:postgresql://localhost:5432/customer")
.fields(field.name("account_id").regex("ACC[0-9]{10}").unique(true))
val parquetValidation = parquet("output_parquet", "/data/parquet/customer")
.validation(
validation.upstreamData(postgresTask)
.joinFields("account_id")
.withValidation(validation.count().isEqual(1000))
)
.validationWait(waitCondition.file("/data/parquet/customer"))kafka("my_kafka", "localhost:29092")
.topic("account-topic")
.fields(...)val postgresTask = postgres("customer_postgres", "jdbc:postgresql://localhost:5432/customer")
.fields(field.name("account_id").regex("ACC[0-9]{10}"))
val kafkaTask = kafka("my_kafka", "localhost:29092")
.topic("account-topic")
.fields(...)
plan.addForeignKeyRelationship(
postgresTask, List("account_id"),
List(kafkaTask -> List("account_id"))
)postgres("customer_postgres", "jdbc:postgresql://localhost:5432/customer")
.table("account", "transactions")
.count(count.recordsPerField(5, "account_id"))postgres("customer_postgres", "jdbc:postgresql://localhost:5432/customer")
.table("account", "transactions")
.count(count.recordsPerFieldGenerator(generator.min(1).max(5), "account_id"))val postgresTask = postgres("customer_postgres", "jdbc:postgresql://localhost:5432/customer")
.table("account", "transactions")
.count(count.recordsPerFieldGenerator(generator.min(0).max(5), "account_id"))
val conf = configuration
.enableDeleteGeneratedRecords(true)
.enableGenerateData(false)I also want to delete the data in Cassandra because my job consumed the data in Postgres and pushed to Cassandra
val postgresTask = postgres("customer_postgres", "jdbc:postgresql://localhost:5432/customer")
.table("account", "transactions")
.count(count.recordsPerFieldGenerator(generator.min(0).max(5), "account_id"))
val cassandraTxns = cassandra("ingested_data", "localhost:9042")
.table("account", "transactions")
val deletePlan = plan.addForeignKeyRelationship(
postgresTask, List("account_id"),
List(),
List(cassandraTxns -> List("account_id"))
)
val conf = configuration
.enableDeleteGeneratedRecords(true)
.enableGenerateData(false)val postgresTask = postgres("customer_postgres", "jdbc:postgresql://localhost:5432/customer")
.count(count.recordsPerFieldGenerator(generator.min(0).max(5), "account_id"))
val cassandraTxns = cassandra("ingested_data", "localhost:9042")
.table("account", "transactions")
val deletePlan = plan.addForeignKeyRelationship(
postgresTask, List("account_id"),
List(),
List(cassandraTxns -> List("SUBSTR(account_id, 3) AS account_number"))
)
val conf = configuration
.enableDeleteGeneratedRecords(true)
.enableGenerateData(false)parquet("customer_parquet", "/data/parquet/customer")
.fields(metadataSource.openDataContractStandard("/data/odcs/full-example.odcs.yaml"))http("my_http")
.fields(metadataSource.openApi("/data/http/petstore.json"))parquet("customer_parquet", "/data/parquet/customer")
.validations(metadataSource.greatExpectations("/data/great-expectations/taxi-expectations.json"))