Skip to content

Commit 1d4f355

Browse files
mengxrmarmbrus
authored andcommitted
[SPARK-3569][SQL] Add metadata field to StructField
Add `metadata: Metadata` to `StructField` to store extra information of columns. `Metadata` is a simple wrapper over `Map[String, Any]` with value types restricted to Boolean, Long, Double, String, Metadata, and arrays of those types. SerDe is via JSON. Metadata is preserved through simple operations like `SELECT`. marmbrus liancheng Author: Xiangrui Meng <[email protected]> Author: Michael Armbrust <[email protected]> Closes apache#2701 from mengxr/structfield-metadata and squashes the following commits: dedda56 [Xiangrui Meng] merge remote 5ef930a [Xiangrui Meng] Merge remote-tracking branch 'apache/master' into structfield-metadata c35203f [Xiangrui Meng] Merge pull request #1 from marmbrus/pr/2701 886b85c [Michael Armbrust] Expose Metadata and MetadataBuilder through the public scala and java packages. 589f314 [Xiangrui Meng] Merge remote-tracking branch 'apache/master' into structfield-metadata 1e2abcf [Xiangrui Meng] change default value of metadata to None in python 611d3c2 [Xiangrui Meng] move metadata from Expr to NamedExpr ddfcfad [Xiangrui Meng] Merge remote-tracking branch 'apache/master' into structfield-metadata a438440 [Xiangrui Meng] Merge remote-tracking branch 'apache/master' into structfield-metadata 4266f4d [Xiangrui Meng] add StructField.toString back for backward compatibility 3f49aab [Xiangrui Meng] remove StructField.toString 24a9f80 [Xiangrui Meng] Merge remote-tracking branch 'apache/master' into structfield-metadata 473a7c5 [Xiangrui Meng] merge master c9d7301 [Xiangrui Meng] organize imports 1fcbf13 [Xiangrui Meng] change metadata type in StructField for Scala/Java 60cc131 [Xiangrui Meng] add doc and header 60614c7 [Xiangrui Meng] add metadata e42c452 [Xiangrui Meng] merge master 93518fb [Xiangrui Meng] support metadata in python 905bb89 [Xiangrui Meng] java conversions 618e349 [Xiangrui Meng] make tests work in scala 61b8e0f [Xiangrui Meng] merge master 7e5a322 [Xiangrui Meng] do not output metadata in StructField.toString c41a664 [Xiangrui Meng] merge master d8af0ed [Xiangrui Meng] move tests to SQLQuerySuite 67fdebb [Xiangrui Meng] add test on join d65072e [Xiangrui Meng] remove Map.empty 367d237 [Xiangrui Meng] add test c194d5e [Xiangrui Meng] add metadata field to StructField and Attribute
1 parent 59e626c commit 1d4f355

File tree

20 files changed

+573
-56
lines changed

20 files changed

+573
-56
lines changed

python/pyspark/sql.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -313,12 +313,15 @@ class StructField(DataType):
313313
314314
"""
315315

316-
def __init__(self, name, dataType, nullable):
316+
def __init__(self, name, dataType, nullable, metadata=None):
317317
"""Creates a StructField
318318
:param name: the name of this field.
319319
:param dataType: the data type of this field.
320320
:param nullable: indicates whether values of this field
321321
can be null.
322+
:param metadata: metadata of this field, which is a map from string
323+
to simple type that can be serialized to JSON
324+
automatically
322325
323326
>>> (StructField("f1", StringType, True)
324327
... == StructField("f1", StringType, True))
@@ -330,6 +333,7 @@ def __init__(self, name, dataType, nullable):
330333
self.name = name
331334
self.dataType = dataType
332335
self.nullable = nullable
336+
self.metadata = metadata or {}
333337

334338
def __repr__(self):
335339
return "StructField(%s,%s,%s)" % (self.name, self.dataType,
@@ -338,13 +342,15 @@ def __repr__(self):
338342
def jsonValue(self):
339343
return {"name": self.name,
340344
"type": self.dataType.jsonValue(),
341-
"nullable": self.nullable}
345+
"nullable": self.nullable,
346+
"metadata": self.metadata}
342347

343348
@classmethod
344349
def fromJson(cls, json):
345350
return StructField(json["name"],
346351
_parse_datatype_json_value(json["type"]),
347-
json["nullable"])
352+
json["nullable"],
353+
json["metadata"])
348354

349355

350356
class StructType(DataType):
@@ -423,7 +429,8 @@ def _parse_datatype_json_string(json_string):
423429
... StructField("simpleArray", simple_arraytype, True),
424430
... StructField("simpleMap", simple_maptype, True),
425431
... StructField("simpleStruct", simple_structtype, True),
426-
... StructField("boolean", BooleanType(), False)])
432+
... StructField("boolean", BooleanType(), False),
433+
... StructField("withMeta", DoubleType(), False, {"name": "age"})])
427434
>>> check_datatype(complex_structtype)
428435
True
429436
>>> # Complex ArrayType.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ object ScalaReflection {
4646
/** Returns a Sequence of attributes for the given case class type. */
4747
def attributesFor[T: TypeTag]: Seq[Attribute] = schemaFor[T] match {
4848
case Schema(s: StructType, _) =>
49-
s.fields.map(f => AttributeReference(f.name, f.dataType, f.nullable)())
49+
s.fields.map(f => AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
5050
}
5151

5252
/** Returns a catalyst DataType and its nullability for the given Scala Type using reflection. */

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.errors.TreeNodeException
2121
import org.apache.spark.sql.catalyst.trees
2222
import org.apache.spark.sql.catalyst.trees.TreeNode
2323
import org.apache.spark.sql.catalyst.types.{DataType, FractionalType, IntegralType, NumericType, NativeType}
24+
import org.apache.spark.sql.catalyst.util.Metadata
2425

2526
abstract class Expression extends TreeNode[Expression] {
2627
self: Product =>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ abstract class Generator extends Expression {
4343
override type EvaluatedType = TraversableOnce[Row]
4444

4545
override lazy val dataType =
46-
ArrayType(StructType(output.map(a => StructField(a.name, a.dataType, a.nullable))))
46+
ArrayType(StructType(output.map(a => StructField(a.name, a.dataType, a.nullable, a.metadata))))
4747

4848
override def nullable = false
4949

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.trees
2121
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
2222
import org.apache.spark.sql.catalyst.errors.TreeNodeException
2323
import org.apache.spark.sql.catalyst.types._
24+
import org.apache.spark.sql.catalyst.util.Metadata
2425

2526
object NamedExpression {
2627
private val curId = new java.util.concurrent.atomic.AtomicLong()
@@ -43,6 +44,9 @@ abstract class NamedExpression extends Expression {
4344

4445
def toAttribute: Attribute
4546

47+
/** Returns the metadata when an expression is a reference to another expression with metadata. */
48+
def metadata: Metadata = Metadata.empty
49+
4650
protected def typeSuffix =
4751
if (resolved) {
4852
dataType match {
@@ -88,10 +92,16 @@ case class Alias(child: Expression, name: String)
8892

8993
override def dataType = child.dataType
9094
override def nullable = child.nullable
95+
override def metadata: Metadata = {
96+
child match {
97+
case named: NamedExpression => named.metadata
98+
case _ => Metadata.empty
99+
}
100+
}
91101

92102
override def toAttribute = {
93103
if (resolved) {
94-
AttributeReference(name, child.dataType, child.nullable)(exprId, qualifiers)
104+
AttributeReference(name, child.dataType, child.nullable, metadata)(exprId, qualifiers)
95105
} else {
96106
UnresolvedAttribute(name)
97107
}
@@ -108,15 +118,20 @@ case class Alias(child: Expression, name: String)
108118
* @param name The name of this attribute, should only be used during analysis or for debugging.
109119
* @param dataType The [[DataType]] of this attribute.
110120
* @param nullable True if null is a valid value for this attribute.
121+
* @param metadata The metadata of this attribute.
111122
* @param exprId A globally unique id used to check if different AttributeReferences refer to the
112123
* same attribute.
113124
* @param qualifiers a list of strings that can be used to referred to this attribute in a fully
114125
* qualified way. Consider the examples tableName.name, subQueryAlias.name.
115126
* tableName and subQueryAlias are possible qualifiers.
116127
*/
117-
case class AttributeReference(name: String, dataType: DataType, nullable: Boolean = true)
118-
(val exprId: ExprId = NamedExpression.newExprId, val qualifiers: Seq[String] = Nil)
119-
extends Attribute with trees.LeafNode[Expression] {
128+
case class AttributeReference(
129+
name: String,
130+
dataType: DataType,
131+
nullable: Boolean = true,
132+
override val metadata: Metadata = Metadata.empty)(
133+
val exprId: ExprId = NamedExpression.newExprId,
134+
val qualifiers: Seq[String] = Nil) extends Attribute with trees.LeafNode[Expression] {
120135

121136
override def equals(other: Any) = other match {
122137
case ar: AttributeReference => exprId == ar.exprId && dataType == ar.dataType
@@ -128,10 +143,12 @@ case class AttributeReference(name: String, dataType: DataType, nullable: Boolea
128143
var h = 17
129144
h = h * 37 + exprId.hashCode()
130145
h = h * 37 + dataType.hashCode()
146+
h = h * 37 + metadata.hashCode()
131147
h
132148
}
133149

134-
override def newInstance() = AttributeReference(name, dataType, nullable)(qualifiers = qualifiers)
150+
override def newInstance() =
151+
AttributeReference(name, dataType, nullable, metadata)(qualifiers = qualifiers)
135152

136153
/**
137154
* Returns a copy of this [[AttributeReference]] with changed nullability.
@@ -140,7 +157,7 @@ case class AttributeReference(name: String, dataType: DataType, nullable: Boolea
140157
if (nullable == newNullability) {
141158
this
142159
} else {
143-
AttributeReference(name, dataType, newNullability)(exprId, qualifiers)
160+
AttributeReference(name, dataType, newNullability, metadata)(exprId, qualifiers)
144161
}
145162
}
146163

@@ -159,7 +176,7 @@ case class AttributeReference(name: String, dataType: DataType, nullable: Boolea
159176
if (newQualifiers.toSet == qualifiers.toSet) {
160177
this
161178
} else {
162-
AttributeReference(name, dataType, nullable)(exprId, newQualifiers)
179+
AttributeReference(name, dataType, nullable, metadata)(exprId, newQualifiers)
163180
}
164181
}
165182

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,16 @@ import scala.reflect.ClassTag
2424
import scala.reflect.runtime.universe.{TypeTag, runtimeMirror, typeTag}
2525
import scala.util.parsing.combinator.RegexParsers
2626

27-
import org.json4s.JsonAST.JValue
2827
import org.json4s._
28+
import org.json4s.JsonAST.JValue
2929
import org.json4s.JsonDSL._
3030
import org.json4s.jackson.JsonMethods._
3131

3232
import org.apache.spark.sql.catalyst.ScalaReflectionLock
3333
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression}
34+
import org.apache.spark.sql.catalyst.util.Metadata
3435
import org.apache.spark.util.Utils
3536

36-
3737
object DataType {
3838
def fromJson(json: String): DataType = parseDataType(parse(json))
3939

@@ -70,10 +70,11 @@ object DataType {
7070

7171
private def parseStructField(json: JValue): StructField = json match {
7272
case JSortedObject(
73+
("metadata", metadata: JObject),
7374
("name", JString(name)),
7475
("nullable", JBool(nullable)),
7576
("type", dataType: JValue)) =>
76-
StructField(name, parseDataType(dataType), nullable)
77+
StructField(name, parseDataType(dataType), nullable, Metadata.fromJObject(metadata))
7778
}
7879

7980
@deprecated("Use DataType.fromJson instead", "1.2.0")
@@ -388,24 +389,34 @@ case class ArrayType(elementType: DataType, containsNull: Boolean) extends DataT
388389
* @param name The name of this field.
389390
* @param dataType The data type of this field.
390391
* @param nullable Indicates if values of this field can be `null` values.
392+
* @param metadata The metadata of this field. The metadata should be preserved during
393+
* transformation if the content of the column is not modified, e.g, in selection.
391394
*/
392-
case class StructField(name: String, dataType: DataType, nullable: Boolean) {
395+
case class StructField(
396+
name: String,
397+
dataType: DataType,
398+
nullable: Boolean,
399+
metadata: Metadata = Metadata.empty) {
393400

394401
private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
395402
builder.append(s"$prefix-- $name: ${dataType.typeName} (nullable = $nullable)\n")
396403
DataType.buildFormattedString(dataType, s"$prefix |", builder)
397404
}
398405

406+
// override the default toString to be compatible with legacy parquet files.
407+
override def toString: String = s"StructField($name,$dataType,$nullable)"
408+
399409
private[sql] def jsonValue: JValue = {
400410
("name" -> name) ~
401411
("type" -> dataType.jsonValue) ~
402-
("nullable" -> nullable)
412+
("nullable" -> nullable) ~
413+
("metadata" -> metadata.jsonValue)
403414
}
404415
}
405416

406417
object StructType {
407418
protected[sql] def fromAttributes(attributes: Seq[Attribute]): StructType =
408-
StructType(attributes.map(a => StructField(a.name, a.dataType, a.nullable)))
419+
StructType(attributes.map(a => StructField(a.name, a.dataType, a.nullable, a.metadata)))
409420
}
410421

411422
case class StructType(fields: Seq[StructField]) extends DataType {
@@ -439,7 +450,7 @@ case class StructType(fields: Seq[StructField]) extends DataType {
439450
}
440451

441452
protected[sql] def toAttributes =
442-
fields.map(f => AttributeReference(f.name, f.dataType, f.nullable)())
453+
fields.map(f => AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
443454

444455
def treeString: String = {
445456
val builder = new StringBuilder

0 commit comments

Comments
 (0)