Skip to content

Commit 0255f0e

Browse files
author
Arthur Rand
authored
[SPARK-611] Add example multi-component application and walkthrough (apache#241)
* working on new spam/ham example * basic NB model builder * messy, but have end-to-end model server working * add jobs README, re-arrange spam classifier * add documentation to scala jobs * added secrets docs * added secure TeraSort walkthrough * wip, run ci, add keytabs to executors also * fix executorenv property string * add flag for sasl secret * finished ml pipeline walkthrough * don't test on the training data * added code explanation * addressed comments, updated test_rpc_auth test * add link to walkthroughs * addressed documentation comments * addressed comments in SpamHam.scala example * added unit test for sasl secret
1 parent 0bb1705 commit 0255f0e

18 files changed

+6854
-175
lines changed

cli/dcos-spark/cli_test.go

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ const mainClass = "org.apache.spark.examples.SparkPi"
1616
const principal = "client@local"
1717
const keytab_prefixed = "__dcos_base64__keytab"
1818
const keytab = "keytab"
19+
const sparkAuthSecret = "spark-auth-secret"
1920

2021
// test spaces
2122
func TestCleanUpSubmitArgs(t *testing.T) {
@@ -91,7 +92,6 @@ func TestPayloadSimple(t *testing.T) {
9192
t.Errorf("mainClass should be %s got %s", mainClass, m["mainClass"])
9293
}
9394

94-
//v := reflect.ValueOf(m["sparkProperties"])
9595
stringProps := map[string]string{
9696
"spark.driver.cores": driverCores,
9797
"spark.cores.max": maxCores,
@@ -177,3 +177,49 @@ func TestPayloadWithSecret(t *testing.T) {
177177
checkSecret(keytab, keytab, t)
178178
checkSecret(keytab_prefixed, keytab, t)
179179
}
180+
181+
func TestSaslSecret(t *testing.T) {
182+
inputArgs := fmt.Sprintf(
183+
"--executor-auth-secret /%s " +
184+
"--class %s "+
185+
"%s --input1 value1 --input2 value2", sparkAuthSecret, mainClass, appJar)
186+
187+
cmd := SparkCommand{
188+
"subId",
189+
inputArgs,
190+
image,
191+
space,
192+
make(map[string]string),
193+
"",
194+
false,
195+
false,
196+
0,
197+
"",
198+
}
199+
payload, err := buildSubmitJson(&cmd)
200+
201+
m := make(map[string]interface{})
202+
203+
json.Unmarshal([]byte(payload), &m)
204+
205+
if err != nil {
206+
t.Errorf("%s", err.Error())
207+
}
208+
209+
stringProps := map[string]string{
210+
"spark.authenticate": "true",
211+
"spark.mesos.containerizer": "mesos",
212+
"spark.authenticate.enableSaslEncryption": "true",
213+
"spark.authenticate.secret": "spark_shared_secret",
214+
"spark.executorEnv._SPARK_AUTH_SECRET": "spark_shared_secret",
215+
"spark.mesos.driver.secret.filenames": sparkAuthSecret,
216+
"spark.mesos.driver.secret.names": fmt.Sprintf("/%s", sparkAuthSecret),
217+
}
218+
219+
v, ok := m["sparkProperties"].(map[string]interface{})
220+
if !ok {
221+
t.Errorf("%s", ok)
222+
}
223+
224+
checkProps(v, stringProps, t)
225+
}

cli/dcos-spark/submit_builder.go

Lines changed: 73 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -40,28 +40,29 @@ func newSparkVal(flagName, propName, desc string) *sparkVal {
4040
}
4141

4242
type sparkArgs struct {
43-
mainClass string
44-
kerberosPrincipal string
45-
keytabSecretPath string
46-
tgtSecretPath string
47-
tgtSecretValue string
48-
keystoreSecretPath string
49-
keystorePassword string
50-
privateKeyPassword string
51-
truststoreSecretPath string
52-
truststorePassword string
53-
propertiesFile string
54-
properties map[string]string
55-
56-
boolVals []*sparkVal
57-
stringVals []*sparkVal
58-
59-
app *url.URL
60-
appArgs []string
61-
62-
isScala bool
63-
isPython bool
64-
isR bool
43+
mainClass string
44+
kerberosPrincipal string
45+
keytabSecretPath string
46+
tgtSecretPath string
47+
tgtSecretValue string
48+
keystoreSecretPath string
49+
keystorePassword string
50+
privateKeyPassword string
51+
truststoreSecretPath string
52+
truststorePassword string
53+
saslSecret string
54+
propertiesFile string
55+
properties map[string]string
56+
57+
boolVals []*sparkVal
58+
stringVals []*sparkVal
59+
60+
app *url.URL
61+
appArgs []string
62+
63+
isScala bool
64+
isPython bool
65+
isR bool
6566
}
6667

6768
func NewSparkArgs() *sparkArgs {
@@ -77,6 +78,7 @@ func NewSparkArgs() *sparkArgs {
7778
"",
7879
"",
7980
"",
81+
"",
8082
make(map[string]string),
8183
make([]*sparkVal, 0),
8284
make([]*sparkVal, 0),
@@ -154,23 +156,24 @@ Args:
154156
PlaceHolder("user@REALM").Default("").StringVar(&args.kerberosPrincipal)
155157
submit.Flag("keytab-secret-path", "Path to Keytab in secret store to be used in the Spark drivers").
156158
PlaceHolder("/mykeytab").Default("").StringVar(&args.keytabSecretPath)
157-
submit.Flag("tgt-secret-path", "Path to ticket granting ticket (TGT) in secret store to be used " +
159+
submit.Flag("tgt-secret-path", "Path to ticket granting ticket (TGT) in secret store to be used "+
158160
"in the Spark drivers").PlaceHolder("/mytgt").Default("").StringVar(&args.tgtSecretPath)
159161
submit.Flag("tgt-secret-value", "Value of TGT to be used in the drivers, must be base64 encoded").
160162
Default("").StringVar(&args.tgtSecretValue)
161-
submit.Flag("keystore-secret-path", "Path to keystore in secret store for TLS/SSL. " +
162-
"Make sure to set --keystore-password and --private-key-password as well.").
163+
submit.Flag("keystore-secret-path", "Path to keystore in secret store for TLS/SSL. "+
164+
"Make sure to set --keystore-password and --private-key-password as well.").
163165
PlaceHolder("__dcos_base64__keystore").Default("").StringVar(&args.keystoreSecretPath)
164166
submit.Flag("keystore-password", "A password to the keystore.").
165167
Default("").StringVar(&args.keystorePassword)
166168
submit.Flag("private-key-password", "A password to the private key in the keystore.").
167169
Default("").StringVar(&args.privateKeyPassword)
168-
submit.Flag("truststore-secret-path", "Path to truststore in secret store for TLS/SSL. " +
169-
"Make sure to set --truststore-password as well.").
170+
submit.Flag("truststore-secret-path", "Path to truststore in secret store for TLS/SSL. "+
171+
"Make sure to set --truststore-password as well.").
170172
PlaceHolder("__dcos_base64__truststore").Default("").StringVar(&args.truststoreSecretPath)
171173
submit.Flag("truststore-password", "A password to the truststore.").
172174
Default("").StringVar(&args.truststorePassword)
173-
175+
submit.Flag("executor-auth-secret", "Path to secret 'cookie' to use for Executor authentication "+
176+
"block transfer encryption. Make one with dcos spark secret").Default("").StringVar(&args.saslSecret)
174177
submit.Flag("isR", "Force using SparkR").Default("false").BoolVar(&args.isR)
175178
submit.Flag("isPython", "Force using Python").Default("false").BoolVar(&args.isPython)
176179

@@ -218,8 +221,8 @@ Args:
218221
val.flag(submit).StringVar(&val.s)
219222
args.stringVals = append(args.stringVals, val)
220223

221-
val = newSparkVal("py-files", "spark.submit.pyFiles", "Add .py, .zip or .egg files to " +
222-
"be distributed with your application. If you depend on multiple Python files we recommend packaging them " +
224+
val = newSparkVal("py-files", "spark.submit.pyFiles", "Add .py, .zip or .egg files to "+
225+
"be distributed with your application. If you depend on multiple Python files we recommend packaging them "+
223226
"into a .zip or .egg.")
224227
val.flag(submit).StringVar(&val.s)
225228
args.stringVals = append(args.stringVals, val)
@@ -252,24 +255,27 @@ func prepareBase64Secret(secretPath string) string {
252255
}
253256

254257
func addArgsForFileBasedSecret(args *sparkArgs, secretPath, property string) {
255-
secretRefProp := fmt.Sprintf(SECRET_REFERENCE_TEMPLATE, "driver")
256-
secretFileProp := fmt.Sprintf(SECRET_FILENAME_TEMPLATE, "driver")
257-
appendToProperty(secretRefProp, secretPath, args)
258-
appendToProperty(secretFileProp, prepareBase64Secret(secretPath), args)
258+
taskTypes := []string{"driver", "executor"}
259+
for _, taskType := range taskTypes {
260+
secretRefProp := fmt.Sprintf(SECRET_REFERENCE_TEMPLATE, taskType)
261+
secretFileProp := fmt.Sprintf(SECRET_FILENAME_TEMPLATE, taskType)
262+
appendToProperty(secretRefProp, secretPath, args)
263+
appendToProperty(secretFileProp, prepareBase64Secret(secretPath), args)
264+
}
259265
args.properties[property] = prepareBase64Secret(secretPath)
260266
}
261267

262268
func setupKerberosAuthArgs(args *sparkArgs) error {
263269
args.properties["spark.mesos.containerizer"] = "mesos"
264-
if args.keytabSecretPath != "" { // using keytab secret
270+
if args.keytabSecretPath != "" { // using keytab secret
265271
addArgsForFileBasedSecret(args, args.keytabSecretPath, "spark.yarn.keytab")
266272
return nil
267273
}
268-
if args.tgtSecretPath != "" { // using tgt secret
274+
if args.tgtSecretPath != "" { // using tgt secret
269275
addArgsForFileBasedSecret(args, args.tgtSecretPath, "spark.mesos.driverEnv.KRB5CCNAME")
270276
return nil
271277
}
272-
if args.tgtSecretValue != "" { // using secret by value
278+
if args.tgtSecretValue != "" { // using secret by value
273279
appendToProperty("spark.mesos.driver.secret.values", args.tgtSecretValue, args)
274280
args.properties["spark.mesos.driverEnv.KRB5CCNAME"] = "tgt"
275281
appendToProperty(fmt.Sprintf(SECRET_FILENAME_TEMPLATE, "driver"), "tgt.base64", args)
@@ -283,8 +289,8 @@ func setupTLSArgs(args *sparkArgs) {
283289
args.properties["spark.ssl.enabled"] = "true"
284290

285291
// Keystore and truststore
286-
const keyStoreFileName = "server.jks"
287-
const trustStoreFileName = "trust.jks"
292+
const keyStoreFileName = "server.jks"
293+
const trustStoreFileName = "trust.jks"
288294
args.properties["spark.ssl.keyStore"] = keyStoreFileName
289295
if args.truststoreSecretPath != "" {
290296
args.properties["spark.ssl.trustStore"] = trustStoreFileName
@@ -303,7 +309,7 @@ func setupTLSArgs(args *sparkArgs) {
303309
joinedFilenames := strings.Join(filenames, ",")
304310
joinedEnvkeys := strings.Join(envkeys, ",")
305311

306-
taskTypes :=[]string{"driver", "executor"}
312+
taskTypes := []string{"driver", "executor"}
307313
for _, taskType := range taskTypes {
308314
appendToProperty(fmt.Sprintf(SECRET_REFERENCE_TEMPLATE, taskType), joinedPaths, args)
309315
appendToProperty(fmt.Sprintf(SECRET_FILENAME_TEMPLATE, taskType), joinedFilenames, args)
@@ -324,6 +330,21 @@ func setupTLSArgs(args *sparkArgs) {
324330
}
325331
}
326332

333+
func setupSaslProperties(secretPath string, args *sparkArgs) {
334+
args.properties["spark.mesos.containerizer"] = "mesos"
335+
args.properties["spark.authenticate"] = "true"
336+
args.properties["spark.authenticate.enableSaslEncryption"] = "true"
337+
args.properties["spark.authenticate.secret"] = "spark_shared_secret"
338+
args.properties["spark.executorEnv._SPARK_AUTH_SECRET"] = "spark_shared_secret"
339+
taskTypes := []string{"driver", "executor"}
340+
for _, taskType := range taskTypes {
341+
secretRefProp := fmt.Sprintf(SECRET_REFERENCE_TEMPLATE, taskType)
342+
secretFileProp := fmt.Sprintf(SECRET_FILENAME_TEMPLATE, taskType)
343+
appendToProperty(secretRefProp, secretPath, args)
344+
appendToProperty(secretFileProp, prepareBase64Secret(secretPath), args)
345+
}
346+
}
347+
327348
func parseApplicationFile(args *sparkArgs) error {
328349
appString := args.app.String()
329350
fs := strings.Split(appString, "/")
@@ -367,7 +388,7 @@ func parseApplicationFile(args *sparkArgs) error {
367388

368389
func cleanUpSubmitArgs(argsStr string, boolVals []*sparkVal) ([]string, []string) {
369390

370-
// collapse two or more spaces to one.
391+
// collapse two or more spaces to one.
371392
argsCompacted := collapseSpacesPattern.ReplaceAllString(argsStr, " ")
372393
// clean up any instances of shell-style escaped newlines: "arg1\\narg2" => "arg1 arg2"
373394
argsCleaned := strings.TrimSpace(backslashNewlinePattern.ReplaceAllLiteralString(argsCompacted, " "))
@@ -390,12 +411,12 @@ ARGLOOP:
390411
// if it's not of the format --flag=val which scopt allows
391412
if strings.HasPrefix(arg, "-") {
392413
appFlags = append(appFlags, arg)
393-
if strings.Contains(arg, "=") || (i + 1) >= len(args) {
414+
if strings.Contains(arg, "=") || (i+1) >= len(args) {
394415
i += 1
395416
} else {
396417
// if there's a value with this flag, add it
397-
if !strings.HasPrefix(args[i + 1], "-") {
398-
appFlags = append(appFlags, args[i + 1])
418+
if !strings.HasPrefix(args[i+1], "-") {
419+
appFlags = append(appFlags, args[i+1])
399420
i += 1
400421
}
401422
i += 1
@@ -514,7 +535,7 @@ func buildSubmitJson(cmd *SparkCommand) (string, error) {
514535
// then map applicable envvars
515536
// then parse all -Dprop.key=propVal, and all --conf prop.key=propVal
516537
// then map flags
517-
submit, args := sparkSubmitArgSetup() // setup
538+
submit, args := sparkSubmitArgSetup() // setup
518539
// convert and get application flags, add them to the args passed to the spark app
519540
submitArgs, appFlags := cleanUpSubmitArgs(cmd.submitArgs, args.boolVals)
520541
args.appArgs = append(args.appArgs, appFlags...)
@@ -623,7 +644,7 @@ func buildSubmitJson(cmd *SparkCommand) (string, error) {
623644

624645
_, contains = args.properties["spark.mesos.executor.docker.forcePullImage"]
625646
if !contains {
626-
log.Printf("Pulling image %s for executors, by default. To bypass set " +
647+
log.Printf("Pulling image %s for executors, by default. To bypass set "+
627648
"spark.mesos.executor.docker.forcePullImage=false", args.properties["spark.mesos.executor.docker.image"])
628649
args.properties["spark.mesos.executor.docker.forcePullImage"] = "true"
629650
}
@@ -677,6 +698,7 @@ func buildSubmitJson(cmd *SparkCommand) (string, error) {
677698
_, contains := args.properties["spark.mesos.driverEnv.KRB5_CONFIG_BASE64"]
678699
if !contains {
679700
args.properties["spark.mesos.driverEnv.KRB5_CONFIG_BASE64"] = krb5conf
701+
args.properties["spark.executorEnv.KRB5_CONFIG_BASE64"] = krb5conf
680702
} else {
681703
log.Printf("Using user-specified krb5 config")
682704
}
@@ -704,6 +726,11 @@ func buildSubmitJson(cmd *SparkCommand) (string, error) {
704726
setupTLSArgs(args)
705727
}
706728

729+
// RPC and SASL
730+
if args.saslSecret != "" {
731+
setupSaslProperties(args.saslSecret, args)
732+
}
733+
707734
jsonMap := map[string]interface{}{
708735
"action": "CreateSubmissionRequest",
709736
"appArgs": args.appArgs,

0 commit comments

Comments
 (0)