@@ -108,27 +108,27 @@ class UnsafeRowSerializerSuite extends SparkFunSuite {
108
108
.set(" spark.shuffle.sort.bypassMergeThreshold" , " 0" )
109
109
.set(" spark.shuffle.memoryFraction" , " 0.0001" )
110
110
111
- sc = new SparkContext (" local" , " test" , conf)
112
- outputFile = File .createTempFile(" test-unsafe-row-serializer-spill" , " " )
113
- // prepare data
114
- val converter = unsafeRowConverter(Array (IntegerType ))
115
- val data = (1 to 1000 ).iterator.map { i =>
116
- (i, converter(Row (i)))
117
- }
118
- val sorter = new ExternalSorter [Int , UnsafeRow , UnsafeRow ](
119
- partitioner = Some (new HashPartitioner (10 )),
120
- serializer = Some (new UnsafeRowSerializer (numFields = 1 )))
121
-
122
- // Ensure we spilled something and have to merge them later
123
- assert(sorter.numSpills === 0 )
124
- sorter.insertAll(data)
125
- assert(sorter.numSpills > 0 )
126
-
127
- // Merging spilled files should not throw assertion error
128
- val taskContext =
129
- new TaskContextImpl (0 , 0 , 0 , 0 , null , null , InternalAccumulator .create(sc))
130
- taskContext.taskMetrics.shuffleWriteMetrics = Some (new ShuffleWriteMetrics )
131
- sorter.writePartitionedFile(ShuffleBlockId (0 , 0 , 0 ), taskContext, outputFile)
111
+ sc = new SparkContext (" local" , " test" , conf)
112
+ outputFile = File .createTempFile(" test-unsafe-row-serializer-spill" , " " )
113
+ // prepare data
114
+ val converter = unsafeRowConverter(Array (IntegerType ))
115
+ val data = (1 to 1000 ).iterator.map { i =>
116
+ (i, converter(Row (i)))
117
+ }
118
+ val sorter = new ExternalSorter [Int , UnsafeRow , UnsafeRow ](
119
+ partitioner = Some (new HashPartitioner (10 )),
120
+ serializer = Some (new UnsafeRowSerializer (numFields = 1 )))
121
+
122
+ // Ensure we spilled something and have to merge them later
123
+ assert(sorter.numSpills === 0 )
124
+ sorter.insertAll(data)
125
+ assert(sorter.numSpills > 0 )
126
+
127
+ // Merging spilled files should not throw assertion error
128
+ val taskContext =
129
+ new TaskContextImpl (0 , 0 , 0 , 0 , null , null , InternalAccumulator .create(sc))
130
+ taskContext.taskMetrics.shuffleWriteMetrics = Some (new ShuffleWriteMetrics )
131
+ sorter.writePartitionedFile(ShuffleBlockId (0 , 0 , 0 ), taskContext, outputFile)
132
132
} {
133
133
// Clean up
134
134
if (sc != null ) {
0 commit comments