Skip to content

Commit 8eb6171

Browse files
committed
feat: use only one, configurable ExecutorService
Fixes #540
1 parent b558a6b commit 8eb6171

File tree

6 files changed

+67
-69
lines changed

6 files changed

+67
-69
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import java.util.Collections;
77
import java.util.LinkedList;
88
import java.util.List;
9+
import java.util.concurrent.TimeUnit;
910

1011
import org.slf4j.Logger;
1112
import org.slf4j.LoggerFactory;
@@ -102,6 +103,18 @@ public void close() {
102103
log.info(
103104
"Operator SDK {} is shutting down...", configurationService.getVersion().getSdkVersion());
104105

106+
try {
107+
log.debug("Closing executor");
108+
final var executor = configurationService.getExecutorService();
109+
executor.shutdown();
110+
if (!executor.awaitTermination(configurationService.getTerminationTimeoutSeconds(),
111+
TimeUnit.SECONDS)) {
112+
executor.shutdownNow(); // if we timed out, waiting, cancel everything
113+
}
114+
} catch (InterruptedException e) {
115+
log.debug("Exception closing executor: {}", e.getLocalizedMessage());
116+
}
117+
105118
controllers.close();
106119

107120
k8sClient.close();

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import io.javaoperatorsdk.operator.api.ResourceController;
99

1010
import com.fasterxml.jackson.databind.ObjectMapper;
11+
import java.util.concurrent.ExecutorService;
1112

1213
/** An interface from which to retrieve configuration information. */
1314
public interface ConfigurationService {
@@ -102,4 +103,8 @@ default int getTerminationTimeoutSeconds() {
102103
default Metrics getMetrics() {
103104
return Metrics.NOOP;
104105
}
106+
107+
default ExecutorService getExecutorService() {
108+
return ExecutorServiceProducer.getExecutor(concurrentReconciliationThreads());
109+
}
105110
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package io.javaoperatorsdk.operator.api.config;
2+
3+
import java.util.concurrent.ExecutorService;
4+
import java.util.concurrent.ScheduledThreadPoolExecutor;
5+
import java.util.concurrent.atomic.AtomicReference;
6+
7+
class ExecutorServiceProducer {
8+
9+
private final static AtomicReference<ScheduledThreadPoolExecutor> executor =
10+
new AtomicReference<>();
11+
12+
static ExecutorService getExecutor(int threadPoolSize) {
13+
final var gotSet =
14+
executor.compareAndSet(null, new ScheduledThreadPoolExecutor(threadPoolSize));
15+
final var result = executor.get();
16+
if (!gotSet) {
17+
// check that we didn't try to change the pool size
18+
if (result.getCorePoolSize() != threadPoolSize) {
19+
throw new IllegalArgumentException(
20+
"Cannot change the ExecutorService's thread pool size once set! Was "
21+
+ result.getCorePoolSize() + ", attempted to retrieve " + threadPoolSize);
22+
}
23+
}
24+
return result;
25+
}
26+
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java

Lines changed: 23 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import java.util.Map;
66
import java.util.Optional;
77
import java.util.Set;
8+
import java.util.concurrent.ExecutorService;
89
import java.util.concurrent.ScheduledThreadPoolExecutor;
910
import java.util.concurrent.TimeUnit;
1011
import java.util.concurrent.locks.ReentrantLock;
@@ -44,61 +45,37 @@ public void failedEvent(String uid, Event event) {}
4445

4546
private final EventBuffer eventBuffer;
4647
private final Set<String> underProcessing = new HashSet<>();
47-
private final ScheduledThreadPoolExecutor executor;
4848
private final EventDispatcher<R> eventDispatcher;
4949
private final Retry retry;
5050
private final Map<String, RetryExecution> retryState = new HashMap<>();
51+
private final ExecutorService executor;
5152
private final String controllerName;
52-
private final int terminationTimeout;
5353
private final ReentrantLock lock = new ReentrantLock();
5454
private DefaultEventSourceManager<R> eventSourceManager;
5555

5656
public DefaultEventHandler(ConfiguredController<R> controller) {
57-
this(
58-
new EventDispatcher<>(controller),
57+
this(controller.getConfiguration().getConfigurationService().getExecutorService(),
5958
controller.getConfiguration().getName(),
60-
GenericRetry.fromConfiguration(controller.getConfiguration().getRetryConfiguration()),
61-
controller.getConfiguration().getConfigurationService().concurrentReconciliationThreads(),
62-
controller.getConfiguration().getConfigurationService().getTerminationTimeoutSeconds());
59+
new EventDispatcher<>(controller),
60+
GenericRetry.fromConfiguration(controller.getConfiguration().getRetryConfiguration()));
6361
}
6462

65-
DefaultEventHandler(EventDispatcher<R> dispatcher, String relatedControllerName, Retry retry) {
66-
this(
67-
dispatcher,
68-
relatedControllerName,
69-
retry,
70-
ConfigurationService.DEFAULT_RECONCILIATION_THREADS_NUMBER,
71-
ConfigurationService.DEFAULT_TERMINATION_TIMEOUT_SECONDS);
63+
DefaultEventHandler(EventDispatcher<R> eventDispatcher, String relatedControllerName,
64+
Retry retry) {
65+
this(null, relatedControllerName, eventDispatcher, retry);
7266
}
7367

74-
private DefaultEventHandler(
75-
EventDispatcher<R> eventDispatcher,
76-
String relatedControllerName,
77-
Retry retry,
78-
int concurrentReconciliationThreads,
79-
int terminationTimeout) {
68+
private DefaultEventHandler(ExecutorService executor, String relatedControllerName,
69+
EventDispatcher<R> eventDispatcher, Retry retry) {
70+
this.executor =
71+
executor == null
72+
? new ScheduledThreadPoolExecutor(
73+
ConfigurationService.DEFAULT_RECONCILIATION_THREADS_NUMBER)
74+
: executor;
75+
this.controllerName = relatedControllerName;
8076
this.eventDispatcher = eventDispatcher;
8177
this.retry = retry;
82-
this.controllerName = relatedControllerName;
8378
eventBuffer = new EventBuffer();
84-
this.terminationTimeout = terminationTimeout;
85-
executor =
86-
new ScheduledThreadPoolExecutor(
87-
concurrentReconciliationThreads,
88-
runnable -> new Thread(runnable, "EventHandler-" + relatedControllerName));
89-
}
90-
91-
@Override
92-
public void close() {
93-
try {
94-
log.debug("Closing handler for {}", controllerName);
95-
executor.shutdown();
96-
if (!executor.awaitTermination(terminationTimeout, TimeUnit.SECONDS)) {
97-
executor.shutdownNow(); // if we timed out, waiting, cancel everything
98-
}
99-
} catch (InterruptedException e) {
100-
log.debug("Exception closing handler for {}: {}", controllerName, e.getLocalizedMessage());
101-
}
10279
}
10380

10481
public void setEventSourceManager(DefaultEventSourceManager<R> eventSourceManager) {
@@ -146,7 +123,13 @@ private void executeBufferedEvents(String customResourceUid) {
146123
latestCustomResource.get(),
147124
retryInfo(customResourceUid));
148125
log.debug("Executing events for custom resource. Scope: {}", executionScope);
149-
executor.execute(new ExecutionConsumer(executionScope, eventDispatcher, this));
126+
executor.execute(() -> {
127+
// change thread name for easier debugging
128+
Thread.currentThread().setName("EventHandler-" + controllerName);
129+
PostExecutionControl<R> postExecutionControl =
130+
eventDispatcher.handleExecution(executionScope);
131+
eventProcessingFinished(executionScope, postExecutionControl);
132+
});
150133
} else {
151134
log.debug(
152135
"Skipping executing controller for resource id: {}. Events in queue: {}."

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/ExecutionConsumer.java

Lines changed: 0 additions & 25 deletions
This file was deleted.

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,6 @@ public DefaultEventSourceManager(ConfiguredController<R> controller) {
5151
new CustomResourceEventSource<>(controller));
5252
}
5353

54-
public DefaultEventHandler<R> getEventHandler() {
55-
return defaultEventHandler;
56-
}
57-
5854
@Override
5955
public void close() {
6056
try {

0 commit comments

Comments
 (0)