Skip to content

Commit 6b37c86

Browse files
ALeksander Eskilsoncloud-fan
authored andcommitted
[SPARK-18016][SQL][CATALYST][BRANCH-2.1] Code Generation: Constant Pool Limit - Class Splitting
## What changes were proposed in this pull request? This is a backport patch for Spark 2.1.x of the class splitting feature over excess generated code as was merged in #18075. ## How was this patch tested? The same test provided in #18075 is included in this patch. Author: ALeksander Eskilson <[email protected]> Closes #18354 from bdrillard/class_splitting_2.1.
1 parent 8923bac commit 6b37c86

File tree

20 files changed

+267
-96
lines changed

20 files changed

+267
-96
lines changed

sql/catalyst/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,13 @@
126126
</execution>
127127
</executions>
128128
</plugin>
129+
<plugin>
130+
<groupId>org.scalatest</groupId>
131+
<artifactId>scalatest-maven-plugin</artifactId>
132+
<configuration>
133+
<argLine>-Xmx4g -Xss4096k -XX:MaxPermSize=${MaxPermGen} -XX:ReservedCodeCacheSize=512m</argLine>
134+
</configuration>
135+
</plugin>
129136
<plugin>
130137
<groupId>org.antlr</groupId>
131138
<artifactId>antlr4-maven-plugin</artifactId>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -988,7 +988,7 @@ case class ScalaUDF(
988988
val converterTerm = ctx.freshName("converter")
989989
val expressionIdx = ctx.references.size - 1
990990
ctx.addMutableState(converterClassName, converterTerm,
991-
s"this.$converterTerm = ($converterClassName)$typeConvertersClassName" +
991+
s"$converterTerm = ($converterClassName)$typeConvertersClassName" +
992992
s".createToScalaConverter(((${expressionClassName})((($scalaUDFClassName)" +
993993
s"references[$expressionIdx]).getChildren().apply($index))).dataType());")
994994
converterTerm
@@ -1005,7 +1005,7 @@ case class ScalaUDF(
10051005
// Generate codes used to convert the returned value of user-defined functions to Catalyst type
10061006
val catalystConverterTerm = ctx.freshName("catalystConverter")
10071007
ctx.addMutableState(converterClassName, catalystConverterTerm,
1008-
s"this.$catalystConverterTerm = ($converterClassName)$typeConvertersClassName" +
1008+
s"$catalystConverterTerm = ($converterClassName)$typeConvertersClassName" +
10091009
s".createToCatalystConverter($scalaUDF.dataType());")
10101010

10111011
val resultTerm = ctx.freshName("result")
@@ -1019,7 +1019,7 @@ case class ScalaUDF(
10191019

10201020
val funcTerm = ctx.freshName("udf")
10211021
ctx.addMutableState(funcClassName, funcTerm,
1022-
s"this.$funcTerm = ($funcClassName)$scalaUDF.userDefinedFunc();")
1022+
s"$funcTerm = ($funcClassName)$scalaUDF.userDefinedFunc();")
10231023

10241024
// codegen for children expressions
10251025
val evals = children.map(_.genCode(ctx))

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala

Lines changed: 127 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ class CodegenContext {
109109
val idx = references.length
110110
references += obj
111111
val clsName = Option(className).getOrElse(obj.getClass.getName)
112-
addMutableState(clsName, term, s"this.$term = ($clsName) references[$idx];")
112+
addMutableState(clsName, term, s"$term = ($clsName) references[$idx];")
113113
term
114114
}
115115

@@ -198,41 +198,139 @@ class CodegenContext {
198198
partitionInitializationStatements.mkString("\n")
199199
}
200200

201+
/**
202+
* Holds expressions that are equivalent. Used to perform subexpression elimination
203+
* during codegen.
204+
*
205+
* For expressions that appear more than once, generate additional code to prevent
206+
* recomputing the value.
207+
*
208+
* For example, consider two expression generated from this SQL statement:
209+
* SELECT (col1 + col2), (col1 + col2) / col3.
210+
*
211+
* equivalentExpressions will match the tree containing `col1 + col2` and it will only
212+
* be evaluated once.
213+
*/
214+
val equivalentExpressions: EquivalentExpressions = new EquivalentExpressions
215+
216+
// Foreach expression that is participating in subexpression elimination, the state to use.
217+
val subExprEliminationExprs = mutable.HashMap.empty[Expression, SubExprEliminationState]
218+
219+
// The collection of sub-expression result resetting methods that need to be called on each row.
220+
val subexprFunctions = mutable.ArrayBuffer.empty[String]
221+
222+
private val outerClassName = "OuterClass"
223+
201224
/**
202-
* Holding all the functions those will be added into generated class.
225+
* Holds the class and instance names to be generated, where `OuterClass` is a placeholder
226+
* standing for whichever class is generated as the outermost class and which will contain any
227+
* nested sub-classes. All other classes and instance names in this list will represent private,
228+
* nested sub-classes.
203229
*/
204-
val addedFunctions: mutable.Map[String, String] =
205-
mutable.Map.empty[String, String]
230+
private val classes: mutable.ListBuffer[(String, String)] =
231+
mutable.ListBuffer[(String, String)](outerClassName -> null)
232+
233+
// A map holding the current size in bytes of each class to be generated.
234+
private val classSize: mutable.Map[String, Int] =
235+
mutable.Map[String, Int](outerClassName -> 0)
236+
237+
// Nested maps holding function names and their code belonging to each class.
238+
private val classFunctions: mutable.Map[String, mutable.Map[String, String]] =
239+
mutable.Map(outerClassName -> mutable.Map.empty[String, String])
206240

207-
def addNewFunction(funcName: String, funcCode: String): Unit = {
208-
addedFunctions += ((funcName, funcCode))
241+
// Returns the size of the most recently added class.
242+
private def currClassSize(): Int = classSize(classes.head._1)
243+
244+
// Returns the class name and instance name for the most recently added class.
245+
private def currClass(): (String, String) = classes.head
246+
247+
// Adds a new class. Requires the class' name, and its instance name.
248+
private def addClass(className: String, classInstance: String): Unit = {
249+
classes.prepend(className -> classInstance)
250+
classSize += className -> 0
251+
classFunctions += className -> mutable.Map.empty[String, String]
209252
}
210253

211254
/**
212-
* Holds expressions that are equivalent. Used to perform subexpression elimination
213-
* during codegen.
214-
*
215-
* For expressions that appear more than once, generate additional code to prevent
216-
* recomputing the value.
255+
* Adds a function to the generated class. If the code for the `OuterClass` grows too large, the
256+
* function will be inlined into a new private, nested class, and a instance-qualified name for
257+
* the function will be returned. Otherwise, the function will be inlined to the `OuterClass` the
258+
* simple `funcName` will be returned.
217259
*
218-
* For example, consider two expression generated from this SQL statement:
219-
* SELECT (col1 + col2), (col1 + col2) / col3.
220-
*
221-
* equivalentExpressions will match the tree containing `col1 + col2` and it will only
222-
* be evaluated once.
260+
* @param funcName the class-unqualified name of the function
261+
* @param funcCode the body of the function
262+
* @param inlineToOuterClass whether the given code must be inlined to the `OuterClass`. This
263+
* can be necessary when a function is declared outside of the context
264+
* it is eventually referenced and a returned qualified function name
265+
* cannot otherwise be accessed.
266+
* @return the name of the function, qualified by class if it will be inlined to a private,
267+
* nested sub-class
223268
*/
224-
val equivalentExpressions: EquivalentExpressions = new EquivalentExpressions
269+
def addNewFunction(
270+
funcName: String,
271+
funcCode: String,
272+
inlineToOuterClass: Boolean = false): String = {
273+
// The number of named constants that can exist in the class is limited by the Constant Pool
274+
// limit, 65,536. We cannot know how many constants will be inserted for a class, so we use a
275+
// threshold of 1600k bytes to determine when a function should be inlined to a private, nested
276+
// sub-class.
277+
val (className, classInstance) = if (inlineToOuterClass) {
278+
outerClassName -> ""
279+
} else if (currClassSize > 1600000) {
280+
val className = freshName("NestedClass")
281+
val classInstance = freshName("nestedClassInstance")
282+
283+
addClass(className, classInstance)
284+
285+
className -> classInstance
286+
} else {
287+
currClass()
288+
}
225289

226-
// Foreach expression that is participating in subexpression elimination, the state to use.
227-
val subExprEliminationExprs = mutable.HashMap.empty[Expression, SubExprEliminationState]
290+
classSize(className) += funcCode.length
291+
classFunctions(className) += funcName -> funcCode
228292

229-
// The collection of sub-expression result resetting methods that need to be called on each row.
230-
val subexprFunctions = mutable.ArrayBuffer.empty[String]
293+
if (className == outerClassName) {
294+
funcName
295+
} else {
231296

232-
def declareAddedFunctions(): String = {
233-
addedFunctions.map { case (funcName, funcCode) => funcCode }.mkString("\n")
297+
s"$classInstance.$funcName"
298+
}
234299
}
235300

301+
/**
302+
* Instantiates all nested, private sub-classes as objects to the `OuterClass`
303+
*/
304+
private[sql] def initNestedClasses(): String = {
305+
// Nested, private sub-classes have no mutable state (though they do reference the outer class'
306+
// mutable state), so we declare and initialize them inline to the OuterClass.
307+
classes.filter(_._1 != outerClassName).map {
308+
case (className, classInstance) =>
309+
s"private $className $classInstance = new $className();"
310+
}.mkString("\n")
311+
}
312+
313+
/**
314+
* Declares all function code that should be inlined to the `OuterClass`.
315+
*/
316+
private[sql] def declareAddedFunctions(): String = {
317+
classFunctions(outerClassName).values.mkString("\n")
318+
}
319+
320+
/**
321+
* Declares all nested, private sub-classes and the function code that should be inlined to them.
322+
*/
323+
private[sql] def declareNestedClasses(): String = {
324+
classFunctions.filterKeys(_ != outerClassName).map {
325+
case (className, functions) =>
326+
s"""
327+
|private class $className {
328+
| ${functions.values.mkString("\n")}
329+
|}
330+
""".stripMargin
331+
}
332+
}.mkString("\n")
333+
236334
final val JAVA_BOOLEAN = "boolean"
237335
final val JAVA_BYTE = "byte"
238336
final val JAVA_SHORT = "short"
@@ -552,8 +650,7 @@ class CodegenContext {
552650
return 0;
553651
}
554652
"""
555-
addNewFunction(compareFunc, funcCode)
556-
s"this.$compareFunc($c1, $c2)"
653+
s"${addNewFunction(compareFunc, funcCode)}($c1, $c2)"
557654
case schema: StructType =>
558655
val comparisons = GenerateOrdering.genComparisons(this, schema)
559656
val compareFunc = freshName("compareStruct")
@@ -569,8 +666,7 @@ class CodegenContext {
569666
return 0;
570667
}
571668
"""
572-
addNewFunction(compareFunc, funcCode)
573-
s"this.$compareFunc($c1, $c2)"
669+
s"${addNewFunction(compareFunc, funcCode)}($c1, $c2)"
574670
case other if other.isInstanceOf[AtomicType] => s"$c1.compare($c2)"
575671
case udt: UserDefinedType[_] => genComp(udt.sqlType, c1, c2)
576672
case _ =>
@@ -640,7 +736,9 @@ class CodegenContext {
640736

641737
/**
642738
* Splits the generated code of expressions into multiple functions, because function has
643-
* 64kb code size limit in JVM
739+
* 64kb code size limit in JVM. If the class to which the function would be inlined would grow
740+
* beyond 1600kb, we declare a private, nested sub-class, and the function is inlined to it
741+
* instead, because classes have a constant pool limit of 65,536 named values.
644742
*
645743
* @param expressions the codes to evaluate expressions.
646744
* @param funcName the split function name base.
@@ -685,7 +783,6 @@ class CodegenContext {
685783
|}
686784
""".stripMargin
687785
addNewFunction(name, code)
688-
name
689786
}
690787

691788
foldFunctions(functions.map(name => s"$name(${arguments.map(_._2).mkString(", ")})"))
@@ -769,8 +866,6 @@ class CodegenContext {
769866
|}
770867
""".stripMargin
771868

772-
addNewFunction(fnName, fn)
773-
774869
// Add a state and a mapping of the common subexpressions that are associate with this
775870
// state. Adding this expression to subExprEliminationExprMap means it will call `fn`
776871
// when it is code generated. This decision should be a cost based one.
@@ -791,7 +886,7 @@ class CodegenContext {
791886
addMutableState(javaType(expr.dataType), value,
792887
s"$value = ${defaultValue(expr.dataType)};")
793888

794-
subexprFunctions += s"$fnName($INPUT_ROW);"
889+
subexprFunctions += s"${addNewFunction(fnName, fn)}($INPUT_ROW);"
795890
val state = SubExprEliminationState(isNull, value)
796891
e.foreach(subExprEliminationExprs.put(_, state))
797892
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -63,21 +63,21 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP
6363
if (e.nullable) {
6464
val isNull = s"isNull_$i"
6565
val value = s"value_$i"
66-
ctx.addMutableState("boolean", isNull, s"this.$isNull = true;")
66+
ctx.addMutableState("boolean", isNull, s"$isNull = true;")
6767
ctx.addMutableState(ctx.javaType(e.dataType), value,
68-
s"this.$value = ${ctx.defaultValue(e.dataType)};")
68+
s"$value = ${ctx.defaultValue(e.dataType)};")
6969
s"""
7070
${ev.code}
71-
this.$isNull = ${ev.isNull};
72-
this.$value = ${ev.value};
71+
$isNull = ${ev.isNull};
72+
$value = ${ev.value};
7373
"""
7474
} else {
7575
val value = s"value_$i"
7676
ctx.addMutableState(ctx.javaType(e.dataType), value,
77-
s"this.$value = ${ctx.defaultValue(e.dataType)};")
77+
s"$value = ${ctx.defaultValue(e.dataType)};")
7878
s"""
7979
${ev.code}
80-
this.$value = ${ev.value};
80+
$value = ${ev.value};
8181
"""
8282
}
8383
}
@@ -87,7 +87,7 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP
8787

8888
val updates = validExpr.zip(index).map {
8989
case (e, i) =>
90-
val ev = ExprCode("", s"this.isNull_$i", s"this.value_$i")
90+
val ev = ExprCode("", s"isNull_$i", s"value_$i")
9191
ctx.updateColumn("mutableRow", e.dataType, i, ev, e.nullable)
9292
}
9393

@@ -135,6 +135,9 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP
135135
$allUpdates
136136
return mutableRow;
137137
}
138+
139+
${ctx.initNestedClasses()}
140+
${ctx.declareNestedClasses()}
138141
}
139142
"""
140143

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,9 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR
179179
$comparisons
180180
return 0;
181181
}
182+
183+
${ctx.initNestedClasses()}
184+
${ctx.declareNestedClasses()}
182185
}"""
183186

184187
val code = CodeFormatter.stripOverlappingComments(

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,9 @@ object GeneratePredicate extends CodeGenerator[Expression, Predicate] {
7272
${eval.code}
7373
return !${eval.isNull} && ${eval.value};
7474
}
75+
76+
${ctx.initNestedClasses()}
77+
${ctx.declareNestedClasses()}
7578
}"""
7679

7780
val code = CodeFormatter.stripOverlappingComments(

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection]
4949
val output = ctx.freshName("safeRow")
5050
val values = ctx.freshName("values")
5151
// These expressions could be split into multiple functions
52-
ctx.addMutableState("Object[]", values, s"this.$values = null;")
52+
ctx.addMutableState("Object[]", values, s"$values = null;")
5353

5454
val rowClass = classOf[GenericInternalRow].getName
5555

@@ -65,10 +65,10 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection]
6565
val allFields = ctx.splitExpressions(tmp, fieldWriters)
6666
val code = s"""
6767
final InternalRow $tmp = $input;
68-
this.$values = new Object[${schema.length}];
68+
$values = new Object[${schema.length}];
6969
$allFields
7070
final InternalRow $output = new $rowClass($values);
71-
this.$values = null;
71+
$values = null;
7272
"""
7373

7474
ExprCode(code, "false", output)
@@ -184,6 +184,9 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection]
184184
$allExpressions
185185
return mutableRow;
186186
}
187+
188+
${ctx.initNestedClasses()}
189+
${ctx.declareNestedClasses()}
187190
}
188191
"""
189192

0 commit comments

Comments
 (0)