diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java index 5b23e3817a..c63dd6c972 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java @@ -1,7 +1,5 @@ package io.javaoperatorsdk.operator; -import java.io.Closeable; -import java.io.IOException; import java.net.ConnectException; import java.util.ArrayList; import java.util.HashMap; @@ -15,6 +13,7 @@ import io.fabric8.kubernetes.client.DefaultKubernetesClient; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.Version; +import io.javaoperatorsdk.operator.api.LifecycleAware; import io.javaoperatorsdk.operator.api.ResourceController; import io.javaoperatorsdk.operator.api.config.ConfigurationService; import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; @@ -22,7 +21,7 @@ import io.javaoperatorsdk.operator.processing.ConfiguredController; @SuppressWarnings("rawtypes") -public class Operator implements AutoCloseable { +public class Operator implements AutoCloseable, LifecycleAware { private static final Logger log = LoggerFactory.getLogger(Operator.class); private final KubernetesClient kubernetesClient; private final ConfigurationService configurationService; @@ -90,18 +89,23 @@ public void start() { controllers.start(); } - /** Stop the operator. */ @Override - public void close() { + public void stop() throws OperatorException { log.info( "Operator SDK {} is shutting down...", configurationService.getVersion().getSdkVersion()); - controllers.close(); + controllers.stop(); - ExecutorServiceManager.close(); + ExecutorServiceManager.stop(); kubernetesClient.close(); } + /** Stop the operator. */ + @Override + public void close() { + stop(); + } + /** * Add a registration requests for the specified controller with this operator. The effective * registration of the controller is delayed till the operator is started. @@ -159,7 +163,7 @@ public void register( } } - private static class ControllerManager implements Closeable { + private static class ControllerManager implements LifecycleAware { private final Map controllers = new HashMap<>(); private boolean started = false; @@ -178,19 +182,14 @@ public synchronized void start() { started = true; } - @Override - public synchronized void close() { + public synchronized void stop() { if (!started) { return; } this.controllers.values().parallelStream().forEach(closeable -> { - try { - log.debug("closing {}", closeable); - closeable.close(); - } catch (IOException e) { - log.warn("Error closing {}", closeable, e); - } + log.debug("closing {}", closeable); + closeable.stop(); }); started = false; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/LifecycleAware.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/LifecycleAware.java new file mode 100644 index 0000000000..320368bcea --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/LifecycleAware.java @@ -0,0 +1,9 @@ +package io.javaoperatorsdk.operator.api; + +import io.javaoperatorsdk.operator.OperatorException; + +public interface LifecycleAware { + void start() throws OperatorException; + + void stop() throws OperatorException; +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java index 682b004c3d..62d44b66a2 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java @@ -34,9 +34,9 @@ public static void init(ConfigurationService configuration) { } } - public static void close() { + public static void stop() { if (instance != null) { - instance.stop(); + instance.doStop(); } // make sure that we remove the singleton so that the thread pool is re-created on next call to // start @@ -55,7 +55,7 @@ public ExecutorService executorService() { return executor; } - private void stop() { + private void doStop() { try { log.debug("Closing executor"); executor.shutdown(); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/ConfiguredController.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/ConfiguredController.java index 0f3bf2d9fd..7249dfe4aa 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/ConfiguredController.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/ConfiguredController.java @@ -1,7 +1,5 @@ package io.javaoperatorsdk.operator.processing; -import java.io.Closeable; -import java.io.IOException; import java.util.Objects; import io.fabric8.kubernetes.api.model.KubernetesResourceList; @@ -13,14 +11,19 @@ import io.javaoperatorsdk.operator.CustomResourceUtils; import io.javaoperatorsdk.operator.MissingCRDException; import io.javaoperatorsdk.operator.OperatorException; -import io.javaoperatorsdk.operator.api.*; +import io.javaoperatorsdk.operator.api.Context; +import io.javaoperatorsdk.operator.api.DeleteControl; +import io.javaoperatorsdk.operator.api.EventSourceInitializer; +import io.javaoperatorsdk.operator.api.LifecycleAware; +import io.javaoperatorsdk.operator.api.ResourceController; +import io.javaoperatorsdk.operator.api.UpdateControl; import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; import io.javaoperatorsdk.operator.api.monitoring.Metrics.ControllerExecution; import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager; import io.javaoperatorsdk.operator.processing.event.EventSourceManager; public class ConfiguredController> implements ResourceController, - Closeable, EventSourceInitializer { + LifecycleAware, EventSourceInitializer { private final ResourceController controller; private final ControllerConfiguration configuration; private final KubernetesClient kubernetesClient; @@ -214,10 +217,9 @@ public EventSourceManager getEventSourceManager() { return eventSourceManager; } - @Override - public void close() throws IOException { + public void stop() { if (eventSourceManager != null) { - eventSourceManager.close(); + eventSourceManager.stop(); } } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java index aac79938cd..212cd378d7 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java @@ -1,6 +1,5 @@ package io.javaoperatorsdk.operator.processing; -import java.io.Closeable; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -14,11 +13,16 @@ import org.slf4j.LoggerFactory; import io.fabric8.kubernetes.client.CustomResource; +import io.javaoperatorsdk.operator.OperatorException; +import io.javaoperatorsdk.operator.api.LifecycleAware; import io.javaoperatorsdk.operator.api.RetryInfo; import io.javaoperatorsdk.operator.api.config.ConfigurationService; import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager; import io.javaoperatorsdk.operator.api.monitoring.Metrics; -import io.javaoperatorsdk.operator.processing.event.*; +import io.javaoperatorsdk.operator.processing.event.CustomResourceID; +import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager; +import io.javaoperatorsdk.operator.processing.event.Event; +import io.javaoperatorsdk.operator.processing.event.EventHandler; import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEvent; import io.javaoperatorsdk.operator.processing.event.internal.ResourceAction; import io.javaoperatorsdk.operator.processing.retry.GenericRetry; @@ -33,7 +37,7 @@ * UID, while buffering events which are received during an execution. */ public class DefaultEventHandler> - implements EventHandler, Closeable { + implements EventHandler, LifecycleAware { private static final Logger log = LoggerFactory.getLogger(DefaultEventHandler.class); @@ -315,7 +319,7 @@ private boolean isRetryConfigured() { } @Override - public void close() { + public void stop() { lock.lock(); try { this.running = false; @@ -324,6 +328,16 @@ public void close() { } } + @Override + public void start() throws OperatorException { + lock.lock(); + try { + this.running = true; + } finally { + lock.unlock(); + } + } + private class ControllerExecution implements Runnable { private final ExecutionScope executionScope; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java index 5d900ab5cc..dd8e7f19ce 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java @@ -1,7 +1,9 @@ package io.javaoperatorsdk.operator.processing.event; -import java.io.Closeable; -import java.util.*; +import java.util.Collections; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; import java.util.concurrent.locks.ReentrantLock; import org.slf4j.Logger; @@ -10,13 +12,14 @@ import io.fabric8.kubernetes.client.CustomResource; import io.javaoperatorsdk.operator.MissingCRDException; import io.javaoperatorsdk.operator.OperatorException; +import io.javaoperatorsdk.operator.api.LifecycleAware; import io.javaoperatorsdk.operator.processing.ConfiguredController; import io.javaoperatorsdk.operator.processing.DefaultEventHandler; import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEventSource; import io.javaoperatorsdk.operator.processing.event.internal.TimerEventSource; public class DefaultEventSourceManager> - implements EventSourceManager, Closeable { + implements EventSourceManager, LifecycleAware { private static final Logger log = LoggerFactory.getLogger(DefaultEventSourceManager.class); @@ -45,18 +48,23 @@ private void init(DefaultEventHandler defaultEventHandler) { } @Override - public void close() { + public void start() throws OperatorException { + defaultEventHandler.start(); + } + + @Override + public void stop() { lock.lock(); try { try { - defaultEventHandler.close(); + defaultEventHandler.stop(); } catch (Exception e) { log.warn("Error closing event handler", e); } log.debug("Closing event sources."); for (var eventSource : eventSources) { try { - eventSource.close(); + eventSource.stop(); } catch (Exception e) { log.warn("Error closing {} -> {}", eventSource, e); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSource.java index 22187ddb86..7646dcc353 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSource.java @@ -1,22 +1,8 @@ package io.javaoperatorsdk.operator.processing.event; -import java.io.Closeable; -import java.io.IOException; +import io.javaoperatorsdk.operator.api.LifecycleAware; -public interface EventSource extends Closeable { - - /** - * This method is invoked when this {@link EventSource} instance is properly registered to a - * {@link EventSourceManager}. - */ - default void start() {} - - /** - * This method is invoked when this {@link EventSource} instance is de-registered from a - * {@link EventSourceManager}. - */ - @Override - default void close() throws IOException {} +public interface EventSource extends LifecycleAware { void setEventHandler(EventHandler eventHandler); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java index 84a3ad75dc..a9a80c10f3 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java @@ -1,6 +1,5 @@ package io.javaoperatorsdk.operator.processing.event.internal; -import java.io.IOException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; @@ -105,13 +104,13 @@ public void start() { } @Override - public void close() throws IOException { + public void stop() { for (SharedIndexInformer informer : sharedIndexInformers.values()) { try { - log.info("Closing informer {} -> {}", controller, informer); - informer.close(); + log.info("Stopping informer {} -> {}", controller, informer); + informer.stop(); } catch (Exception e) { - log.warn("Error closing informer {} -> {}", controller, informer, e); + log.warn("Error stopping informer {} -> {}", controller, informer, e); } } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/InformerEventSource.java index 9d32432360..d9252bb4c6 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/InformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/InformerEventSource.java @@ -1,6 +1,5 @@ package io.javaoperatorsdk.operator.processing.event.internal; -import java.io.IOException; import java.util.Objects; import java.util.Set; import java.util.function.Function; @@ -93,7 +92,7 @@ public void start() { } @Override - public void close() throws IOException { + public void stop() { sharedInformer.close(); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/TimerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/TimerEventSource.java index e8a1b7bc20..4fc78ebb44 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/TimerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/TimerEventSource.java @@ -1,6 +1,5 @@ package io.javaoperatorsdk.operator.processing.event.internal; -import java.io.IOException; import java.util.Map; import java.util.Timer; import java.util.TimerTask; @@ -76,7 +75,7 @@ public void start() { } @Override - public void close() throws IOException { + public void stop() { running.set(false); onceTasks.keySet().forEach(this::cancelOnceSchedule); timerTasks.keySet().forEach(this::cancelSchedule); diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java index 93a36d5304..2ffabd6fe1 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java @@ -194,7 +194,7 @@ public void reScheduleOnlyIfNotExecutedBufferedEvents() { @Test public void doNotFireEventsIfClosing() { - defaultEventHandler.close(); + defaultEventHandler.stop(); defaultEventHandler.handleEvent(prepareCREvent()); verify(eventDispatcherMock, timeout(50).times(0)).handleExecution(any()); diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManagerTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManagerTest.java index ad87000a52..d5b87bb5b5 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManagerTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManagerTest.java @@ -42,10 +42,10 @@ public void closeShouldCascadeToEventSources() throws IOException { defaultEventSourceManager.registerEventSource(eventSource); defaultEventSourceManager.registerEventSource(eventSource2); - defaultEventSourceManager.close(); + defaultEventSourceManager.stop(); - verify(eventSource, times(1)).close(); - verify(eventSource2, times(1)).close(); + verify(eventSource, times(1)).stop(); + verify(eventSource2, times(1)).stop(); } @Test diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/internal/TimerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/internal/TimerEventSourceTest.java index 0d9c3b5a11..34bc6b2f92 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/internal/TimerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/internal/TimerEventSourceTest.java @@ -112,7 +112,7 @@ public void deRegistersOnceEventSources() { public void eventNotRegisteredIfStopped() throws IOException { TestCustomResource customResource = TestUtils.testCustomResource(); - timerEventSource.close(); + timerEventSource.stop(); assertThatExceptionOfType(IllegalStateException.class).isThrownBy( () -> timerEventSource.scheduleOnce(customResource, PERIOD)); } @@ -120,7 +120,7 @@ public void eventNotRegisteredIfStopped() throws IOException { @Test public void eventNotFiredIfStopped() throws IOException { timerEventSource.scheduleOnce(TestUtils.testCustomResource(), PERIOD); - timerEventSource.close(); + timerEventSource.stop(); untilAsserted(() -> assertThat(eventHandlerMock.events).isEmpty()); }