Skip to content

Commit f966d1d

Browse files
committed
fix: DefaultEventHandler should not fire events if the handler is being closed #578
1 parent 4d36922 commit f966d1d

File tree

3 files changed

+55
-10
lines changed

3 files changed

+55
-10
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ public void failedEvent(String uid, Event event) {}
5151
private final ExecutorService executor;
5252
private final String controllerName;
5353
private final ReentrantLock lock = new ReentrantLock();
54+
private volatile boolean running;
5455
private DefaultEventSourceManager<R> eventSourceManager;
5556

5657
public DefaultEventHandler(ConfiguredController<R> controller) {
@@ -67,6 +68,7 @@ public DefaultEventHandler(ConfiguredController<R> controller) {
6768

6869
private DefaultEventHandler(ExecutorService executor, String relatedControllerName,
6970
EventDispatcher<R> eventDispatcher, Retry retry) {
71+
this.running = true;
7072
this.executor =
7173
executor == null
7274
? new ScheduledThreadPoolExecutor(
@@ -75,27 +77,27 @@ private DefaultEventHandler(ExecutorService executor, String relatedControllerNa
7577
this.controllerName = relatedControllerName;
7678
this.eventDispatcher = eventDispatcher;
7779
this.retry = retry;
78-
eventBuffer = new EventBuffer();
79-
}
80-
81-
public void setEventSourceManager(DefaultEventSourceManager<R> eventSourceManager) {
82-
this.eventSourceManager = eventSourceManager;
80+
this.eventBuffer = new EventBuffer();
8381
}
8482

8583
public static void setEventMonitor(EventMonitor monitor) {
8684
DefaultEventHandler.monitor = monitor;
8785
}
8886

89-
public interface EventMonitor {
90-
void processedEvent(String uid, Event event);
91-
92-
void failedEvent(String uid, Event event);
87+
public void setEventSourceManager(DefaultEventSourceManager<R> eventSourceManager) {
88+
this.eventSourceManager = eventSourceManager;
9389
}
9490

9591
@Override
9692
public void handleEvent(Event event) {
93+
9794
try {
9895
lock.lock();
96+
97+
if (!this.running) {
98+
log.debug("Skipping event: {} because the event handler is shutting down", event);
99+
}
100+
99101
log.debug("Received event: {}", event);
100102

101103
final Predicate<CustomResource> selector = event.getCustomResourcesSelector();
@@ -109,6 +111,16 @@ public void handleEvent(Event event) {
109111
}
110112
}
111113

114+
@Override
115+
public void close() {
116+
try {
117+
lock.lock();
118+
this.running = false;
119+
} finally {
120+
lock.unlock();
121+
}
122+
}
123+
112124
private void executeBufferedEvents(String customResourceUid) {
113125
boolean newEventForResourceId = eventBuffer.containsEvents(customResourceUid);
114126
boolean controllerUnderExecution = isControllerUnderExecution(customResourceUid);
@@ -143,6 +155,10 @@ void eventProcessingFinished(
143155
ExecutionScope<R> executionScope, PostExecutionControl<R> postExecutionControl) {
144156
try {
145157
lock.lock();
158+
if (!running) {
159+
return;
160+
}
161+
146162
log.debug(
147163
"Event processing finished. Scope: {}, PostExecutionControl: {}",
148164
executionScope,
@@ -279,6 +295,12 @@ private void unsetUnderExecution(String customResourceUid) {
279295
underProcessing.remove(customResourceUid);
280296
}
281297

298+
public interface EventMonitor {
299+
void processedEvent(String uid, Event event);
300+
301+
void failedEvent(String uid, Event event);
302+
}
303+
282304
private class ControllerExecution implements Runnable {
283305
private final ExecutionScope<R> executionScope;
284306

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,13 @@ public DefaultEventSourceManager(ConfiguredController<R> controller) {
5454
public void close() {
5555
try {
5656
lock.lock();
57+
58+
try {
59+
defaultEventHandler.close();
60+
} catch (Exception e) {
61+
log.warn("Error closing event handler", e);
62+
}
63+
5764
for (var entry : eventSources.entrySet()) {
5865
try {
5966
log.debug("Closing {} -> {}", entry.getKey(), entry.getValue());

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,15 @@
2323
import static io.javaoperatorsdk.operator.TestUtils.testCustomResource;
2424
import static org.assertj.core.api.Assertions.assertThat;
2525
import static org.mockito.ArgumentMatchers.eq;
26-
import static org.mockito.Mockito.*;
26+
import static org.mockito.Mockito.any;
27+
import static org.mockito.Mockito.doAnswer;
28+
import static org.mockito.Mockito.doCallRealMethod;
29+
import static org.mockito.Mockito.mock;
30+
import static org.mockito.Mockito.never;
31+
import static org.mockito.Mockito.timeout;
32+
import static org.mockito.Mockito.times;
33+
import static org.mockito.Mockito.verify;
34+
import static org.mockito.Mockito.when;
2735

2836
class DefaultEventHandlerTest {
2937

@@ -224,6 +232,14 @@ public void scheduleTimedEventIfInstructedByPostExecutionControl() {
224232
.scheduleOnce(any(), eq(testDelay));
225233
}
226234

235+
@Test
236+
public void doNotFireEventsIfClosing() {
237+
defaultEventHandler.close();
238+
defaultEventHandler.handleEvent(prepareCREvent());
239+
240+
verify(eventDispatcherMock, timeout(50).times(1)).handleExecution(any());
241+
}
242+
227243
private void waitMinimalTime() {
228244
try {
229245
Thread.sleep(50);

0 commit comments

Comments
 (0)