|
17 | 17 |
|
18 | 18 | package org.apache.spark.streaming.flume
|
19 | 19 |
|
20 |
| -import scala.collection.JavaConversions._ |
21 |
| -import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} |
22 |
| - |
23 |
| -import java.net.InetSocketAddress |
| 20 | +import java.net.{InetSocketAddress, ServerSocket} |
24 | 21 | import java.nio.ByteBuffer
|
25 | 22 | import java.nio.charset.Charset
|
26 | 23 |
|
| 24 | +import scala.collection.JavaConversions._ |
| 25 | +import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} |
| 26 | +import scala.concurrent.duration._ |
| 27 | +import scala.language.postfixOps |
| 28 | + |
27 | 29 | import org.apache.avro.ipc.NettyTransceiver
|
28 | 30 | import org.apache.avro.ipc.specific.SpecificRequestor
|
| 31 | +import org.apache.flume.source.avro |
29 | 32 | import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol}
|
| 33 | +import org.jboss.netty.channel.ChannelPipeline |
| 34 | +import org.jboss.netty.channel.socket.SocketChannel |
| 35 | +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory |
| 36 | +import org.jboss.netty.handler.codec.compression._ |
| 37 | +import org.scalatest.{BeforeAndAfter, FunSuite, Matchers} |
| 38 | +import org.scalatest.concurrent.Eventually._ |
30 | 39 |
|
| 40 | +import org.apache.spark.{Logging, SparkConf} |
31 | 41 | import org.apache.spark.storage.StorageLevel
|
32 |
| -import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuiteBase} |
33 |
| -import org.apache.spark.streaming.util.ManualClock |
| 42 | +import org.apache.spark.streaming.{Milliseconds, StreamingContext, TestOutputStream} |
| 43 | +import org.apache.spark.streaming.scheduler.{StreamingListener, StreamingListenerReceiverStarted} |
34 | 44 | import org.apache.spark.util.Utils
|
35 | 45 |
|
36 |
| -import org.jboss.netty.channel.ChannelPipeline |
37 |
| -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory |
38 |
| -import org.jboss.netty.channel.socket.SocketChannel |
39 |
| -import org.jboss.netty.handler.codec.compression._ |
| 46 | +class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with Logging { |
| 47 | + val conf = new SparkConf().setMaster("local[4]").setAppName("FlumeStreamSuite") |
| 48 | + |
| 49 | + var ssc: StreamingContext = null |
| 50 | + var transceiver: NettyTransceiver = null |
40 | 51 |
|
41 |
| -class FlumeStreamSuite extends TestSuiteBase { |
| 52 | + after { |
| 53 | + if (ssc != null) { |
| 54 | + ssc.stop() |
| 55 | + } |
| 56 | + if (transceiver != null) { |
| 57 | + transceiver.close() |
| 58 | + } |
| 59 | + } |
42 | 60 |
|
43 | 61 | test("flume input stream") {
|
44 |
| - runFlumeStreamTest(false) |
| 62 | + testFlumeStream(testCompression = false) |
45 | 63 | }
|
46 | 64 |
|
47 | 65 | test("flume input compressed stream") {
|
48 |
| - runFlumeStreamTest(true) |
| 66 | + testFlumeStream(testCompression = true) |
| 67 | + } |
| 68 | + |
| 69 | + /** Run test on flume stream */ |
| 70 | + private def testFlumeStream(testCompression: Boolean): Unit = { |
| 71 | + val input = (1 to 100).map { _.toString } |
| 72 | + val testPort = findFreePort() |
| 73 | + val outputBuffer = startContext(testPort, testCompression) |
| 74 | + writeAndVerify(input, testPort, outputBuffer, testCompression) |
| 75 | + } |
| 76 | + |
| 77 | + /** Find a free port */ |
| 78 | + private def findFreePort(): Int = { |
| 79 | + Utils.startServiceOnPort(23456, (trialPort: Int) => { |
| 80 | + val socket = new ServerSocket(trialPort) |
| 81 | + socket.close() |
| 82 | + (null, trialPort) |
| 83 | + })._2 |
49 | 84 | }
|
50 |
| - |
51 |
| - def runFlumeStreamTest(enableDecompression: Boolean) { |
52 |
| - // Set up the streaming context and input streams |
53 |
| - val ssc = new StreamingContext(conf, batchDuration) |
54 |
| - val (flumeStream, testPort) = |
55 |
| - Utils.startServiceOnPort(9997, (trialPort: Int) => { |
56 |
| - val dstream = FlumeUtils.createStream( |
57 |
| - ssc, "localhost", trialPort, StorageLevel.MEMORY_AND_DISK, enableDecompression) |
58 |
| - (dstream, trialPort) |
59 |
| - }) |
60 | 85 |
|
| 86 | + /** Setup and start the streaming context */ |
| 87 | + private def startContext( |
| 88 | + testPort: Int, testCompression: Boolean): (ArrayBuffer[Seq[SparkFlumeEvent]]) = { |
| 89 | + ssc = new StreamingContext(conf, Milliseconds(200)) |
| 90 | + val flumeStream = FlumeUtils.createStream( |
| 91 | + ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK, testCompression) |
61 | 92 | val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
|
62 | 93 | with SynchronizedBuffer[Seq[SparkFlumeEvent]]
|
63 | 94 | val outputStream = new TestOutputStream(flumeStream, outputBuffer)
|
64 | 95 | outputStream.register()
|
65 | 96 | ssc.start()
|
| 97 | + outputBuffer |
| 98 | + } |
66 | 99 |
|
67 |
| - val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] |
68 |
| - val input = Seq(1, 2, 3, 4, 5) |
69 |
| - Thread.sleep(1000) |
70 |
| - val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort)) |
71 |
| - var client: AvroSourceProtocol = null |
72 |
| - |
73 |
| - if (enableDecompression) { |
74 |
| - client = SpecificRequestor.getClient( |
75 |
| - classOf[AvroSourceProtocol], |
76 |
| - new NettyTransceiver(new InetSocketAddress("localhost", testPort), |
77 |
| - new CompressionChannelFactory(6))) |
78 |
| - } else { |
79 |
| - client = SpecificRequestor.getClient( |
80 |
| - classOf[AvroSourceProtocol], transceiver) |
81 |
| - } |
| 100 | + /** Send data to the flume receiver and verify whether the data was received */ |
| 101 | + private def writeAndVerify( |
| 102 | + input: Seq[String], |
| 103 | + testPort: Int, |
| 104 | + outputBuffer: ArrayBuffer[Seq[SparkFlumeEvent]], |
| 105 | + enableCompression: Boolean |
| 106 | + ) { |
| 107 | + val testAddress = new InetSocketAddress("localhost", testPort) |
82 | 108 |
|
83 |
| - for (i <- 0 until input.size) { |
| 109 | + val inputEvents = input.map { item => |
84 | 110 | val event = new AvroFlumeEvent
|
85 |
| - event.setBody(ByteBuffer.wrap(input(i).toString.getBytes("utf-8"))) |
| 111 | + event.setBody(ByteBuffer.wrap(item.getBytes("UTF-8"))) |
86 | 112 | event.setHeaders(Map[CharSequence, CharSequence]("test" -> "header"))
|
87 |
| - client.append(event) |
88 |
| - Thread.sleep(500) |
89 |
| - clock.addToTime(batchDuration.milliseconds) |
| 113 | + event |
90 | 114 | }
|
91 | 115 |
|
92 |
| - Thread.sleep(1000) |
93 |
| - |
94 |
| - val startTime = System.currentTimeMillis() |
95 |
| - while (outputBuffer.size < input.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { |
96 |
| - logInfo("output.size = " + outputBuffer.size + ", input.size = " + input.size) |
97 |
| - Thread.sleep(100) |
| 116 | + eventually(timeout(10 seconds), interval(100 milliseconds)) { |
| 117 | + // if last attempted transceiver had succeeded, close it |
| 118 | + if (transceiver != null) { |
| 119 | + transceiver.close() |
| 120 | + transceiver = null |
| 121 | + } |
| 122 | + |
| 123 | + // Create transceiver |
| 124 | + transceiver = { |
| 125 | + if (enableCompression) { |
| 126 | + new NettyTransceiver(testAddress, new CompressionChannelFactory(6)) |
| 127 | + } else { |
| 128 | + new NettyTransceiver(testAddress) |
| 129 | + } |
| 130 | + } |
| 131 | + |
| 132 | + // Create Avro client with the transceiver |
| 133 | + val client = SpecificRequestor.getClient(classOf[AvroSourceProtocol], transceiver) |
| 134 | + client should not be null |
| 135 | + |
| 136 | + // Send data |
| 137 | + val status = client.appendBatch(inputEvents.toList) |
| 138 | + status should be (avro.Status.OK) |
98 | 139 | }
|
99 |
| - Thread.sleep(1000) |
100 |
| - val timeTaken = System.currentTimeMillis() - startTime |
101 |
| - assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms") |
102 |
| - logInfo("Stopping context") |
103 |
| - ssc.stop() |
104 |
| - |
105 |
| - val decoder = Charset.forName("UTF-8").newDecoder() |
106 |
| - |
107 |
| - assert(outputBuffer.size === input.length) |
108 |
| - for (i <- 0 until outputBuffer.size) { |
109 |
| - assert(outputBuffer(i).size === 1) |
110 |
| - val str = decoder.decode(outputBuffer(i).head.event.getBody) |
111 |
| - assert(str.toString === input(i).toString) |
112 |
| - assert(outputBuffer(i).head.event.getHeaders.get("test") === "header") |
| 140 | + |
| 141 | + val decoder = Charset.forName("UTF-8").newDecoder() |
| 142 | + eventually(timeout(10 seconds), interval(100 milliseconds)) { |
| 143 | + val outputEvents = outputBuffer.flatten.map { _.event } |
| 144 | + outputEvents.foreach { |
| 145 | + event => |
| 146 | + event.getHeaders.get("test") should be("header") |
| 147 | + } |
| 148 | + val output = outputEvents.map(event => decoder.decode(event.getBody()).toString) |
| 149 | + output should be (input) |
113 | 150 | }
|
114 | 151 | }
|
115 | 152 |
|
116 |
| - class CompressionChannelFactory(compressionLevel: Int) extends NioClientSocketChannelFactory { |
| 153 | + /** Class to create socket channel with compression */ |
| 154 | + private class CompressionChannelFactory(compressionLevel: Int) extends NioClientSocketChannelFactory { |
117 | 155 | override def newChannel(pipeline: ChannelPipeline): SocketChannel = {
|
118 | 156 | val encoder = new ZlibEncoder(compressionLevel)
|
119 | 157 | pipeline.addFirst("deflater", encoder)
|
|
0 commit comments