Skip to content

Expose additional csv parsing options #10

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Feb 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cli/cmd/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ func appInitFiles(appName string) map[string]string {
# data:
# type: csv
# path: s3a://my-bucket/data.csv
# skip_header: true
# csv_config:
# header: true
# schema:
# - column1
# - column2
Expand Down
27 changes: 26 additions & 1 deletion docs/applications/resources/environments.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,38 @@ Transfer data at scale from data warehouses like S3 into the Cortex cluster. Onc
data:
type: csv # file type (required)
path: s3a://<bucket_name>/<file_name> # S3 is currently supported (required)
skip_header: <bool> # skips a single header line (default: false)
drop_null: <bool> # drop any rows that contain at least 1 null value (default: false)
csv_config: <csv_config> # optional configuration that can be provided
schema:
- <string> # raw column names listed in the CSV columns' order (required)
...
```

#### CSV Config

To help ingest different styles of CSV files, Cortex supports the parameters listed below. All of these parameters are optional. A description and default values for each parameter can be found in the [PySpark CSV Documentation](https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.csv).

```yaml
csv_config:
sep: <string>
encoding: <string>
quote: <string>
escape: <string>
comment: <string>
header: <bool>
ignore_leading_white_space: <bool>
ignore_trailing_white_space: <bool>
null_value: <string>
nan_value: <string>
positive_inf: <bool>
negative_inf: <bool>
max_columns: <int>
max_chars_per_column: <int>
multiline: <bool>
char_to_escape_quote_escaping: <string>
empty_value: <string>
```

### Parquet Data Config

```yaml
Expand Down
3 changes: 2 additions & 1 deletion examples/fraud/resources/environments.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
data:
type: csv
path: s3a://cortex-examples/fraud.csv
skip_header: true
csv_config:
header: true
schema:
- time
- v1
Expand Down
3 changes: 2 additions & 1 deletion examples/mnist/resources/environments.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
data:
type: csv
path: s3a://cortex-examples/mnist.csv
skip_header: true
csv_config:
header: true
schema:
- image
- label
12 changes: 6 additions & 6 deletions pkg/api/context/serialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type RawColumnsTypeSplit struct {
}

type DataSplit struct {
CsvData *userconfig.CsvData `json:"csv_data"`
CSVData *userconfig.CSVData `json:"csv_data"`
ParquetData *userconfig.ParquetData `json:"parquet_data"`
}

Expand Down Expand Up @@ -84,8 +84,8 @@ func (ctx ContextSerial) collectRawColumns() RawColumns {
func (ctx Context) splitEnvironment() *DataSplit {
var split DataSplit
switch typedData := ctx.Environment.Data.(type) {
case *userconfig.CsvData:
split.CsvData = typedData
case *userconfig.CSVData:
split.CSVData = typedData
case *userconfig.ParquetData:
split.ParquetData = typedData
}
Expand All @@ -94,10 +94,10 @@ func (ctx Context) splitEnvironment() *DataSplit {
}

func (ctxSerial *ContextSerial) collectEnvironment() (*Environment, error) {
if ctxSerial.DataSplit.ParquetData != nil && ctxSerial.DataSplit.CsvData == nil {
if ctxSerial.DataSplit.ParquetData != nil && ctxSerial.DataSplit.CSVData == nil {
ctxSerial.Environment.Data = ctxSerial.DataSplit.ParquetData
} else if ctxSerial.DataSplit.CsvData != nil && ctxSerial.DataSplit.ParquetData == nil {
ctxSerial.Environment.Data = ctxSerial.DataSplit.CsvData
} else if ctxSerial.DataSplit.CSVData != nil && ctxSerial.DataSplit.ParquetData == nil {
ctxSerial.Environment.Data = ctxSerial.DataSplit.CSVData
} else {
return nil, errors.Wrap(userconfig.ErrorSpecifyOnlyOne("CSV", "PARQUET"), ctxSerial.App.Name, resource.EnvironmentType.String(), userconfig.DataKey)
}
Expand Down
120 changes: 107 additions & 13 deletions pkg/api/userconfig/environments.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ var dataValidation = &cr.InterfaceStructValidation{
TypeStructField: "Type",
InterfaceStructTypes: map[string]*cr.InterfaceStructType{
"csv": &cr.InterfaceStructType{
Type: (*CsvData)(nil),
Type: (*CSVData)(nil),
StructFieldValidations: csvDataFieldValidations,
},
"parquet": &cr.InterfaceStructType{
Expand All @@ -99,12 +99,33 @@ var dataValidation = &cr.InterfaceStructValidation{
},
}

type CsvData struct {
Type string `json:"type" yaml:"type"`
Path string `json:"path" yaml:"path"`
Schema []string `json:"schema" yaml:"schema"`
DropNull bool `json:"drop_null" yaml:"drop_null"`
SkipHeader bool `json:"skip_header" yaml:"skip_header"`
type CSVData struct {
Type string `json:"type" yaml:"type"`
Path string `json:"path" yaml:"path"`
Schema []string `json:"schema" yaml:"schema"`
DropNull bool `json:"drop_null" yaml:"drop_null"`
CSVConfig *CSVConfig `json:"csv_config" yaml:"csv_config"`
}

// SPARK_VERSION dependent
type CSVConfig struct {
Sep *string `json:"sep" yaml:"sep"`
Encoding *string `json:"encoding" yaml:"encoding"`
Quote *string `json:"quote" yaml:"quote"`
Escape *string `json:"escape" yaml:"escape"`
Comment *string `json:"comment" yaml:"comment"`
Header *bool `json:"header" yaml:"header"`
IgnoreLeadingWhiteSpace *bool `json:"ignore_leading_white_space" yaml:"ignore_leading_white_space"`
IgnoreTrailingWhiteSpace *bool `json:"ignore_trailing_white_space" yaml:"ignore_trailing_white_space"`
NullValue *string `json:"null_value" yaml:"null_value"`
NanValue *string `json:"nan_value" yaml:"nan_value"`
PositiveInf *string `json:"positive_inf" yaml:"positive_inf"`
NegativeInf *string `json:"negative_inf" yaml:"negative_inf"`
MaxColumns *int32 `json:"max_columns" yaml:"max_columns"`
MaxCharsPerColumn *int32 `json:"max_chars_per_column" yaml:"max_chars_per_column"`
Multiline *bool `json:"multiline" yaml:"multiline"`
CharToEscapeQuoteEscaping *string `json:"char_to_escape_quote_escaping" yaml:"char_to_escape_quote_escaping"`
EmptyValue *string `json:"empty_value" yaml:"empty_value"`
}

var csvDataFieldValidations = []*cr.StructFieldValidation{
Expand All @@ -127,9 +148,82 @@ var csvDataFieldValidations = []*cr.StructFieldValidation{
},
},
&cr.StructFieldValidation{
StructField: "SkipHeader",
BoolValidation: &cr.BoolValidation{
Default: false,
StructField: "CSVConfig",
StructValidation: &cr.StructValidation{
StructFieldValidations: []*cr.StructFieldValidation{
&cr.StructFieldValidation{
StructField: "Sep",
StringPtrValidation: &cr.StringPtrValidation{},
},
&cr.StructFieldValidation{
StructField: "Encoding",
StringPtrValidation: &cr.StringPtrValidation{},
},
&cr.StructFieldValidation{
StructField: "Quote",
StringPtrValidation: &cr.StringPtrValidation{},
},
&cr.StructFieldValidation{
StructField: "Escape",
StringPtrValidation: &cr.StringPtrValidation{},
},
&cr.StructFieldValidation{
StructField: "Comment",
StringPtrValidation: &cr.StringPtrValidation{},
},
&cr.StructFieldValidation{
StructField: "Header",
BoolPtrValidation: &cr.BoolPtrValidation{},
},
&cr.StructFieldValidation{
StructField: "IgnoreLeadingWhiteSpace",
BoolPtrValidation: &cr.BoolPtrValidation{},
},
&cr.StructFieldValidation{
StructField: "IgnoreTrailingWhiteSpace",
BoolPtrValidation: &cr.BoolPtrValidation{},
},
&cr.StructFieldValidation{
StructField: "NullValue",
StringPtrValidation: &cr.StringPtrValidation{},
},
&cr.StructFieldValidation{
StructField: "NanValue",
StringPtrValidation: &cr.StringPtrValidation{},
},
&cr.StructFieldValidation{
StructField: "PositiveInf",
StringPtrValidation: &cr.StringPtrValidation{},
},
&cr.StructFieldValidation{
StructField: "NegativeInf",
StringPtrValidation: &cr.StringPtrValidation{},
},
&cr.StructFieldValidation{
StructField: "MaxColumns",
Int32PtrValidation: &cr.Int32PtrValidation{
GreaterThan: util.Int32Ptr(0),
},
},
&cr.StructFieldValidation{
StructField: "MaxCharsPerColumn",
Int32PtrValidation: &cr.Int32PtrValidation{
GreaterThanOrEqualTo: util.Int32Ptr(-1),
},
},
&cr.StructFieldValidation{
StructField: "Multiline",
BoolPtrValidation: &cr.BoolPtrValidation{},
},
&cr.StructFieldValidation{
StructField: "CharToEscapeQuoteEscaping",
StringPtrValidation: &cr.StringPtrValidation{},
},
&cr.StructFieldValidation{
StructField: "EmptyValue",
StringPtrValidation: &cr.StringPtrValidation{},
},
},
},
},
}
Expand Down Expand Up @@ -212,23 +306,23 @@ func (env *Environment) Validate() error {
return nil
}

func (csvData *CsvData) Validate() error {
func (csvData *CSVData) Validate() error {
return nil
}

func (parqData *ParquetData) Validate() error {
return nil
}

func (csvData *CsvData) GetExternalPath() string {
func (csvData *CSVData) GetExternalPath() string {
return csvData.Path
}

func (parqData *ParquetData) GetExternalPath() string {
return parqData.Path
}

func (csvData *CsvData) GetIngestedColumns() []string {
func (csvData *CSVData) GetIngestedColumns() []string {
return csvData.Schema
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/context/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func dataID(config *userconfig.Config, datasetVersion string) string {

data := config.Environment.Data
switch typedData := data.(type) {
case *userconfig.CsvData:
case *userconfig.CSVData:
buf.WriteString(s.Obj(typedData))
case *userconfig.ParquetData:
buf.WriteString(typedData.Type)
Expand Down
9 changes: 9 additions & 0 deletions pkg/workloads/lib/test/util_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@
import logging


def test_snake_to_camel():
assert util.snake_to_camel("ONE_TWO_THREE") == "oneTwoThree"
assert util.snake_to_camel("ONE_TWO_THREE", lower=False) == "OneTwoThree"
assert util.snake_to_camel("ONE_TWO_THREE", sep="-") == "one_two_three"
assert util.snake_to_camel("ONE-TWO-THREE", sep="-") == "oneTwoThree"
assert util.snake_to_camel("ONE") == "one"
assert util.snake_to_camel("ONE", lower=False) == "One"


def test_flatten_all_values():
obj = "v"
expected = ["v"]
Expand Down
10 changes: 10 additions & 0 deletions pkg/workloads/lib/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,16 @@ def pluralize(num, singular, plural):
return str(num) + " " + plural


def snake_to_camel(input, sep="_", lower=True):
output = ""
for idx, word in enumerate(input.lower().split(sep)):
if idx == 0 and lower:
output += word
else:
output += word[0].upper() + word[1:]
return output


def mkdir_p(dir_path):
try:
os.makedirs(dir_path)
Expand Down
12 changes: 9 additions & 3 deletions pkg/workloads/spark_job/spark_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,10 +239,16 @@ def ingest(ctx, spark):


def read_csv(ctx, spark):
csv_config = ctx.environment["data"]
data_config = ctx.environment["data"]
schema = expected_schema_from_context(ctx)
header = csv_config.get("skip_header", False)
return spark.read.csv(csv_config["path"], header=header, schema=schema, mode="FAILFAST")

csv_config = {
util.snake_to_camel(param_name): val
for param_name, val in data_config.get("csv_config", {}).items()
if val is not None
}

return spark.read.csv(data_config["path"], schema=schema, mode="FAILFAST", **csv_config)


def read_parquet(ctx, spark):
Expand Down
Loading