Skip to content

Conversation

jonathanc-n
Copy link
Contributor

@jonathanc-n jonathanc-n commented May 18, 2025

Which issue does this PR close?

Part of #13138 .

Rationale for this change

Revamp implementation of the previous stale implementation for RightMark

What changes are included in this PR?

Added support for right mark join functionality to NestedLoop + Hash join. A follow up pr will be made for supporting right mark in Sortmergejoin and sym hash join.

Are these changes tested?

Yes unit tests

@github-actions github-actions bot added sql SQL Planner logical-expr Logical plan and expressions physical-expr Changes to the physical-expr crates optimizer Optimizer rules core Core DataFusion crate substrait Changes to the substrait crate common Related to common crate proto Related to proto crate labels May 18, 2025
@jonathanc-n
Copy link
Contributor Author

cc @comphead

@2010YOUY01
Copy link
Contributor

This is cool! I got some questions:

  1. Can we test this feature through the SQL interface (some SQL with subqueries got optimized into RightMarkJoin)? Or maybe this feature is not supported yet.
  2. Is RightMarkJoin equivalent to LeftMarkJoin with left and right child swapped? (just picturing how should mark joins be supported for memory-limited cases, it's fine for now since NLJ and HJ will always buffer all input from the build side)

@jonathanc-n
Copy link
Contributor Author

@2010YOUY01 I think i'lll try to get sql queries optimized into a right mark join after support for symmetric hash join + sort merge join.

Right mark is equivalent to left mark however internally I didn't swap during planning time. I had just marked a column on the probe batch as probe batches were incoming.

@jonathanc-n
Copy link
Contributor Author

@2010YOUY01 @Dandandan Is it possible to take a look? Thanks!

@jonathanc-n
Copy link
Contributor Author

jonathanc-n commented May 30, 2025

@alamb Are you able to add this pull request to here to get some eyes on it?

@ctsk
Copy link
Contributor

ctsk commented May 30, 2025

Really cool work! I'll look over it in more detail over the weekend =)

@ctsk
Copy link
Contributor

ctsk commented May 30, 2025

After updating JoinType::supports_swap to include LeftMark/RightMark join, the join_selection rule should already plan right joins where appropriate. Subsequently running the sqllogictests (cargo test --test sqllogictests) reveals some failures:

External error: 6 errors in file [...]/datafusion/datafusion/sqllogictest/test_files/subquery.slt

1. query failed: DataFusion error: join_selection
caused by
Internal error: Input field name t1_name does not match with the projection expression t1_id.
This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker
[SQL] select t1.t1_id,
       t1.t1_name,
       t1.t1_int
from t1
where t1.t1_id > 40 or t1.t1_id in (select t2.t2_id from t2 where t1.t1_int > 0)
at [...]/datafusion/datafusion/sqllogictest/test_files/subquery.slt:1108


2. query failed: DataFusion error: join_selection
caused by
Internal error: Input field name t1_name does not match with the projection expression t1_id.
This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker
[SQL] select t1.t1_id,
       t1.t1_name,
       t1.t1_int
from t1
where t1.t1_id = 11 or t1.t1_id + 12 not in (select t2.t2_id + 1 from t2 where t1.t1_int > 0)
at [...]/datafusion/datafusion/sqllogictest/test_files/subquery.slt:1136


3. query failed: DataFusion error: join_selection
caused by
Internal error: Input field name t1_name does not match with the projection expression t1_id.
This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker
[SQL] select t1.t1_id,
       t1.t1_name,
       t1.t1_int
from t1
where t1.t1_id > 40 or exists (select * from t2 where t1.t1_id = t2.t2_id)
at [...]/datafusion/datafusion/sqllogictest/test_files/subquery.slt:1162


4. query result mismatch:
[SQL] explain select t1.t1_id,
       t1.t1_name,
       t1.t1_int
from t1
where t1.t1_id > 40 or not exists (select * from t2 where t1.t1_id = t2.t2_id)
[Diff] (-expected|+actual)
    logical_plan
    01)Projection: t1.t1_id, t1.t1_name, t1.t1_int
    02)--Filter: t1.t1_id > Int32(40) OR NOT __correlated_sq_1.mark
    03)----LeftMark Join: t1.t1_id = __correlated_sq_1.t2_id
    04)------TableScan: t1 projection=[t1_id, t1_name, t1_int]
    05)------SubqueryAlias: __correlated_sq_1
-   06)--------TableScan: t2 projection=[t2_id]
-   physical_plan
-   01)CoalesceBatchesExec: target_batch_size=2
-   02)--FilterExec: t1_id@0 > 40 OR NOT mark@3, projection=[t1_id@0, t1_name@1, t1_int@2]
-   03)----CoalesceBatchesExec: target_batch_size=2
-   04)------HashJoinExec: mode=CollectLeft, join_type=LeftMark, on=[(t1_id@0, t2_id@0)]
-   05)--------DataSourceExec: partitions=1, partition_sizes=[1]
-   06)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
-   07)----------DataSourceExec: partitions=1, partition_sizes=[1]
+   06)--------TableScan: t2 projection=[t2_id]
at [...]/datafusion/datafusion/sqllogictest/test_files/subquery.slt:1177


5. query failed: DataFusion error: join_selection
caused by
Internal error: Input field name t1_name does not match with the projection expression t1_id.
This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker
[SQL] select t1.t1_id,
       t1.t1_name,
       t1.t1_int
from t1
where t1.t1_id > 40 or not exists (select * from t2 where t1.t1_id = t2.t2_id)
at [...]/datafusion/datafusion/sqllogictest/test_files/subquery.slt:1203


6. query failed: DataFusion error: join_selection
caused by
Internal error: Input field name t1_name does not match with the projection expression t1_id.
This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker
[SQL] select t1.t1_id,
       t1.t1_name,
       t1.t1_int
from t1
where t1.t1_id > 40 or exists (select * from t2 cross join t3 where t1.t1_id = t2.t2_id)
at [...]/datafusion/datafusion/sqllogictest/test_files/subquery.slt:1244



External error: task 14575 panicked with message "index out of bounds: the len is 6 but the index is 6"
External error: task 17017 panicked with message "index out of bounds: the len is 6 but the index is 6"
Error: Execution("3 failures")
error: test failed, to rerun pass `-p datafusion-sqllogictest --test sqllogictests`

</details>

Copy link
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jonathanc-n I quickly went through the PR
It would be nice to have fuzz tests for RightMark join, I'm not sure 3 tests would cover all possible cases, WDYT?

@ctsk
Copy link
Contributor

ctsk commented Jun 1, 2025

Alrighty!

Regarding the lack of support for RightMark joins in some join operators, I believe it would be best to return an error in the constructor of those operators if they do not support the JoinType. I believe the PR currently would just return a wrong result.

I don't (yet) know how much work it would be to adjust physical optimizer rules so that no bad JoinType x OperatorType combination gets constructed.

Algorithmically, what I found hardest to understand is the switcheroo between build and probe side - You've modified adjust_indices_by_join_type for then RightMark join so that left_indices holds the indices for the probe batch and right_indices holds the mark. Then you also switch probe_batch, build_input_buffer and the build_sidein the call tobuild_batch_from_indices` so that it all works out..... It's kind of brilliant, but would be best to at least document that this is going on.

I think a less confusing alternative would be to just add a mark_side: JoinSide parameter to build_batch_from_indices that lets it distinguish between LeftMark and RightMark join for the ColumnSide::None case.

@jonathanc-n
Copy link
Contributor Author

Regarding the lack of support for RightMark joins in some join operators, I believe it would be best to return an error in the constructor of those operators if they do not support the JoinType. I believe the PR currently would just return a wrong result.

I think instead we should just not support swapping for now and support it after all the join operators are supported. Returning an error will cause some tests to fail.

@github-actions github-actions bot added the physical-plan Changes to the physical-plan crate label Jun 2, 2025
Comment on lines -707 to +708
| JoinType::LeftMark => {
| JoinType::LeftMark
| JoinType::RightMark => {
Copy link
Contributor

@ctsk ctsk Jun 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I admit that I don't understand why LeftMark/RightMark are different from their Semi/Anti counterparts. It seems like the documentation of this method is missing LeftMark / RightMark too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the doc! I'm not quite sure either, I think it has to do with the extra boolean column crossing the boundary into the other side, resulting in the mark column being missing from the requirements

Comment on lines 1139 to 1148
let mut bitmap = BooleanBufferBuilder::new(range.len());
bitmap.append_n(range.len(), false);
input_indices
.iter()
.flatten()
.map(|v| v.as_usize())
.filter(|v| range.contains(v))
.for_each(|v| {
bitmap.set_bit(v - range.start, true);
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that these lines are the same for get_mark_indices, get_semi_indices and get_anti_indices. We could factor them out to a new method?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

map.filter.foreach wondering can it be done in single iteration?

Copy link
Contributor Author

@jonathanc-n jonathanc-n Jun 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.for_each(|v| {
    let idx = v.as_usize();
    if range.contains(&idx) {
        builder.set_bit(idx - range.start, true);
    }
});

Could do something like this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I havI have refactored to use this

@@ -305,6 +305,31 @@ async fn test_left_mark_join_1k_filtered() {
.await
}

// todo: add JoinTestType::HjSmj after Right mark SortMergeJoin support
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is needed to add RightMark Join support to SortMergeJoin execution as well

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I will close #16226 as it is a duplicate

.collect::<PrimitiveArray<L>>();
let probe_indices = (0..prune_length)
.map(|idx| {
// For mark join we output a dummy index 0 to indicate the row had a match
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that would be nice to show as an example, it is challenging to read algorithms for upcoming contributors, but it can be done as follow up PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tracked in #16415

Copy link
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jonathanc-n for bearing with this PR, I think it is good to go.

@comphead
Copy link
Contributor

Thanks for this contribution, I'm planning to have this PR open for a little bit of more time to see if there are any other feedbacks

@alamb alamb merged commit 7d16764 into apache:main Jun 16, 2025
28 checks passed
@alamb
Copy link
Contributor

alamb commented Jun 16, 2025

🚀 -- I am feeling physically nervous that there are so many PRs open so starting the merge train!

@jonathanc-n jonathanc-n deleted the right-mark branch June 16, 2025 23:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
common Related to common crate core Core DataFusion crate logical-expr Logical plan and expressions optimizer Optimizer rules physical-expr Changes to the physical-expr crates physical-plan Changes to the physical-plan crate proto Related to proto crate sql SQL Planner substrait Changes to the substrait crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants