Skip to content

Commit 15e0d2b

Browse files
committed
[SPARK-6765] Fix test code style for streaming.
So we can turn style checker on for test code. Author: Reynold Xin <[email protected]> Closes apache#5409 from rxin/test-style-streaming and squashes the following commits: 7aea69b [Reynold Xin] [SPARK-6765] Fix test code style for streaming.
1 parent 8d2a36c commit 15e0d2b

19 files changed

+115
-75
lines changed

external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,20 @@
11
/*
2-
* Licensed to the Apache Software Foundation (ASF) under one
3-
* or more contributor license agreements. See the NOTICE file
4-
* distributed with this work for additional information
5-
* regarding copyright ownership. The ASF licenses this file
6-
* to you under the Apache License, Version 2.0 (the
7-
* "License"); you may not use this file except in compliance
8-
* with the License. You may obtain a copy of the License at
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
98
*
10-
* http://www.apache.org/licenses/LICENSE-2.0
9+
* http://www.apache.org/licenses/LICENSE-2.0
1110
*
12-
* Unless required by applicable law or agreed to in writing,
13-
* software distributed under the License is distributed on an
14-
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15-
* KIND, either express or implied. See the License for the
16-
* specific language governing permissions and limitations
17-
* under the License.
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
1816
*/
17+
1918
package org.apache.spark.streaming.flume
2019

2120
import java.net.InetSocketAddress
@@ -213,7 +212,7 @@ class FlumePollingStreamSuite extends FunSuite with BeforeAndAfter with Logging
213212
assert(counter === totalEventsPerChannel * channels.size)
214213
}
215214

216-
def assertChannelIsEmpty(channel: MemoryChannel) = {
215+
def assertChannelIsEmpty(channel: MemoryChannel): Unit = {
217216
val queueRemaining = channel.getClass.getDeclaredField("queueRemaining")
218217
queueRemaining.setAccessible(true)
219218
val m = queueRemaining.get(channel).getClass.getDeclaredMethod("availablePermits")

external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,9 @@ class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with L
151151
}
152152

153153
/** Class to create socket channel with compression */
154-
private class CompressionChannelFactory(compressionLevel: Int) extends NioClientSocketChannelFactory {
154+
private class CompressionChannelFactory(compressionLevel: Int)
155+
extends NioClientSocketChannelFactory {
156+
155157
override def newChannel(pipeline: ChannelPipeline): SocketChannel = {
156158
val encoder = new ZlibEncoder(compressionLevel)
157159
pipeline.addFirst("deflater", encoder)

external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,8 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
139139
msgTopic.publish(message)
140140
} catch {
141141
case e: MqttException if e.getReasonCode == MqttException.REASON_CODE_MAX_INFLIGHT =>
142-
Thread.sleep(50) // wait for Spark streaming to consume something from the message queue
142+
// wait for Spark streaming to consume something from the message queue
143+
Thread.sleep(50)
143144
}
144145
}
145146
}

streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,9 @@ class BasicOperationsSuite extends TestSuiteBase {
171171
test("flatMapValues") {
172172
testOperation(
173173
Seq( Seq("a", "a", "b"), Seq("", ""), Seq() ),
174-
(s: DStream[String]) => s.map(x => (x, 1)).reduceByKey(_ + _).flatMapValues(x => Seq(x, x + 10)),
174+
(s: DStream[String]) => {
175+
s.map(x => (x, 1)).reduceByKey(_ + _).flatMapValues(x => Seq(x, x + 10))
176+
},
175177
Seq( Seq(("a", 2), ("a", 12), ("b", 1), ("b", 11)), Seq(("", 2), ("", 12)), Seq() ),
176178
true
177179
)
@@ -474,7 +476,7 @@ class BasicOperationsSuite extends TestSuiteBase {
474476
stream.foreachRDD(_ => {}) // Dummy output stream
475477
ssc.start()
476478
Thread.sleep(2000)
477-
def getInputFromSlice(fromMillis: Long, toMillis: Long) = {
479+
def getInputFromSlice(fromMillis: Long, toMillis: Long): Set[Int] = {
478480
stream.slice(new Time(fromMillis), new Time(toMillis)).flatMap(_.collect()).toSet
479481
}
480482

streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ class CheckpointSuite extends TestSuiteBase {
4343

4444
var ssc: StreamingContext = null
4545

46-
override def batchDuration = Milliseconds(500)
46+
override def batchDuration: Duration = Milliseconds(500)
4747

4848
override def beforeFunction() {
4949
super.beforeFunction()
@@ -72,7 +72,7 @@ class CheckpointSuite extends TestSuiteBase {
7272
val input = (1 to 10).map(_ => Seq("a")).toSeq
7373
val operation = (st: DStream[String]) => {
7474
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
75-
Some((values.sum + state.getOrElse(0)))
75+
Some(values.sum + state.getOrElse(0))
7676
}
7777
st.map(x => (x, 1))
7878
.updateStateByKey(updateFunc)
@@ -199,7 +199,12 @@ class CheckpointSuite extends TestSuiteBase {
199199
testCheckpointedOperation(
200200
Seq( Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), Seq("", ""), Seq() ),
201201
(s: DStream[String]) => s.map(x => (x, 1)).reduceByKey(_ + _),
202-
Seq( Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq() ),
202+
Seq(
203+
Seq(("a", 2), ("b", 1)),
204+
Seq(("", 2)),
205+
Seq(),
206+
Seq(("a", 2), ("b", 1)),
207+
Seq(("", 2)), Seq() ),
203208
3
204209
)
205210
}
@@ -212,7 +217,8 @@ class CheckpointSuite extends TestSuiteBase {
212217
val n = 10
213218
val w = 4
214219
val input = (1 to n).map(_ => Seq("a")).toSeq
215-
val output = Seq(Seq(("a", 1)), Seq(("a", 2)), Seq(("a", 3))) ++ (1 to (n - w + 1)).map(x => Seq(("a", 4)))
220+
val output = Seq(
221+
Seq(("a", 1)), Seq(("a", 2)), Seq(("a", 3))) ++ (1 to (n - w + 1)).map(x => Seq(("a", 4)))
216222
val operation = (st: DStream[String]) => {
217223
st.map(x => (x, 1))
218224
.reduceByKeyAndWindow(_ + _, _ - _, batchDuration * w, batchDuration)
@@ -236,7 +242,13 @@ class CheckpointSuite extends TestSuiteBase {
236242
classOf[TextOutputFormat[Text, IntWritable]])
237243
output
238244
},
239-
Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq()),
245+
Seq(
246+
Seq(("a", 2), ("b", 1)),
247+
Seq(("", 2)),
248+
Seq(),
249+
Seq(("a", 2), ("b", 1)),
250+
Seq(("", 2)),
251+
Seq()),
240252
3
241253
)
242254
} finally {
@@ -259,7 +271,13 @@ class CheckpointSuite extends TestSuiteBase {
259271
classOf[NewTextOutputFormat[Text, IntWritable]])
260272
output
261273
},
262-
Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq()),
274+
Seq(
275+
Seq(("a", 2), ("b", 1)),
276+
Seq(("", 2)),
277+
Seq(),
278+
Seq(("a", 2), ("b", 1)),
279+
Seq(("", 2)),
280+
Seq()),
263281
3
264282
)
265283
} finally {
@@ -298,7 +316,13 @@ class CheckpointSuite extends TestSuiteBase {
298316
output
299317
}
300318
},
301-
Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq()),
319+
Seq(
320+
Seq(("a", 2), ("b", 1)),
321+
Seq(("", 2)),
322+
Seq(),
323+
Seq(("a", 2), ("b", 1)),
324+
Seq(("", 2)),
325+
Seq()),
302326
3
303327
)
304328
} finally {
@@ -533,7 +557,8 @@ class CheckpointSuite extends TestSuiteBase {
533557
* Advances the manual clock on the streaming scheduler by given number of batches.
534558
* It also waits for the expected amount of time for each batch.
535559
*/
536-
def advanceTimeWithRealDelay[V: ClassTag](ssc: StreamingContext, numBatches: Long): Seq[Seq[V]] = {
560+
def advanceTimeWithRealDelay[V: ClassTag](ssc: StreamingContext, numBatches: Long): Seq[Seq[V]] =
561+
{
537562
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
538563
logInfo("Manual clock before advancing = " + clock.getTimeMillis())
539564
for (i <- 1 to numBatches.toInt) {
@@ -543,7 +568,7 @@ class CheckpointSuite extends TestSuiteBase {
543568
logInfo("Manual clock after advancing = " + clock.getTimeMillis())
544569
Thread.sleep(batchDuration.milliseconds)
545570

546-
val outputStream = ssc.graph.getOutputStreams.filter { dstream =>
571+
val outputStream = ssc.graph.getOutputStreams().filter { dstream =>
547572
dstream.isInstanceOf[TestOutputStreamWithPartitions[V]]
548573
}.head.asInstanceOf[TestOutputStreamWithPartitions[V]]
549574
outputStream.output.map(_.flatten)
@@ -552,4 +577,4 @@ class CheckpointSuite extends TestSuiteBase {
552577

553578
private object CheckpointSuite extends Serializable {
554579
var batchThreeShouldBlockIndefinitely: Boolean = true
555-
}
580+
}

streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@ class FailureSuite extends TestSuiteBase with Logging {
2929
val directory = Utils.createTempDir()
3030
val numBatches = 30
3131

32-
override def batchDuration = Milliseconds(1000)
32+
override def batchDuration: Duration = Milliseconds(1000)
3333

34-
override def useManualClock = false
34+
override def useManualClock: Boolean = false
3535

3636
override def afterFunction() {
3737
Utils.deleteRecursively(directory)

streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
5252
"localhost", testServer.port, StorageLevel.MEMORY_AND_DISK)
5353
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
5454
val outputStream = new TestOutputStream(networkStream, outputBuffer)
55-
def output = outputBuffer.flatMap(x => x)
55+
def output: ArrayBuffer[String] = outputBuffer.flatMap(x => x)
5656
outputStream.register()
5757
ssc.start()
5858

@@ -164,7 +164,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
164164
val countStream = networkStream.count
165165
val outputBuffer = new ArrayBuffer[Seq[Long]] with SynchronizedBuffer[Seq[Long]]
166166
val outputStream = new TestOutputStream(countStream, outputBuffer)
167-
def output = outputBuffer.flatMap(x => x)
167+
def output: ArrayBuffer[Long] = outputBuffer.flatMap(x => x)
168168
outputStream.register()
169169
ssc.start()
170170

@@ -196,15 +196,15 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
196196
val queueStream = ssc.queueStream(queue, oneAtATime = true)
197197
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
198198
val outputStream = new TestOutputStream(queueStream, outputBuffer)
199-
def output = outputBuffer.filter(_.size > 0)
199+
def output: ArrayBuffer[Seq[String]] = outputBuffer.filter(_.size > 0)
200200
outputStream.register()
201201
ssc.start()
202202

203203
// Setup data queued into the stream
204204
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
205205
val input = Seq("1", "2", "3", "4", "5")
206206
val expectedOutput = input.map(Seq(_))
207-
//Thread.sleep(1000)
207+
208208
val inputIterator = input.toIterator
209209
for (i <- 0 until input.size) {
210210
// Enqueue more than 1 item per tick but they should dequeue one at a time
@@ -239,7 +239,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
239239
val queueStream = ssc.queueStream(queue, oneAtATime = false)
240240
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
241241
val outputStream = new TestOutputStream(queueStream, outputBuffer)
242-
def output = outputBuffer.filter(_.size > 0)
242+
def output: ArrayBuffer[Seq[String]] = outputBuffer.filter(_.size > 0)
243243
outputStream.register()
244244
ssc.start()
245245

@@ -352,7 +352,8 @@ class TestServer(portToBind: Int = 0) extends Logging {
352352
logInfo("New connection")
353353
try {
354354
clientSocket.setTcpNoDelay(true)
355-
val outputStream = new BufferedWriter(new OutputStreamWriter(clientSocket.getOutputStream))
355+
val outputStream = new BufferedWriter(
356+
new OutputStreamWriter(clientSocket.getOutputStream))
356357

357358
while(clientSocket.isConnected) {
358359
val msg = queue.poll(100, TimeUnit.MILLISECONDS)
@@ -384,7 +385,7 @@ class TestServer(portToBind: Int = 0) extends Logging {
384385

385386
def stop() { servingThread.interrupt() }
386387

387-
def port = serverSocket.getLocalPort
388+
def port: Int = serverSocket.getLocalPort
388389
}
389390

390391
/** This is a receiver to test multiple threads inserting data using block generator */

streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche
9696
testBlockStoring(handler) { case (data, blockIds, storeResults) =>
9797
// Verify the data in block manager is correct
9898
val storedData = blockIds.flatMap { blockId =>
99-
blockManager.getLocal(blockId).map { _.data.map {_.toString}.toList }.getOrElse(List.empty)
99+
blockManager.getLocal(blockId).map(_.data.map(_.toString).toList).getOrElse(List.empty)
100100
}.toList
101101
storedData shouldEqual data
102102

@@ -120,7 +120,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche
120120
testBlockStoring(handler) { case (data, blockIds, storeResults) =>
121121
// Verify the data in block manager is correct
122122
val storedData = blockIds.flatMap { blockId =>
123-
blockManager.getLocal(blockId).map { _.data.map {_.toString}.toList }.getOrElse(List.empty)
123+
blockManager.getLocal(blockId).map(_.data.map(_.toString).toList).getOrElse(List.empty)
124124
}.toList
125125
storedData shouldEqual data
126126

streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,8 @@ class ReceivedBlockTrackerSuite
228228
* Get all the data written in the given write ahead log files. By default, it will read all
229229
* files in the test log directory.
230230
*/
231-
def getWrittenLogData(logFiles: Seq[String] = getWriteAheadLogFiles): Seq[ReceivedBlockTrackerLogEvent] = {
231+
def getWrittenLogData(logFiles: Seq[String] = getWriteAheadLogFiles)
232+
: Seq[ReceivedBlockTrackerLogEvent] = {
232233
logFiles.flatMap {
233234
file => new WriteAheadLogReader(file, hadoopConf).toSeq
234235
}.map { byteBuffer =>
@@ -244,7 +245,8 @@ class ReceivedBlockTrackerSuite
244245
}
245246

246247
/** Create batch allocation object from the given info */
247-
def createBatchAllocation(time: Long, blockInfos: Seq[ReceivedBlockInfo]): BatchAllocationEvent = {
248+
def createBatchAllocation(time: Long, blockInfos: Seq[ReceivedBlockInfo])
249+
: BatchAllocationEvent = {
248250
BatchAllocationEvent(time, AllocatedBlocks(Map((streamId -> blockInfos))))
249251
}
250252

streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
308308
val errors = new ArrayBuffer[Throwable]
309309

310310
/** Check if all data structures are clean */
311-
def isAllEmpty = {
311+
def isAllEmpty: Boolean = {
312312
singles.isEmpty && byteBuffers.isEmpty && iterators.isEmpty &&
313313
arrayBuffers.isEmpty && errors.isEmpty
314314
}
@@ -320,24 +320,21 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
320320
def pushBytes(
321321
bytes: ByteBuffer,
322322
optionalMetadata: Option[Any],
323-
optionalBlockId: Option[StreamBlockId]
324-
) {
323+
optionalBlockId: Option[StreamBlockId]) {
325324
byteBuffers += bytes
326325
}
327326

328327
def pushIterator(
329328
iterator: Iterator[_],
330329
optionalMetadata: Option[Any],
331-
optionalBlockId: Option[StreamBlockId]
332-
) {
330+
optionalBlockId: Option[StreamBlockId]) {
333331
iterators += iterator
334332
}
335333

336334
def pushArrayBuffer(
337335
arrayBuffer: ArrayBuffer[_],
338336
optionalMetadata: Option[Any],
339-
optionalBlockId: Option[StreamBlockId]
340-
) {
337+
optionalBlockId: Option[StreamBlockId]) {
341338
arrayBuffers += arrayBuffer
342339
}
343340

0 commit comments

Comments
 (0)