Skip to content

Commit 7eaba68

Browse files
committed
Add defaultSize method to data types.
1 parent fd425e0 commit 7eaba68

File tree

4 files changed

+136
-32
lines changed

4 files changed

+136
-32
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/WrapDynamic.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,13 @@ import org.apache.spark.sql.types.DataType
2424
/**
2525
* The data type representing [[DynamicRow]] values.
2626
*/
27-
case object DynamicType extends DataType
27+
case object DynamicType extends DataType {
28+
29+
/**
30+
* The default size of a value of the DynamicType is 4096 bytes.
31+
*/
32+
override def defaultSize: Int = 4096
33+
}
2834

2935
/**
3036
* Wrap a [[Row]] as a [[DynamicRow]].

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -238,16 +238,11 @@ case class Rollup(
238238
case class Limit(limitExpr: Expression, child: LogicalPlan) extends UnaryNode {
239239
override def output = child.output
240240

241-
override lazy val statistics: Statistics =
242-
if (output.forall(_.dataType.isInstanceOf[NativeType])) {
243-
val limit = limitExpr.eval(null).asInstanceOf[Int]
244-
val sizeInBytes = (limit: Long) * output.map { a =>
245-
NativeType.defaultSizeOf(a.dataType.asInstanceOf[NativeType])
246-
}.sum
247-
Statistics(sizeInBytes = sizeInBytes)
248-
} else {
249-
Statistics(sizeInBytes = children.map(_.statistics).map(_.sizeInBytes).product)
250-
}
241+
override lazy val statistics: Statistics = {
242+
val limit = limitExpr.eval(null).asInstanceOf[Int]
243+
val sizeInBytes = (limit: Long) * output.map(a => a.dataType.defaultSize).sum
244+
Statistics(sizeInBytes = sizeInBytes)
245+
}
251246
}
252247

253248
case class Subquery(alias: String, child: LogicalPlan) extends UnaryNode {

sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala

Lines changed: 89 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,9 @@ abstract class DataType {
215215
case _ => false
216216
}
217217

218+
/** The default size of a value of this data type. */
219+
def defaultSize: Int
220+
218221
def isPrimitive: Boolean = false
219222

220223
def typeName: String = this.getClass.getSimpleName.stripSuffix("$").dropRight(4).toLowerCase
@@ -235,29 +238,16 @@ abstract class DataType {
235238
* @group dataType
236239
*/
237240
@DeveloperApi
238-
case object NullType extends DataType
241+
case object NullType extends DataType {
242+
override def defaultSize: Int = 1
243+
}
239244

240245

241246
object NativeType {
242247
val all = Seq(
243248
IntegerType, BooleanType, LongType, DoubleType, FloatType, ShortType, ByteType, StringType)
244249

245250
def unapply(dt: DataType): Boolean = all.contains(dt)
246-
247-
def defaultSizeOf(dataType: NativeType) = dataType match {
248-
case IntegerType => 4
249-
case BooleanType => 1
250-
case LongType => 8
251-
case DoubleType => 8
252-
case FloatType => 4
253-
case ShortType => 2
254-
case ByteType => 1
255-
case StringType => 4096
256-
case decimal: DecimalType => 4096
257-
case TimestampType => 8
258-
case DateType => 8
259-
case BinaryType => 8
260-
}
261251
}
262252

263253

@@ -305,6 +295,11 @@ case object StringType extends NativeType with PrimitiveType {
305295
private[sql] type JvmType = String
306296
@transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
307297
private[sql] val ordering = implicitly[Ordering[JvmType]]
298+
299+
/**
300+
* The default size of a value of the StringType is 4096 bytes.
301+
*/
302+
override def defaultSize: Int = 4096
308303
}
309304

310305

@@ -329,6 +324,11 @@ case object BinaryType extends NativeType with PrimitiveType {
329324
x.length - y.length
330325
}
331326
}
327+
328+
/**
329+
* The default size of a value of the BinaryType is 4096 bytes.
330+
*/
331+
override def defaultSize: Int = 4096
332332
}
333333

334334

@@ -344,6 +344,11 @@ case object BooleanType extends NativeType with PrimitiveType {
344344
private[sql] type JvmType = Boolean
345345
@transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
346346
private[sql] val ordering = implicitly[Ordering[JvmType]]
347+
348+
/**
349+
* The default size of a value of the BooleanType is 1 byte.
350+
*/
351+
override def defaultSize: Int = 1
347352
}
348353

349354

@@ -364,6 +369,11 @@ case object TimestampType extends NativeType {
364369
private[sql] val ordering = new Ordering[JvmType] {
365370
def compare(x: Timestamp, y: Timestamp) = x.compareTo(y)
366371
}
372+
373+
/**
374+
* The default size of a value of the TimestampType is 8 bytes.
375+
*/
376+
override def defaultSize: Int = 8
367377
}
368378

369379

@@ -384,6 +394,11 @@ case object DateType extends NativeType {
384394
private[sql] val ordering = new Ordering[JvmType] {
385395
def compare(x: Date, y: Date) = x.compareTo(y)
386396
}
397+
398+
/**
399+
* The default size of a value of the DateType is 8 bytes.
400+
*/
401+
override def defaultSize: Int = 8
387402
}
388403

389404

@@ -430,6 +445,11 @@ case object LongType extends IntegralType {
430445
private[sql] val numeric = implicitly[Numeric[Long]]
431446
private[sql] val integral = implicitly[Integral[Long]]
432447
private[sql] val ordering = implicitly[Ordering[JvmType]]
448+
449+
/**
450+
* The default size of a value of the LongType is 8 bytes.
451+
*/
452+
override def defaultSize: Int = 8
433453
}
434454

435455

@@ -447,6 +467,11 @@ case object IntegerType extends IntegralType {
447467
private[sql] val numeric = implicitly[Numeric[Int]]
448468
private[sql] val integral = implicitly[Integral[Int]]
449469
private[sql] val ordering = implicitly[Ordering[JvmType]]
470+
471+
/**
472+
* The default size of a value of the IntegerType is 4 bytes.
473+
*/
474+
override def defaultSize: Int = 4
450475
}
451476

452477

@@ -464,6 +489,11 @@ case object ShortType extends IntegralType {
464489
private[sql] val numeric = implicitly[Numeric[Short]]
465490
private[sql] val integral = implicitly[Integral[Short]]
466491
private[sql] val ordering = implicitly[Ordering[JvmType]]
492+
493+
/**
494+
* The default size of a value of the ShortType is 2 bytes.
495+
*/
496+
override def defaultSize: Int = 2
467497
}
468498

469499

@@ -481,6 +511,11 @@ case object ByteType extends IntegralType {
481511
private[sql] val numeric = implicitly[Numeric[Byte]]
482512
private[sql] val integral = implicitly[Integral[Byte]]
483513
private[sql] val ordering = implicitly[Ordering[JvmType]]
514+
515+
/**
516+
* The default size of a value of the ByteType is 1 byte.
517+
*/
518+
override def defaultSize: Int = 1
484519
}
485520

486521

@@ -535,6 +570,11 @@ case class DecimalType(precisionInfo: Option[PrecisionInfo]) extends FractionalT
535570
case Some(PrecisionInfo(precision, scale)) => s"DecimalType($precision,$scale)"
536571
case None => "DecimalType()"
537572
}
573+
574+
/**
575+
* The default size of a value of the DecimalType is 4096 bytes.
576+
*/
577+
override def defaultSize: Int = 4096
538578
}
539579

540580

@@ -585,6 +625,11 @@ case object DoubleType extends FractionalType {
585625
private[sql] val fractional = implicitly[Fractional[Double]]
586626
private[sql] val ordering = implicitly[Ordering[JvmType]]
587627
private[sql] val asIntegral = DoubleAsIfIntegral
628+
629+
/**
630+
* The default size of a value of the DoubleType is 8 bytes.
631+
*/
632+
override def defaultSize: Int = 8
588633
}
589634

590635

@@ -603,6 +648,11 @@ case object FloatType extends FractionalType {
603648
private[sql] val fractional = implicitly[Fractional[Float]]
604649
private[sql] val ordering = implicitly[Ordering[JvmType]]
605650
private[sql] val asIntegral = FloatAsIfIntegral
651+
652+
/**
653+
* The default size of a value of the FloatType is 4 bytes.
654+
*/
655+
override def defaultSize: Int = 4
606656
}
607657

608658

@@ -641,6 +691,12 @@ case class ArrayType(elementType: DataType, containsNull: Boolean) extends DataT
641691
("type" -> typeName) ~
642692
("elementType" -> elementType.jsonValue) ~
643693
("containsNull" -> containsNull)
694+
695+
/**
696+
* The default size of a value of the ArrayType is 100 * the default size of the element type.
697+
* (We assume that there are 100 elements).
698+
*/
699+
override def defaultSize: Int = 100 * elementType.defaultSize
644700
}
645701

646702

@@ -810,6 +866,11 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru
810866
override def length: Int = fields.length
811867

812868
override def iterator: Iterator[StructField] = fields.iterator
869+
870+
/**
871+
* The default size of a value of the StructType is the total default sizes of all field types.
872+
*/
873+
override def defaultSize: Int = fields.map(_.dataType.defaultSize).sum
813874
}
814875

815876

@@ -853,6 +914,13 @@ case class MapType(
853914
("keyType" -> keyType.jsonValue) ~
854915
("valueType" -> valueType.jsonValue) ~
855916
("valueContainsNull" -> valueContainsNull)
917+
918+
/**
919+
* The default size of a value of the MapType is
920+
* 100 * (the default size of the key type + the default size of the value type).
921+
* (We assume that there are 100 elements).
922+
*/
923+
override def defaultSize: Int = 100 * (keyType.defaultSize + valueType.defaultSize)
856924
}
857925

858926

@@ -901,4 +969,9 @@ abstract class UserDefinedType[UserType] extends DataType with Serializable {
901969
* Class object for the UserType
902970
*/
903971
def userClass: java.lang.Class[UserType]
972+
973+
/**
974+
* The default size of a value of the UserDefinedType is 4096 bytes.
975+
*/
976+
override def defaultSize: Int = 4096
904977
}

sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,27 +62,57 @@ class DataTypeSuite extends FunSuite {
6262
}
6363
}
6464

65+
checkDataTypeJsonRepr(NullType)
6566
checkDataTypeJsonRepr(BooleanType)
6667
checkDataTypeJsonRepr(ByteType)
6768
checkDataTypeJsonRepr(ShortType)
6869
checkDataTypeJsonRepr(IntegerType)
6970
checkDataTypeJsonRepr(LongType)
7071
checkDataTypeJsonRepr(FloatType)
7172
checkDataTypeJsonRepr(DoubleType)
73+
checkDataTypeJsonRepr(DecimalType(10, 5))
7274
checkDataTypeJsonRepr(DecimalType.Unlimited)
75+
checkDataTypeJsonRepr(DateType)
7376
checkDataTypeJsonRepr(TimestampType)
7477
checkDataTypeJsonRepr(StringType)
7578
checkDataTypeJsonRepr(BinaryType)
7679
checkDataTypeJsonRepr(ArrayType(DoubleType, true))
7780
checkDataTypeJsonRepr(ArrayType(StringType, false))
7881
checkDataTypeJsonRepr(MapType(IntegerType, StringType, true))
7982
checkDataTypeJsonRepr(MapType(IntegerType, ArrayType(DoubleType), false))
83+
8084
val metadata = new MetadataBuilder()
8185
.putString("name", "age")
8286
.build()
83-
checkDataTypeJsonRepr(
84-
StructType(Seq(
85-
StructField("a", IntegerType, nullable = true),
86-
StructField("b", ArrayType(DoubleType), nullable = false),
87-
StructField("c", DoubleType, nullable = false, metadata))))
87+
val structType = StructType(Seq(
88+
StructField("a", IntegerType, nullable = true),
89+
StructField("b", ArrayType(DoubleType), nullable = false),
90+
StructField("c", DoubleType, nullable = false, metadata)))
91+
checkDataTypeJsonRepr(structType)
92+
93+
def checkDefaultSize(dataType: DataType, expectedDefaultSize: Int): Unit = {
94+
test(s"Check the default size of ${dataType}") {
95+
assert(dataType.defaultSize === expectedDefaultSize)
96+
}
97+
}
98+
99+
checkDefaultSize(NullType, 1)
100+
checkDefaultSize(BooleanType, 1)
101+
checkDefaultSize(ByteType, 1)
102+
checkDefaultSize(ShortType, 2)
103+
checkDefaultSize(IntegerType, 4)
104+
checkDefaultSize(LongType, 8)
105+
checkDefaultSize(FloatType, 4)
106+
checkDefaultSize(DoubleType, 8)
107+
checkDefaultSize(DecimalType(10, 5), 4096)
108+
checkDefaultSize(DecimalType.Unlimited, 4096)
109+
checkDefaultSize(DateType, 8)
110+
checkDefaultSize(TimestampType, 8)
111+
checkDefaultSize(StringType, 4096)
112+
checkDefaultSize(BinaryType, 4096)
113+
checkDefaultSize(ArrayType(DoubleType, true), 800)
114+
checkDefaultSize(ArrayType(StringType, false), 409600)
115+
checkDefaultSize(MapType(IntegerType, StringType, true), 410000)
116+
checkDefaultSize(MapType(IntegerType, ArrayType(DoubleType), false), 80400)
117+
checkDefaultSize(structType, 812)
88118
}

0 commit comments

Comments
 (0)