Skip to content

Commit af31335

Browse files
committed
[SPARK-8319] [CORE] [SQL] Update logic related to key orderings in shuffle dependencies
This patch updates two pieces of logic that are related to handling of keyOrderings in ShuffleDependencies: - The Tungsten ShuffleManager falls back to regular SortShuffleManager whenever the shuffle dependency specifies a key ordering, but technically we only need to fall back when an aggregator is also specified. This patch updates the fallback logic to reflect this so that the Tungsten optimizations can apply to more workloads. - The SQL Exchange operator performs defensive copying of shuffle inputs when a key ordering is specified, but this is unnecessary. The copying was added to guard against cases where ExternalSorter would buffer non-serialized records in memory. When ExternalSorter is configured without an aggregator, it uses the following logic to determine whether to buffer records in a serialized or deserialized format: ```scala private val useSerializedPairBuffer = ordering.isEmpty && conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true) && ser.supportsRelocationOfSerializedObjects ``` The `newOrdering.isDefined` branch in `ExternalSorter.needToCopyObjectsBeforeShuffle`, removed by this patch, is not necessary: - It was checked even if we weren't using sort-based shuffle, but this was unnecessary because only SortShuffleManager performs map-side sorting. - Map-side sorting during shuffle writing is only performed for shuffles that perform map-side aggregation as part of the shuffle (to see this, look at how SortShuffleWriter constructs ExternalSorter). Since SQL never pushes aggregation into Spark's shuffle, we can guarantee that both the aggregator and ordering will be empty and Spark SQL always uses serializers that support relocation, so sort-shuffle will use the serialized pair buffer unless the user has explicitly disabled it via the SparkConf feature-flag. Therefore, I think my optimization in Exchange should be safe. Author: Josh Rosen <[email protected]> Closes apache#6773 from JoshRosen/SPARK-8319 and squashes the following commits: 7a14129 [Josh Rosen] Revise comments; add handler to guard against future ShuffleManager implementations 07bb2c9 [Josh Rosen] Update comment to clarify circumstances under which shuffle operates on serialized records 269089a [Josh Rosen] Avoid unnecessary copy in SQL Exchange 34e526e [Josh Rosen] Enable Tungsten shuffle for non-agg shuffles w/ key orderings
1 parent ce1041c commit af31335

File tree

3 files changed

+21
-20
lines changed

3 files changed

+21
-20
lines changed

core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,6 @@ private[spark] object UnsafeShuffleManager extends Logging {
5656
} else if (dependency.aggregator.isDefined) {
5757
log.debug(s"Can't use UnsafeShuffle for shuffle $shufId because an aggregator is defined")
5858
false
59-
} else if (dependency.keyOrdering.isDefined) {
60-
log.debug(s"Can't use UnsafeShuffle for shuffle $shufId because a key ordering is defined")
61-
false
6259
} else if (dependency.partitioner.numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS) {
6360
log.debug(s"Can't use UnsafeShuffle for shuffle $shufId because it has more than " +
6461
s"$MAX_SHUFFLE_OUTPUT_PARTITIONS partitions")

core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManagerSuite.scala

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,15 @@ class UnsafeShuffleManagerSuite extends SparkFunSuite with Matchers {
7676
mapSideCombine = false
7777
)))
7878

79+
// Shuffles with key orderings are supported as long as no aggregator is specified
80+
assert(canUseUnsafeShuffle(shuffleDep(
81+
partitioner = new HashPartitioner(2),
82+
serializer = kryo,
83+
keyOrdering = Some(mock(classOf[Ordering[Any]])),
84+
aggregator = None,
85+
mapSideCombine = false
86+
)))
87+
7988
}
8089

8190
test("unsupported shuffle dependencies") {
@@ -100,22 +109,14 @@ class UnsafeShuffleManagerSuite extends SparkFunSuite with Matchers {
100109
mapSideCombine = false
101110
)))
102111

103-
// We do not support shuffles that perform any kind of aggregation or sorting of keys
104-
assert(!canUseUnsafeShuffle(shuffleDep(
105-
partitioner = new HashPartitioner(2),
106-
serializer = kryo,
107-
keyOrdering = Some(mock(classOf[Ordering[Any]])),
108-
aggregator = None,
109-
mapSideCombine = false
110-
)))
112+
// We do not support shuffles that perform aggregation
111113
assert(!canUseUnsafeShuffle(shuffleDep(
112114
partitioner = new HashPartitioner(2),
113115
serializer = kryo,
114116
keyOrdering = None,
115117
aggregator = Some(mock(classOf[Aggregator[Any, Any, Any]])),
116118
mapSideCombine = false
117119
)))
118-
// We do not support shuffles that perform any kind of aggregation or sorting of keys
119120
assert(!canUseUnsafeShuffle(shuffleDep(
120121
partitioner = new HashPartitioner(2),
121122
serializer = kryo,

sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution
2020
import org.apache.spark.annotation.DeveloperApi
2121
import org.apache.spark.rdd.{RDD, ShuffledRDD}
2222
import org.apache.spark.serializer.Serializer
23+
import org.apache.spark.shuffle.hash.HashShuffleManager
2324
import org.apache.spark.shuffle.sort.SortShuffleManager
2425
import org.apache.spark.shuffle.unsafe.UnsafeShuffleManager
2526
import org.apache.spark.sql.SQLContext
@@ -81,11 +82,7 @@ case class Exchange(
8182
shuffleManager.isInstanceOf[UnsafeShuffleManager]
8283
val bypassMergeThreshold = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
8384
val serializeMapOutputs = conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true)
84-
if (newOrdering.nonEmpty) {
85-
// If a new ordering is required, then records will be sorted with Spark's `ExternalSorter`,
86-
// which requires a defensive copy.
87-
true
88-
} else if (sortBasedShuffleOn) {
85+
if (sortBasedShuffleOn) {
8986
val bypassIsSupported = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]
9087
if (bypassIsSupported && partitioner.numPartitions <= bypassMergeThreshold) {
9188
// If we're using the original SortShuffleManager and the number of output partitions is
@@ -96,8 +93,11 @@ case class Exchange(
9693
} else if (serializeMapOutputs && serializer.supportsRelocationOfSerializedObjects) {
9794
// SPARK-4550 extended sort-based shuffle to serialize individual records prior to sorting
9895
// them. This optimization is guarded by a feature-flag and is only applied in cases where
99-
// shuffle dependency does not specify an ordering and the record serializer has certain
100-
// properties. If this optimization is enabled, we can safely avoid the copy.
96+
// shuffle dependency does not specify an aggregator or ordering and the record serializer
97+
// has certain properties. If this optimization is enabled, we can safely avoid the copy.
98+
//
99+
// Exchange never configures its ShuffledRDDs with aggregators or key orderings, so we only
100+
// need to check whether the optimization is enabled and supported by our serializer.
101101
//
102102
// This optimization also applies to UnsafeShuffleManager (added in SPARK-7081).
103103
false
@@ -108,9 +108,12 @@ case class Exchange(
108108
// both cases, we must copy.
109109
true
110110
}
111-
} else {
111+
} else if (shuffleManager.isInstanceOf[HashShuffleManager]) {
112112
// We're using hash-based shuffle, so we don't need to copy.
113113
false
114+
} else {
115+
// Catch-all case to safely handle any future ShuffleManager implementations.
116+
true
114117
}
115118
}
116119

0 commit comments

Comments
 (0)