-
Notifications
You must be signed in to change notification settings - Fork 28.8k
[SPARK-6734] [SQL] Add UDTF.close support in Generate #5383
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Test build #29778 has started for PR 5383 at commit |
Test build #29778 has finished for PR 5383 at commit
|
Test PASSed. |
child.execute().mapPartitions(iter => iter.flatMap(row => boundGenerator.eval(row))) | ||
child.execute().mapPartitions(iter => | ||
iter.flatMap(row => boundGenerator.eval(row)) ++ | ||
LazyIterator(() => boundGenerator.terminate()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like you are calling terminate
on each partition. Is that not the same as how Hive does? In Hive it seems that close
is called after all rows are processed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I that's OK by called in each partition.
See
https://github.com/apache/hive/blob/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java#L278
https://github.com/apache/hive/blob/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java#L192
The Operator.close()
is called in MapReduceBase.close
, which mean they are supposed to run once within each Mapper/Reducer.
And
https://github.com/apache/hive/blob/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java#L144
https://github.com/apache/hive/blob/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java#L616
shows the genericUDTF.close()
is called within the Operator.close
.
Sorry, please correct me if my understanding is wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about codes below to simplify that?
child.execute().mapPartitions(iter =>
iter.flatMap(row => boundGenerator.eval(row)))
.mapPartitions(_ ++ boundGenerator.terminate())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I'll try and let you know the result.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, it works correctly, thanks.
I found one issue; the current implementation of HiveGenericUdtf always calls
|
@maropu thanks for the comments, I've updated the code. |
Test build #29954 has started for PR 5383 at commit |
Test build #29954 has finished for PR 5383 at commit
|
Test PASSed. |
}.mapPartitions { iter => | ||
val nullOriginalRow = Row(Seq.fill(generator.output.size)(Literal(null)): _*) | ||
val joinedRow = new JoinedRow | ||
iter ++ boundGenerator.terminate().map(joinedRow(nullOriginalRow, _)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we merge this .mapPartitions
into the above one?
Please also add a test case to test the case where |
c45faf0
to
63c88cc
Compare
Test build #30195 has started for PR 5383 at commit |
Test build #30195 has finished for PR 5383 at commit
|
Test FAILed. |
Test build #30202 has started for PR 5383 at commit |
Test build #30202 has finished for PR 5383 at commit
|
Test PASSed. |
Thank you @liancheng I've updated the code and it passed the unit test. |
createQueryTest("Test UDTF.close in Lateral Views", | ||
""" | ||
| SELECT key, cc | ||
| FROM src LATERAL VIEW udtf_count2(value) dd AS cc |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove spaces at the beginning of these two lines.
Test build #30620 has finished for PR 5383 at commit
|
Test PASSed. |
@@ -21,6 +21,16 @@ import org.apache.spark.annotation.DeveloperApi | |||
import org.apache.spark.rdd.RDD | |||
import org.apache.spark.sql.catalyst.expressions._ | |||
|
|||
// for lazy computing, be sure the generator.terminate() called in the very last | |||
// TODO reusing the CompletionIterator? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use ScalaDoc style for class comments.
e1635b4
to
8953be3
Compare
Test build #30845 has started for PR 5383 at commit |
Test build #30845 has finished for PR 5383 at commit
|
Test FAILed. |
8953be3
to
1799ba5
Compare
Test build #30890 has started for PR 5383 at commit |
Test build #30890 has finished for PR 5383 at commit
|
Test PASSed. |
@liancheng @marmbrus Any more comments? |
1799ba5
to
98b4e4b
Compare
Merged build triggered. |
Merged build started. |
Test build #32593 has started for PR 5383 at commit |
Test build #32593 has finished for PR 5383 at commit
|
Merged build finished. Test PASSed. |
Test PASSed. |
Thanks for working on this! Merging to master and branch-1.4. |
Some third-party UDTF extensions generate additional rows in the "GenericUDTF.close()" method, which is supported / documented by Hive. https://cwiki.apache.org/confluence/display/Hive/DeveloperGuide+UDTF However, Spark SQL ignores the "GenericUDTF.close()", and it causes bug while porting job from Hive to Spark SQL. Author: Cheng Hao <[email protected]> Closes #5383 from chenghao-intel/udtf_close and squashes the following commits: 98b4e4b [Cheng Hao] Support UDTF.close (cherry picked from commit 0da254f) Signed-off-by: Cheng Lian <[email protected]>
Some third-party UDTF extensions generate additional rows in the "GenericUDTF.close()" method, which is supported / documented by Hive. https://cwiki.apache.org/confluence/display/Hive/DeveloperGuide+UDTF However, Spark SQL ignores the "GenericUDTF.close()", and it causes bug while porting job from Hive to Spark SQL. Author: Cheng Hao <[email protected]> Closes apache#5383 from chenghao-intel/udtf_close and squashes the following commits: 98b4e4b [Cheng Hao] Support UDTF.close
Some third-party UDTF extensions generate additional rows in the "GenericUDTF.close()" method, which is supported / documented by Hive. https://cwiki.apache.org/confluence/display/Hive/DeveloperGuide+UDTF However, Spark SQL ignores the "GenericUDTF.close()", and it causes bug while porting job from Hive to Spark SQL. Author: Cheng Hao <[email protected]> Closes apache#5383 from chenghao-intel/udtf_close and squashes the following commits: 98b4e4b [Cheng Hao] Support UDTF.close
Some third-party UDTF extensions generate additional rows in the "GenericUDTF.close()" method, which is supported / documented by Hive. https://cwiki.apache.org/confluence/display/Hive/DeveloperGuide+UDTF However, Spark SQL ignores the "GenericUDTF.close()", and it causes bug while porting job from Hive to Spark SQL. Author: Cheng Hao <[email protected]> Closes apache#5383 from chenghao-intel/udtf_close and squashes the following commits: 98b4e4b [Cheng Hao] Support UDTF.close
Some third-party UDTF extensions generate additional rows in the "GenericUDTF.close()" method, which is supported / documented by Hive.
https://cwiki.apache.org/confluence/display/Hive/DeveloperGuide+UDTF
However, Spark SQL ignores the "GenericUDTF.close()", and it causes bug while porting job from Hive to Spark SQL.