@@ -65,13 +65,21 @@ private[spark] class HashShuffleReader[K, C](
65
65
readMetrics.incRecordsRead(1 )
66
66
delegate.next()
67
67
}
68
- }. asInstanceOf [ Iterator [ Nothing ]]
68
+ }
69
69
70
70
val aggregatedIter : Iterator [Product2 [K , C ]] = if (dep.aggregator.isDefined) {
71
71
if (dep.mapSideCombine) {
72
- new InterruptibleIterator (context, dep.aggregator.get.combineCombinersByKey(iter, context))
72
+ // We are reading values that are already combined
73
+ val combinedKeyValuesIterator = iter.asInstanceOf [Iterator [(K ,C )]]
74
+ new InterruptibleIterator (context,
75
+ dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context))
73
76
} else {
74
- new InterruptibleIterator (context, dep.aggregator.get.combineValuesByKey(iter, context))
77
+ // We don't know the value type, but also don't care -- the dependency *should*
78
+ // have made sure its compatible w/ this aggregator, which will convert the value
79
+ // type to the combined type C
80
+ val keyValuesIterator = iter.asInstanceOf [Iterator [(K ,Nothing )]]
81
+ new InterruptibleIterator (context,
82
+ dep.aggregator.get.combineValuesByKey(keyValuesIterator, context))
75
83
}
76
84
} else {
77
85
require(! dep.mapSideCombine, " Map-side combine without Aggregator specified!" )
0 commit comments