@@ -26,23 +26,18 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionIn
26
26
import com .amazonaws .services .kinesis .clientlibrary .types .ShutdownReason
27
27
import com .amazonaws .services .kinesis .model .Record
28
28
import org .mockito .Mockito ._
29
- // scalastyle:off
30
- // To avoid introducing a dependency on Spark core tests, simply use scalatest's FunSuite
31
- // here instead of our own SparkFunSuite. Introducing the dependency has caused problems
32
- // in the past (SPARK-8781) that are complicated by bugs in the maven shade plugin (MSHADE-148).
33
- import org .scalatest .{BeforeAndAfter , FunSuite , Matchers }
29
+ import org .scalatest .{BeforeAndAfter , Matchers }
34
30
import org .scalatest .mock .MockitoSugar
35
31
36
32
import org .apache .spark .storage .StorageLevel
37
- import org .apache .spark .streaming .{Milliseconds , Seconds , StreamingContext }
33
+ import org .apache .spark .streaming .{Milliseconds , Seconds , StreamingContext , TestSuiteBase }
38
34
import org .apache .spark .util .{Clock , ManualClock , Utils }
39
35
40
36
/**
41
37
* Suite of Kinesis streaming receiver tests focusing mostly on the KinesisRecordProcessor
42
38
*/
43
- class KinesisReceiverSuite extends FunSuite with Matchers with BeforeAndAfter
44
- with MockitoSugar {
45
- // scalastyle:on
39
+ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAfter
40
+ with MockitoSugar {
46
41
47
42
val app = " TestKinesisReceiver"
48
43
val stream = " mySparkStream"
@@ -62,23 +57,24 @@ class KinesisReceiverSuite extends FunSuite with Matchers with BeforeAndAfter
62
57
var checkpointStateMock : KinesisCheckpointState = _
63
58
var currentClockMock : Clock = _
64
59
65
- before {
60
+ override def beforeFunction () : Unit = {
66
61
receiverMock = mock[KinesisReceiver ]
67
62
checkpointerMock = mock[IRecordProcessorCheckpointer ]
68
63
checkpointClockMock = mock[ManualClock ]
69
64
checkpointStateMock = mock[KinesisCheckpointState ]
70
65
currentClockMock = mock[Clock ]
71
66
}
72
67
73
- after {
68
+ override def afterFunction (): Unit = {
69
+ super .afterFunction()
74
70
// Since this suite was originally written using EasyMock, add this to preserve the old
75
71
// mocking semantics (see SPARK-5735 for more details)
76
72
verifyNoMoreInteractions(receiverMock, checkpointerMock, checkpointClockMock,
77
73
checkpointStateMock, currentClockMock)
78
74
}
79
75
80
76
test(" KinesisUtils API" ) {
81
- val ssc = new StreamingContext (" local[2] " , getClass.getSimpleName, Seconds ( 1 ) )
77
+ val ssc = new StreamingContext (master, framework, batchDuration )
82
78
// Tests the API, does not actually test data receiving
83
79
val kinesisStream1 = KinesisUtils .createStream(ssc, " mySparkStream" ,
84
80
" https://kinesis.us-west-2.amazonaws.com" , Seconds (2 ),
0 commit comments