Skip to content

Commit 8ff6402

Browse files
committed
Add specific row.
1 parent 58d15f1 commit 8ff6402

File tree

2 files changed

+301
-4
lines changed

2 files changed

+301
-4
lines changed
Lines changed: 297 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,297 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.expressions
19+
20+
import org.apache.spark.sql.catalyst.types._
21+
22+
/**
23+
*
24+
*
25+
*
26+
{{{
27+
val types = "Int,Float,Boolean,Double,Short,Long,Byte,Any".split(",")
28+
types.map {tpe =>
29+
s"""
30+
final class Mutable$tpe extends MutableValue {
31+
var value: $tpe = 0
32+
def boxed = if (isNull) null else value
33+
def update(v: Any) = value = {
34+
isNull = false
35+
v.asInstanceOf[$tpe]
36+
}
37+
def copy() = {
38+
val newCopy = new Mutable$tpe
39+
newCopy.isNull = isNull
40+
newCopy.value = value
41+
newCopy.asInstanceOf[this.type]
42+
}
43+
}"""
44+
}.foreach(println)
45+
46+
types.map { tpe =>
47+
s"""
48+
override def set$tpe(ordinal: Int, value: $tpe): Unit = {
49+
val currentValue = values(ordinal).asInstanceOf[Mutable$tpe]
50+
currentValue.isNull = false
51+
currentValue.value = value
52+
}
53+
54+
override def get$tpe(i: Int): $tpe = {
55+
values(i).asInstanceOf[Mutable$tpe].value
56+
}"""
57+
}.foreach(println)
58+
}}}
59+
*/
60+
abstract class MutableValue {
61+
var isNull: Boolean = true
62+
def boxed: Any
63+
def update(v: Any)
64+
def copy(): this.type
65+
}
66+
67+
final class MutableInt extends MutableValue {
68+
var value: Int = 0
69+
def boxed = if (isNull) null else value
70+
def update(v: Any) = value = {
71+
isNull = false
72+
v.asInstanceOf[Int]
73+
}
74+
def copy() = {
75+
val newCopy = new MutableInt
76+
newCopy.isNull = isNull
77+
newCopy.value = value
78+
newCopy.asInstanceOf[this.type]
79+
}
80+
}
81+
82+
final class MutableFloat extends MutableValue {
83+
var value: Float = 0
84+
def boxed = if (isNull) null else value
85+
def update(v: Any) = value = {
86+
isNull = false
87+
v.asInstanceOf[Float]
88+
}
89+
def copy() = {
90+
val newCopy = new MutableFloat
91+
newCopy.isNull = isNull
92+
newCopy.value = value
93+
newCopy.asInstanceOf[this.type]
94+
}
95+
}
96+
97+
final class MutableBoolean extends MutableValue {
98+
var value: Boolean = false
99+
def boxed = if (isNull) null else value
100+
def update(v: Any) = value = {
101+
isNull = false
102+
v.asInstanceOf[Boolean]
103+
}
104+
def copy() = {
105+
val newCopy = new MutableBoolean
106+
newCopy.isNull = isNull
107+
newCopy.value = value
108+
newCopy.asInstanceOf[this.type]
109+
}
110+
}
111+
112+
final class MutableDouble extends MutableValue {
113+
var value: Double = 0
114+
def boxed = if (isNull) null else value
115+
def update(v: Any) = value = {
116+
isNull = false
117+
v.asInstanceOf[Double]
118+
}
119+
def copy() = {
120+
val newCopy = new MutableDouble
121+
newCopy.isNull = isNull
122+
newCopy.value = value
123+
newCopy.asInstanceOf[this.type]
124+
}
125+
}
126+
127+
final class MutableShort extends MutableValue {
128+
var value: Short = 0
129+
def boxed = if (isNull) null else value
130+
def update(v: Any) = value = {
131+
isNull = false
132+
v.asInstanceOf[Short]
133+
}
134+
def copy() = {
135+
val newCopy = new MutableShort
136+
newCopy.isNull = isNull
137+
newCopy.value = value
138+
newCopy.asInstanceOf[this.type]
139+
}
140+
}
141+
142+
final class MutableLong extends MutableValue {
143+
var value: Long = 0
144+
def boxed = if (isNull) null else value
145+
def update(v: Any) = value = {
146+
isNull = false
147+
v.asInstanceOf[Long]
148+
}
149+
def copy() = {
150+
val newCopy = new MutableLong
151+
newCopy.isNull = isNull
152+
newCopy.value = value
153+
newCopy.asInstanceOf[this.type]
154+
}
155+
}
156+
157+
final class MutableByte extends MutableValue {
158+
var value: Byte = 0
159+
def boxed = if (isNull) null else value
160+
def update(v: Any) = value = {
161+
isNull = false
162+
v.asInstanceOf[Byte]
163+
}
164+
def copy() = {
165+
val newCopy = new MutableByte
166+
newCopy.isNull = isNull
167+
newCopy.value = value
168+
newCopy.asInstanceOf[this.type]
169+
}
170+
}
171+
172+
final class MutableAny extends MutableValue {
173+
var value: Any = 0
174+
def boxed = if (isNull) null else value
175+
def update(v: Any) = value = {
176+
isNull = false
177+
v.asInstanceOf[Any]
178+
}
179+
def copy() = {
180+
val newCopy = new MutableAny
181+
newCopy.isNull = isNull
182+
newCopy.value = value
183+
newCopy.asInstanceOf[this.type]
184+
}
185+
}
186+
187+
class SpecificMutableRow(val values: Array[MutableValue]) extends MutableRow {
188+
189+
def this(dataTypes: Seq[DataType]) =
190+
this(
191+
dataTypes.map {
192+
case IntegerType => new MutableInt
193+
case ByteType => new MutableByte
194+
case FloatType => new MutableFloat
195+
case ShortType => new MutableShort
196+
case LongType => new MutableLong
197+
case _ => new MutableAny
198+
}.toArray)
199+
200+
override def length: Int = values.length
201+
202+
override def setNullAt(i: Int): Unit = {
203+
values(i).isNull = true
204+
}
205+
206+
override def apply(i: Int): Any = values(i).boxed
207+
208+
override def isNullAt(i: Int): Boolean = values(i).isNull
209+
210+
override def copy(): Row = {
211+
val newValues = new Array[MutableValue](values.length)
212+
var i = 0
213+
while (i < values.length) {
214+
newValues(i) = values(i).copy()
215+
i += 1
216+
}
217+
new SpecificMutableRow(newValues)
218+
}
219+
220+
override def update(ordinal: Int, value: Any): Unit = values(ordinal).update(value)
221+
222+
override def iterator: Iterator[Any] = values.map(_.boxed).iterator
223+
224+
def setString(ordinal: Int, value: String) = update(ordinal, value)
225+
226+
def getString(ordinal: Int) = apply(ordinal).asInstanceOf[String]
227+
228+
override def setInt(ordinal: Int, value: Int): Unit = {
229+
val currentValue = values(ordinal).asInstanceOf[MutableInt]
230+
currentValue.isNull = false
231+
currentValue.value = value
232+
}
233+
234+
override def getInt(i: Int): Int = {
235+
values(i).asInstanceOf[MutableInt].value
236+
}
237+
238+
override def setFloat(ordinal: Int, value: Float): Unit = {
239+
val currentValue = values(ordinal).asInstanceOf[MutableFloat]
240+
currentValue.isNull = false
241+
currentValue.value = value
242+
}
243+
244+
override def getFloat(i: Int): Float = {
245+
values(i).asInstanceOf[MutableFloat].value
246+
}
247+
248+
override def setBoolean(ordinal: Int, value: Boolean): Unit = {
249+
val currentValue = values(ordinal).asInstanceOf[MutableBoolean]
250+
currentValue.isNull = false
251+
currentValue.value = value
252+
}
253+
254+
override def getBoolean(i: Int): Boolean = {
255+
values(i).asInstanceOf[MutableBoolean].value
256+
}
257+
258+
override def setDouble(ordinal: Int, value: Double): Unit = {
259+
val currentValue = values(ordinal).asInstanceOf[MutableDouble]
260+
currentValue.isNull = false
261+
currentValue.value = value
262+
}
263+
264+
override def getDouble(i: Int): Double = {
265+
values(i).asInstanceOf[MutableDouble].value
266+
}
267+
268+
override def setShort(ordinal: Int, value: Short): Unit = {
269+
val currentValue = values(ordinal).asInstanceOf[MutableShort]
270+
currentValue.isNull = false
271+
currentValue.value = value
272+
}
273+
274+
override def getShort(i: Int): Short = {
275+
values(i).asInstanceOf[MutableShort].value
276+
}
277+
278+
override def setLong(ordinal: Int, value: Long): Unit = {
279+
val currentValue = values(ordinal).asInstanceOf[MutableLong]
280+
currentValue.isNull = false
281+
currentValue.value = value
282+
}
283+
284+
override def getLong(i: Int): Long = {
285+
values(i).asInstanceOf[MutableLong].value
286+
}
287+
288+
override def setByte(ordinal: Int, value: Byte): Unit = {
289+
val currentValue = values(ordinal).asInstanceOf[MutableByte]
290+
currentValue.isNull = false
291+
currentValue.value = value
292+
}
293+
294+
override def getByte(i: Int): Byte = {
295+
values(i).asInstanceOf[MutableByte].value
296+
}
297+
}

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import parquet.io.api.{PrimitiveConverter, GroupConverter, Binary, Converter}
2323
import parquet.schema.MessageType
2424

2525
import org.apache.spark.sql.catalyst.types._
26-
import org.apache.spark.sql.catalyst.expressions.{GenericRow, Row, Attribute}
26+
import org.apache.spark.sql.catalyst.expressions._
2727
import org.apache.spark.sql.parquet.CatalystConverter.FieldType
2828

2929
/**
@@ -278,14 +278,14 @@ private[parquet] class CatalystGroupConverter(
278278
*/
279279
private[parquet] class CatalystPrimitiveRowConverter(
280280
protected[parquet] val schema: Array[FieldType],
281-
protected[parquet] var current: ParquetRelation.RowType)
281+
protected[parquet] var current: MutableRow)
282282
extends CatalystConverter {
283283

284284
// This constructor is used for the root converter only
285285
def this(attributes: Array[Attribute]) =
286286
this(
287287
attributes.map(a => new FieldType(a.name, a.dataType, a.nullable)),
288-
new ParquetRelation.RowType(attributes.length))
288+
new SpecificMutableRow(attributes.map(_.dataType)))
289289

290290
protected [parquet] val converters: Array[Converter] =
291291
schema.zipWithIndex.map {
@@ -299,7 +299,7 @@ private[parquet] class CatalystPrimitiveRowConverter(
299299
override val parent = null
300300

301301
// Should be only called in root group converter!
302-
override def getCurrentRecord: ParquetRelation.RowType = current
302+
override def getCurrentRecord: Row = current
303303

304304
override def getConverter(fieldIndex: Int): Converter = converters(fieldIndex)
305305

0 commit comments

Comments
 (0)