Skip to content

Commit 520ec0f

Browse files
JoshRosenrxin
authored andcommitted
[SPARK-8850] [SQL] Enable Unsafe mode by default
This pull request enables Unsafe mode by default in Spark SQL. In order to do this, we had to fix a number of small issues: **List of fixed blockers**: - [x] Make some default buffer sizes configurable so that HiveCompatibilitySuite can run properly (#7741). - [x] Memory leak on grouped aggregation of empty input (fixed by #7560 to fix this) - [x] Update planner to also check whether codegen is enabled before planning unsafe operators. - [x] Investigate failing HiveThriftBinaryServerSuite test. This turns out to be caused by a ClassCastException that occurs when Exchange tries to apply an interpreted RowOrdering to an UnsafeRow when range partitioning an RDD. This could be fixed by #7408, but a shorter-term fix is to just skip the Unsafe exchange path when RangePartitioner is used. - [x] Memory leak exceptions masking exceptions that actually caused tasks to fail (will be fixed by #7603). - [x] ~~https://issues.apache.org/jira/browse/SPARK-9162, to implement code generation for ScalaUDF. This is necessary for `UDFSuite` to pass. For now, I've just ignored this test in order to try to find other problems while we wait for a fix.~~ This is no longer necessary as of #7682. - [x] Memory leaks from Limit after UnsafeExternalSort cause the memory leak detector to fail tests. This is a huge problem in the HiveCompatibilitySuite (fixed by f4ac642a4e5b2a7931c5e04e086bb10e263b1db6). - [x] Tests in `AggregationQuerySuite` are failing due to NaN-handling issues in UnsafeRow, which were fixed in #7736. - [x] `org.apache.spark.sql.ColumnExpressionSuite.rand` needs to be updated so that the planner check also matches `TungstenProject`. - [x] After having lowered the buffer sizes to 4MB so that most of HiveCompatibilitySuite runs: - [x] Wrong answer in `join_1to1` (fixed by #7680) - [x] Wrong answer in `join_nulls` (fixed by #7680) - [x] Managed memory OOM / leak in `lateral_view` - [x] Seems to hang indefinitely in `partcols1`. This might be a deadlock in script transformation or a bug in error-handling code? The hang was fixed by #7710. - [x] Error while freeing memory in `partcols1`: will be fixed by #7734. - [x] After fixing the `partcols1` hang, it appears that a number of later tests have issues as well. - [x] Fix thread-safety bug in codegen fallback expression evaluation (#7759). Author: Josh Rosen <[email protected]> Closes #7564 from JoshRosen/unsafe-by-default and squashes the following commits: 83c0c56 [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-by-default f4cc859 [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-by-default 963f567 [Josh Rosen] Reduce buffer size for R tests d6986de [Josh Rosen] Lower page size in PySpark tests 013b9da [Josh Rosen] Also match TungstenProject in checkNumProjects 5d0b2d3 [Josh Rosen] Add task completion callback to avoid leak in limit after sort ea250da [Josh Rosen] Disable unsafe Exchange path when RangePartitioning is used 715517b [Josh Rosen] Enable Unsafe by default
1 parent ab78b1d commit 520ec0f

File tree

7 files changed

+30
-24
lines changed

7 files changed

+30
-24
lines changed

R/run-tests.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ FAILED=0
2323
LOGFILE=$FWDIR/unit-tests.out
2424
rm -f $LOGFILE
2525

26-
SPARK_TESTING=1 $FWDIR/../bin/sparkR --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
26+
SPARK_TESTING=1 $FWDIR/../bin/sparkR --conf spark.buffer.pageSize=4m --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
2727
FAILED=$((PIPESTATUS[0]||$FAILED))
2828

2929
if [[ $FAILED != 0 ]]; then

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
import java.io.IOException;
2121
import java.util.LinkedList;
2222

23+
import scala.runtime.AbstractFunction0;
24+
import scala.runtime.BoxedUnit;
25+
2326
import com.google.common.annotations.VisibleForTesting;
2427
import org.slf4j.Logger;
2528
import org.slf4j.LoggerFactory;
@@ -90,6 +93,17 @@ public UnsafeExternalSorter(
9093
this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
9194
this.pageSizeBytes = conf.getSizeAsBytes("spark.buffer.pageSize", "64m");
9295
initializeForWriting();
96+
97+
// Register a cleanup task with TaskContext to ensure that memory is guaranteed to be freed at
98+
// the end of the task. This is necessary to avoid memory leaks in when the downstream operator
99+
// does not fully consume the sorter's output (e.g. sort followed by limit).
100+
taskContext.addOnCompleteCallback(new AbstractFunction0<BoxedUnit>() {
101+
@Override
102+
public BoxedUnit apply() {
103+
freeMemory();
104+
return null;
105+
}
106+
});
93107
}
94108

95109
// TODO: metrics tracking + integration with shuffle write metrics

python/pyspark/java_gateway.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,11 @@ def launch_gateway():
5252
script = "./bin/spark-submit.cmd" if on_windows else "./bin/spark-submit"
5353
submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "pyspark-shell")
5454
if os.environ.get("SPARK_TESTING"):
55-
submit_args = "--conf spark.ui.enabled=false " + submit_args
55+
submit_args = ' '.join([
56+
"--conf spark.ui.enabled=false",
57+
"--conf spark.buffer.pageSize=4mb",
58+
submit_args
59+
])
5660
command = [os.path.join(SPARK_HOME, script)] + shlex.split(submit_args)
5761

5862
# Start a socket that will be used by PythonGatewayServer to communicate its port to us

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ private[spark] object SQLConf {
229229
" a specific query.")
230230

231231
val UNSAFE_ENABLED = booleanConf("spark.sql.unsafe.enabled",
232-
defaultValue = Some(false),
232+
defaultValue = Some(true),
233233
doc = "When true, use the new optimized Tungsten physical execution backend.")
234234

235235
val DIALECT = stringConf(

sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,12 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
4747

4848
override def canProcessSafeRows: Boolean = true
4949

50-
override def canProcessUnsafeRows: Boolean = true
50+
override def canProcessUnsafeRows: Boolean = {
51+
// Do not use the Unsafe path if we are using a RangePartitioning, since this may lead to
52+
// an interpreted RowOrdering being applied to an UnsafeRow, which will lead to
53+
// ClassCastExceptions at runtime. This check can be removed after SPARK-9054 is fixed.
54+
!newPartitioning.isInstanceOf[RangePartitioning]
55+
}
5156

5257
/**
5358
* Determines whether records must be defensively copied before being sent to the shuffle.

sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql
1919

2020
import org.scalatest.Matchers._
2121

22-
import org.apache.spark.sql.execution.Project
22+
import org.apache.spark.sql.execution.{Project, TungstenProject}
2323
import org.apache.spark.sql.functions._
2424
import org.apache.spark.sql.types._
2525
import org.apache.spark.sql.test.SQLTestUtils
@@ -538,6 +538,7 @@ class ColumnExpressionSuite extends QueryTest with SQLTestUtils {
538538
def checkNumProjects(df: DataFrame, expectedNumProjects: Int): Unit = {
539539
val projects = df.queryExecution.executedPlan.collect {
540540
case project: Project => project
541+
case tungstenProject: TungstenProject => tungstenProject
541542
}
542543
assert(projects.size === expectedNumProjects)
543544
}

sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeExternalSortSuite.scala

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,7 @@ class UnsafeExternalSortSuite extends SparkPlanTest with BeforeAndAfterAll {
3636
TestSQLContext.conf.setConf(SQLConf.CODEGEN_ENABLED, SQLConf.CODEGEN_ENABLED.defaultValue.get)
3737
}
3838

39-
ignore("sort followed by limit should not leak memory") {
40-
// TODO: this test is going to fail until we implement a proper iterator interface
41-
// with a close() method.
42-
TestSQLContext.sparkContext.conf.set("spark.unsafe.exceptionOnMemoryLeak", "false")
39+
test("sort followed by limit") {
4340
checkThatPlansAgree(
4441
(1 to 100).map(v => Tuple1(v)).toDF("a"),
4542
(child: SparkPlan) => Limit(10, UnsafeExternalSort('a.asc :: Nil, true, child)),
@@ -48,21 +45,6 @@ class UnsafeExternalSortSuite extends SparkPlanTest with BeforeAndAfterAll {
4845
)
4946
}
5047

51-
test("sort followed by limit") {
52-
TestSQLContext.sparkContext.conf.set("spark.unsafe.exceptionOnMemoryLeak", "false")
53-
try {
54-
checkThatPlansAgree(
55-
(1 to 100).map(v => Tuple1(v)).toDF("a"),
56-
(child: SparkPlan) => Limit(10, UnsafeExternalSort('a.asc :: Nil, true, child)),
57-
(child: SparkPlan) => Limit(10, Sort('a.asc :: Nil, global = true, child)),
58-
sortAnswers = false
59-
)
60-
} finally {
61-
TestSQLContext.sparkContext.conf.set("spark.unsafe.exceptionOnMemoryLeak", "false")
62-
63-
}
64-
}
65-
6648
test("sorting does not crash for large inputs") {
6749
val sortOrder = 'a.asc :: Nil
6850
val stringLength = 1024 * 1024 * 2

0 commit comments

Comments
 (0)