@@ -3,32 +3,43 @@ package org.apache.spark.sql
3
3
import org .apache .spark .sql .test .TestSQLContext
4
4
import org .scalatest .FunSuite
5
5
6
- case class Person (name : String , age : Int )
6
+ case class Person (id : Int , name : String , age : Int )
7
+
8
+ case class Score (personId : Int , score : Double )
7
9
8
10
class MetadataSuite extends FunSuite {
9
11
10
12
test(" metadata" ) {
11
13
val sqlContext = TestSQLContext
12
14
import sqlContext ._
13
- val members = sqlContext.sparkContext.makeRDD(Seq (
14
- Person (" mike" , 10 ),
15
- Person (" jim" , 20 )))
16
- val person : SchemaRDD = sqlContext.createSchemaRDD(members)
17
- val schema : StructType = person.schema
18
- println(" schema: " + schema)
19
- val ageField = schema(" age" ).copy(metadata = Map (" doc" -> " age (must be nonnegative)" ))
20
- val newSchema = schema.copy(Seq (schema(" name" ), ageField))
21
- val newTable = sqlContext.applySchema(person, newSchema)
22
- newTable.registerTempTable(" person" )
23
- val selectByExprAgeField = newTable.select(' age ).schema(" age" )
15
+ val person = sqlContext.sparkContext.makeRDD(Seq (
16
+ Person (0 , " mike" , 10 ),
17
+ Person (1 , " jim" , 20 ))).toSchemaRDD
18
+ val score = sqlContext.sparkContext.makeRDD(Seq (
19
+ Score (0 , 4.0 ),
20
+ Score (1 , 5.0 ))).toSchemaRDD
21
+ val personSchema : StructType = person.schema
22
+ println(" schema: " + personSchema)
23
+ val ageField = personSchema(" age" ).copy(metadata = Map (" doc" -> " age (must be nonnegative)" ))
24
+ val newPersonSchema = personSchema.copy(Seq (personSchema(" id" ), personSchema(" name" ), ageField))
25
+ val newPerson = sqlContext.applySchema(person, newPersonSchema)
26
+ newPerson.registerTempTable(" person" )
27
+ score.registerTempTable(" score" )
28
+ val selectByExprAgeField = newPerson.select(' age ).schema(" age" )
24
29
assert(selectByExprAgeField.metadata.contains(" doc" ))
25
- val selectByNameAttrAgeField = newTable .select(" age" .attr).schema(" age" )
30
+ val selectByNameAttrAgeField = newPerson .select(" age" .attr).schema(" age" )
26
31
assert(selectByNameAttrAgeField.metadata.contains(" doc" ))
27
32
val selectAgeBySQL = sql(" SELECT age FROM person" ).schema(" age" )
28
33
println(selectAgeBySQL)
29
34
assert(selectAgeBySQL.metadata.contains(" doc" ))
30
35
val selectStarBySQL = sql(" SELECT * FROM person" ).schema(" age" )
31
36
println(selectStarBySQL)
32
37
assert(selectStarBySQL.metadata.contains(" doc" ))
38
+ val selectStarJoinBySQL = sql(" SELECT * FROM person JOIN score ON id = personId" ).schema(" age" )
39
+ println(selectStarJoinBySQL)
40
+ assert(selectStarJoinBySQL.metadata.contains(" doc" ))
41
+ val selectAgeJoinBySQL = sql(" SELECT age, score FROM person JOIN score ON id = personId" ).schema(" age" )
42
+ println(selectAgeJoinBySQL)
43
+ assert(selectAgeJoinBySQL.metadata.contains(" doc" ))
33
44
}
34
45
}
0 commit comments