From d1d996b764e4587c254e863d7c8ad7827713dfb1 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Tue, 20 Apr 2021 11:15:52 +0900 Subject: [PATCH 1/2] Bump up janino version to 3.1.3 --- dev/deps/spark-deps-hadoop-2.7-hive-2.3 | 4 +- dev/deps/spark-deps-hadoop-3.2-hive-2.3 | 4 +- pom.xml | 2 +- .../expressions/codegen/CodeGenerator.scala | 12 ++-- .../sql/errors/QueryExecutionErrors.scala | 3 +- .../spark/sql/execution/ExpandExec.scala | 60 +++++++++++++------ 6 files changed, 54 insertions(+), 31 deletions(-) diff --git a/dev/deps/spark-deps-hadoop-2.7-hive-2.3 b/dev/deps/spark-deps-hadoop-2.7-hive-2.3 index c8077d5ff3664..c98d2779e5e50 100644 --- a/dev/deps/spark-deps-hadoop-2.7-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-2.7-hive-2.3 @@ -37,7 +37,7 @@ commons-beanutils/1.9.4//commons-beanutils-1.9.4.jar commons-cli/1.2//commons-cli-1.2.jar commons-codec/1.15//commons-codec-1.15.jar commons-collections/3.2.2//commons-collections-3.2.2.jar -commons-compiler/3.0.16//commons-compiler-3.0.16.jar +commons-compiler/3.1.3//commons-compiler-3.1.3.jar commons-compress/1.20//commons-compress-1.20.jar commons-configuration/1.6//commons-configuration-1.6.jar commons-crypto/1.1.0//commons-crypto-1.1.0.jar @@ -122,7 +122,7 @@ jakarta.servlet-api/4.0.3//jakarta.servlet-api-4.0.3.jar jakarta.validation-api/2.0.2//jakarta.validation-api-2.0.2.jar jakarta.ws.rs-api/2.1.6//jakarta.ws.rs-api-2.1.6.jar jakarta.xml.bind-api/2.3.2//jakarta.xml.bind-api-2.3.2.jar -janino/3.0.16//janino-3.0.16.jar +janino/3.1.3//janino-3.1.3.jar javassist/3.25.0-GA//javassist-3.25.0-GA.jar javax.inject/1//javax.inject-1.jar javax.jdo/3.2.0-m3//javax.jdo-3.2.0-m3.jar diff --git a/dev/deps/spark-deps-hadoop-3.2-hive-2.3 b/dev/deps/spark-deps-hadoop-3.2-hive-2.3 index 841dd523e5320..2774866a0a872 100644 --- a/dev/deps/spark-deps-hadoop-3.2-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-3.2-hive-2.3 @@ -31,7 +31,7 @@ chill_2.12/0.9.5//chill_2.12-0.9.5.jar commons-cli/1.2//commons-cli-1.2.jar commons-codec/1.15//commons-codec-1.15.jar commons-collections/3.2.2//commons-collections-3.2.2.jar -commons-compiler/3.0.16//commons-compiler-3.0.16.jar +commons-compiler/3.1.3//commons-compiler-3.1.3.jar commons-compress/1.20//commons-compress-1.20.jar commons-crypto/1.1.0//commons-crypto-1.1.0.jar commons-dbcp/1.4//commons-dbcp-1.4.jar @@ -98,7 +98,7 @@ jakarta.servlet-api/4.0.3//jakarta.servlet-api-4.0.3.jar jakarta.validation-api/2.0.2//jakarta.validation-api-2.0.2.jar jakarta.ws.rs-api/2.1.6//jakarta.ws.rs-api-2.1.6.jar jakarta.xml.bind-api/2.3.2//jakarta.xml.bind-api-2.3.2.jar -janino/3.0.16//janino-3.0.16.jar +janino/3.1.3//janino-3.1.3.jar javassist/3.25.0-GA//javassist-3.25.0-GA.jar javax.jdo/3.2.0-m3//javax.jdo-3.2.0-m3.jar javolution/5.5.1//javolution-5.5.1.jar diff --git a/pom.xml b/pom.xml index f8ab52bb6ef10..55893d3baac9e 100644 --- a/pom.xml +++ b/pom.xml @@ -184,7 +184,7 @@ 2.6.2 4.1.17 14.0.1 - 3.0.16 + 3.1.3 2.34 2.10.5 3.5.2 diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 6ed193a4d9d46..f1fc718432c56 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -27,8 +27,9 @@ import scala.util.control.NonFatal import com.google.common.cache.{CacheBuilder, CacheLoader} import com.google.common.util.concurrent.{ExecutionError, UncheckedExecutionException} -import org.codehaus.commons.compiler.CompileException -import org.codehaus.janino.{ByteArrayClassLoader, ClassBodyEvaluator, InternalCompilerException, SimpleCompiler} +import org.codehaus.commons.compiler.{CompileException, InternalCompilerException} +import org.codehaus.commons.compiler.util.reflect.ByteArrayClassLoader +import org.codehaus.janino.{ClassBodyEvaluator, SimpleCompiler} import org.codehaus.janino.util.ClassFile import org.apache.spark.{TaskContext, TaskKilledException} @@ -1434,9 +1435,10 @@ object CodeGenerator extends Logging { private def updateAndGetCompilationStats(evaluator: ClassBodyEvaluator): ByteCodeStats = { // First retrieve the generated classes. val classes = { - val resultField = classOf[SimpleCompiler].getDeclaredField("result") - resultField.setAccessible(true) - val loader = resultField.get(evaluator).asInstanceOf[ByteArrayClassLoader] + val scField = classOf[ClassBodyEvaluator].getDeclaredField("sc") + scField.setAccessible(true) + val compiler = scField.get(evaluator).asInstanceOf[SimpleCompiler] + val loader = compiler.getClassLoader.asInstanceOf[ByteArrayClassLoader] val classesField = loader.getClass.getDeclaredField("classes") classesField.setAccessible(true) classesField.get(loader).asInstanceOf[JavaMap[String, Array[Byte]]].asScala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 3589c875fb723..30da3f5cb3ed6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -23,8 +23,7 @@ import java.sql.{SQLException, SQLFeatureNotSupportedException} import java.time.DateTimeException import org.apache.hadoop.fs.{FileStatus, Path} -import org.codehaus.commons.compiler.CompileException -import org.codehaus.janino.InternalCompilerException +import org.codehaus.commons.compiler.{CompileException, InternalCompilerException} import org.apache.spark.{Partition, SparkException, SparkUpgradeException} import org.apache.spark.executor.CommitDeniedException diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala index 3fd653130e57c..24ad90b929cdf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala @@ -17,11 +17,12 @@ package org.apache.spark.sql.execution +import scala.collection.mutable + import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen._ -import org.apache.spark.sql.catalyst.expressions.codegen.Block._ import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} import org.apache.spark.sql.execution.metric.SQLMetrics @@ -152,15 +153,16 @@ case class ExpandExec( // This column is the same across all output rows. Just generate code for it here. BindReferences.bindReference(firstExpr, attributeSeq).genCode(ctx) } else { - val isNull = ctx.freshName("isNull") - val value = ctx.freshName("value") - val code = code""" - |boolean $isNull = true; - |${CodeGenerator.javaType(firstExpr.dataType)} $value = - | ${CodeGenerator.defaultValue(firstExpr.dataType)}; - """.stripMargin + val isNull = ctx.addMutableState( + CodeGenerator.JAVA_BOOLEAN, + "resultIsNull", + v => s"$v = true;") + val value = ctx.addMutableState( + CodeGenerator.javaType(firstExpr.dataType), + "resultValue", + v => s"$v = ${CodeGenerator.defaultValue(firstExpr.dataType)};") + ExprCode( - code, JavaCode.isNullVariable(isNull), JavaCode.variable(value, firstExpr.dataType)) } @@ -168,22 +170,42 @@ case class ExpandExec( // Part 2: switch/case statements val cases = projections.zipWithIndex.map { case (exprs, row) => - var updateCode = "" - for (col <- exprs.indices) { + val updateCode = mutable.ArrayBuffer[String]() + exprs.indices.foreach { col => if (!sameOutput(col)) { - val ev = BindReferences.bindReference(exprs(col), attributeSeq).genCode(ctx) - updateCode += - s""" - |${ev.code} - |${outputColumns(col).isNull} = ${ev.isNull}; - |${outputColumns(col).value} = ${ev.value}; - """.stripMargin + val boundExpr = BindReferences.bindReference(exprs(col), attributeSeq) + val ev = boundExpr.genCode(ctx) + val inputVars = CodeGenerator.getLocalInputVariableValues(ctx, boundExpr)._1.toSeq + val argList = inputVars.map { v => + s"${CodeGenerator.typeName(v.javaType)} ${v.variableName}" + } + val paramLength = CodeGenerator.calculateParamLengthFromExprValues(inputVars) + if (CodeGenerator.isValidParamLength(paramLength)) { + val switchCaseFunc = ctx.freshName("switchCaseCode") + ctx.addNewFunction(switchCaseFunc, + s""" + |private void $switchCaseFunc(${argList.mkString(", ")}) { + | ${ev.code} + | ${outputColumns(col).isNull} = ${ev.isNull}; + | ${outputColumns(col).value} = ${ev.value}; + |} + """.stripMargin) + + updateCode += s"$switchCaseFunc(${inputVars.map(_.variableName).mkString(", ")});" + } else { + updateCode += + s""" + |${ev.code} + |${outputColumns(col).isNull} = ${ev.isNull}; + |${outputColumns(col).value} = ${ev.value}; + """.stripMargin + } } } s""" |case $row: - | ${updateCode.trim} + | ${updateCode.mkString("\n")} | break; """.stripMargin } From 8a13cfbcd57b7e93e0009c6b93d784184a880761 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Thu, 6 May 2021 22:59:19 +0900 Subject: [PATCH 2/2] Fix --- dev/deps/spark-deps-hadoop-2.7-hive-2.3 | 4 +- dev/deps/spark-deps-hadoop-3.2-hive-2.3 | 4 +- pom.xml | 2 +- .../spark/sql/execution/ExpandExec.scala | 60 ++++++------------- 4 files changed, 24 insertions(+), 46 deletions(-) diff --git a/dev/deps/spark-deps-hadoop-2.7-hive-2.3 b/dev/deps/spark-deps-hadoop-2.7-hive-2.3 index c98d2779e5e50..85553e207751c 100644 --- a/dev/deps/spark-deps-hadoop-2.7-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-2.7-hive-2.3 @@ -37,7 +37,7 @@ commons-beanutils/1.9.4//commons-beanutils-1.9.4.jar commons-cli/1.2//commons-cli-1.2.jar commons-codec/1.15//commons-codec-1.15.jar commons-collections/3.2.2//commons-collections-3.2.2.jar -commons-compiler/3.1.3//commons-compiler-3.1.3.jar +commons-compiler/3.1.4//commons-compiler-3.1.4.jar commons-compress/1.20//commons-compress-1.20.jar commons-configuration/1.6//commons-configuration-1.6.jar commons-crypto/1.1.0//commons-crypto-1.1.0.jar @@ -122,7 +122,7 @@ jakarta.servlet-api/4.0.3//jakarta.servlet-api-4.0.3.jar jakarta.validation-api/2.0.2//jakarta.validation-api-2.0.2.jar jakarta.ws.rs-api/2.1.6//jakarta.ws.rs-api-2.1.6.jar jakarta.xml.bind-api/2.3.2//jakarta.xml.bind-api-2.3.2.jar -janino/3.1.3//janino-3.1.3.jar +janino/3.1.4//janino-3.1.4.jar javassist/3.25.0-GA//javassist-3.25.0-GA.jar javax.inject/1//javax.inject-1.jar javax.jdo/3.2.0-m3//javax.jdo-3.2.0-m3.jar diff --git a/dev/deps/spark-deps-hadoop-3.2-hive-2.3 b/dev/deps/spark-deps-hadoop-3.2-hive-2.3 index 2774866a0a872..3d1bb46180da6 100644 --- a/dev/deps/spark-deps-hadoop-3.2-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-3.2-hive-2.3 @@ -31,7 +31,7 @@ chill_2.12/0.9.5//chill_2.12-0.9.5.jar commons-cli/1.2//commons-cli-1.2.jar commons-codec/1.15//commons-codec-1.15.jar commons-collections/3.2.2//commons-collections-3.2.2.jar -commons-compiler/3.1.3//commons-compiler-3.1.3.jar +commons-compiler/3.1.4//commons-compiler-3.1.4.jar commons-compress/1.20//commons-compress-1.20.jar commons-crypto/1.1.0//commons-crypto-1.1.0.jar commons-dbcp/1.4//commons-dbcp-1.4.jar @@ -98,7 +98,7 @@ jakarta.servlet-api/4.0.3//jakarta.servlet-api-4.0.3.jar jakarta.validation-api/2.0.2//jakarta.validation-api-2.0.2.jar jakarta.ws.rs-api/2.1.6//jakarta.ws.rs-api-2.1.6.jar jakarta.xml.bind-api/2.3.2//jakarta.xml.bind-api-2.3.2.jar -janino/3.1.3//janino-3.1.3.jar +janino/3.1.4//janino-3.1.4.jar javassist/3.25.0-GA//javassist-3.25.0-GA.jar javax.jdo/3.2.0-m3//javax.jdo-3.2.0-m3.jar javolution/5.5.1//javolution-5.5.1.jar diff --git a/pom.xml b/pom.xml index 55893d3baac9e..a9246366c35aa 100644 --- a/pom.xml +++ b/pom.xml @@ -184,7 +184,7 @@ 2.6.2 4.1.17 14.0.1 - 3.1.3 + 3.1.4 2.34 2.10.5 3.5.2 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala index 24ad90b929cdf..3fd653130e57c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala @@ -17,12 +17,11 @@ package org.apache.spark.sql.execution -import scala.collection.mutable - import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen._ +import org.apache.spark.sql.catalyst.expressions.codegen.Block._ import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} import org.apache.spark.sql.execution.metric.SQLMetrics @@ -153,16 +152,15 @@ case class ExpandExec( // This column is the same across all output rows. Just generate code for it here. BindReferences.bindReference(firstExpr, attributeSeq).genCode(ctx) } else { - val isNull = ctx.addMutableState( - CodeGenerator.JAVA_BOOLEAN, - "resultIsNull", - v => s"$v = true;") - val value = ctx.addMutableState( - CodeGenerator.javaType(firstExpr.dataType), - "resultValue", - v => s"$v = ${CodeGenerator.defaultValue(firstExpr.dataType)};") - + val isNull = ctx.freshName("isNull") + val value = ctx.freshName("value") + val code = code""" + |boolean $isNull = true; + |${CodeGenerator.javaType(firstExpr.dataType)} $value = + | ${CodeGenerator.defaultValue(firstExpr.dataType)}; + """.stripMargin ExprCode( + code, JavaCode.isNullVariable(isNull), JavaCode.variable(value, firstExpr.dataType)) } @@ -170,42 +168,22 @@ case class ExpandExec( // Part 2: switch/case statements val cases = projections.zipWithIndex.map { case (exprs, row) => - val updateCode = mutable.ArrayBuffer[String]() - exprs.indices.foreach { col => + var updateCode = "" + for (col <- exprs.indices) { if (!sameOutput(col)) { - val boundExpr = BindReferences.bindReference(exprs(col), attributeSeq) - val ev = boundExpr.genCode(ctx) - val inputVars = CodeGenerator.getLocalInputVariableValues(ctx, boundExpr)._1.toSeq - val argList = inputVars.map { v => - s"${CodeGenerator.typeName(v.javaType)} ${v.variableName}" - } - val paramLength = CodeGenerator.calculateParamLengthFromExprValues(inputVars) - if (CodeGenerator.isValidParamLength(paramLength)) { - val switchCaseFunc = ctx.freshName("switchCaseCode") - ctx.addNewFunction(switchCaseFunc, - s""" - |private void $switchCaseFunc(${argList.mkString(", ")}) { - | ${ev.code} - | ${outputColumns(col).isNull} = ${ev.isNull}; - | ${outputColumns(col).value} = ${ev.value}; - |} - """.stripMargin) - - updateCode += s"$switchCaseFunc(${inputVars.map(_.variableName).mkString(", ")});" - } else { - updateCode += - s""" - |${ev.code} - |${outputColumns(col).isNull} = ${ev.isNull}; - |${outputColumns(col).value} = ${ev.value}; - """.stripMargin - } + val ev = BindReferences.bindReference(exprs(col), attributeSeq).genCode(ctx) + updateCode += + s""" + |${ev.code} + |${outputColumns(col).isNull} = ${ev.isNull}; + |${outputColumns(col).value} = ${ev.value}; + """.stripMargin } } s""" |case $row: - | ${updateCode.mkString("\n")} + | ${updateCode.trim} | break; """.stripMargin }