Skip to content

Commit c01e3ef

Browse files
committed
[SPARK-2024] code formatting
1 parent 6591e37 commit c01e3ef

File tree

3 files changed

+19
-19
lines changed

3 files changed

+19
-19
lines changed

core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -93,26 +93,25 @@ private[python] object SerDeUtil extends Logging {
9393
}
9494
pyRDD.mapPartitions { iter =>
9595
val unpickle = new Unpickler
96-
val unpickled = if (batchSerialized) {
97-
iter.flatMap { batch =>
98-
unpickle.loads(batch) match {
99-
case objs: java.util.List[_] => collectionAsScalaIterable(objs)
100-
case other => throw new SparkException(
101-
s"Unexpected type ${other.getClass.getName} for batch serialized Python RDD")
96+
val unpickled =
97+
if (batchSerialized) {
98+
iter.flatMap { batch =>
99+
unpickle.loads(batch) match {
100+
case objs: java.util.List[_] => collectionAsScalaIterable(objs)
101+
case other => throw new SparkException(
102+
s"Unexpected type ${other.getClass.getName} for batch serialized Python RDD")
103+
}
102104
}
105+
} else {
106+
iter.map(unpickle.loads(_))
103107
}
104-
} else {
105-
iter.map(unpickle.loads(_))
106-
}
107108
unpickled.map {
108-
// we only accept pickled (K, V)
109109
case obj if isPair(obj) =>
110+
// we only accept (K, V)
110111
val arr = obj.asInstanceOf[Array[_]]
111-
// arr has only 2 elements K and V
112112
(arr.head.asInstanceOf[K], arr.last.asInstanceOf[V])
113-
case other =>
114-
throw new SparkException(
115-
s"RDD element of type ${other.getClass.getName} cannot be used")
113+
case other => throw new SparkException(
114+
s"RDD element of type ${other.getClass.getName} cannot be used")
116115
}
117116
}
118117
}

python/pyspark/rdd.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1079,7 +1079,7 @@ def saveAsNewAPIHadoopFile(self, path, outputFormatClass, keyClass=None, valueCl
10791079
pickledRDD = self._toPickleSerialization()
10801080
batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
10811081
self.ctx._jvm.PythonRDD.saveAsNewAPIHadoopFile(pickledRDD._jrdd, batched, path,
1082-
outputFormatClass, keyClass, valueClass, keyConverter, valueConverter, jconf)
1082+
outputFormatClass, keyClass, valueClass, keyConverter, valueConverter, jconf)
10831083

10841084
def saveAsHadoopDataset(self, conf, keyConverter=None, valueConverter=None):
10851085
"""
@@ -1099,7 +1099,8 @@ def saveAsHadoopDataset(self, conf, keyConverter=None, valueConverter=None):
10991099
keyConverter, valueConverter, False)
11001100

11011101
def saveAsHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None,
1102-
keyConverter=None, valueConverter=None, conf=None, compressionCodecClass=None):
1102+
keyConverter=None, valueConverter=None, conf=None,
1103+
compressionCodecClass=None):
11031104
"""
11041105
Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
11051106
system, using the old Hadoop OutputFormat API (mapred package). Key and value types
@@ -1123,8 +1124,8 @@ def saveAsHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=No
11231124
jconf = self.ctx._dictToJavaMap(conf)
11241125
pickledRDD = self._toPickleSerialization()
11251126
batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
1126-
self.ctx._jvm.PythonRDD.saveAsHadoopFile(pickledRDD._jrdd, batched,
1127-
path, outputFormatClass, keyClass, valueClass, keyConverter, valueConverter,
1127+
self.ctx._jvm.PythonRDD.saveAsHadoopFile(pickledRDD._jrdd, batched, path,
1128+
outputFormatClass, keyClass, valueClass, keyConverter, valueConverter,
11281129
jconf, compressionCodecClass)
11291130

11301131
def saveAsSequenceFile(self, path, compressionCodecClass=None):

python/pyspark/tests.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -704,7 +704,7 @@ def test_unbatched_save_and_read(self):
704704

705705
def test_malformed_RDD(self):
706706
basepath = self.tempdir.name
707-
# non-batch-serialized RDD of type RDD[[(K, V)]] should be rejected
707+
# non-batch-serialized RDD[[(K, V)]] should be rejected
708708
data = [[(1, "a")], [(2, "aa")], [(3, "aaa")]]
709709
rdd = self.sc.parallelize(data, numSlices=len(data))
710710
self.assertRaises(Exception, lambda: rdd.saveAsSequenceFile(

0 commit comments

Comments
 (0)