@@ -35,25 +35,31 @@ def __init__(self, jdstream, ssc, jrdd_deserializer):
35
35
self .ctx = ssc ._sc
36
36
self ._jrdd_deserializer = jrdd_deserializer
37
37
38
+ def context (self ):
39
+ """
40
+ Return the StreamingContext associated with this DStream
41
+ """
42
+ return self ._ssc
43
+
38
44
def count (self ):
39
45
"""
40
46
Return a new DStream which contains the number of elements in this DStream.
41
47
"""
42
- return self ._mapPartitions (lambda i : [sum (1 for _ in i )])._sum ()
48
+ return self .mapPartitions (lambda i : [sum (1 for _ in i )])._sum ()
43
49
44
50
def _sum (self ):
45
51
"""
46
52
Add up the elements in this DStream.
47
53
"""
48
- return self ._mapPartitions (lambda x : [sum (x )]).reduce (operator .add )
54
+ return self .mapPartitions (lambda x : [sum (x )]).reduce (operator .add )
49
55
50
56
def print_ (self , label = None ):
51
57
"""
52
58
Since print is reserved name for python, we cannot define a "print" method function.
53
59
This function prints serialized data in RDD in DStream because Scala and Java cannot
54
- deserialized pickled python object. Please use DStream.pyprint() instead to print results.
60
+ deserialized pickled python object. Please use DStream.pyprint() to print results.
55
61
56
- Call DStream.print().
62
+ Call DStream.print() and this function will print byte array in the DStream
57
63
"""
58
64
# a hack to call print function in DStream
59
65
getattr (self ._jdstream , "print" )(label )
@@ -63,29 +69,32 @@ def filter(self, f):
63
69
Return a new DStream containing only the elements that satisfy predicate.
64
70
"""
65
71
def func (iterator ): return ifilter (f , iterator )
66
- return self ._mapPartitions (func )
72
+ return self .mapPartitions (func )
67
73
68
74
def flatMap (self , f , preservesPartitioning = False ):
69
75
"""
70
76
Pass each value in the key-value pair DStream through flatMap function
71
77
without changing the keys: this also retains the original RDD's partition.
72
78
"""
73
- def func (s , iterator ): return chain .from_iterable (imap (f , iterator ))
79
+ def func (s , iterator ):
80
+ return chain .from_iterable (imap (f , iterator ))
74
81
return self ._mapPartitionsWithIndex (func , preservesPartitioning )
75
82
76
- def map (self , f ):
83
+ def map (self , f , preservesPartitioning = False ):
77
84
"""
78
85
Return a new DStream by applying a function to each element of DStream.
79
86
"""
80
- def func (iterator ): return imap (f , iterator )
81
- return self ._mapPartitions (func )
87
+ def func (iterator ):
88
+ return imap (f , iterator )
89
+ return self .mapPartitions (func , preservesPartitioning )
82
90
83
- def _mapPartitions (self , f ):
91
+ def mapPartitions (self , f , preservesPartitioning = False ):
84
92
"""
85
93
Return a new DStream by applying a function to each partition of this DStream.
86
94
"""
87
- def func (s , iterator ): return f (iterator )
88
- return self ._mapPartitionsWithIndex (func )
95
+ def func (s , iterator ):
96
+ return f (iterator )
97
+ return self ._mapPartitionsWithIndex (func , preservesPartitioning )
89
98
90
99
def _mapPartitionsWithIndex (self , f , preservesPartitioning = False ):
91
100
"""
@@ -131,7 +140,7 @@ def combineLocally(iterator):
131
140
else :
132
141
combiners [k ] = mergeValue (combiners [k ], v )
133
142
return combiners .iteritems ()
134
- locally_combined = self ._mapPartitions (combineLocally )
143
+ locally_combined = self .mapPartitions (combineLocally )
135
144
shuffled = locally_combined .partitionBy (numPartitions )
136
145
137
146
def _mergeCombiners (iterator ):
@@ -143,7 +152,7 @@ def _mergeCombiners(iterator):
143
152
combiners [k ] = mergeCombiners (combiners [k ], v )
144
153
return combiners .iteritems ()
145
154
146
- return shuffled ._mapPartitions (_mergeCombiners )
155
+ return shuffled .mapPartitions (_mergeCombiners )
147
156
148
157
def partitionBy (self , numPartitions , partitionFunc = None ):
149
158
"""
@@ -233,6 +242,34 @@ def takeAndPrint(rdd, time):
233
242
234
243
self .foreachRDD (takeAndPrint )
235
244
245
+ def mapValues (self , f ):
246
+ """
247
+ Pass each value in the key-value pair RDD through a map function
248
+ without changing the keys; this also retains the original RDD's
249
+ partitioning.
250
+ """
251
+ map_values_fn = lambda (k , v ): (k , f (v ))
252
+ return self .map (map_values_fn , preservesPartitioning = True )
253
+
254
+ def flatMapValues (self , f ):
255
+ """
256
+ Pass each value in the key-value pair RDD through a flatMap function
257
+ without changing the keys; this also retains the original RDD's
258
+ partitioning.
259
+ """
260
+ flat_map_fn = lambda (k , v ): ((k , x ) for x in f (v ))
261
+ return self .flatMap (flat_map_fn , preservesPartitioning = True )
262
+
263
+ def glom (self ):
264
+ """
265
+ Return a new DStream in which RDD is generated by applying glom() to RDD of
266
+ this DStream. Applying glom() to an RDD coalesces all elements within each partition into
267
+ an list.
268
+ """
269
+ def func (iterator ):
270
+ yield list (iterator )
271
+ return self .mapPartitions (func )
272
+
236
273
#def transform(self, func): - TD
237
274
# from utils import RDDFunction
238
275
# wrapped_func = RDDFunction(self.ctx, self._jrdd_deserializer, func)
@@ -242,7 +279,7 @@ def takeAndPrint(rdd, time):
242
279
def _test_output (self , result ):
243
280
"""
244
281
This function is only for test case.
245
- Store data in a DStream to result to verify the result in tese case
282
+ Store data in a DStream to result to verify the result in test case
246
283
"""
247
284
def get_output (rdd , time ):
248
285
taken = rdd .collect ()
@@ -305,4 +342,4 @@ def _jdstream(self):
305
342
return self ._jdstream_val
306
343
307
344
def _is_pipelinable (self ):
308
- return not ( self .is_cached )
345
+ return not self .is_cached
0 commit comments