64
64
from itertools import chain , izip , product
65
65
import marshal
66
66
import struct
67
+ import sys
67
68
from pyspark import cloudpickle
68
69
69
70
@@ -113,6 +114,11 @@ class FramedSerializer(Serializer):
113
114
where C{length} is a 32-bit integer and data is C{length} bytes.
114
115
"""
115
116
117
+ def __init__ (self ):
118
+ # On Python 2.6, we can't write bytearrays to streams, so we need to convert them
119
+ # to strings first. Check if the version number is that old.
120
+ self ._only_write_strings = sys .version_info [0 :2 ] <= (2 , 6 )
121
+
116
122
def dump_stream (self , iterator , stream ):
117
123
for obj in iterator :
118
124
self ._write_with_length (obj , stream )
@@ -127,7 +133,10 @@ def load_stream(self, stream):
127
133
def _write_with_length (self , obj , stream ):
128
134
serialized = self .dumps (obj )
129
135
write_int (len (serialized ), stream )
130
- stream .write (serialized )
136
+ if self ._only_write_strings :
137
+ stream .write (str (serialized ))
138
+ else :
139
+ stream .write (serialized )
131
140
132
141
def _read_with_length (self , stream ):
133
142
length = read_int (stream )
@@ -290,7 +299,7 @@ class MarshalSerializer(FramedSerializer):
290
299
291
300
class UTF8Deserializer (Serializer ):
292
301
"""
293
- Deserializes streams written by getBytes.
302
+ Deserializes streams written by String. getBytes.
294
303
"""
295
304
296
305
def loads (self , stream ):
0 commit comments