Skip to content

Commit e4abf69

Browse files
committed
Scala doc improvements and stuff.
1 parent bb65232 commit e4abf69

File tree

5 files changed

+111
-71
lines changed

5 files changed

+111
-71
lines changed

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,12 @@ import kafka.utils.VerifiableProperties
3636
* Starting and ending offsets are specified in advance,
3737
* so that you can control exactly-once semantics.
3838
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
39-
* configuration parameters</a>.
40-
* Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
41-
* NOT zookeeper servers, specified in host1:port1,host2:port2 form.
42-
* @param batch Each KafkaRDDPartition in the batch corresponds to a
43-
* range of offsets for a given Kafka topic/partition
39+
* configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" to be set
40+
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
41+
* @param offsetRanges offset ranges that define the Kafka data belonging to this RDD
4442
* @param messageHandler function for translating each message into the desired type
4543
*/
46-
private[spark]
44+
private[kafka]
4745
class KafkaRDD[
4846
K: ClassTag,
4947
V: ClassTag,
@@ -183,7 +181,7 @@ class KafkaRDD[
183181
}
184182
}
185183

186-
private[spark]
184+
private[kafka]
187185
object KafkaRDD {
188186
import KafkaCluster.LeaderOffset
189187

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.spark.Partition
2626
* @param host preferred kafka host, i.e. the leader at the time the rdd was created
2727
* @param port preferred kafka host's port
2828
*/
29-
private[spark]
29+
private[kafka]
3030
class KafkaRDDPartition(
3131
val index: Int,
3232
val topic: String,
@@ -36,24 +36,3 @@ class KafkaRDDPartition(
3636
val host: String,
3737
val port: Int
3838
) extends Partition
39-
40-
private[spark]
41-
object KafkaRDDPartition {
42-
def apply(
43-
index: Int,
44-
topic: String,
45-
partition: Int,
46-
fromOffset: Long,
47-
untilOffset: Long,
48-
host: String,
49-
port: Int
50-
): KafkaRDDPartition = new KafkaRDDPartition(
51-
index,
52-
topic,
53-
partition,
54-
fromOffset,
55-
untilOffset,
56-
host,
57-
port
58-
)
59-
}

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala

Lines changed: 59 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -155,14 +155,13 @@ object KafkaUtils {
155155
}
156156

157157
/**
158-
* Create a RDD from the
159-
* Starting and ending offsets are specified in advance,
160-
* so that you can control exactly-once semantics.
158+
* Create a RDD from Kafka using offset ranges for each topic and partition.
159+
*
161160
* @param sc SparkContext object
162161
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
163-
* configuration parameters</a>.
164-
* Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
165-
* NOT zookeeper servers, specified in host1:port1,host2:port2 form.
162+
* configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
163+
* to be set with Kafka broker(s) (NOT zookeeper servers) specified in
164+
* host1:port1,host2:port2 form.
166165
* @param offsetRanges Each OffsetRange in the batch corresponds to a
167166
* range of offsets for a given Kafka topic/partition
168167
*/
@@ -186,18 +185,21 @@ object KafkaUtils {
186185
new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler)
187186
}
188187

189-
/** A batch-oriented interface for consuming from Kafka.
190-
* Starting and ending offsets are specified in advance,
191-
* so that you can control exactly-once semantics.
188+
/**
189+
* :: Experimental ::
190+
* Create a RDD from Kafka using offset ranges for each topic and partition. This allows you
191+
* specify the Kafka leader to connect to (to optimize fetching) and access the message as well
192+
* as the metadata.
193+
*
192194
* @param sc SparkContext object
193195
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
194-
* configuration parameters</a>.
195-
* Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
196-
* NOT zookeeper servers, specified in host1:port1,host2:port2 form.
196+
* configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
197+
* to be set with Kafka broker(s) (NOT zookeeper servers) specified in
198+
* host1:port1,host2:port2 form.
197199
* @param offsetRanges Each OffsetRange in the batch corresponds to a
198200
* range of offsets for a given Kafka topic/partition
199201
* @param leaders Kafka leaders for each offset range in batch
200-
* @param messageHandler function for translating each message into the desired type
202+
* @param messageHandler function for translating each message and metadata into the desired type
201203
*/
202204
@Experimental
203205
def createRDD[
@@ -219,47 +221,73 @@ object KafkaUtils {
219221
}
220222

221223

224+
/**
225+
* Create a RDD from Kafka using offset ranges for each topic and partition.
226+
*
227+
* @param jsc JavaSparkContext object
228+
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
229+
* configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
230+
* to be set with Kafka broker(s) (NOT zookeeper servers) specified in
231+
* host1:port1,host2:port2 form.
232+
* @param offsetRanges Each OffsetRange in the batch corresponds to a
233+
* range of offsets for a given Kafka topic/partition
234+
*/
222235
@Experimental
223-
def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
236+
def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V]](
224237
jsc: JavaSparkContext,
225238
keyClass: Class[K],
226239
valueClass: Class[V],
227240
keyDecoderClass: Class[KD],
228241
valueDecoderClass: Class[VD],
229-
recordClass: Class[R],
230242
kafkaParams: JMap[String, String],
231-
offsetRanges: Array[OffsetRange],
232-
leaders: Array[Leader],
233-
messageHandler: JFunction[MessageAndMetadata[K, V], R]
234-
): JavaRDD[R] = {
243+
offsetRanges: Array[OffsetRange]
244+
): JavaPairRDD[K, V] = {
235245
implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
236246
implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
237247
implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
238248
implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
239-
implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
240-
createRDD[K, V, KD, VD, R](
241-
jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges, leaders, messageHandler.call _)
249+
new JavaPairRDD(createRDD[K, V, KD, VD](
250+
jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges))
242251
}
243252

253+
/**
254+
* :: Experimental ::
255+
* Create a RDD from Kafka using offset ranges for each topic and partition. This allows you
256+
* specify the Kafka leader to connect to (to optimize fetching) and access the message as well
257+
* as the metadata.
258+
*
259+
* @param jsc JavaSparkContext object
260+
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
261+
* configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
262+
* to be set with Kafka broker(s) (NOT zookeeper servers) specified in
263+
* host1:port1,host2:port2 form.
264+
* @param offsetRanges Each OffsetRange in the batch corresponds to a
265+
* range of offsets for a given Kafka topic/partition
266+
* @param leaders Kafka leaders for each offset range in batch
267+
* @param messageHandler function for translating each message and metadata into the desired type
268+
*/
244269
@Experimental
245-
def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V]](
270+
def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
246271
jsc: JavaSparkContext,
247272
keyClass: Class[K],
248273
valueClass: Class[V],
249274
keyDecoderClass: Class[KD],
250275
valueDecoderClass: Class[VD],
276+
recordClass: Class[R],
251277
kafkaParams: JMap[String, String],
252-
offsetRanges: Array[OffsetRange]
253-
): JavaPairRDD[K, V] = {
278+
offsetRanges: Array[OffsetRange],
279+
leaders: Array[Leader],
280+
messageHandler: JFunction[MessageAndMetadata[K, V], R]
281+
): JavaRDD[R] = {
254282
implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
255283
implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
256284
implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
257285
implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
258-
new JavaPairRDD(createRDD[K, V, KD, VD](
259-
jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges))
286+
implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
287+
createRDD[K, V, KD, VD, R](
288+
jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges, leaders, messageHandler.call _)
260289
}
261290

262-
263291
/**
264292
* :: Experimental ::
265293
* Create an input stream that pulls messages from a Kafka Broker. This stream can guarantee
@@ -270,7 +298,7 @@ object KafkaUtils {
270298
* - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
271299
* by the stream itself. For interoperability with Kafka monitoring tools that depend on
272300
* Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
273-
* - Failure Recovery: To recover from driver failures, you have to enable checkpointing
301+
* - Failure Recovery: To recover from driver failures, you have to enable checkpointing
274302
* in the [[StreamingContext]]. The information on consumed offset can be
275303
* recovered from the checkpoint. See the programming guide for details (constraints, etc.).
276304
* - End-to-end semantics: This stream ensures that every records is effectively received and
@@ -375,7 +403,7 @@ object KafkaUtils {
375403
* - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
376404
* by the stream itself. For interoperability with Kafka monitoring tools that depend on
377405
* Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
378-
* - Failure Recovery: To recover from driver failures, you have to enable checkpointing
406+
* - Failure Recovery: To recover from driver failures, you have to enable checkpointing
379407
* in the [[StreamingContext]]. The information on consumed offset can be
380408
* recovered from the checkpoint. See the programming guide for details (constraints, etc.).
381409
* - End-to-end semantics: This stream ensures that every records is effectively received and
@@ -433,7 +461,7 @@ object KafkaUtils {
433461
* - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
434462
* by the stream itself. For interoperability with Kafka monitoring tools that depend on
435463
* Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
436-
* - Failure Recovery: To recover from driver failures, you have to enable checkpointing
464+
* - Failure Recovery: To recover from driver failures, you have to enable checkpointing
437465
* in the [[StreamingContext]]. The information on consumed offset can be
438466
* recovered from the checkpoint. See the programming guide for details (constraints, etc.).
439467
* - End-to-end semantics: This stream ensures that every records is effectively received and

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,28 @@ package org.apache.spark.streaming.kafka
1919

2020
import kafka.common.TopicAndPartition
2121

22-
/** Host info for the leader of a Kafka TopicAndPartition */
22+
import org.apache.spark.annotation.Experimental
23+
24+
/**
25+
* :: Experimental ::
26+
* Represent the host info for the leader of a Kafka partition.
27+
*/
28+
@Experimental
2329
final class Leader private(
24-
/** kafka topic name */
30+
/** Kafka topic name */
2531
val topic: String,
26-
/** kafka partition id */
32+
/** Kafka partition id */
2733
val partition: Int,
28-
/** kafka hostname */
34+
/** Leader's hostname */
2935
val host: String,
30-
/** kafka host's port */
36+
/** Leader's port */
3137
val port: Int) extends Serializable
3238

39+
/**
40+
* :: Experimental ::
41+
* Companion object the provides methods to create instances of [[Leader]].
42+
*/
43+
@Experimental
3344
object Leader {
3445
def create(topic: String, partition: Int, host: String, port: Int): Leader =
3546
new Leader(topic, partition, host, port)

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,35 @@ package org.apache.spark.streaming.kafka
1919

2020
import kafka.common.TopicAndPartition
2121

22-
/** Something that has a collection of OffsetRanges */
22+
import org.apache.spark.annotation.Experimental
23+
24+
/**
25+
* :: Experimental ::
26+
* Represents any object that has a collection of [[OffsetRange]]s. This can be used access the
27+
* offset ranges in RDDs generated by the direct Kafka DStream (see
28+
* [[KafkaUtils.createDirectStream()]]).
29+
* {{{
30+
* KafkaUtils.createDirectStream(...).foreachRDD { rdd =>
31+
* val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
32+
* ...
33+
* }
34+
* }}}
35+
*/
36+
@Experimental
2337
trait HasOffsetRanges {
2438
def offsetRanges: Array[OffsetRange]
2539
}
2640

27-
/** Represents a range of offsets from a single Kafka TopicAndPartition */
41+
/**
42+
* :: Experimental ::
43+
* Represents a range of offsets from a single Kafka TopicAndPartition. Instances of this class
44+
* can be created with `OffsetRange.create()`.
45+
*/
46+
@Experimental
2847
final class OffsetRange private(
29-
/** kafka topic name */
48+
/** Kafka topic name */
3049
val topic: String,
31-
/** kafka partition id */
50+
/** Kafka partition id */
3251
val partition: Int,
3352
/** inclusive starting offset */
3453
val fromOffset: Long,
@@ -58,6 +77,11 @@ final class OffsetRange private(
5877
def toTuple: OffsetRangeTuple = (topic, partition, fromOffset, untilOffset)
5978
}
6079

80+
/**
81+
* :: Experimental ::
82+
* Companion object the provides methods to create instances of [[OffsetRange]].
83+
*/
84+
@Experimental
6185
object OffsetRange {
6286
def create(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange =
6387
new OffsetRange(topic, partition, fromOffset, untilOffset)
@@ -78,10 +102,10 @@ object OffsetRange {
78102
new OffsetRange(topicAndPartition.topic, topicAndPartition.partition, fromOffset, untilOffset)
79103

80104
/** this is to avoid ClassNotFoundException during checkpoint restore */
81-
private[spark]
105+
private[kafka]
82106
type OffsetRangeTuple = (String, Int, Long, Long)
83107

84-
private[streaming]
108+
private[kafka]
85109
def apply(t: OffsetRangeTuple) =
86110
new OffsetRange(t._1, t._2, t._3, t._4)
87111
}

0 commit comments

Comments
 (0)