Skip to content

Commit bc20a52

Browse files
yhuairxin
authored andcommitted
[SPARK-5287][SQL] Add defaultSizeOf to every data type.
JIRA: https://issues.apache.org/jira/browse/SPARK-5287 This PR only add `defaultSizeOf` to data types and make those internal type classes `protected[sql]`. I will use another PR to cleanup the type hierarchy of data types. Author: Yin Huai <[email protected]> Closes #4081 from yhuai/SPARK-5287 and squashes the following commits: 90cec75 [Yin Huai] Update unit test. e1c600c [Yin Huai] Make internal classes protected[sql]. 7eaba68 [Yin Huai] Add `defaultSize` method to data types. fd425e0 [Yin Huai] Add all native types to NativeType.defaultSizeOf.
1 parent 23e2554 commit bc20a52

File tree

5 files changed

+201
-48
lines changed

5 files changed

+201
-48
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/WrapDynamic.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,13 @@ import org.apache.spark.sql.types.DataType
2424
/**
2525
* The data type representing [[DynamicRow]] values.
2626
*/
27-
case object DynamicType extends DataType
27+
case object DynamicType extends DataType {
28+
29+
/**
30+
* The default size of a value of the DynamicType is 4096 bytes.
31+
*/
32+
override def defaultSize: Int = 4096
33+
}
2834

2935
/**
3036
* Wrap a [[Row]] as a [[DynamicRow]].

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -238,16 +238,11 @@ case class Rollup(
238238
case class Limit(limitExpr: Expression, child: LogicalPlan) extends UnaryNode {
239239
override def output = child.output
240240

241-
override lazy val statistics: Statistics =
242-
if (output.forall(_.dataType.isInstanceOf[NativeType])) {
243-
val limit = limitExpr.eval(null).asInstanceOf[Int]
244-
val sizeInBytes = (limit: Long) * output.map { a =>
245-
NativeType.defaultSizeOf(a.dataType.asInstanceOf[NativeType])
246-
}.sum
247-
Statistics(sizeInBytes = sizeInBytes)
248-
} else {
249-
Statistics(sizeInBytes = children.map(_.statistics).map(_.sizeInBytes).product)
250-
}
241+
override lazy val statistics: Statistics = {
242+
val limit = limitExpr.eval(null).asInstanceOf[Int]
243+
val sizeInBytes = (limit: Long) * output.map(a => a.dataType.defaultSize).sum
244+
Statistics(sizeInBytes = sizeInBytes)
245+
}
251246
}
252247

253248
case class Subquery(alias: String, child: LogicalPlan) extends UnaryNode {

sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala

Lines changed: 99 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,9 @@ abstract class DataType {
215215
case _ => false
216216
}
217217

218+
/** The default size of a value of this data type. */
219+
def defaultSize: Int
220+
218221
def isPrimitive: Boolean = false
219222

220223
def typeName: String = this.getClass.getSimpleName.stripSuffix("$").dropRight(4).toLowerCase
@@ -235,33 +238,25 @@ abstract class DataType {
235238
* @group dataType
236239
*/
237240
@DeveloperApi
238-
case object NullType extends DataType
241+
case object NullType extends DataType {
242+
override def defaultSize: Int = 1
243+
}
239244

240245

241-
object NativeType {
246+
protected[sql] object NativeType {
242247
val all = Seq(
243248
IntegerType, BooleanType, LongType, DoubleType, FloatType, ShortType, ByteType, StringType)
244249

245250
def unapply(dt: DataType): Boolean = all.contains(dt)
246-
247-
val defaultSizeOf: Map[NativeType, Int] = Map(
248-
IntegerType -> 4,
249-
BooleanType -> 1,
250-
LongType -> 8,
251-
DoubleType -> 8,
252-
FloatType -> 4,
253-
ShortType -> 2,
254-
ByteType -> 1,
255-
StringType -> 4096)
256251
}
257252

258253

259-
trait PrimitiveType extends DataType {
254+
protected[sql] trait PrimitiveType extends DataType {
260255
override def isPrimitive = true
261256
}
262257

263258

264-
object PrimitiveType {
259+
protected[sql] object PrimitiveType {
265260
private val nonDecimals = Seq(NullType, DateType, TimestampType, BinaryType) ++ NativeType.all
266261
private val nonDecimalNameToType = nonDecimals.map(t => t.typeName -> t).toMap
267262

@@ -276,7 +271,7 @@ object PrimitiveType {
276271
}
277272
}
278273

279-
abstract class NativeType extends DataType {
274+
protected[sql] abstract class NativeType extends DataType {
280275
private[sql] type JvmType
281276
@transient private[sql] val tag: TypeTag[JvmType]
282277
private[sql] val ordering: Ordering[JvmType]
@@ -300,6 +295,11 @@ case object StringType extends NativeType with PrimitiveType {
300295
private[sql] type JvmType = String
301296
@transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
302297
private[sql] val ordering = implicitly[Ordering[JvmType]]
298+
299+
/**
300+
* The default size of a value of the StringType is 4096 bytes.
301+
*/
302+
override def defaultSize: Int = 4096
303303
}
304304

305305

@@ -324,6 +324,11 @@ case object BinaryType extends NativeType with PrimitiveType {
324324
x.length - y.length
325325
}
326326
}
327+
328+
/**
329+
* The default size of a value of the BinaryType is 4096 bytes.
330+
*/
331+
override def defaultSize: Int = 4096
327332
}
328333

329334

@@ -339,6 +344,11 @@ case object BooleanType extends NativeType with PrimitiveType {
339344
private[sql] type JvmType = Boolean
340345
@transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
341346
private[sql] val ordering = implicitly[Ordering[JvmType]]
347+
348+
/**
349+
* The default size of a value of the BooleanType is 1 byte.
350+
*/
351+
override def defaultSize: Int = 1
342352
}
343353

344354

@@ -359,6 +369,11 @@ case object TimestampType extends NativeType {
359369
private[sql] val ordering = new Ordering[JvmType] {
360370
def compare(x: Timestamp, y: Timestamp) = x.compareTo(y)
361371
}
372+
373+
/**
374+
* The default size of a value of the TimestampType is 8 bytes.
375+
*/
376+
override def defaultSize: Int = 8
362377
}
363378

364379

@@ -379,10 +394,15 @@ case object DateType extends NativeType {
379394
private[sql] val ordering = new Ordering[JvmType] {
380395
def compare(x: Date, y: Date) = x.compareTo(y)
381396
}
397+
398+
/**
399+
* The default size of a value of the DateType is 8 bytes.
400+
*/
401+
override def defaultSize: Int = 8
382402
}
383403

384404

385-
abstract class NumericType extends NativeType with PrimitiveType {
405+
protected[sql] abstract class NumericType extends NativeType with PrimitiveType {
386406
// Unfortunately we can't get this implicitly as that breaks Spark Serialization. In order for
387407
// implicitly[Numeric[JvmType]] to be valid, we have to change JvmType from a type variable to a
388408
// type parameter and and add a numeric annotation (i.e., [JvmType : Numeric]). This gets
@@ -392,21 +412,21 @@ abstract class NumericType extends NativeType with PrimitiveType {
392412
}
393413

394414

395-
object NumericType {
415+
protected[sql] object NumericType {
396416
def unapply(e: Expression): Boolean = e.dataType.isInstanceOf[NumericType]
397417
}
398418

399419

400420
/** Matcher for any expressions that evaluate to [[IntegralType]]s */
401-
object IntegralType {
421+
protected[sql] object IntegralType {
402422
def unapply(a: Expression): Boolean = a match {
403423
case e: Expression if e.dataType.isInstanceOf[IntegralType] => true
404424
case _ => false
405425
}
406426
}
407427

408428

409-
sealed abstract class IntegralType extends NumericType {
429+
protected[sql] sealed abstract class IntegralType extends NumericType {
410430
private[sql] val integral: Integral[JvmType]
411431
}
412432

@@ -425,6 +445,11 @@ case object LongType extends IntegralType {
425445
private[sql] val numeric = implicitly[Numeric[Long]]
426446
private[sql] val integral = implicitly[Integral[Long]]
427447
private[sql] val ordering = implicitly[Ordering[JvmType]]
448+
449+
/**
450+
* The default size of a value of the LongType is 8 bytes.
451+
*/
452+
override def defaultSize: Int = 8
428453
}
429454

430455

@@ -442,6 +467,11 @@ case object IntegerType extends IntegralType {
442467
private[sql] val numeric = implicitly[Numeric[Int]]
443468
private[sql] val integral = implicitly[Integral[Int]]
444469
private[sql] val ordering = implicitly[Ordering[JvmType]]
470+
471+
/**
472+
* The default size of a value of the IntegerType is 4 bytes.
473+
*/
474+
override def defaultSize: Int = 4
445475
}
446476

447477

@@ -459,6 +489,11 @@ case object ShortType extends IntegralType {
459489
private[sql] val numeric = implicitly[Numeric[Short]]
460490
private[sql] val integral = implicitly[Integral[Short]]
461491
private[sql] val ordering = implicitly[Ordering[JvmType]]
492+
493+
/**
494+
* The default size of a value of the ShortType is 2 bytes.
495+
*/
496+
override def defaultSize: Int = 2
462497
}
463498

464499

@@ -476,19 +511,24 @@ case object ByteType extends IntegralType {
476511
private[sql] val numeric = implicitly[Numeric[Byte]]
477512
private[sql] val integral = implicitly[Integral[Byte]]
478513
private[sql] val ordering = implicitly[Ordering[JvmType]]
514+
515+
/**
516+
* The default size of a value of the ByteType is 1 byte.
517+
*/
518+
override def defaultSize: Int = 1
479519
}
480520

481521

482522
/** Matcher for any expressions that evaluate to [[FractionalType]]s */
483-
object FractionalType {
523+
protected[sql] object FractionalType {
484524
def unapply(a: Expression): Boolean = a match {
485525
case e: Expression if e.dataType.isInstanceOf[FractionalType] => true
486526
case _ => false
487527
}
488528
}
489529

490530

491-
sealed abstract class FractionalType extends NumericType {
531+
protected[sql] sealed abstract class FractionalType extends NumericType {
492532
private[sql] val fractional: Fractional[JvmType]
493533
private[sql] val asIntegral: Integral[JvmType]
494534
}
@@ -530,6 +570,11 @@ case class DecimalType(precisionInfo: Option[PrecisionInfo]) extends FractionalT
530570
case Some(PrecisionInfo(precision, scale)) => s"DecimalType($precision,$scale)"
531571
case None => "DecimalType()"
532572
}
573+
574+
/**
575+
* The default size of a value of the DecimalType is 4096 bytes.
576+
*/
577+
override def defaultSize: Int = 4096
533578
}
534579

535580

@@ -580,6 +625,11 @@ case object DoubleType extends FractionalType {
580625
private[sql] val fractional = implicitly[Fractional[Double]]
581626
private[sql] val ordering = implicitly[Ordering[JvmType]]
582627
private[sql] val asIntegral = DoubleAsIfIntegral
628+
629+
/**
630+
* The default size of a value of the DoubleType is 8 bytes.
631+
*/
632+
override def defaultSize: Int = 8
583633
}
584634

585635

@@ -598,6 +648,11 @@ case object FloatType extends FractionalType {
598648
private[sql] val fractional = implicitly[Fractional[Float]]
599649
private[sql] val ordering = implicitly[Ordering[JvmType]]
600650
private[sql] val asIntegral = FloatAsIfIntegral
651+
652+
/**
653+
* The default size of a value of the FloatType is 4 bytes.
654+
*/
655+
override def defaultSize: Int = 4
601656
}
602657

603658

@@ -636,6 +691,12 @@ case class ArrayType(elementType: DataType, containsNull: Boolean) extends DataT
636691
("type" -> typeName) ~
637692
("elementType" -> elementType.jsonValue) ~
638693
("containsNull" -> containsNull)
694+
695+
/**
696+
* The default size of a value of the ArrayType is 100 * the default size of the element type.
697+
* (We assume that there are 100 elements).
698+
*/
699+
override def defaultSize: Int = 100 * elementType.defaultSize
639700
}
640701

641702

@@ -805,6 +866,11 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru
805866
override def length: Int = fields.length
806867

807868
override def iterator: Iterator[StructField] = fields.iterator
869+
870+
/**
871+
* The default size of a value of the StructType is the total default sizes of all field types.
872+
*/
873+
override def defaultSize: Int = fields.map(_.dataType.defaultSize).sum
808874
}
809875

810876

@@ -848,6 +914,13 @@ case class MapType(
848914
("keyType" -> keyType.jsonValue) ~
849915
("valueType" -> valueType.jsonValue) ~
850916
("valueContainsNull" -> valueContainsNull)
917+
918+
/**
919+
* The default size of a value of the MapType is
920+
* 100 * (the default size of the key type + the default size of the value type).
921+
* (We assume that there are 100 elements).
922+
*/
923+
override def defaultSize: Int = 100 * (keyType.defaultSize + valueType.defaultSize)
851924
}
852925

853926

@@ -896,4 +969,9 @@ abstract class UserDefinedType[UserType] extends DataType with Serializable {
896969
* Class object for the UserType
897970
*/
898971
def userClass: java.lang.Class[UserType]
972+
973+
/**
974+
* The default size of a value of the UserDefinedType is 4096 bytes.
975+
*/
976+
override def defaultSize: Int = 4096
899977
}

sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,27 +62,57 @@ class DataTypeSuite extends FunSuite {
6262
}
6363
}
6464

65+
checkDataTypeJsonRepr(NullType)
6566
checkDataTypeJsonRepr(BooleanType)
6667
checkDataTypeJsonRepr(ByteType)
6768
checkDataTypeJsonRepr(ShortType)
6869
checkDataTypeJsonRepr(IntegerType)
6970
checkDataTypeJsonRepr(LongType)
7071
checkDataTypeJsonRepr(FloatType)
7172
checkDataTypeJsonRepr(DoubleType)
73+
checkDataTypeJsonRepr(DecimalType(10, 5))
7274
checkDataTypeJsonRepr(DecimalType.Unlimited)
75+
checkDataTypeJsonRepr(DateType)
7376
checkDataTypeJsonRepr(TimestampType)
7477
checkDataTypeJsonRepr(StringType)
7578
checkDataTypeJsonRepr(BinaryType)
7679
checkDataTypeJsonRepr(ArrayType(DoubleType, true))
7780
checkDataTypeJsonRepr(ArrayType(StringType, false))
7881
checkDataTypeJsonRepr(MapType(IntegerType, StringType, true))
7982
checkDataTypeJsonRepr(MapType(IntegerType, ArrayType(DoubleType), false))
83+
8084
val metadata = new MetadataBuilder()
8185
.putString("name", "age")
8286
.build()
83-
checkDataTypeJsonRepr(
84-
StructType(Seq(
85-
StructField("a", IntegerType, nullable = true),
86-
StructField("b", ArrayType(DoubleType), nullable = false),
87-
StructField("c", DoubleType, nullable = false, metadata))))
87+
val structType = StructType(Seq(
88+
StructField("a", IntegerType, nullable = true),
89+
StructField("b", ArrayType(DoubleType), nullable = false),
90+
StructField("c", DoubleType, nullable = false, metadata)))
91+
checkDataTypeJsonRepr(structType)
92+
93+
def checkDefaultSize(dataType: DataType, expectedDefaultSize: Int): Unit = {
94+
test(s"Check the default size of ${dataType}") {
95+
assert(dataType.defaultSize === expectedDefaultSize)
96+
}
97+
}
98+
99+
checkDefaultSize(NullType, 1)
100+
checkDefaultSize(BooleanType, 1)
101+
checkDefaultSize(ByteType, 1)
102+
checkDefaultSize(ShortType, 2)
103+
checkDefaultSize(IntegerType, 4)
104+
checkDefaultSize(LongType, 8)
105+
checkDefaultSize(FloatType, 4)
106+
checkDefaultSize(DoubleType, 8)
107+
checkDefaultSize(DecimalType(10, 5), 4096)
108+
checkDefaultSize(DecimalType.Unlimited, 4096)
109+
checkDefaultSize(DateType, 8)
110+
checkDefaultSize(TimestampType, 8)
111+
checkDefaultSize(StringType, 4096)
112+
checkDefaultSize(BinaryType, 4096)
113+
checkDefaultSize(ArrayType(DoubleType, true), 800)
114+
checkDefaultSize(ArrayType(StringType, false), 409600)
115+
checkDefaultSize(MapType(IntegerType, StringType, true), 410000)
116+
checkDefaultSize(MapType(IntegerType, ArrayType(DoubleType), false), 80400)
117+
checkDefaultSize(structType, 812)
88118
}

0 commit comments

Comments
 (0)