Skip to content

[SPARK-8357] [SQL] Memory leakage on unsafe aggregation path with empty input #6810

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from

Conversation

navis
Copy link
Contributor

@navis navis commented Jun 14, 2015

Currently, unsafe-based hash is released on 'next' call but if input is empty, it would not be called ever.

@@ -270,7 +270,9 @@ case class GeneratedAggregate(

val joinedRow = new JoinedRow3

if (groupingExpressions.isEmpty) {
if (!iter.hasNext) {
Iterator[InternalRow]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a comment here to explain why we're doing this? Just needs to mention the memory leak issue.

@JoshRosen
Copy link
Contributor

I guess there's not a super easy way to add a regression test for this since it's kind of hard to just instantiate the operator and run it with some empty input.

@JoshRosen
Copy link
Contributor

Jenkins, this is ok to test.

@JoshRosen
Copy link
Contributor

I'm actually working on some tooling that will make it very easy to write regressions tests for these kinds of issues (basically, a way to instantiate individual operators and run them with canned inputs, which is much easier than trying to rig a high-level query that triggers the code path under test). This should not block on that, though, but I'll ping this thread later today with an example of how we could write such a test.

@SparkQA
Copy link

SparkQA commented Jun 14, 2015

Test build #34889 has finished for PR 6810 at commit 993c289.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor

Now that we've merged #6885, it should be possible to write a regression test for this. Take a look and let me know if you're interested in doing that. If not, I can merge this as-is.

@navis
Copy link
Contributor Author

navis commented Jun 22, 2015

@JoshRosen Sure, I'll review your patch first. Thanks.

@navis
Copy link
Contributor Author

navis commented Jun 22, 2015

@JoshRosen Done.

@SparkQA
Copy link

SparkQA commented Jun 22, 2015

Test build #35440 has finished for PR 6810 at commit 36ecd1f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 22, 2015

Test build #35450 has finished for PR 6810 at commit 0a5bf17.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class Module(object):
    • class PCAModel(JavaVectorTransformer):
    • class PCA(object):

val groupExpr = df.col("b").expr
val aggrExpr = Alias(Count(Cast(groupExpr, LongType)), "Count")()

SparkEnv.get.conf.set("spark.unsafe.exceptionOnMemoryLeak", "true")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This option is automatically enabled when running tests in Maven or SBT, so we don't need to put this here. Additionally, mutating the SparkConf after it's already been used to create a SparkContext is not always safe, so I think that we should remove this line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I know that. Maybe I'm wrong but when running the test in IDE(intelliJ), the policy seemed not applied. I'll remove this line, anyway.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I can see how this would be an issue for IntelliJ. If we do want to modify the conf, though, we should do it elsewhere (when creating the SparkContext that's used to create the TestSQLContext).

@SparkQA
Copy link

SparkQA commented Jun 23, 2015

Test build #35504 has finished for PR 6810 at commit eb86c67.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 23, 2015

Test build #35507 has finished for PR 6810 at commit d20f8f9.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor

I appreciate you trying to add a more general save / restore / backup mechanism for SqlConf state, but I'd rather not introduce that change as part of a bugfix. I think that we might want to be able to introduce / test / backport that test infra change independent of the change here.

Do you mind just mimicking the simple try-finally save/restore that's used by other SQL tests and rolling back the wider test infra changes? Happy to include them, just not here.

@JoshRosen
Copy link
Contributor

Also, if we do introduce a more general way of handling this sort of state cleanup as part of the test runner, then I suspect we'll want to both remove a lot of the old try-finally code and will have to fix any bugs due to tests that implicitly relied on the old behavior. Since that has the potential to grow to cover a lot more changes than this bugfix, I'd like to tackle it separately.


for ((codegen, unsafe) <- Seq((false, false), (true, false), (true, true));
partial <- Seq(false, true)) {
TestSQLContext.conf.setConfString("spark.sql.codegen", String.valueOf(codegen))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you use the new SparkConf API? This line will be TestSQLContext.setConf(SQLConf.CODEGEN_ENABLED, codegen).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thanks.

@SparkQA
Copy link

SparkQA commented Jun 23, 2015

Test build #35532 has finished for PR 6810 at commit f1859be.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class InterpretedProjection(expressions: Seq[Expression], mutableRow: Boolean = false)

class InterpretedProjection(expressions: Seq[Expression], mutableRow: Boolean = false)
extends Projection {
def this(expressions: Seq[Expression],
inputSchema: Seq[Attribute], mutableRow: Boolean = false) =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain this mutableRow change? If we do need to keep it, the style here isn't quite right; see https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide for how to properly wrap long parameter lists.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that your comment mentioned that this is avoiding a potential ClassCastException. Did you actually run into that exception somewhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for late reply. It happened when I changed empty input in AggregateSuite test to something like ("Hello", 4, 2.0). In GeneratedAggreate, whenever the grouping expression is empty or unsafeEnabled is not applied for some reason spark tries to cast GenericRow to MutableRow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sound like there should be another test.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NAVER - http://www.naver.com/

[email protected] 님께 보내신 메일 <Re: [spark] [SPARK-8357] [SQL] Memory leakage on unsafe aggregation path with empty input (#6810)> 이 다음과 같은 이유로 전송 실패했습니다.


받는 사람이 회원님의 메일을 수신차단 하였습니다.


@navis navis force-pushed the SPARK-8357 branch 2 times, most recently from c4b1f32 to 19907a5 Compare June 26, 2015 08:59
@SparkQA
Copy link

SparkQA commented Jun 26, 2015

Test build #35849 has finished for PR 6810 at commit c4b1f32.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class InterpretedProjection(expressions: Seq[Expression], mutableRow: Boolean = false)
    • case class Sha2(left: Expression, right: Expression)

@SparkQA
Copy link

SparkQA commented Jun 26, 2015

Test build #35850 has finished for PR 6810 at commit 19907a5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class StreamingLinearAlgorithm(object):
    • class StreamingLogisticRegressionWithSGD(StreamingLinearAlgorithm):
    • class LinearDataGenerator(object):
    • class InterpretedProjection(expressions: Seq[Expression], mutableRow: Boolean = false)
    • case class CountFunction(expr: Expression, base: AggregateExpression) extends AggregateFunction
    • case class CountDistinctFunction(
    • case class ApproxCountDistinctPartitionFunction(
    • case class ApproxCountDistinctMergeFunction(
    • case class Sum(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression]
    • case class CombineSum(child: Expression) extends AggregateExpression
    • case class SumDistinct(child: Expression)
    • case class CombineSetsAndSum(inputSet: Expression, base: Expression) extends AggregateExpression
    • case class CombineSetsAndSumFunction(
    • case class First(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression]
    • case class Last(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression]
    • case class Sha2(left: Expression, right: Expression)
    • case class BroadcastHint(child: LogicalPlan) extends UnaryNode
    • case class PrecisionInfo(precision: Int, scale: Int)
    • case class TakeOrderedAndProject(
    • s"Using output committer class $
    • logInfo(s"Using user defined output committer class $
    • s"Using output committer class $

val colC = df.col("c").expr
val aggrExpr = Alias(Count(Cast(colC, LongType)), "Count")()

for ((codegen, unsafe) <- Seq((false, false), (true, false), (true, true));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK we only need to test the (true, true) case, since this bug only occurred when using the unsafe aggregation path?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wanted to see all is well. I'll remove the condition.

@SparkQA
Copy link

SparkQA commented Jun 30, 2015

Test build #36072 has finished for PR 6810 at commit c5419b3.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds the following public classes (experimental):
    • class InterpretedProjection(expressions: Seq[Expression], mutableRow: Boolean = false)

@JoshRosen
Copy link
Contributor

As I understand it, there are two separate bugs here:

  • A memory leak fix.
  • A bug related to mis-handling of empty inputs.

If this is the case, can we fix these separately? Combining them into the same PR / test code makes them harder to understand.

@navis
Copy link
Contributor Author

navis commented Jul 5, 2015

Thanks, @JoshRosen. Actually, it's two bugs which is

  • memory leak on empty input
  • CCE in some cases (codeGen=false && (groupbyException.isEmpty || unsafe = false))

I've booked second bug to SPARK-8826(#7225).

@JoshRosen
Copy link
Contributor

Now that you've moved the second bug to #7225, do you mind cleaning this PR up to reflect only the first bug?

@JoshRosen
Copy link
Contributor

I'm working on a patch to enable Unsafe mode by default and ran into this problem as well, so I've opened #7560 to bring this patch up to date and merge it. I'll still credit you with the primary patch authorship.

@asfgit asfgit closed this in 9ba7c64 Jul 21, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants