Skip to content

Commit abaedef

Browse files
committed
[FLINK-37921][table] Refactor current key ordered async lookup join.
1 parent 8e7273d commit abaedef

File tree

9 files changed

+1183
-19
lines changed

9 files changed

+1183
-19
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncRunnableStreamOperator.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,12 @@ public abstract class AbstractAsyncRunnableStreamOperator<OUT>
4646
extends AbstractAsyncKeyOrderedStreamOperator<OUT>
4747
implements AsyncKeyOrderedProcessingOperator {
4848

49-
final KeySelector<?, ?> keySelector1;
50-
final KeySelector<?, ?> keySelector2;
51-
final ExecutorService asyncThreadPool;
52-
final int asyncBufferSize;
53-
final long asyncBufferTimeout;
54-
final int inFlightRecordsLimit;
49+
protected final KeySelector<?, ?> keySelector1;
50+
protected final KeySelector<?, ?> keySelector2;
51+
protected final ExecutorService asyncThreadPool;
52+
protected final int asyncBufferSize;
53+
protected final long asyncBufferTimeout;
54+
protected final int inFlightRecordsLimit;
5555

5656
public AbstractAsyncRunnableStreamOperator(
5757
KeySelector<?, ?> keySelector1,

flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncOneInputStreamOperatorTestHarness.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ private void finishFuture(CompletableFuture<Void> future) throws Exception {
269269
}
270270
}
271271

272-
private void checkEnvState() {
272+
protected void checkEnvState() {
273273
if (getEnvironment().getActualExternalFailureCause().isPresent()) {
274274
fail(
275275
"There is an error on other threads",
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.streaming.util.asyncprocessing;
20+
21+
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
22+
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
23+
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
24+
import org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncKeyOrderedProcessingOperator;
25+
26+
import java.util.concurrent.CompletableFuture;
27+
import java.util.concurrent.ExecutorService;
28+
import java.util.concurrent.Executors;
29+
30+
/**
31+
* A test harness for testing a {@link AsyncKeyOrderedProcessingOperator} .
32+
*
33+
* <p>This harness has no difference with {@link AsyncOneInputStreamOperatorTestHarness} but check
34+
* environment state.
35+
*/
36+
public class AsyncOneInputStreamOperatorTestHarnessWithoutEnvCheck<IN, OUT>
37+
extends AsyncOneInputStreamOperatorTestHarness<IN, OUT> {
38+
39+
public static <IN, OUT> AsyncOneInputStreamOperatorTestHarnessWithoutEnvCheck<IN, OUT> create(
40+
OneInputStreamOperator<IN, OUT> operator) throws Exception {
41+
42+
ExecutorService executorService = Executors.newSingleThreadExecutor();
43+
CompletableFuture<AsyncOneInputStreamOperatorTestHarnessWithoutEnvCheck<IN, OUT>> future =
44+
new CompletableFuture<>();
45+
executorService.execute(
46+
() -> {
47+
try {
48+
future.complete(
49+
new AsyncOneInputStreamOperatorTestHarnessWithoutEnvCheck<>(
50+
executorService,
51+
SimpleOperatorFactory.of(operator),
52+
1,
53+
1,
54+
0));
55+
} catch (Exception e) {
56+
throw new RuntimeException(e);
57+
}
58+
});
59+
return future.get();
60+
}
61+
62+
protected AsyncOneInputStreamOperatorTestHarnessWithoutEnvCheck(
63+
ExecutorService executor,
64+
StreamOperatorFactory<OUT> operatorFactory,
65+
int maxParallelism,
66+
int parallelism,
67+
int subtaskIndex)
68+
throws Exception {
69+
super(executor, operatorFactory, maxParallelism, parallelism, subtaskIndex);
70+
}
71+
72+
@Override
73+
protected void checkEnvState() {
74+
// do not check
75+
}
76+
}

flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -214,19 +214,23 @@ public LazyAsyncFunction() {
214214
@Override
215215
public void asyncInvoke(final Integer input, final ResultFuture<Integer> resultFuture)
216216
throws Exception {
217-
executorService.submit(
218-
() -> {
219-
try {
220-
waitLatch();
221-
} catch (InterruptedException e) {
222-
// do nothing
223-
}
217+
executorService.submit(getRunnable(input, resultFuture));
218+
}
224219

225-
resultFuture.complete(Collections.singletonList(input));
226-
});
220+
protected Runnable getRunnable(
221+
final Integer input, final ResultFuture<Integer> resultFuture) {
222+
return () -> {
223+
try {
224+
waitLatch();
225+
} catch (InterruptedException e) {
226+
// do nothing
227+
}
228+
229+
resultFuture.complete(Collections.singletonList(input));
230+
};
227231
}
228232

229-
protected void waitLatch() throws InterruptedException {
233+
public void waitLatch() throws InterruptedException {
230234
latch.await();
231235
}
232236

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@
6969
import org.apache.flink.table.runtime.generated.GeneratedFunction;
7070
import org.apache.flink.table.runtime.generated.GeneratedResultFuture;
7171
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
72-
import org.apache.flink.table.runtime.operators.TableKeyedAsyncWaitOperatorFactory;
72+
import org.apache.flink.table.runtime.operators.AsyncKeyOrderedLookupOperatorFactory;
7373
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
7474
import org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner;
7575
import org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner;
@@ -527,7 +527,7 @@ protected StreamOperatorFactory<RowData> createAsyncLookupJoin(
527527
if (asyncLookupOptions.keyOrdered) {
528528
Preconditions.checkState(
529529
AsyncDataStream.OutputMode.ORDERED.equals(asyncLookupOptions.asyncOutputMode));
530-
return new TableKeyedAsyncWaitOperatorFactory<>(
530+
return new AsyncKeyOrderedLookupOperatorFactory<>(
531531
asyncFunc,
532532
keySelector,
533533
asyncLookupOptions.asyncTimeout,

0 commit comments

Comments
 (0)