-
Notifications
You must be signed in to change notification settings - Fork 28.7k
WIP - [SPARK-10816][SS] Support session window natively #22482
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Test build #96321 has finished for PR 22482 at commit
|
The patch is a bit huge, so I'm not sure we would be better to squash commits into one before reviewing. Two TODOs are left hence marking the patch as WIP, but it's closer to be a complete patch:
It borrowed the state implementation from streaming join since it fits the necessary concept of state for session window, but it may not be optimal one so I'm going to see we can have better implementation.
I didn't add javadoc yet to speed up POC and actual development, but to complete the patch I guess I need to write javadoc for new classes as well as methods (maybe). |
Test build #96323 has finished for PR 22482 at commit
|
Test build #96324 has finished for PR 22482 at commit
|
7d8371c
to
ad0b746
Compare
Test build #96328 has finished for PR 22482 at commit
|
retest this, please |
Test build #96336 has finished for PR 22482 at commit
|
retest this, please |
Please review the general approach and direction first. I'm planning to spend time to rewrite streaming part to tightly integrate logic with state so that updating state is going to be minimized. |
Test build #96348 has finished for PR 22482 at commit
|
+1 for the idea to provide native session window support. On the approach, it would be ideal if all windowing aggregations can be handled via single plan and state store (v/s the separate plan and state store the patch proposes for session window). Underlying steps are more or less the same for Fixed, Session and Sliding windows. The sort/merge operations have to be part of a window merge function rather than the plan itself. K,Values -> AssignWindows (produces [k, v, timestamp, window]) -> GroupByKey (shuffle) -> MergeWindows (optional step) -> GroupWindows -> aggregate values. Based on how we want to approach it, it could be handled now or as a follow up item (with major refactoring). |
@arunmahadevan Here in Spark, we are expecting smaller batch interval, and Spark deals with the requirement as storing "delta" of state change. The behavior brings concern about the strategy of how we store and how we remove the state. Let's say we have 3 rows in group in batch result and there're also 3 rows in same group in state, and we want to replace state with new batch result. For full snapshot removing 3 rows first and putting 3 rows may not matter much, but with delta approach, we should compare them side-by-side and bring less changes on state. (We also avoid having List[V] as value for state and have two different states because of that. If you make a change of any element on List, Spark's state store will treat it as whole change of List[V] and store whole elements to delta.) The difference is not trivial one for session window, because arbitrary changes are required: for example, two different sessions in state can be merged later when late events come in between two sessions, then we ideally should have to overwrite one and remove others. Some new sessions can be created as well as existing session, and we want to overwrite session if the new output session is originated from old state, and append session if not. For other window, it is just a "put" because there's no group and we are just safe to put (overwrite if any, and without evict there's no need to remove). The different requirements between time window and session window are hard to be combined into one. That's what I realized the difficulty of state part for session window, and that's why I feel I need to make change on streaming part. (Not sure I'm too worry about optimization of the state change, but minimizing delta was the core concept of #21733 and it really affects the performance.) For batch part current patch is doing OK. Btw, we can assume |
If we are fine with ignoring the optimal delta of state, or OK with addressing it in follow-up issue (it should be addressed in same release version to avoid having state V1, V2, etc...), I think the only TODO is writing javadoc as well as deduplicate some codes. |
According to the discussion on SPARK-10816, I'm holding up effort to improve and plan to discuss further from JIRA issue. I guess someone interested for this patch can still review or try this out and share feedback. |
ad0b746
to
9a60cf3
Compare
Test build #96838 has finished for PR 22482 at commit
|
Test build #97107 has finished for PR 22482 at commit
|
Test build #97135 has finished for PR 22482 at commit
|
* This will be also used from session window state as well
… stateful operators (WIP...)
We can enable it but there're lots of approaches on aggregations in batch side... * AggUtils.planAggregateWithoutDistinct * AggUtils.planAggregateWithOneDistinct * RewriteDistinctAggregates * AggregateInPandasExec So unless we are sure which things to support, just block them for now...
… node * we will leverage such node for batch case if we want
… numbers of randomized operations
Test build #98289 has finished for PR 22482 at commit
|
…w for group key Also modify CodeGenerator to print out debug information when code generation takes too long
Test build #98347 has finished for PR 22482 at commit
|
5de4075
to
d1536b4
Compare
Test build #98348 has finished for PR 22482 at commit
|
d1536b4
to
ee67bca
Compare
retest this, please |
Test build #98382 has finished for PR 22482 at commit
|
Test build #98380 has finished for PR 22482 at commit
|
Test build #98379 has finished for PR 22482 at commit
|
5a76383
to
b6ccecd
Compare
Test build #98383 has finished for PR 22482 at commit
|
… review is in progress
Test build #98385 has finished for PR 22482 at commit
|
retest this, please |
Test build #98387 has finished for PR 22482 at commit
|
Can one of the admins verify this patch? |
We're closing this PR because it hasn't been updated in a while. If you'd like to revive this PR, please reopen it! |
What changes were proposed in this pull request?
This patch proposes native support of session window, like Spark has been supporting for time window.
Please refer the attached doc in SPARK-10816 for more details on rationalization, concepts, and limitation, etc.
In point of end users' view, only the change is addition of "session" SQL function. End users could define query with session window as replacing "window" function to "session" function, and "window" column to "session" column. After then the patch will provide same experience with time window.
Internally, this patch will change the physical plan of aggregation a bit: if there's session function being used in query, it will sort the input rows as "grouping keys" + "session", and merge overlapped sessions into one with applying aggregations, so it's like a sort based aggregation but the unit of group is grouping keys + session.
Due to handle late event, there's a case multiple session windows co-exist per key which are not yet to evict. This patch handles the case via borrowing state implementation from streaming join which can handle multiple values for given key.
How was this patch tested?
Many UTs are added to verify session window queries for both batch and streaming.
Please review http://spark.apache.org/contributing.html before opening a pull request.