Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions async-shuffle-upload/async-shuffle-upload-api/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>3.0.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<artifactId>spark-async-shuffle-upload-api</artifactId>
<properties>
<sbt.project.name>spark-async-shuffle-upload-api</sbt.project.name>
</properties>
<packaging>jar</packaging>
<name>Spark Project Async Shuffle Upload API</name>
<url>http://spark.apache.org/</url>
<dependencies>
<dependency>
<groupId>org.immutables</groupId>
<artifactId>value</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jdk8</artifactId>
</dependency>
<dependency>
<groupId>com.palantir.safe-logging</groupId>
<artifactId>safe-logging</artifactId>
</dependency>
<dependency>
<groupId>com.palantir.safe-logging</groupId>
<artifactId>preconditions</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-async-shuffle-upload-immutables</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.spark.shuffle.async.api;

public final class SparkShuffleApiConstants {

// Identifiers used by the spark shuffle plugin
// TODO make these consistent in naming - but keep in mind that changing these would result in runtime errors.
public static final String SHUFFLE_PLUGIN_APP_NAME_CONF = "spark.plugin.shuffle.async.appName";
public static final String SHUFFLE_BASE_URI_CONF = "spark.shuffle.hadoop.async.base-uri";
public static final String SHUFFLE_S3A_CREDS_FILE_CONF = "spark.plugin.shuffle.async.s3a.credsFile";
public static final String SHUFFLE_S3A_ENDPOINT_CONF = "spark.shuffle.hadoop.async.s3a.endpoint";
public static final String METRICS_FACTORY_CLASS_CONF = "spark.plugin.shuffle.async.metricsFactoryClass";

private SparkShuffleApiConstants() {}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.spark.shuffle.async.api;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.palantir.logsafe.UnsafeArg;
import com.palantir.logsafe.exceptions.SafeIllegalArgumentException;
import com.palantir.spark.shuffle.async.immutables.ImmutablesStyle;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import org.immutables.value.Value;

@ImmutablesStyle
@Value.Immutable
@JsonSerialize(as = ImmutableSparkShuffleAwsCredentials.class)
@JsonDeserialize(as = ImmutableSparkShuffleAwsCredentials.class)
@JsonIgnoreProperties(ignoreUnknown = true)
public abstract class SparkShuffleAwsCredentials {

private static final ObjectMapper MAPPER = new ObjectMapper();

public abstract String accessKeyId();

public abstract String secretAccessKey();

public abstract String sessionToken();

public final byte[] toBytes() {
try {
return MAPPER.writeValueAsString(this).getBytes(StandardCharsets.UTF_8);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}

public static SparkShuffleAwsCredentials fromBytes(byte[] bytes) {
try {
return MAPPER.readValue(new String(bytes, StandardCharsets.UTF_8), SparkShuffleAwsCredentials.class);
} catch (IOException e) {
throw new SafeIllegalArgumentException(
"Could not deserialize bytes as AWS credentials.",
UnsafeArg.of("cause", e));
}
}

public static Builder builder() {
return new Builder();
}

public static final class Builder extends ImmutableSparkShuffleAwsCredentials.Builder {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.spark.shuffle.async.api;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;

import java.nio.charset.StandardCharsets;
import org.junit.Test;

public final class SparkShuffleAwsCredentialsSuite {

@Test
public void testSerialize() {
SparkShuffleAwsCredentials creds = SparkShuffleAwsCredentials.builder()
.accessKeyId("access-key")
.secretAccessKey("secret-key")
.sessionToken("session-token")
.build();
byte[] bytes = creds.toBytes();
assertThat(new String(bytes, StandardCharsets.UTF_8))
.isEqualTo("{\"accessKeyId\":\"access-key\","
+ "\"secretAccessKey\":\"secret-key\","
+ "\"sessionToken\":\"session-token\"}");
}

@Test
public void testDeserialize() {
String serializedString = "{\"accessKeyId\":\"access-key\","
+ "\"secretAccessKey\":\"secret-key\","
+ "\"sessionToken\":\"session-token\"}";

SparkShuffleAwsCredentials creds =
SparkShuffleAwsCredentials.fromBytes(serializedString.getBytes(StandardCharsets.UTF_8));
}
}
41 changes: 41 additions & 0 deletions async-shuffle-upload/immutables/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>3.0.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<artifactId>spark-async-shuffle-upload-immutables</artifactId>
<properties>
<sbt.project.name>spark-async-shuffle-upload-immutables</sbt.project.name>
</properties>
<packaging>jar</packaging>
<name>Spark Project Async Shuffle Upload Immutables</name>
<url>http://spark.apache.org/</url>
<dependencies>
<dependency>
<groupId>org.immutables</groupId>
<artifactId>value</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.spark.shuffle.async.immutables;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.immutables.value.Value;

@Target({ElementType.PACKAGE, ElementType.TYPE})
@Retention(RetentionPolicy.SOURCE)
@Value.Style(
visibility = Value.Style.ImplementationVisibility.PACKAGE,
overshadowImplementation = true,
jdkOnly = true,
get = {"get*", "is*"})
public @interface ImmutablesStyle {}
9 changes: 4 additions & 5 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -388,14 +388,13 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.palantir.safe-logging</groupId>
<artifactId>safe-logging</artifactId>
<version>1.5.1</version>
<groupId>org.apache.commons</groupId>
<artifactId>commons-crypto</artifactId>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-crypto</artifactId>
<groupId>com.palantir.safe-logging</groupId>
<artifactId>safe-logging</artifactId>
</dependency>

</dependencies>
Expand Down
31 changes: 31 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@
<module>sql/hive</module>
<module>streaming</module>
<module>tools</module>

<!-- Async shuffle upload modules-->
<module>async-shuffle-upload/immutables</module>
<module>async-shuffle-upload/async-shuffle-upload-api</module>
<!-- See additional modules enabled by profiles below -->
</modules>

Expand Down Expand Up @@ -216,6 +220,9 @@
-->
<arrow.version>0.12.0</arrow.version>

<!-- Async shuffle upload plugin dependency versions -->
<safe-logging.version>1.13.0</safe-logging.version>

<test.java.home>${java.home}</test.java.home>
<test.exclude.tags></test.exclude.tags>
<test.include.tags></test.include.tags>
Expand Down Expand Up @@ -2305,6 +2312,30 @@
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>

<dependency>
<groupId>com.palantir.safe-logging</groupId>
<artifactId>safe-logging</artifactId>
<version>${safe-logging.version}</version>
</dependency>

<!-- Dependencies required for the async shuffle upload plugin. -->
<dependency>
<groupId>org.immutables</groupId>
<artifactId>value</artifactId>
<version>2.8.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.palantir.safe-logging</groupId>
<artifactId>preconditions</artifactId>
<version>${safe-logging.version}</version>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.15.0</version>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down