@@ -59,14 +59,6 @@ case class Exchange(
59
59
60
60
override def output : Seq [Attribute ] = child.output
61
61
62
- private val sortBasedShuffleOn = SparkEnv .get.shuffleManager.isInstanceOf [SortShuffleManager ]
63
-
64
- private val bypassMergeThreshold =
65
- child.sqlContext.sparkContext.conf.getInt(" spark.shuffle.sort.bypassMergeThreshold" , 200 )
66
-
67
- private val serializeMapOutputs =
68
- child.sqlContext.sparkContext.conf.getBoolean(" spark.shuffle.sort.serializeMapOutputs" , true )
69
-
70
62
/**
71
63
* Determines whether records must be defensively copied before being sent to the shuffle.
72
64
* Several of Spark's shuffle components will buffer deserialized Java objects in memory. The
@@ -91,7 +83,11 @@ case class Exchange(
91
83
// Note: even though we only use the partitioner's `numPartitions` field, we require it to be
92
84
// passed instead of directly passing the number of partitions in order to guard against
93
85
// corner-cases where a partitioner constructed with `numPartitions` partitions may output
94
- // fewer partitions (like RangeParittioner, for example).
86
+ // fewer partitions (like RangePartitioner, for example).
87
+ val conf = child.sqlContext.sparkContext.conf
88
+ val sortBasedShuffleOn = SparkEnv .get.shuffleManager.isInstanceOf [SortShuffleManager ]
89
+ val bypassMergeThreshold = conf.getInt(" spark.shuffle.sort.bypassMergeThreshold" , 200 )
90
+ val serializeMapOutputs = conf.getBoolean(" spark.shuffle.sort.serializeMapOutputs" , true )
95
91
if (newOrdering.nonEmpty) {
96
92
// If a new ordering is required, then records will be sorted with Spark's `ExternalSorter`,
97
93
// which requires a defensive copy.
0 commit comments