Skip to content

[FLINK-37921][table] Replace TableKeyedAsyncWaitOperator with new operator under new async framework #26698

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,18 @@ private void seizeCapacity(boolean allowOverdraft) {
inFlightRecordNum.incrementAndGet();
}

/**
* Make a completable future that is wired with the current context and mailbox executor, which
* can be used to complete the future later, and trigger the mailbox executor to run the
* callbacks.
*
* @return a completable future that is wired with the current context and mailbox executor.
* @param <T> the type of the result of the future.
*/
public <T> InternalAsyncFuture<T> makeCompletableFuture() {
return asyncFutureFactory.create(currentContext);
}

/**
* A helper to request a sync point and run a callback if it finishes (once the record is not
* blocked).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ public abstract class AbstractAsyncRunnableStreamOperator<OUT>
extends AbstractAsyncKeyOrderedStreamOperator<OUT>
implements AsyncKeyOrderedProcessingOperator {

final KeySelector<?, ?> keySelector1;
final KeySelector<?, ?> keySelector2;
final ExecutorService asyncThreadPool;
final int asyncBufferSize;
final long asyncBufferTimeout;
final int inFlightRecordsLimit;
protected final KeySelector<?, ?> keySelector1;
protected final KeySelector<?, ?> keySelector2;
protected final ExecutorService asyncThreadPool;
protected final int asyncBufferSize;
protected final long asyncBufferTimeout;
protected final int inFlightRecordsLimit;

public AbstractAsyncRunnableStreamOperator(
KeySelector<?, ?> keySelector1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ private void finishFuture(CompletableFuture<Void> future) throws Exception {
}
}

private void checkEnvState() {
protected void checkEnvState() {
if (getEnvironment().getActualExternalFailureCause().isPresent()) {
fail(
"There is an error on other threads",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.util.asyncprocessing;

import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncKeyOrderedProcessingOperator;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* A test harness for testing a {@link AsyncKeyOrderedProcessingOperator} .
*
* <p>This harness has no difference with {@link AsyncOneInputStreamOperatorTestHarness} but check
* environment state.
*/
public class AsyncOneInputStreamOperatorTestHarnessWithoutEnvCheck<IN, OUT>
extends AsyncOneInputStreamOperatorTestHarness<IN, OUT> {

public static <IN, OUT> AsyncOneInputStreamOperatorTestHarnessWithoutEnvCheck<IN, OUT> create(
OneInputStreamOperator<IN, OUT> operator) throws Exception {

ExecutorService executorService = Executors.newSingleThreadExecutor();
CompletableFuture<AsyncOneInputStreamOperatorTestHarnessWithoutEnvCheck<IN, OUT>> future =
new CompletableFuture<>();
executorService.execute(
() -> {
try {
future.complete(
new AsyncOneInputStreamOperatorTestHarnessWithoutEnvCheck<>(
executorService,
SimpleOperatorFactory.of(operator),
1,
1,
0));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
return future.get();
}

protected AsyncOneInputStreamOperatorTestHarnessWithoutEnvCheck(
ExecutorService executor,
StreamOperatorFactory<OUT> operatorFactory,
int maxParallelism,
int parallelism,
int subtaskIndex)
throws Exception {
super(executor, operatorFactory, maxParallelism, parallelism, subtaskIndex);
}

@Override
protected void checkEnvState() {
// do not check
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -214,19 +214,23 @@ public LazyAsyncFunction() {
@Override
public void asyncInvoke(final Integer input, final ResultFuture<Integer> resultFuture)
throws Exception {
executorService.submit(
() -> {
try {
waitLatch();
} catch (InterruptedException e) {
// do nothing
}
executorService.submit(getRunnable(input, resultFuture));
}

resultFuture.complete(Collections.singletonList(input));
});
protected Runnable getRunnable(
final Integer input, final ResultFuture<Integer> resultFuture) {
return () -> {
try {
waitLatch();
} catch (InterruptedException e) {
// do nothing
}

resultFuture.complete(Collections.singletonList(input));
};
}

protected void waitLatch() throws InterruptedException {
public void waitLatch() throws InterruptedException {
latch.await();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
import org.apache.flink.table.runtime.generated.GeneratedFunction;
import org.apache.flink.table.runtime.generated.GeneratedResultFuture;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.TableKeyedAsyncWaitOperatorFactory;
import org.apache.flink.table.runtime.operators.AsyncKeyOrderedLookupOperatorFactory;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner;
import org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner;
Expand Down Expand Up @@ -527,7 +527,7 @@ protected StreamOperatorFactory<RowData> createAsyncLookupJoin(
if (asyncLookupOptions.keyOrdered) {
Preconditions.checkState(
AsyncDataStream.OutputMode.ORDERED.equals(asyncLookupOptions.asyncOutputMode));
return new TableKeyedAsyncWaitOperatorFactory<>(
return new AsyncKeyOrderedLookupOperatorFactory<>(
asyncFunc,
keySelector,
asyncLookupOptions.asyncTimeout,
Expand Down
Loading