Skip to content

Commit c9d607a

Browse files
committed
convert datetype for runtime
java.util.{List,Set} => Seq java.util.Map => Map but it can not convert Seq into java.util.Set, so set() and tuple() and array() can not been handled gracefully (back with the original type). We can not access items in ArrayType by position, but this is not defined for set(). Do we still want to support set()/tuple()/array() ?
1 parent 709d40d commit c9d607a

File tree

3 files changed

+31
-28
lines changed

3 files changed

+31
-28
lines changed

python/pyspark/sql.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,12 @@ def __init__(self, sparkContext, sqlContext=None):
4949
5050
>>> from datetime import datetime
5151
>>> allTypes = sc.parallelize([{"int": 1, "string": "string", "double": 1.0, "long": 1L,
52-
... "boolean": True, "time": datetime(2010, 1, 1, 1, 1, 1)}])
52+
... "boolean": True, "time": datetime(2010, 1, 1, 1, 1, 1), "dict": {"a": 1},
53+
... "list": [1, 2, 3]}])
5354
>>> srdd = sqlCtx.inferSchema(allTypes).map(lambda x: (x.int, x.string, x.double, x.long,
54-
... x.boolean, x.time))
55+
... x.boolean, x.time, x.dict["a"], x.list))
5556
>>> srdd.collect()[0]
56-
(1, u'string', 1.0, 1, True, datetime.datetime(2010, 1, 1, 1, 1, 1))
57+
(1, u'string', 1.0, 1, True, datetime.datetime(2010, 1, 1, 1, 1, 1), 1, [1, 2, 3])
5758
"""
5859
self._sc = sparkContext
5960
self._jsc = self._sc._jsc
@@ -89,13 +90,13 @@ def inferSchema(self, rdd):
8990
9091
>>> from array import array
9192
>>> srdd = sqlCtx.inferSchema(nestedRdd1)
92-
>>> srdd.collect() == [{"f1" : array('i', [1, 2]), "f2" : {"row1" : 1.0}},
93-
... {"f1" : array('i', [2, 3]), "f2" : {"row2" : 2.0}}]
93+
>>> srdd.collect() == [{"f1" : [1, 2], "f2" : {"row1" : 1.0}},
94+
... {"f1" : [2, 3], "f2" : {"row2" : 2.0}}]
9495
True
9596
9697
>>> srdd = sqlCtx.inferSchema(nestedRdd2)
97-
>>> srdd.collect() == [{"f1" : [[1, 2], [2, 3]], "f2" : set([1, 2]), "f3" : (1, 2)},
98-
... {"f1" : [[2, 3], [3, 4]], "f2" : set([2, 3]), "f3" : (2, 3)}]
98+
>>> srdd.collect() == [{"f1" : [[1, 2], [2, 3]], "f2" : [1, 2], "f3" : [1, 2]},
99+
... {"f1" : [[2, 3], [3, 4]], "f2" : [2, 3], "f3" : [2, 3]}]
99100
True
100101
"""
101102
if (rdd.__class__ is SchemaRDD):
@@ -510,8 +511,8 @@ def _test():
510511
{"f1": array('i', [1, 2]), "f2": {"row1": 1.0}},
511512
{"f1": array('i', [2, 3]), "f2": {"row2": 2.0}}])
512513
globs['nestedRdd2'] = sc.parallelize([
513-
{"f1": [[1, 2], [2, 3]], "f2": set([1, 2]), "f3": (1, 2)},
514-
{"f1": [[2, 3], [3, 4]], "f2": set([2, 3]), "f3": (2, 3)}])
514+
{"f1": [[1, 2], [2, 3]], "f2": set([1, 2]), "f3": [1, 2]},
515+
{"f1": [[2, 3], [3, 4]], "f2": set([2, 3]), "f3": [2, 3]}])
515516
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
516517
globs['sc'].stop()
517518
if failure_count:

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -369,25 +369,24 @@ class SQLContext(@transient val sparkContext: SparkContext)
369369
}.toSeq
370370

371371
def needTransform(obj: Any): Boolean = obj match {
372-
case c: java.util.List[_] => c.exists(needTransform)
373-
case c: java.util.Set[_] => c.exists(needTransform)
374-
case c: java.util.Map[_, _] => c.exists {
375-
case (key, value) => needTransform(key) || needTransform(value)
376-
}
377-
case c if c.getClass.isArray =>
378-
c.asInstanceOf[Array[_]].exists(needTransform)
372+
case c: java.util.List[_] => true
373+
case c: java.util.Set[_] => true
374+
case c: java.util.Map[_, _] => true
375+
case c if c.getClass.isArray => true
379376
case c: java.util.Calendar => true
380377
case c => false
381378
}
382379

380+
// convert JList, JSet into Seq, convert JMap into Map
381+
// convert Calendar into Timestamp
383382
def transform(obj: Any): Any = obj match {
384-
case c: java.util.List[_] => c.map(transform)
385-
case c: java.util.Set[_] => c.map(transform)
383+
case c: java.util.List[_] => c.map(transform).toSeq
384+
case c: java.util.Set[_] => c.map(transform).toSet.toSeq
386385
case c: java.util.Map[_, _] => c.map {
387-
case (key, value) => (transform(key), transform(value))
388-
}
386+
case (key, value) => (key, transform(value))
387+
}.toMap
389388
case c if c.getClass.isArray =>
390-
c.asInstanceOf[Array[_]].map(transform)
389+
c.asInstanceOf[Array[_]].map(transform).toSeq
391390
case c: java.util.Calendar =>
392391
new java.sql.Timestamp(c.getTime().getTime())
393392
case c => c

sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.analysis._
3232
import org.apache.spark.sql.catalyst.expressions._
3333
import org.apache.spark.sql.catalyst.plans.logical._
3434
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
35-
import org.apache.spark.sql.catalyst.types.{ArrayType, BooleanType, StructType}
35+
import org.apache.spark.sql.catalyst.types.{ArrayType, BooleanType, StructType, MapType}
3636
import org.apache.spark.sql.execution.{ExistingRdd, SparkLogicalPlan}
3737
import org.apache.spark.api.java.JavaRDD
3838

@@ -388,27 +388,30 @@ class SchemaRDD(
388388
case seq: Seq[Any] =>
389389
seq.map(element => rowToMap(element.asInstanceOf[Row], struct)).asJava
390390
case list: JList[_] =>
391-
list.map(element => rowToMap(element.asInstanceOf[Row], struct))
391+
list.map(element => rowToMap(element.asInstanceOf[Row], struct)).asJava
392392
case set: JSet[_] =>
393-
set.map(element => rowToMap(element.asInstanceOf[Row], struct))
393+
set.map(element => rowToMap(element.asInstanceOf[Row], struct)).asJava
394394
case arr if arr != null && arr.getClass.isArray =>
395395
arr.asInstanceOf[Array[Any]].map {
396396
element => rowToMap(element.asInstanceOf[Row], struct)
397397
}
398-
case t: java.sql.Timestamp =>
399-
val c = java.util.Calendar.getInstance()
400-
c.setTimeInMillis(t.getTime())
401-
c
402398
case other => other
403399
}
404400
map.put(attrName, arrayValues)
401+
case m @ MapType(_, struct: StructType) =>
402+
val nm = obj.asInstanceOf[Map[_,_]].map {
403+
case (k, v) => (k, rowToMap(v.asInstanceOf[Row], struct))
404+
}.asJava
405+
map.put(attrName, nm)
405406
case array: ArrayType => {
406407
val arrayValues = obj match {
407408
case seq: Seq[Any] => seq.asJava
408409
case other => other
409410
}
410411
map.put(attrName, arrayValues)
411412
}
413+
case m: MapType => map.put(attrName, obj.asInstanceOf[Map[_,_]].asJava)
414+
// Pyrolite can handle Timestamp
412415
case other => map.put(attrName, obj)
413416
}
414417
}

0 commit comments

Comments
 (0)