@@ -36,25 +36,23 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
36
36
37
37
override def outputPartitioning = newPartitioning
38
38
39
- def output = child.output
39
+ override def output = child.output
40
40
41
41
/** We must copy rows when sort based shuffle is on */
42
42
protected def sortBasedShuffleOn = SparkEnv .get.shuffleManager.isInstanceOf [SortShuffleManager ]
43
43
44
- def execute () = attachTree(this , " execute" ) {
44
+ override def execute () = attachTree(this , " execute" ) {
45
45
newPartitioning match {
46
46
case HashPartitioning (expressions, numPartitions) =>
47
47
// TODO: Eliminate redundant expressions in grouping key and value.
48
- val rdd = child.execute().mapPartitions { iter =>
49
- if (sortBasedShuffleOn) {
50
- @ transient val hashExpressions =
51
- newProjection(expressions, child.output)
52
-
48
+ val rdd = if (sortBasedShuffleOn) {
49
+ child.execute().mapPartitions { iter =>
50
+ val hashExpressions = newProjection(expressions, child.output)
53
51
iter.map(r => (hashExpressions(r), r.copy()))
54
- } else {
55
- @ transient val hashExpressions =
56
- newMutableProjection(expressions, child.output)()
57
-
52
+ }
53
+ } else {
54
+ child.execute().mapPartitions { iter =>
55
+ val hashExpressions = newMutableProjection(expressions, child.output)()
58
56
val mutablePair = new MutablePair [Row , Row ]()
59
57
iter.map(r => mutablePair.update(hashExpressions(r), r))
60
58
}
@@ -65,28 +63,29 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
65
63
shuffled.map(_._2)
66
64
67
65
case RangePartitioning (sortingExpressions, numPartitions) =>
68
- // TODO: RangePartitioner should take an Ordering.
69
- implicit val ordering = new RowOrdering (sortingExpressions, child.output)
70
-
71
- val rdd = child.execute().mapPartitions { iter =>
72
- if (sortBasedShuffleOn) {
73
- iter.map(row => (row.copy(), null ))
74
- } else {
66
+ val rdd = if (sortBasedShuffleOn) {
67
+ child.execute().mapPartitions { iter => iter.map(row => (row.copy(), null ))}
68
+ } else {
69
+ child.execute().mapPartitions { iter =>
75
70
val mutablePair = new MutablePair [Row , Null ](null , null )
76
71
iter.map(row => mutablePair.update(row, null ))
77
72
}
78
73
}
74
+
75
+ // TODO: RangePartitioner should take an Ordering.
76
+ implicit val ordering = new RowOrdering (sortingExpressions, child.output)
77
+
79
78
val part = new RangePartitioner (numPartitions, rdd, ascending = true )
80
79
val shuffled = new ShuffledRDD [Row , Null , Null ](rdd, part)
81
80
shuffled.setSerializer(new SparkSqlSerializer (new SparkConf (false )))
82
81
83
82
shuffled.map(_._1)
84
83
85
84
case SinglePartition =>
86
- val rdd = child.execute().mapPartitions { iter =>
87
- if (sortBasedShuffleOn) {
88
- iter.map(r => ( null , r.copy()))
89
- } else {
85
+ val rdd = if (sortBasedShuffleOn) {
86
+ child.execute().mapPartitions { iter => iter.map(r => ( null , r.copy())) }
87
+ } else {
88
+ child.execute().mapPartitions { iter =>
90
89
val mutablePair = new MutablePair [Null , Row ]()
91
90
iter.map(r => mutablePair.update(null , r))
92
91
}
0 commit comments