|
20 | 20 | import java.util.List;
|
21 | 21 | import java.util.regex.Pattern;
|
22 | 22 |
|
| 23 | +import com.amazonaws.regions.RegionUtils; |
23 | 24 | import org.apache.log4j.Logger;
|
24 | 25 | import org.apache.spark.SparkConf;
|
25 | 26 | import org.apache.spark.api.java.function.FlatMapFunction;
|
|
40 | 41 | import com.google.common.collect.Lists;
|
41 | 42 |
|
42 | 43 | /**
|
43 |
| - * Java-friendly Kinesis Spark Streaming WordCount example |
| 44 | + * Consumes messages from a Amazon Kinesis streams and does wordcount. |
44 | 45 | *
|
45 |
| - * See http://spark.apache.org/docs/latest/streaming-kinesis.html for more details |
46 |
| - * on the Kinesis Spark Streaming integration. |
| 46 | + * This example spins up 1 Kinesis Receiver per shard for the given stream. |
| 47 | + * It then starts pulling from the last checkpointed sequence number of the given stream. |
47 | 48 | *
|
48 |
| - * This example spins up 1 Kinesis Worker (Spark Streaming Receiver) per shard |
49 |
| - * for the given stream. |
50 |
| - * It then starts pulling from the last checkpointed sequence number of the given |
51 |
| - * <stream-name> and <endpoint-url>. |
| 49 | + * Usage: JavaKinesisWordCountASL [app-name] [stream-name] [endpoint-url] [region-name] |
| 50 | + * [app-name] is the name of the consumer app, used to track the read data in DynamoDB |
| 51 | + * [stream-name] name of the Kinesis stream (ie. mySparkStream) |
| 52 | + * [endpoint-url] endpoint of the Kinesis service |
| 53 | + * (e.g. https://kinesis.us-east-1.amazonaws.com) |
52 | 54 | *
|
53 |
| - * Valid endpoint urls: http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region |
54 |
| - * |
55 |
| - * This code uses the DefaultAWSCredentialsProviderChain and searches for credentials |
56 |
| - * in the following order of precedence: |
57 |
| - * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY |
58 |
| - * Java System Properties - aws.accessKeyId and aws.secretKey |
59 |
| - * Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs |
60 |
| - * Instance profile credentials - delivered through the Amazon EC2 metadata service |
61 |
| - * |
62 |
| - * Usage: JavaKinesisWordCountASL <stream-name> <endpoint-url> |
63 |
| - * <stream-name> is the name of the Kinesis stream (ie. mySparkStream) |
64 |
| - * <endpoint-url> is the endpoint of the Kinesis service |
65 |
| - * (ie. https://kinesis.us-east-1.amazonaws.com) |
66 | 55 | *
|
67 | 56 | * Example:
|
68 |
| - * $ export AWS_ACCESS_KEY_ID=<your-access-key> |
| 57 | + * # export AWS keys if necessary |
| 58 | + * $ export AWS_ACCESS_KEY_ID=[your-access-key] |
69 | 59 | * $ export AWS_SECRET_KEY=<your-secret-key>
|
70 |
| - * $ $SPARK_HOME/bin/run-example \ |
71 |
| - * org.apache.spark.examples.streaming.JavaKinesisWordCountASL mySparkStream \ |
72 |
| - * https://kinesis.us-east-1.amazonaws.com |
73 | 60 | *
|
74 |
| - * Note that number of workers/threads should be 1 more than the number of receivers. |
75 |
| - * This leaves one thread available for actually processing the data. |
| 61 | + * # run the example |
| 62 | + * $ SPARK_HOME/bin/run-example streaming.JavaKinesisWordCountASL myAppName mySparkStream \ |
| 63 | + * https://kinesis.us-east-1.amazonaws.com |
| 64 | + * |
| 65 | + * There is a companion helper class called KinesisWordProducerASL which puts dummy data |
| 66 | + * onto the Kinesis stream. |
76 | 67 | *
|
77 |
| - * There is a companion helper class called KinesisWordCountProducerASL which puts dummy data |
78 |
| - * onto the Kinesis stream. |
79 |
| - * Usage instructions for KinesisWordCountProducerASL are provided in the class definition. |
| 68 | + * This code uses the DefaultAWSCredentialsProviderChain to find credentials |
| 69 | + * in the following order: |
| 70 | + * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY |
| 71 | + * Java System Properties - aws.accessKeyId and aws.secretKey |
| 72 | + * Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs |
| 73 | + * Instance profile credentials - delivered through the Amazon EC2 metadata service |
| 74 | + * For more information, see |
| 75 | + * http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html |
| 76 | + * |
| 77 | + * See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more details on |
| 78 | + * the Kinesis Spark Streaming integration. |
80 | 79 | */
|
81 | 80 | public final class JavaKinesisWordCountASL { // needs to be public for access from run-example
|
82 |
| - private static final Pattern WORD_SEPARATOR = Pattern.compile(" "); |
83 |
| - private static final Logger logger = Logger.getLogger(JavaKinesisWordCountASL.class); |
84 |
| - |
85 |
| - /* Make the constructor private to enforce singleton */ |
86 |
| - private JavaKinesisWordCountASL() { |
| 81 | + private static final Pattern WORD_SEPARATOR = Pattern.compile(" "); |
| 82 | + private static final Logger logger = Logger.getLogger(JavaKinesisWordCountASL.class); |
| 83 | + |
| 84 | + public static void main(String[] args) { |
| 85 | + // Check that all required args were passed in. |
| 86 | + if (args.length != 3) { |
| 87 | + System.err.println( |
| 88 | + "Usage: JavaKinesisWordCountASL <stream-name> <endpoint-url>\n\n" + |
| 89 | + " <app-name> is the name of the app, used to track the read data in DynamoDB\n" + |
| 90 | + " <stream-name> is the name of the Kinesis stream\n" + |
| 91 | + " <endpoint-url> is the endpoint of the Kinesis service\n" + |
| 92 | + " (e.g. https://kinesis.us-east-1.amazonaws.com)\n" + |
| 93 | + "Generate data for the Kinesis stream using the example KinesisWordProducerASL.\n" + |
| 94 | + "See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more\n" + |
| 95 | + "details.\n" |
| 96 | + ); |
| 97 | + System.exit(1); |
87 | 98 | }
|
88 | 99 |
|
89 |
| - public static void main(String[] args) { |
90 |
| - /* Check that all required args were passed in. */ |
91 |
| - if (args.length < 2) { |
92 |
| - System.err.println( |
93 |
| - "Usage: JavaKinesisWordCountASL <stream-name> <endpoint-url>\n" + |
94 |
| - " <stream-name> is the name of the Kinesis stream\n" + |
95 |
| - " <endpoint-url> is the endpoint of the Kinesis service\n" + |
96 |
| - " (e.g. https://kinesis.us-east-1.amazonaws.com)\n"); |
97 |
| - System.exit(1); |
98 |
| - } |
99 |
| - |
100 |
| - StreamingExamples.setStreamingLogLevels(); |
101 |
| - |
102 |
| - /* Populate the appropriate variables from the given args */ |
103 |
| - String streamName = args[0]; |
104 |
| - String endpointUrl = args[1]; |
105 |
| - /* Set the batch interval to a fixed 2000 millis (2 seconds) */ |
106 |
| - Duration batchInterval = new Duration(2000); |
107 |
| - |
108 |
| - /* Create a Kinesis client in order to determine the number of shards for the given stream */ |
109 |
| - AmazonKinesisClient kinesisClient = new AmazonKinesisClient( |
110 |
| - new DefaultAWSCredentialsProviderChain()); |
111 |
| - kinesisClient.setEndpoint(endpointUrl); |
112 |
| - |
113 |
| - /* Determine the number of shards from the stream */ |
114 |
| - int numShards = kinesisClient.describeStream(streamName) |
115 |
| - .getStreamDescription().getShards().size(); |
116 |
| - |
117 |
| - /* In this example, we're going to create 1 Kinesis Worker/Receiver/DStream for each shard */ |
118 |
| - int numStreams = numShards; |
119 |
| - |
120 |
| - /* Setup the Spark config. */ |
121 |
| - SparkConf sparkConfig = new SparkConf().setAppName("KinesisWordCount"); |
122 |
| - |
123 |
| - /* Kinesis checkpoint interval. Same as batchInterval for this example. */ |
124 |
| - Duration checkpointInterval = batchInterval; |
| 100 | + // Set default log4j logging level to WARN to hide Spark logs |
| 101 | + StreamingExamples.setStreamingLogLevels(); |
| 102 | + |
| 103 | + // Populate the appropriate variables from the given args |
| 104 | + String kinesisAppName = args[0]; |
| 105 | + String streamName = args[1]; |
| 106 | + String endpointUrl = args[2]; |
| 107 | + |
| 108 | + // Create a Kinesis client in order to determine the number of shards for the given stream |
| 109 | + AmazonKinesisClient kinesisClient = |
| 110 | + new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain()); |
| 111 | + kinesisClient.setEndpoint(endpointUrl); |
| 112 | + int numShards = |
| 113 | + kinesisClient.describeStream(streamName).getStreamDescription().getShards().size(); |
| 114 | + |
| 115 | + |
| 116 | + // In this example, we're going to create 1 Kinesis Receiver/input DStream for each shard. |
| 117 | + // This is not a necessity; if there are less receivers/DStreams than the number of shards, |
| 118 | + // then the shards will be automatically distributed among the receivers and each receiver |
| 119 | + // will receive data from multiple shards. |
| 120 | + int numStreams = numShards; |
| 121 | + |
| 122 | + // Spark Streaming batch interval |
| 123 | + Duration batchInterval = new Duration(2000); |
| 124 | + |
| 125 | + // Kinesis checkpoint interval. Same as batchInterval for this example. |
| 126 | + Duration kinesisCheckpointInterval = batchInterval; |
| 127 | + |
| 128 | + // Get the region name from the endpoint URL to save Kinesis Client Library metadata in |
| 129 | + // DynamoDB of the same region as the Kinesis stream |
| 130 | + String regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName(); |
| 131 | + |
| 132 | + // Setup the Spark config and StreamingContext |
| 133 | + SparkConf sparkConfig = new SparkConf().setAppName("JavaKinesisWordCountASL"); |
| 134 | + JavaStreamingContext jssc = new JavaStreamingContext(sparkConfig, batchInterval); |
| 135 | + |
| 136 | + // Create the Kinesis DStreams |
| 137 | + List<JavaDStream<byte[]>> streamsList = new ArrayList<JavaDStream<byte[]>>(numStreams); |
| 138 | + for (int i = 0; i < numStreams; i++) { |
| 139 | + streamsList.add( |
| 140 | + KinesisUtils.createStream(jssc, kinesisAppName, streamName, endpointUrl, regionName, |
| 141 | + InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2()) |
| 142 | + ); |
| 143 | + } |
125 | 144 |
|
126 |
| - /* Setup the StreamingContext */ |
127 |
| - JavaStreamingContext jssc = new JavaStreamingContext(sparkConfig, batchInterval); |
| 145 | + // Union all the streams if there is more than 1 stream |
| 146 | + JavaDStream<byte[]> unionStreams; |
| 147 | + if (streamsList.size() > 1) { |
| 148 | + unionStreams = jssc.union(streamsList.get(0), streamsList.subList(1, streamsList.size())); |
| 149 | + } else { |
| 150 | + // Otherwise, just use the 1 stream |
| 151 | + unionStreams = streamsList.get(0); |
| 152 | + } |
128 | 153 |
|
129 |
| - /* Create the same number of Kinesis DStreams/Receivers as Kinesis stream's shards */ |
130 |
| - List<JavaDStream<byte[]>> streamsList = new ArrayList<JavaDStream<byte[]>>(numStreams); |
131 |
| - for (int i = 0; i < numStreams; i++) { |
132 |
| - streamsList.add( |
133 |
| - KinesisUtils.createStream(jssc, streamName, endpointUrl, checkpointInterval, |
134 |
| - InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2()) |
135 |
| - ); |
| 154 | + // Convert each line of Array[Byte] to String, and split into words |
| 155 | + JavaDStream<String> words = unionStreams.flatMap(new FlatMapFunction<byte[], String>() { |
| 156 | + @Override |
| 157 | + public Iterable<String> call(byte[] line) { |
| 158 | + return Lists.newArrayList(WORD_SEPARATOR.split(new String(line))); |
| 159 | + } |
| 160 | + }); |
| 161 | + |
| 162 | + // Map each word to a (word, 1) tuple so we can reduce by key to count the words |
| 163 | + JavaPairDStream<String, Integer> wordCounts = words.mapToPair( |
| 164 | + new PairFunction<String, String, Integer>() { |
| 165 | + @Override |
| 166 | + public Tuple2<String, Integer> call(String s) { |
| 167 | + return new Tuple2<String, Integer>(s, 1); |
| 168 | + } |
136 | 169 | }
|
137 |
| - |
138 |
| - /* Union all the streams if there is more than 1 stream */ |
139 |
| - JavaDStream<byte[]> unionStreams; |
140 |
| - if (streamsList.size() > 1) { |
141 |
| - unionStreams = jssc.union(streamsList.get(0), streamsList.subList(1, streamsList.size())); |
142 |
| - } else { |
143 |
| - /* Otherwise, just use the 1 stream */ |
144 |
| - unionStreams = streamsList.get(0); |
| 170 | + ).reduceByKey( |
| 171 | + new Function2<Integer, Integer, Integer>() { |
| 172 | + @Override |
| 173 | + public Integer call(Integer i1, Integer i2) { |
| 174 | + return i1 + i2; |
| 175 | + } |
145 | 176 | }
|
| 177 | + ); |
146 | 178 |
|
147 |
| - /* |
148 |
| - * Split each line of the union'd DStreams into multiple words using flatMap to produce the collection. |
149 |
| - * Convert lines of byte[] to multiple Strings by first converting to String, then splitting on WORD_SEPARATOR. |
150 |
| - */ |
151 |
| - JavaDStream<String> words = unionStreams.flatMap(new FlatMapFunction<byte[], String>() { |
152 |
| - @Override |
153 |
| - public Iterable<String> call(byte[] line) { |
154 |
| - return Lists.newArrayList(WORD_SEPARATOR.split(new String(line))); |
155 |
| - } |
156 |
| - }); |
157 |
| - |
158 |
| - /* Map each word to a (word, 1) tuple, then reduce/aggregate by word. */ |
159 |
| - JavaPairDStream<String, Integer> wordCounts = words.mapToPair( |
160 |
| - new PairFunction<String, String, Integer>() { |
161 |
| - @Override |
162 |
| - public Tuple2<String, Integer> call(String s) { |
163 |
| - return new Tuple2<String, Integer>(s, 1); |
164 |
| - } |
165 |
| - }).reduceByKey(new Function2<Integer, Integer, Integer>() { |
166 |
| - @Override |
167 |
| - public Integer call(Integer i1, Integer i2) { |
168 |
| - return i1 + i2; |
169 |
| - } |
170 |
| - }); |
171 |
| - |
172 |
| - /* Print the first 10 wordCounts */ |
173 |
| - wordCounts.print(); |
174 |
| - |
175 |
| - /* Start the streaming context and await termination */ |
176 |
| - jssc.start(); |
177 |
| - jssc.awaitTermination(); |
178 |
| - } |
| 179 | + // Print the first 10 wordCounts |
| 180 | + wordCounts.print(); |
| 181 | + |
| 182 | + // Start the streaming context and await termination |
| 183 | + jssc.start(); |
| 184 | + jssc.awaitTermination(); |
| 185 | + } |
179 | 186 | }
|
0 commit comments