Skip to content

[SPARK-4233] [SQL] UDAF Interface Refactoring #5542

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 20 commits into from

Conversation

chenghao-intel
Copy link
Contributor

This PR will keep both old / new versions of UDAF, and switch them by

SET spark.sql.aggregate2=true/false;

The new interface is

trait AggregateFunction2 {
  self: Product =>

  // Specify the BoundReference for Aggregate Buffer
  def initialize(buffers: Seq[BoundReference]): Unit

  // Initialize (reinitialize) the aggregation buffer
  def reset(buf: MutableRow): Unit

  // Get the children value from the input row, and then
  // merge it with the given aggregate buffer,
  // `seen` is the set that the value showed up, that's will
  // be useful for distinct aggregate. And it probably be
  // null for non-distinct aggregate
  def update(input: Row, buf: MutableRow, seen: JSet[Any]): Unit

  // Merge 2 aggregation buffers, and write back to the later one
  def merge(value: Row, buf: MutableRow): Unit

  // Semantically we probably don't need this, however, we need it when
  // integrating with Hive UDAF(GenericUDAF)
  @deprecated
  def terminatePartial(buf: MutableRow): Unit = {}

  // Output the final result by feeding the aggregation buffer
  def terminate(buffer: Row): Any
}

@SparkQA
Copy link

SparkQA commented Apr 16, 2015

Test build #30431 has started for PR 5542 at commit e9017ed.

@SparkQA
Copy link

SparkQA commented Apr 16, 2015

Test build #30431 has finished for PR 5542 at commit e9017ed.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • trait AggregateFunction2
    • trait AggregateExpression2 extends Expression with AggregateFunction2
    • abstract class UnaryAggregateExpression extends UnaryExpression with AggregateExpression2
    • case class Min(
    • case class Average(child: Expression, distinct: Boolean = false)
    • case class Max(child: Expression)
    • case class Count(child: Expression)
    • case class CountDistinct(children: Seq[Expression])
    • case class Sum(child: Expression, distinct: Boolean = false)
    • case class First(child: Expression, distinct: Boolean = false)
    • case class Last(child: Expression, distinct: Boolean = false)
    • class AggregateExpressionSubsitution
    • class HashAggregation2(aggrSubsitution: AggregateExpressionSubsitution) extends Strategy
    • sealed class BufferSeens(var buffer: MutableRow, var seens: Array[JSet[Any]] = null)
    • sealed trait Aggregate
    • sealed trait PostShuffle extends Aggregate
    • case class AggregatePreShuffle(
    • case class AggregatePostShuffle(
    • case class DistinctAggregate(
  • This patch does not change any dependencies.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/30431/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Apr 17, 2015

Test build #30451 has started for PR 5542 at commit e213e5e.

@SparkQA
Copy link

SparkQA commented Apr 17, 2015

Test build #30451 has finished for PR 5542 at commit e213e5e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • trait AggregateFunction2
    • trait AggregateExpression2 extends Expression with AggregateFunction2
    • abstract class UnaryAggregateExpression extends UnaryExpression with AggregateExpression2
    • case class Min(child: Expression) extends UnaryAggregateExpression
    • case class Average(child: Expression, distinct: Boolean = false)
    • case class Max(child: Expression) extends UnaryAggregateExpression
    • case class Count(child: Expression)
    • case class CountDistinct(children: Seq[Expression])
    • case class Sum(child: Expression, distinct: Boolean = false)
    • case class First(child: Expression, distinct: Boolean = false)
    • case class Last(child: Expression, distinct: Boolean = false)
    • class AggregateExpressionSubsitution
    • class HashAggregation2(aggrSubsitution: AggregateExpressionSubsitution) extends Strategy
    • sealed class BufferSeens(var buffer: MutableRow, var seens: Array[JSet[Any]] = null)
    • sealed class BufferAndKey(leftLen: Int, rightLen: Int)
    • sealed trait Aggregate
    • sealed trait PostShuffle extends Aggregate
    • case class AggregatePreShuffle(
    • case class AggregatePostShuffle(
    • case class DistinctAggregate(
  • This patch does not change any dependencies.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/30451/
Test PASSed.

@SparkQA
Copy link

SparkQA commented Apr 21, 2015

Test build #30629 has started for PR 5542 at commit 4aa56c2.

@SparkQA
Copy link

SparkQA commented Apr 21, 2015

Test build #30629 has finished for PR 5542 at commit 4aa56c2.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • trait AggregateFunction2
    • trait AggregateExpression2 extends Expression with AggregateFunction2
    • abstract class UnaryAggregateExpression extends UnaryExpression with AggregateExpression2
    • case class Min(child: Expression) extends UnaryAggregateExpression
    • case class Average(child: Expression, distinct: Boolean = false)
    • case class Max(child: Expression) extends UnaryAggregateExpression
    • case class Count(child: Expression)
    • case class CountDistinct(children: Seq[Expression])
    • case class Sum(child: Expression, distinct: Boolean = false)
    • case class First(child: Expression, distinct: Boolean = false)
    • case class Last(child: Expression, distinct: Boolean = false)
    • class AggregateExpressionSubsitution
    • class HashAggregation2(aggrSubsitution: AggregateExpressionSubsitution) extends Strategy
    • sealed class BufferSeens(var buffer: MutableRow, var seens: Array[JSet[Any]] = null)
    • sealed class BufferAndKey(leftLen: Int, rightLen: Int)
    • sealed trait Aggregate
    • sealed trait PostShuffle extends Aggregate
    • case class AggregatePreShuffle(
    • case class AggregatePostShuffle(
    • case class DistinctAggregate(
  • This patch does not change any dependencies.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/30629/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Apr 21, 2015

Test build #30637 has started for PR 5542 at commit b45f487.

@SparkQA
Copy link

SparkQA commented Apr 21, 2015

Test build #30637 has finished for PR 5542 at commit b45f487.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • trait AggregateFunction2
    • trait AggregateExpression2 extends Expression with AggregateFunction2
    • abstract class UnaryAggregateExpression extends UnaryExpression with AggregateExpression2
    • case class Min(child: Expression) extends UnaryAggregateExpression
    • case class Average(child: Expression, distinct: Boolean = false)
    • case class Max(child: Expression) extends UnaryAggregateExpression
    • case class Count(child: Expression)
    • case class CountDistinct(children: Seq[Expression])
    • case class Sum(child: Expression, distinct: Boolean = false)
    • case class First(child: Expression, distinct: Boolean = false)
    • case class Last(child: Expression, distinct: Boolean = false)
    • class AggregateExpressionSubsitution
    • class HashAggregation2(aggrSubsitution: AggregateExpressionSubsitution) extends Strategy
    • sealed class BufferSeens(var buffer: MutableRow, var seens: Array[JSet[Any]] = null)
    • sealed class BufferAndKey(leftLen: Int, rightLen: Int)
    • sealed trait Aggregate
    • sealed trait PostShuffle extends Aggregate
    • case class AggregatePreShuffle(
    • case class AggregatePostShuffle(
    • case class DistinctAggregate(
  • This patch does not change any dependencies.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/30637/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Apr 21, 2015

Test build #30644 has started for PR 5542 at commit 9806266.

@SparkQA
Copy link

SparkQA commented Apr 21, 2015

Test build #30644 has finished for PR 5542 at commit 9806266.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • trait AggregateFunction2
    • trait AggregateExpression2 extends Expression with AggregateFunction2
    • abstract class UnaryAggregateExpression extends UnaryExpression with AggregateExpression2
    • case class Min(child: Expression) extends UnaryAggregateExpression
    • case class Average(child: Expression, distinct: Boolean = false)
    • case class Max(child: Expression) extends UnaryAggregateExpression
    • case class Count(child: Expression)
    • case class CountDistinct(children: Seq[Expression])
    • case class Sum(child: Expression, distinct: Boolean = false)
    • case class First(child: Expression, distinct: Boolean = false)
    • case class Last(child: Expression, distinct: Boolean = false)
    • class AggregateExpressionSubsitution
    • class HashAggregation2(aggrSubsitution: AggregateExpressionSubsitution) extends Strategy
    • sealed class BufferSeens(var buffer: MutableRow, var seens: Array[JSet[Any]] = null)
    • sealed class BufferAndKey(leftLen: Int, rightLen: Int)
    • sealed trait Aggregate
    • sealed trait PostShuffle extends Aggregate
    • case class AggregatePreShuffle(
    • case class AggregatePostShuffle(
    • case class DistinctAggregate(
  • This patch does not change any dependencies.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/30644/
Test FAILed.

@@ -562,3 +563,13 @@ class SQLQuerySuite extends QueryTest {
.queryExecution.analyzed
}
}

class SQLQuerySuite2 extends SQLQuerySuite with BeforeAndAfter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should name this something more explicit, maybe "SQLQueryNewUDAFSuite"

@SparkQA
Copy link

SparkQA commented Apr 24, 2015

Test build #30901 has started for PR 5542 at commit 71f1bd5.

@SparkQA
Copy link

SparkQA commented Apr 24, 2015

Test build #30901 has finished for PR 5542 at commit 71f1bd5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
  • This patch does not change any dependencies.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/30901/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Apr 24, 2015

Test build #30921 has started for PR 5542 at commit 6b594f0.

@SparkQA
Copy link

SparkQA commented Apr 24, 2015

Test build #30921 has finished for PR 5542 at commit 6b594f0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • trait AggregateFunction2
    • trait AggregateExpression2 extends Expression with AggregateFunction2
    • abstract class UnaryAggregateExpression extends UnaryExpression with AggregateExpression2
    • case class Min(child: Expression) extends UnaryAggregateExpression
    • case class Average(child: Expression, distinct: Boolean = false)
    • case class Max(child: Expression) extends UnaryAggregateExpression
    • case class Count(child: Expression)
    • case class CountDistinct(children: Seq[Expression])
    • case class Sum(child: Expression, distinct: Boolean = false)
    • case class First(child: Expression, distinct: Boolean = false)
    • case class Last(child: Expression, distinct: Boolean = false)
    • class AggregateExpressionSubsitution
    • class HashAggregation2(aggrSubsitution: AggregateExpressionSubsitution) extends Strategy
    • sealed class BufferSeens(var buffer: MutableRow, var seens: Array[JSet[Any]] = null)
    • sealed class BufferAndKey(leftLen: Int, rightLen: Int)
    • sealed trait Aggregate
    • sealed trait PostShuffle extends Aggregate
    • case class AggregatePreShuffle(
    • case class AggregatePostShuffle(
    • case class DistinctAggregate(
  • This patch does not change any dependencies.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/30921/
Test PASSed.

@SparkQA
Copy link

SparkQA commented Apr 27, 2015

Test build #31007 has started for PR 5542 at commit 6b594f0.

@chenghao-intel chenghao-intel changed the title [SPARK-4233] [SQL] [WIP] UDAF Interface Refactoring [SPARK-4233] [SQL] UDAF Interface Refactoring Apr 28, 2015
@tiffanyTown
Copy link

found an issue when running the query with SET spark.sql.aggregate2=true configuration after applying this patch.
ERROR message:
15/05/07 17:11:14 WARN TaskSetManager: Lost task 15.0 in stage 101.0 (TID 2056, qac8-node2): java.lang.ClassCastException: java.lang.Double cannot be cast to java.lang.Long
at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:110)
at scala.math.Numeric$LongIsIntegral$.toInt(Numeric.scala:117)
at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToInt$5.apply(Cast.scala:274)
at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToInt$5.apply(Cast.scala:274)
at org.apache.spark.sql.catalyst.expressions.Cast.eval(Cast.scala:435)
at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118)
at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:68)
at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:52)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.scala:101)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:83)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:83)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:209)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
query.sql file:
INSERT INTO TABLE ${hiveconf:TEMP_TABLE}
SELECT
cid,
100.0 * COUNT(distinct (CASE WHEN r_date IS NOT NULL THEN oid ELSE 0L END)) / COUNT(distinct oid) AS r_order_ratio,
SUM(CASE WHEN r_date IS NOT NULL THEN 1 ELSE 0 END) / COUNT(item) * 100 AS r_item_ratio,
CASE WHEN SUM(s_amount)=0.0 THEN 0.0 ELSE (SUM(CASE WHEN r_date IS NOT NULL THEN r_amount ELSE 0.0 END) / SUM(s_amount) * 100) END AS r_amount_ratio,
COUNT(distinct (CASE WHEN r_date IS NOT NULL THEN r_date ELSE 0L END)) AS r_freq
FROM (
SELECT
r.sr_returned_date_sk AS r_date,
s.ss_item_sk AS item,
s.ss_ticket_number AS oid,
s.ss_net_paid AS s_amount,
CASE WHEN r.sr_return_amt IS NULL THEN 0.0 ELSE r.sr_return_amt END AS r_amount,
(CASE WHEN s.ss_customer_sk IS NULL THEN r.sr_customer_sk ELSE s.ss_customer_sk END) AS cid
FROM store_sales s
LEFT OUTER JOIN store_returns r ON (
r.sr_item_sk = s.ss_item_sk
AND r.sr_ticket_number = s.ss_ticket_number
AND s.ss_sold_date_sk IS NOT NULL
)
) q20_sales_returns

WHERE cid IS NOT NULL
GROUP BY cid
;

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@SparkQA
Copy link

SparkQA commented Jun 8, 2015

Test build #34435 has started for PR 5542 at commit 68dd625.

@SparkQA
Copy link

SparkQA commented Jun 8, 2015

Test build #34435 has finished for PR 5542 at commit 68dd625.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Merged build finished. Test FAILed.

return mr;
}

@Override
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indentation on this line looks off

@rxin
Copy link
Contributor

rxin commented Jul 7, 2015

Can we close this ticket first? I think @yhuai will revisit this with you soon.

@chenghao-intel
Copy link
Contributor Author

yes, thanks for the reminding. closing it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants