From 188372779d209e3aca5eaecfb224172740f4c9d3 Mon Sep 17 00:00:00 2001 From: csviri Date: Tue, 8 Dec 2020 15:48:43 +0100 Subject: [PATCH 01/10] retry basic impl --- .../io/javaoperatorsdk/operator/Operator.java | 11 ++-- .../processing/DefaultEventHandler.java | 57 ++++++++++++++++--- .../operator/processing/EventBuffer.java | 8 +++ .../operator/processing/EventDispatcher.java | 14 ++--- .../processing/ExecutionConsumer.java | 2 +- .../processing/PostExecutionControl.java | 24 ++++++-- .../event/DefaultEventSourceManager.java | 20 +++++-- .../processing/retry/GenericRetry.java | 2 +- .../retry/GenericRetryExecution.java | 7 --- .../operator/EventDispatcherTest.java | 28 +++++---- .../processing/DefaultEventHandlerTest.java | 12 ++-- .../event/DefaultEventSourceManagerTest.java | 2 +- .../retry/GenericRetryExecutionTest.java | 18 +++--- 13 files changed, 135 insertions(+), 70 deletions(-) diff --git a/operator-framework/src/main/java/io/javaoperatorsdk/operator/Operator.java b/operator-framework/src/main/java/io/javaoperatorsdk/operator/Operator.java index 7eb63b18f9..faf4cf6b45 100644 --- a/operator-framework/src/main/java/io/javaoperatorsdk/operator/Operator.java +++ b/operator-framework/src/main/java/io/javaoperatorsdk/operator/Operator.java @@ -15,6 +15,7 @@ import io.javaoperatorsdk.operator.processing.CustomResourceCache; import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager; import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEventSource; +import io.javaoperatorsdk.operator.processing.retry.Retry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,16 +39,16 @@ public Operator(KubernetesClient k8sClient) { public void registerControllerForAllNamespaces(ResourceController controller) throws OperatorException { - registerController(controller, true); + registerController(controller, true, null); } public void registerController(ResourceController controller, String... targetNamespaces) throws OperatorException { - registerController(controller, false, targetNamespaces); + registerController(controller, false, null, targetNamespaces); } @SuppressWarnings("rawtypes") private void registerController(ResourceController controller, - boolean watchAllNamespaces, String... targetNamespaces) throws OperatorException { + boolean watchAllNamespaces, Retry retry, String... targetNamespaces) throws OperatorException { Class resClass = getCustomResourceClass(controller); CustomResourceDefinitionContext crd = getCustomResourceDefinitionForController(controller); KubernetesDeserializer.registerCustomKind(crd.getVersion(), crd.getKind(), resClass); @@ -58,8 +59,8 @@ private void registerController(ResourceController CustomResourceCache customResourceCache = new CustomResourceCache(); - DefaultEventHandler defaultEventHandler = new DefaultEventHandler(customResourceCache, eventDispatcher, controller.getClass().getName()); - DefaultEventSourceManager eventSourceManager = new DefaultEventSourceManager(defaultEventHandler); + DefaultEventHandler defaultEventHandler = new DefaultEventHandler(customResourceCache, eventDispatcher, controller.getClass().getName(), retry); + DefaultEventSourceManager eventSourceManager = new DefaultEventSourceManager(defaultEventHandler, retry != null); defaultEventHandler.setDefaultEventSourceManager(eventSourceManager); eventDispatcher.setEventSourceManager(eventSourceManager); diff --git a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java index fb225aae65..8b605c8a54 100644 --- a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java +++ b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java @@ -5,16 +5,15 @@ import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager; import io.javaoperatorsdk.operator.processing.event.Event; import io.javaoperatorsdk.operator.processing.event.EventHandler; +import io.javaoperatorsdk.operator.processing.retry.Retry; +import io.javaoperatorsdk.operator.processing.retry.RetryExecution; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashSet; -import java.util.Optional; -import java.util.Set; +import java.util.*; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.locks.ReentrantLock; -import java.util.function.Predicate; import static io.javaoperatorsdk.operator.EventListUtils.containsCustomResourceDeletedEvent; import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getUID; @@ -34,13 +33,17 @@ public class DefaultEventHandler implements EventHandler { private final Set underProcessing = new HashSet<>(); private final ScheduledThreadPoolExecutor executor; private final EventDispatcher eventDispatcher; + private final Retry retry; + private final Map retryState = new HashMap<>(); private DefaultEventSourceManager defaultEventSourceManager; private final ReentrantLock lock = new ReentrantLock(); - public DefaultEventHandler(CustomResourceCache customResourceCache, EventDispatcher eventDispatcher, String relatedControllerName) { + public DefaultEventHandler(CustomResourceCache customResourceCache, EventDispatcher eventDispatcher, String relatedControllerName, + Retry retry) { this.customResourceCache = customResourceCache; this.eventDispatcher = eventDispatcher; + this.retry = retry; eventBuffer = new EventBuffer(); executor = new ScheduledThreadPoolExecutor(5, new ThreadFactory() { @Override @@ -90,6 +93,13 @@ void eventProcessingFinished(ExecutionScope executionScope, PostExecutionControl lock.lock(); log.debug("Event processing finished. Scope: {}", executionScope); unsetUnderExecution(executionScope.getCustomResourceUid()); + + if (retry != null && postExecutionControl.exceptionDuringExecution()) { + handleRetryOnException(executionScope, postExecutionControl); + } else if (retry != null) { + handleSuccessfulExecutionRegardingRetry(executionScope); + } + if (containsCustomResourceDeletedEvent(executionScope.getEvents())) { cleanupAfterDeletedEvent(executionScope.getCustomResourceUid()); } else { @@ -101,16 +111,49 @@ void eventProcessingFinished(ExecutionScope executionScope, PostExecutionControl } } + /** + * Regarding the events there are 2 approaches we can take. Either retry always when there are new events (received meanwhile retry + * is in place or already in buffer) instantly or always wait according to the retry timing if there was an exception. + */ + private void handleRetryOnException(ExecutionScope executionScope, PostExecutionControl postExecutionControl) { + RetryExecution execution = getOrInitRetryExecution(executionScope); + boolean newEventsExists = eventBuffer.newEventsExists(executionScope.getCustomResourceUid()); + eventBuffer.putBackEvents(executionScope.getCustomResourceUid(), executionScope.getEvents()); + + Optional nextDelay = execution.nextDelay(); + if (newEventsExists) { + executeBufferedEvents(executionScope.getCustomResourceUid()); + return; + } + nextDelay.ifPresent(delay -> + defaultEventSourceManager.getRetryTimerEventSource() + .scheduleOnce(executionScope.getCustomResource(), delay)); + } + + private void handleSuccessfulExecutionRegardingRetry(ExecutionScope executionScope) { + retryState.remove(executionScope.getCustomResourceUid()); + defaultEventSourceManager.getRetryTimerEventSource().cancelOnceSchedule(executionScope.getCustomResourceUid()); + } + + private RetryExecution getOrInitRetryExecution(ExecutionScope executionScope) { + RetryExecution retryExecution = retryState.get(executionScope.getCustomResourceUid()); + if (retryExecution == null) { + retryExecution = retry.initExecution(); + retryState.put(executionScope.getCustomResourceUid(), retryExecution); + } + return retryExecution; + } + /** * Here we try to cache the latest resource after an update. The goal is to solve a concurrency issue we've seen: * If an execution is finished, where we updated a custom resource, but there are other events already buffered for next * execution, we might not get the newest custom resource from CustomResource event source in time. Thus we execute * the next batch of events but with a non up to date CR. Here we cache the latest CustomResource from the update * execution so we make sure its already used in the up-coming execution. - * + *

* Note that this is an improvement, not a bug fix. This situation can happen naturally, we just make the execution more * efficient, and avoid questions about conflicts. - * + *

* Note that without the conditional locking in the cache, there is a very minor chance that we would override an * additional change coming from a different client. */ diff --git a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/EventBuffer.java b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/EventBuffer.java index a7be15a934..0f51a9d979 100644 --- a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/EventBuffer.java +++ b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/EventBuffer.java @@ -15,6 +15,14 @@ public void addEvent(Event event) { crEvents.add(event); } + public boolean newEventsExists(String resourceId) { + return !events.get(resourceId).isEmpty(); + } + + public void putBackEvents(String resourceUid, List oldEvents) { + events.get(resourceUid).addAll(0, oldEvents); + } + public boolean containsEvents(String customResourceId) { return events.get(customResourceId) != null; } diff --git a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/EventDispatcher.java b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/EventDispatcher.java index f73777df60..f6f3b4694c 100644 --- a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/EventDispatcher.java +++ b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/EventDispatcher.java @@ -5,16 +5,12 @@ import io.fabric8.kubernetes.client.dsl.MixedOperation; import io.fabric8.kubernetes.client.dsl.Resource; import io.javaoperatorsdk.operator.api.*; -import io.javaoperatorsdk.operator.processing.event.Event; import io.javaoperatorsdk.operator.processing.event.EventList; import io.javaoperatorsdk.operator.processing.event.EventSourceManager; -import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import static io.javaoperatorsdk.operator.EventListUtils.containsCustomResourceDeletedEvent; import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.*; @@ -43,16 +39,16 @@ public void setEventSourceManager(EventSourceManager eventSourceManager) { this.eventSourceManager = eventSourceManager; } - public PostExecutionControl handleEvent(ExecutionScope event) { + public PostExecutionControl handleExecution(ExecutionScope executionScope) { try { - return handDispatch(event); + return handleDispatch(executionScope); } catch (RuntimeException e) { - log.error("Error during event processing {} failed.", event, e); - return PostExecutionControl.defaultDispatch(); + log.error("Error during event processing {} failed.", executionScope, e); + return PostExecutionControl.exceptionDuringExecution(e); } } - private PostExecutionControl handDispatch(ExecutionScope executionScope) { + private PostExecutionControl handleDispatch(ExecutionScope executionScope) { CustomResource resource = executionScope.getCustomResource(); log.debug("Handling events: {} for resource {}", executionScope.getEvents(), resource.getMetadata()); diff --git a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/ExecutionConsumer.java b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/ExecutionConsumer.java index 58c5767432..218036b9c6 100644 --- a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/ExecutionConsumer.java +++ b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/ExecutionConsumer.java @@ -19,7 +19,7 @@ class ExecutionConsumer implements Runnable { @Override public void run() { - PostExecutionControl postExecutionControl = eventDispatcher.handleEvent(executionScope); + PostExecutionControl postExecutionControl = eventDispatcher.handleExecution(executionScope); defaultEventHandler.eventProcessingFinished(executionScope, postExecutionControl); } diff --git a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/PostExecutionControl.java b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/PostExecutionControl.java index aa4dc6df93..07b163cf55 100644 --- a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/PostExecutionControl.java +++ b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/PostExecutionControl.java @@ -1,7 +1,6 @@ package io.javaoperatorsdk.operator.processing; import io.fabric8.kubernetes.client.CustomResource; -import io.javaoperatorsdk.operator.api.UpdateControl; import java.util.Optional; @@ -11,21 +10,28 @@ public final class PostExecutionControl { private final CustomResource updatedCustomResource; - private PostExecutionControl(boolean onlyFinalizerHandled, CustomResource updatedCustomResource) { + private final RuntimeException runtimeException; + + private PostExecutionControl(boolean onlyFinalizerHandled, CustomResource updatedCustomResource, RuntimeException runtimeException) { this.onlyFinalizerHandled = onlyFinalizerHandled; this.updatedCustomResource = updatedCustomResource; + this.runtimeException = runtimeException; } public static PostExecutionControl onlyFinalizerAdded() { - return new PostExecutionControl(true, null); + return new PostExecutionControl(true, null, null); } public static PostExecutionControl defaultDispatch() { - return new PostExecutionControl(false, null); + return new PostExecutionControl(false, null, null); } public static PostExecutionControl customResourceUpdated(CustomResource updatedCustomResource) { - return new PostExecutionControl(false, updatedCustomResource); + return new PostExecutionControl(false, updatedCustomResource, null); + } + + public static PostExecutionControl exceptionDuringExecution(RuntimeException exception) { + return new PostExecutionControl(false, null, exception); } public boolean isOnlyFinalizerHandled() { @@ -39,4 +45,12 @@ public Optional getUpdatedCustomResource() { public boolean customResourceUpdatedDuringExecution() { return updatedCustomResource != null; } + + public boolean exceptionDuringExecution() { + return runtimeException != null; + } + + public Optional getRuntimeException() { + return Optional.ofNullable(runtimeException); + } } diff --git a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java index b1babc7f70..a3b4c51379 100644 --- a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java +++ b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java @@ -2,6 +2,7 @@ import io.javaoperatorsdk.operator.processing.DefaultEventHandler; import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEventSource; +import io.javaoperatorsdk.operator.processing.event.internal.TimerEventSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -12,15 +13,22 @@ public class DefaultEventSourceManager implements EventSourceManager { + public static final String RETRY_TIMER_EVENT_SOURCE_NAME = "retry-timer-event-source"; private static final Logger log = LoggerFactory.getLogger(DefaultEventSourceManager.class); + private final ReentrantLock lock = new ReentrantLock(); private Map eventSources = new ConcurrentHashMap<>(); private CustomResourceEventSource customResourceEventSource; private DefaultEventHandler defaultEventHandler; + private TimerEventSource retryTimerEventSource; - public DefaultEventSourceManager(DefaultEventHandler defaultEventHandler) { + public DefaultEventSourceManager(DefaultEventHandler defaultEventHandler, boolean supportRetry) { this.defaultEventHandler = defaultEventHandler; + if (supportRetry) { + this.retryTimerEventSource = new TimerEventSource(); + registerEventSource(RETRY_TIMER_EVENT_SOURCE_NAME, retryTimerEventSource); + } } public void registerCustomResourceEventSource(CustomResourceEventSource customResourceEventSource) { @@ -36,7 +44,7 @@ public void registerEventSource(String name, T eventSour if (currentEventSource != null) { throw new IllegalStateException("Event source with name already registered. Event source name: " + name); } - eventSources.put(name,eventSource); + eventSources.put(name, eventSource); eventSource.setEventHandler(defaultEventHandler); } finally { lock.unlock(); @@ -44,7 +52,7 @@ public void registerEventSource(String name, T eventSour } @Override - public Optional deRegisterCustomResourceFromEventSource(String eventSourceName,String customResourceUid) { + public Optional deRegisterCustomResourceFromEventSource(String eventSourceName, String customResourceUid) { try { lock.lock(); EventSource eventSource = this.eventSources.get(eventSourceName); @@ -60,13 +68,17 @@ public Optional deRegisterCustomResourceFromEventSource(String even } } + public TimerEventSource getRetryTimerEventSource() { + return retryTimerEventSource; + } + @Override public Map getRegisteredEventSources() { return Collections.unmodifiableMap(eventSources); } public void cleanup(String customResourceUid) { - getRegisteredEventSources().keySet().forEach(k -> deRegisterCustomResourceFromEventSource(k,customResourceUid)); + getRegisteredEventSources().keySet().forEach(k -> deRegisterCustomResourceFromEventSource(k, customResourceUid)); eventSources.remove(customResourceUid); } diff --git a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/retry/GenericRetry.java b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/retry/GenericRetry.java index 31f65bf21e..b3fc6a2873 100644 --- a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/retry/GenericRetry.java +++ b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/retry/GenericRetry.java @@ -16,7 +16,7 @@ public static GenericRetry defaultLimitedExponentialRetry() { } public static GenericRetry noRetry() { - return new GenericRetry().setMaxAttempts(1); + return new GenericRetry().setMaxAttempts(0); } public static GenericRetry every10second10TimesRetry() { diff --git a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/retry/GenericRetryExecution.java b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/retry/GenericRetryExecution.java index c39ae93709..b206fbc960 100644 --- a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/retry/GenericRetryExecution.java +++ b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/retry/GenericRetryExecution.java @@ -15,14 +15,7 @@ public GenericRetryExecution(GenericRetry genericRetry) { this.currentInterval = genericRetry.getInitialInterval(); } - /** - * Note that first attempt is always 0. Since this implementation is tailored for event scheduling. - */ public Optional nextDelay() { - if (lastAttemptIndex == 0) { - lastAttemptIndex++; - return Optional.of(0L); - } if (genericRetry.getMaxAttempts() > -1 && lastAttemptIndex >= genericRetry.getMaxAttempts()) { return Optional.empty(); } diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/EventDispatcherTest.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/EventDispatcherTest.java index fba1240f38..abc1945ec5 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/EventDispatcherTest.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/EventDispatcherTest.java @@ -9,7 +9,6 @@ import io.javaoperatorsdk.operator.processing.ExecutionScope; import io.javaoperatorsdk.operator.processing.event.Event; import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEvent; -import io.javaoperatorsdk.operator.processing.event.internal.TimerEvent; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentMatchers; @@ -19,7 +18,6 @@ import java.util.Collections; import java.util.List; -import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getUID; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.*; @@ -46,7 +44,7 @@ void setup() { @Test void callCreateOrUpdateOnNewResource() { - eventDispatcher.handleEvent(executionScopeWithCREvent(Watcher.Action.ADDED, testCustomResource)); + eventDispatcher.handleExecution(executionScopeWithCREvent(Watcher.Action.ADDED, testCustomResource)); verify(controller, times(1)).createOrUpdateResource(ArgumentMatchers.eq(testCustomResource), any()); } @@ -55,7 +53,7 @@ void updatesOnlyStatusSubResource() { when(controller.createOrUpdateResource(eq(testCustomResource), any())) .thenReturn(UpdateControl.updateStatusSubResource(testCustomResource)); - eventDispatcher.handleEvent(executionScopeWithCREvent(Watcher.Action.ADDED, testCustomResource)); + eventDispatcher.handleExecution(executionScopeWithCREvent(Watcher.Action.ADDED, testCustomResource)); verify(customResourceFacade, times(1)).updateStatus(testCustomResource); verify(customResourceFacade, never()).replaceWithLock(any()); @@ -64,13 +62,13 @@ void updatesOnlyStatusSubResource() { @Test void callCreateOrUpdateOnModifiedResource() { - eventDispatcher.handleEvent(executionScopeWithCREvent(Watcher.Action.MODIFIED, testCustomResource)); + eventDispatcher.handleExecution(executionScopeWithCREvent(Watcher.Action.MODIFIED, testCustomResource)); verify(controller, times(1)).createOrUpdateResource(ArgumentMatchers.eq(testCustomResource), any()); } @Test void adsDefaultFinalizerOnCreateIfNotThere() { - eventDispatcher.handleEvent(executionScopeWithCREvent(Watcher.Action.MODIFIED, testCustomResource)); + eventDispatcher.handleExecution(executionScopeWithCREvent(Watcher.Action.MODIFIED, testCustomResource)); verify(controller, times(1)) .createOrUpdateResource(argThat(testCustomResource -> testCustomResource.getMetadata().getFinalizers().contains(DEFAULT_FINALIZER)), any()); @@ -81,7 +79,7 @@ void callsDeleteIfObjectHasFinalizerAndMarkedForDelete() { testCustomResource.getMetadata().setDeletionTimestamp("2019-8-10"); testCustomResource.getMetadata().getFinalizers().add(DEFAULT_FINALIZER); - eventDispatcher.handleEvent(executionScopeWithCREvent(Watcher.Action.MODIFIED, testCustomResource)); + eventDispatcher.handleExecution(executionScopeWithCREvent(Watcher.Action.MODIFIED, testCustomResource)); verify(controller, times(1)).deleteResource(eq(testCustomResource), any()); } @@ -93,7 +91,7 @@ void callsDeleteIfObjectHasFinalizerAndMarkedForDelete() { void callDeleteOnControllerIfMarkedForDeletionButThereIsNoDefaultFinalizer() { markForDeletion(testCustomResource); - eventDispatcher.handleEvent(executionScopeWithCREvent(Watcher.Action.MODIFIED, testCustomResource)); + eventDispatcher.handleExecution(executionScopeWithCREvent(Watcher.Action.MODIFIED, testCustomResource)); verify(controller).deleteResource(eq(testCustomResource), any()); } @@ -102,7 +100,7 @@ void callDeleteOnControllerIfMarkedForDeletionButThereIsNoDefaultFinalizer() { void removesDefaultFinalizerOnDelete() { markForDeletion(testCustomResource); - eventDispatcher.handleEvent(executionScopeWithCREvent(Watcher.Action.MODIFIED, testCustomResource)); + eventDispatcher.handleExecution(executionScopeWithCREvent(Watcher.Action.MODIFIED, testCustomResource)); assertEquals(0, testCustomResource.getMetadata().getFinalizers().size()); verify(customResourceFacade, times(1)).replaceWithLock(any()); @@ -113,7 +111,7 @@ void doesNotRemovesTheFinalizerIfTheDeleteNotMethodInstructsIt() { when(controller.deleteResource(eq(testCustomResource), any())).thenReturn(DeleteControl.NO_FINALIZER_REMOVAL); markForDeletion(testCustomResource); - eventDispatcher.handleEvent(executionScopeWithCREvent(Watcher.Action.MODIFIED, testCustomResource)); + eventDispatcher.handleExecution(executionScopeWithCREvent(Watcher.Action.MODIFIED, testCustomResource)); assertEquals(1, testCustomResource.getMetadata().getFinalizers().size()); verify(customResourceFacade, never()).replaceWithLock(any()); @@ -123,7 +121,7 @@ void doesNotRemovesTheFinalizerIfTheDeleteNotMethodInstructsIt() { void doesNotUpdateTheResourceIfNoUpdateUpdateControl() { when(controller.createOrUpdateResource(eq(testCustomResource), any())).thenReturn(UpdateControl.noUpdate()); - eventDispatcher.handleEvent(executionScopeWithCREvent(Watcher.Action.MODIFIED, testCustomResource)); + eventDispatcher.handleExecution(executionScopeWithCREvent(Watcher.Action.MODIFIED, testCustomResource)); verify(customResourceFacade, never()).replaceWithLock(any()); verify(customResourceFacade, never()).updateStatus(testCustomResource); } @@ -133,7 +131,7 @@ void addsFinalizerIfNotMarkedForDeletionAndEmptyCustomResourceReturned() { removeFinalizers(testCustomResource); when(controller.createOrUpdateResource(eq(testCustomResource), any())).thenReturn(UpdateControl.noUpdate()); - eventDispatcher.handleEvent(executionScopeWithCREvent(Watcher.Action.MODIFIED, testCustomResource)); + eventDispatcher.handleExecution(executionScopeWithCREvent(Watcher.Action.MODIFIED, testCustomResource)); assertEquals(1, testCustomResource.getMetadata().getFinalizers().size()); verify(customResourceFacade, times(1)).replaceWithLock(any()); @@ -144,7 +142,7 @@ void doesNotCallDeleteIfMarkedForDeletionButNotOurFinalizer() { removeFinalizers(testCustomResource); markForDeletion(testCustomResource); - eventDispatcher.handleEvent(executionScopeWithCREvent(Watcher.Action.MODIFIED, testCustomResource)); + eventDispatcher.handleExecution(executionScopeWithCREvent(Watcher.Action.MODIFIED, testCustomResource)); verify(customResourceFacade, never()).replaceWithLock(any()); verify(controller, never()).deleteResource(eq(testCustomResource), any()); @@ -152,8 +150,8 @@ void doesNotCallDeleteIfMarkedForDeletionButNotOurFinalizer() { @Test void executeControllerRegardlessGenerationInNonGenerationAwareMode() { - eventDispatcher.handleEvent(executionScopeWithCREvent(Watcher.Action.MODIFIED, testCustomResource)); - eventDispatcher.handleEvent(executionScopeWithCREvent(Watcher.Action.MODIFIED, testCustomResource)); + eventDispatcher.handleExecution(executionScopeWithCREvent(Watcher.Action.MODIFIED, testCustomResource)); + eventDispatcher.handleExecution(executionScopeWithCREvent(Watcher.Action.MODIFIED, testCustomResource)); verify(controller, times(2)).createOrUpdateResource(eq(testCustomResource), any()); } diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java index 7f8d9e9291..afc4d6093e 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java @@ -24,7 +24,7 @@ class DefaultEventHandlerTest { public static final int SEPARATE_EXECUTION_TIMEOUT = 450; private EventDispatcher eventDispatcherMock = mock(EventDispatcher.class); private CustomResourceCache customResourceCache = new CustomResourceCache(); - private DefaultEventHandler defaultEventHandler = new DefaultEventHandler(customResourceCache, eventDispatcherMock, "Test"); + private DefaultEventHandler defaultEventHandler = new DefaultEventHandler(customResourceCache, eventDispatcherMock, "Test", null); private DefaultEventSourceManager defaultEventSourceManagerMock = mock(DefaultEventSourceManager.class); @BeforeEach @@ -36,7 +36,7 @@ public void setup() { public void dispatchesEventsIfNoExecutionInProgress() { defaultEventHandler.handleEvent(prepareCREvent()); - verify(eventDispatcherMock, timeout(50).times(1)).handleEvent(any()); + verify(eventDispatcherMock, timeout(50).times(1)).handleExecution(any()); } @Test @@ -46,7 +46,7 @@ public void skipProcessingIfLatestCustomResourceNotInCache() { defaultEventHandler.handleEvent(event); - verify(eventDispatcherMock, timeout(50).times(0)).handleEvent(any()); + verify(eventDispatcherMock, timeout(50).times(0)).handleExecution(any()); } @Test @@ -55,7 +55,7 @@ public void ifExecutionInProgressWaitsUntilItsFinished() throws InterruptedExcep defaultEventHandler.handleEvent(nonCREvent(resourceUid)); - verify(eventDispatcherMock, timeout(SEPARATE_EXECUTION_TIMEOUT).times(1)).handleEvent(any()); + verify(eventDispatcherMock, timeout(SEPARATE_EXECUTION_TIMEOUT).times(1)).handleExecution(any()); } @Test @@ -66,7 +66,7 @@ public void buffersAllIncomingEventsWhileControllerInExecution() { defaultEventHandler.handleEvent(prepareCREvent(resourceUid)); ArgumentCaptor captor = ArgumentCaptor.forClass(ExecutionScope.class); - verify(eventDispatcherMock, timeout(SEPARATE_EXECUTION_TIMEOUT).times(2)).handleEvent(captor.capture()); + verify(eventDispatcherMock, timeout(SEPARATE_EXECUTION_TIMEOUT).times(2)).handleExecution(captor.capture()); List events = captor.getAllValues().get(1).getEvents(); assertThat(events).hasSize(2); assertThat(events.get(0)).isInstanceOf(TimerEvent.class); @@ -97,7 +97,7 @@ private void waitMinimalTime() { } private String eventAlreadyUnderProcessing() { - when(eventDispatcherMock.handleEvent(any())).then((Answer) invocationOnMock -> { + when(eventDispatcherMock.handleExecution(any())).then((Answer) invocationOnMock -> { Thread.sleep(FAKE_CONTROLLER_EXECUTION_DURATION); return PostExecutionControl.defaultDispatch(); }); diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManagerTest.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManagerTest.java index 0a9074c5d0..5b6313c77f 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManagerTest.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManagerTest.java @@ -18,7 +18,7 @@ class DefaultEventSourceManagerTest { public static final String CUSTOM_EVENT_SOURCE_NAME = "CustomEventSource"; private DefaultEventHandler defaultEventHandlerMock = mock(DefaultEventHandler.class); - private DefaultEventSourceManager defaultEventSourceManager = new DefaultEventSourceManager(defaultEventHandlerMock); + private DefaultEventSourceManager defaultEventSourceManager = new DefaultEventSourceManager(defaultEventHandlerMock, false); @Test public void registersEventSource() { diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/retry/GenericRetryExecutionTest.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/retry/GenericRetryExecutionTest.java index ffc5238998..f7df735677 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/retry/GenericRetryExecutionTest.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/retry/GenericRetryExecutionTest.java @@ -4,33 +4,34 @@ import java.util.Optional; +import static io.javaoperatorsdk.operator.processing.retry.GenericRetry.DEFAULT_INITIAL_INTERVAL; import static org.assertj.core.api.Assertions.assertThat; public class GenericRetryExecutionTest { @Test - public void forFirstBackOffAlwaysReturnsZero() { - assertThat(getDefaultRetryExecution().nextDelay().get()).isEqualTo(0); + public void forFirstBackOffAlwaysReturnsInitialInterval() { + assertThat(getDefaultRetryExecution().nextDelay().get()).isEqualTo(DEFAULT_INITIAL_INTERVAL); } @Test public void delayIsMultipliedEveryNextDelayCall() { RetryExecution retryExecution = getDefaultRetryExecution(); - Optional res = callNextDelayNTimes(retryExecution, 2); - assertThat(res.get()).isEqualTo(GenericRetry.DEFAULT_INITIAL_INTERVAL); + Optional res = callNextDelayNTimes(retryExecution, 1); + assertThat(res.get()).isEqualTo(DEFAULT_INITIAL_INTERVAL); res = retryExecution.nextDelay(); - assertThat(res.get()).isEqualTo((long) (GenericRetry.DEFAULT_INITIAL_INTERVAL * GenericRetry.DEFAULT_MULTIPLIER)); + assertThat(res.get()).isEqualTo((long) (DEFAULT_INITIAL_INTERVAL * GenericRetry.DEFAULT_MULTIPLIER)); res = retryExecution.nextDelay(); - assertThat(res.get()).isEqualTo((long) (GenericRetry.DEFAULT_INITIAL_INTERVAL * GenericRetry.DEFAULT_MULTIPLIER * GenericRetry.DEFAULT_MULTIPLIER)); + assertThat(res.get()).isEqualTo((long) (DEFAULT_INITIAL_INTERVAL * GenericRetry.DEFAULT_MULTIPLIER * GenericRetry.DEFAULT_MULTIPLIER)); } @Test public void noNextDelayIfMaxAttemptLimitReached() { RetryExecution retryExecution = GenericRetry.defaultLimitedExponentialRetry().setMaxAttempts(3).initExecution(); - Optional res = callNextDelayNTimes(retryExecution, 3); + Optional res = callNextDelayNTimes(retryExecution, 2); assertThat(res).isNotEmpty(); res = retryExecution.nextDelay(); @@ -53,7 +54,6 @@ public void canLimitMaxIntervalLength() { @Test public void supportsNoRetry() { RetryExecution retryExecution = GenericRetry.noRetry().initExecution(); - assertThat(retryExecution.nextDelay().get()).isZero(); assertThat(retryExecution.nextDelay()).isEmpty(); } @@ -72,7 +72,7 @@ private RetryExecution getDefaultRetryExecution() { } public Optional callNextDelayNTimes(RetryExecution retryExecution, int n) { - for (int i = 0; i < n - 1; i++) { + for (int i = 0; i < n ; i++) { retryExecution.nextDelay(); } return retryExecution.nextDelay(); From 95450aa1f73f98a203a4df89999667163ae59f69 Mon Sep 17 00:00:00 2001 From: csviri Date: Wed, 9 Dec 2020 09:56:55 +0100 Subject: [PATCH 02/10] merged master, google formatter --- .../io/javaoperatorsdk/operator/Operator.java | 64 ++-- .../processing/DefaultEventHandler.java | 197 ++++++----- .../operator/processing/EventBuffer.java | 46 ++- .../operator/processing/EventDispatcher.java | 27 +- .../processing/ExecutionConsumer.java | 32 +- .../processing/PostExecutionControl.java | 58 +-- .../event/DefaultEventSourceManager.java | 116 +++--- .../processing/retry/GenericRetry.java | 153 ++++---- .../retry/GenericRetryExecution.java | 47 ++- .../operator/EventDispatcherTest.java | 333 ++++++++++-------- .../processing/DefaultEventHandlerTest.java | 63 ++-- .../event/DefaultEventSourceManagerTest.java | 99 +++--- .../retry/GenericRetryExecutionTest.java | 150 ++++---- 13 files changed, 719 insertions(+), 666 deletions(-) diff --git a/operator-framework/src/main/java/io/javaoperatorsdk/operator/Operator.java b/operator-framework/src/main/java/io/javaoperatorsdk/operator/Operator.java index dcc80fc2b4..03ec95f5e9 100644 --- a/operator-framework/src/main/java/io/javaoperatorsdk/operator/Operator.java +++ b/operator-framework/src/main/java/io/javaoperatorsdk/operator/Operator.java @@ -18,9 +18,6 @@ import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager; import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEventSource; import io.javaoperatorsdk.operator.processing.retry.Retry; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.Arrays; import java.util.HashMap; import java.util.Map; @@ -39,32 +36,45 @@ public Operator(KubernetesClient k8sClient) { this.k8sClient = k8sClient; } + public void registerControllerForAllNamespaces( + ResourceController controller) throws OperatorException { + registerController(controller, true, null); + } - public void registerControllerForAllNamespaces(ResourceController controller) throws OperatorException { - registerController(controller, true, null); - } - - public void registerController(ResourceController controller, String... targetNamespaces) throws OperatorException { - registerController(controller, false, null, targetNamespaces); - } + public void registerController( + ResourceController controller, String... targetNamespaces) throws OperatorException { + registerController(controller, false, null, targetNamespaces); + } - @SuppressWarnings("rawtypes") - private void registerController(ResourceController controller, - boolean watchAllNamespaces, Retry retry, String... targetNamespaces) throws OperatorException { - Class resClass = getCustomResourceClass(controller); - CustomResourceDefinitionContext crd = getCustomResourceDefinitionForController(controller); - KubernetesDeserializer.registerCustomKind(crd.getVersion(), crd.getKind(), resClass); - String finalizer = ControllerUtils.getFinalizer(controller); - MixedOperation client = k8sClient.customResources(crd, resClass, CustomResourceList.class, ControllerUtils.getCustomResourceDoneableClass(controller)); - EventDispatcher eventDispatcher = new EventDispatcher(controller, - finalizer, new EventDispatcher.CustomResourceFacade(client)); - - - CustomResourceCache customResourceCache = new CustomResourceCache(); - DefaultEventHandler defaultEventHandler = new DefaultEventHandler(customResourceCache, eventDispatcher, controller.getClass().getName(), retry); - DefaultEventSourceManager eventSourceManager = new DefaultEventSourceManager(defaultEventHandler, retry != null); - defaultEventHandler.setDefaultEventSourceManager(eventSourceManager); - eventDispatcher.setEventSourceManager(eventSourceManager); + @SuppressWarnings("rawtypes") + private void registerController( + ResourceController controller, + boolean watchAllNamespaces, + Retry retry, + String... targetNamespaces) + throws OperatorException { + Class resClass = getCustomResourceClass(controller); + CustomResourceDefinitionContext crd = getCustomResourceDefinitionForController(controller); + KubernetesDeserializer.registerCustomKind(crd.getVersion(), crd.getKind(), resClass); + String finalizer = ControllerUtils.getFinalizer(controller); + MixedOperation client = + k8sClient.customResources( + crd, + resClass, + CustomResourceList.class, + ControllerUtils.getCustomResourceDoneableClass(controller)); + EventDispatcher eventDispatcher = + new EventDispatcher( + controller, finalizer, new EventDispatcher.CustomResourceFacade(client)); + + CustomResourceCache customResourceCache = new CustomResourceCache(); + DefaultEventHandler defaultEventHandler = + new DefaultEventHandler( + customResourceCache, eventDispatcher, controller.getClass().getName(), retry); + DefaultEventSourceManager eventSourceManager = + new DefaultEventSourceManager(defaultEventHandler, retry != null); + defaultEventHandler.setDefaultEventSourceManager(eventSourceManager); + eventDispatcher.setEventSourceManager(eventSourceManager); customResourceClients.put(resClass, (CustomResourceOperationsImpl) client); diff --git a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java index 3bbffef700..1051f6c692 100644 --- a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java +++ b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java @@ -8,25 +8,18 @@ import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager; import io.javaoperatorsdk.operator.processing.event.Event; import io.javaoperatorsdk.operator.processing.event.EventHandler; -import java.util.HashSet; -import java.util.Optional; -import java.util.Set; import io.javaoperatorsdk.operator.processing.retry.Retry; import io.javaoperatorsdk.operator.processing.retry.RetryExecution; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.*; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.locks.ReentrantLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static io.javaoperatorsdk.operator.EventListUtils.containsCustomResourceDeletedEvent; -import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getUID; -import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getVersion; - /** * Event handler that makes sure that events are processed in a "single threaded" way per resource * UID, while buffering events which are received during an execution. @@ -35,26 +28,32 @@ public class DefaultEventHandler implements EventHandler { private static final Logger log = LoggerFactory.getLogger(DefaultEventHandler.class); - private final CustomResourceCache customResourceCache; - private final EventBuffer eventBuffer; - private final Set underProcessing = new HashSet<>(); - private final ScheduledThreadPoolExecutor executor; - private final EventDispatcher eventDispatcher; - private final Retry retry; - private final Map retryState = new HashMap<>(); - private DefaultEventSourceManager defaultEventSourceManager; + private final CustomResourceCache customResourceCache; + private final EventBuffer eventBuffer; + private final Set underProcessing = new HashSet<>(); + private final ScheduledThreadPoolExecutor executor; + private final EventDispatcher eventDispatcher; + private final Retry retry; + private final Map retryState = new HashMap<>(); + private DefaultEventSourceManager defaultEventSourceManager; private final ReentrantLock lock = new ReentrantLock(); - public DefaultEventHandler(CustomResourceCache customResourceCache, EventDispatcher eventDispatcher, String relatedControllerName, - Retry retry) { - this.customResourceCache = customResourceCache; - this.eventDispatcher = eventDispatcher; - this.retry = retry; - eventBuffer = new EventBuffer(); - executor = new ScheduledThreadPoolExecutor(5, new ThreadFactory() { - @Override - public Thread newThread(Runnable runnable) { + public DefaultEventHandler( + CustomResourceCache customResourceCache, + EventDispatcher eventDispatcher, + String relatedControllerName, + Retry retry) { + this.customResourceCache = customResourceCache; + this.eventDispatcher = eventDispatcher; + this.retry = retry; + eventBuffer = new EventBuffer(); + executor = + new ScheduledThreadPoolExecutor( + 5, + new ThreadFactory() { + @Override + public Thread newThread(Runnable runnable) { return new Thread(runnable, "EventHandler-" + relatedControllerName); } }); @@ -101,80 +100,90 @@ private void executeBufferedEvents(String customResourceUid) { } } - void eventProcessingFinished(ExecutionScope executionScope, PostExecutionControl postExecutionControl) { - try { - lock.lock(); - log.debug("Event processing finished. Scope: {}", executionScope); - unsetUnderExecution(executionScope.getCustomResourceUid()); - - if (retry != null && postExecutionControl.exceptionDuringExecution()) { - handleRetryOnException(executionScope, postExecutionControl); - } else if (retry != null) { - handleSuccessfulExecutionRegardingRetry(executionScope); - } - - if (containsCustomResourceDeletedEvent(executionScope.getEvents())) { - cleanupAfterDeletedEvent(executionScope.getCustomResourceUid()); - } else { - cacheUpdatedResourceIfChanged(executionScope, postExecutionControl); - executeBufferedEvents(executionScope.getCustomResourceUid()); - } - } finally { - lock.unlock(); - } + void eventProcessingFinished( + ExecutionScope executionScope, PostExecutionControl postExecutionControl) { + try { + lock.lock(); + log.debug("Event processing finished. Scope: {}", executionScope); + unsetUnderExecution(executionScope.getCustomResourceUid()); + + if (retry != null && postExecutionControl.exceptionDuringExecution()) { + handleRetryOnException(executionScope, postExecutionControl); + } else if (retry != null) { + handleSuccessfulExecutionRegardingRetry(executionScope); + } + + if (containsCustomResourceDeletedEvent(executionScope.getEvents())) { + cleanupAfterDeletedEvent(executionScope.getCustomResourceUid()); + } else { + cacheUpdatedResourceIfChanged(executionScope, postExecutionControl); + executeBufferedEvents(executionScope.getCustomResourceUid()); + } + } finally { + lock.unlock(); } + } - /** - * Regarding the events there are 2 approaches we can take. Either retry always when there are new events (received meanwhile retry - * is in place or already in buffer) instantly or always wait according to the retry timing if there was an exception. - */ - private void handleRetryOnException(ExecutionScope executionScope, PostExecutionControl postExecutionControl) { - RetryExecution execution = getOrInitRetryExecution(executionScope); - boolean newEventsExists = eventBuffer.newEventsExists(executionScope.getCustomResourceUid()); - eventBuffer.putBackEvents(executionScope.getCustomResourceUid(), executionScope.getEvents()); - - Optional nextDelay = execution.nextDelay(); - if (newEventsExists) { - executeBufferedEvents(executionScope.getCustomResourceUid()); - return; - } - nextDelay.ifPresent(delay -> - defaultEventSourceManager.getRetryTimerEventSource() - .scheduleOnce(executionScope.getCustomResource(), delay)); + /** + * Regarding the events there are 2 approaches we can take. Either retry always when there are new + * events (received meanwhile retry is in place or already in buffer) instantly or always wait + * according to the retry timing if there was an exception. + */ + private void handleRetryOnException( + ExecutionScope executionScope, PostExecutionControl postExecutionControl) { + RetryExecution execution = getOrInitRetryExecution(executionScope); + boolean newEventsExists = eventBuffer.newEventsExists(executionScope.getCustomResourceUid()); + eventBuffer.putBackEvents(executionScope.getCustomResourceUid(), executionScope.getEvents()); + + Optional nextDelay = execution.nextDelay(); + if (newEventsExists) { + executeBufferedEvents(executionScope.getCustomResourceUid()); + return; } + nextDelay.ifPresent( + delay -> + defaultEventSourceManager + .getRetryTimerEventSource() + .scheduleOnce(executionScope.getCustomResource(), delay)); + } - private void handleSuccessfulExecutionRegardingRetry(ExecutionScope executionScope) { - retryState.remove(executionScope.getCustomResourceUid()); - defaultEventSourceManager.getRetryTimerEventSource().cancelOnceSchedule(executionScope.getCustomResourceUid()); - } + private void handleSuccessfulExecutionRegardingRetry(ExecutionScope executionScope) { + retryState.remove(executionScope.getCustomResourceUid()); + defaultEventSourceManager + .getRetryTimerEventSource() + .cancelOnceSchedule(executionScope.getCustomResourceUid()); + } - private RetryExecution getOrInitRetryExecution(ExecutionScope executionScope) { - RetryExecution retryExecution = retryState.get(executionScope.getCustomResourceUid()); - if (retryExecution == null) { - retryExecution = retry.initExecution(); - retryState.put(executionScope.getCustomResourceUid(), retryExecution); - } - return retryExecution; + private RetryExecution getOrInitRetryExecution(ExecutionScope executionScope) { + RetryExecution retryExecution = retryState.get(executionScope.getCustomResourceUid()); + if (retryExecution == null) { + retryExecution = retry.initExecution(); + retryState.put(executionScope.getCustomResourceUid(), retryExecution); } + return retryExecution; + } - /** - * Here we try to cache the latest resource after an update. The goal is to solve a concurrency issue we've seen: - * If an execution is finished, where we updated a custom resource, but there are other events already buffered for next - * execution, we might not get the newest custom resource from CustomResource event source in time. Thus we execute - * the next batch of events but with a non up to date CR. Here we cache the latest CustomResource from the update - * execution so we make sure its already used in the up-coming execution. - *

- * Note that this is an improvement, not a bug fix. This situation can happen naturally, we just make the execution more - * efficient, and avoid questions about conflicts. - *

- * Note that without the conditional locking in the cache, there is a very minor chance that we would override an - * additional change coming from a different client. - */ - private void cacheUpdatedResourceIfChanged(ExecutionScope executionScope, PostExecutionControl postExecutionControl) { - if (postExecutionControl.customResourceUpdatedDuringExecution()) { - CustomResource originalCustomResource = executionScope.getCustomResource(); - CustomResource customResourceAfterExecution = postExecutionControl.getUpdatedCustomResource().get(); - String originalResourceVersion = getVersion(originalCustomResource); + /** + * Here we try to cache the latest resource after an update. The goal is to solve a concurrency + * issue we've seen: If an execution is finished, where we updated a custom resource, but there + * are other events already buffered for next execution, we might not get the newest custom + * resource from CustomResource event source in time. Thus we execute the next batch of events but + * with a non up to date CR. Here we cache the latest CustomResource from the update execution so + * we make sure its already used in the up-coming execution. + * + *

Note that this is an improvement, not a bug fix. This situation can happen naturally, we + * just make the execution more efficient, and avoid questions about conflicts. + * + *

Note that without the conditional locking in the cache, there is a very minor chance that we + * would override an additional change coming from a different client. + */ + private void cacheUpdatedResourceIfChanged( + ExecutionScope executionScope, PostExecutionControl postExecutionControl) { + if (postExecutionControl.customResourceUpdatedDuringExecution()) { + CustomResource originalCustomResource = executionScope.getCustomResource(); + CustomResource customResourceAfterExecution = + postExecutionControl.getUpdatedCustomResource().get(); + String originalResourceVersion = getVersion(originalCustomResource); log.debug( "Trying to update resource cache from update response for resource uid: {} new version: {} old version: {}", diff --git a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/EventBuffer.java b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/EventBuffer.java index 0f51a9d979..914a08000b 100644 --- a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/EventBuffer.java +++ b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/EventBuffer.java @@ -1,38 +1,36 @@ package io.javaoperatorsdk.operator.processing; import io.javaoperatorsdk.operator.processing.event.Event; -import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEvent; - import java.util.*; class EventBuffer { - private final Map> events = new HashMap<>(); + private final Map> events = new HashMap<>(); - public void addEvent(Event event) { - String uid = event.getRelatedCustomResourceUid(); - List crEvents = events.computeIfAbsent(uid, (id) -> new ArrayList<>(1)); - crEvents.add(event); - } + public void addEvent(Event event) { + String uid = event.getRelatedCustomResourceUid(); + List crEvents = events.computeIfAbsent(uid, (id) -> new ArrayList<>(1)); + crEvents.add(event); + } - public boolean newEventsExists(String resourceId) { - return !events.get(resourceId).isEmpty(); - } + public boolean newEventsExists(String resourceId) { + return !events.get(resourceId).isEmpty(); + } - public void putBackEvents(String resourceUid, List oldEvents) { - events.get(resourceUid).addAll(0, oldEvents); - } + public void putBackEvents(String resourceUid, List oldEvents) { + events.get(resourceUid).addAll(0, oldEvents); + } - public boolean containsEvents(String customResourceId) { - return events.get(customResourceId) != null; - } + public boolean containsEvents(String customResourceId) { + return events.get(customResourceId) != null; + } - public List getAndRemoveEventsForExecution(String resourceUid) { - List crEvents = events.remove(resourceUid); - return crEvents == null ? Collections.emptyList() : crEvents; - } + public List getAndRemoveEventsForExecution(String resourceUid) { + List crEvents = events.remove(resourceUid); + return crEvents == null ? Collections.emptyList() : crEvents; + } - public void cleanup(String resourceUid) { - events.remove(resourceUid); - } + public void cleanup(String resourceUid) { + events.remove(resourceUid); + } } diff --git a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/EventDispatcher.java b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/EventDispatcher.java index 9722e6122b..07f432b769 100644 --- a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/EventDispatcher.java +++ b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/EventDispatcher.java @@ -10,14 +10,10 @@ import io.javaoperatorsdk.operator.api.*; import io.javaoperatorsdk.operator.processing.event.EventList; import io.javaoperatorsdk.operator.processing.event.EventSourceManager; +import java.util.ArrayList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; - -import static io.javaoperatorsdk.operator.EventListUtils.containsCustomResourceDeletedEvent; -import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.*; - /** * Dispatches events to the Controller and handles Finalizers for a single type of Custom Resource. */ @@ -41,18 +37,19 @@ public void setEventSourceManager(EventSourceManager eventSourceManager) { this.eventSourceManager = eventSourceManager; } - public PostExecutionControl handleExecution(ExecutionScope executionScope) { - try { - return handleDispatch(executionScope); - } catch (RuntimeException e) { - log.error("Error during event processing {} failed.", executionScope, e); - return PostExecutionControl.exceptionDuringExecution(e); - } + public PostExecutionControl handleExecution(ExecutionScope executionScope) { + try { + return handleDispatch(executionScope); + } catch (RuntimeException e) { + log.error("Error during event processing {} failed.", executionScope, e); + return PostExecutionControl.exceptionDuringExecution(e); } + } - private PostExecutionControl handleDispatch(ExecutionScope executionScope) { - CustomResource resource = executionScope.getCustomResource(); - log.debug("Handling events: {} for resource {}", executionScope.getEvents(), resource.getMetadata()); + private PostExecutionControl handleDispatch(ExecutionScope executionScope) { + CustomResource resource = executionScope.getCustomResource(); + log.debug( + "Handling events: {} for resource {}", executionScope.getEvents(), resource.getMetadata()); if (containsCustomResourceDeletedEvent(executionScope.getEvents())) { log.debug( diff --git a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/ExecutionConsumer.java b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/ExecutionConsumer.java index 218036b9c6..2b66ad68d4 100644 --- a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/ExecutionConsumer.java +++ b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/ExecutionConsumer.java @@ -4,23 +4,25 @@ import org.slf4j.LoggerFactory; class ExecutionConsumer implements Runnable { - - private final static Logger log = LoggerFactory.getLogger(ExecutionConsumer.class); - private final ExecutionScope executionScope; - private final EventDispatcher eventDispatcher; - private final DefaultEventHandler defaultEventHandler; + private static final Logger log = LoggerFactory.getLogger(ExecutionConsumer.class); - ExecutionConsumer(ExecutionScope executionScope, EventDispatcher eventDispatcher, DefaultEventHandler defaultEventHandler) { - this.executionScope = executionScope; - this.eventDispatcher = eventDispatcher; - this.defaultEventHandler = defaultEventHandler; - } + private final ExecutionScope executionScope; + private final EventDispatcher eventDispatcher; + private final DefaultEventHandler defaultEventHandler; - @Override - public void run() { - PostExecutionControl postExecutionControl = eventDispatcher.handleExecution(executionScope); - defaultEventHandler.eventProcessingFinished(executionScope, postExecutionControl); - } + ExecutionConsumer( + ExecutionScope executionScope, + EventDispatcher eventDispatcher, + DefaultEventHandler defaultEventHandler) { + this.executionScope = executionScope; + this.eventDispatcher = eventDispatcher; + this.defaultEventHandler = defaultEventHandler; + } + @Override + public void run() { + PostExecutionControl postExecutionControl = eventDispatcher.handleExecution(executionScope); + defaultEventHandler.eventProcessingFinished(executionScope, postExecutionControl); + } } diff --git a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/PostExecutionControl.java b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/PostExecutionControl.java index 568b3dc766..072136e046 100644 --- a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/PostExecutionControl.java +++ b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/PostExecutionControl.java @@ -1,7 +1,6 @@ package io.javaoperatorsdk.operator.processing; import io.fabric8.kubernetes.client.CustomResource; - import java.util.Optional; public final class PostExecutionControl { @@ -10,29 +9,32 @@ public final class PostExecutionControl { private final CustomResource updatedCustomResource; - private final RuntimeException runtimeException; + private final RuntimeException runtimeException; - private PostExecutionControl(boolean onlyFinalizerHandled, CustomResource updatedCustomResource, RuntimeException runtimeException) { - this.onlyFinalizerHandled = onlyFinalizerHandled; - this.updatedCustomResource = updatedCustomResource; - this.runtimeException = runtimeException; - } + private PostExecutionControl( + boolean onlyFinalizerHandled, + CustomResource updatedCustomResource, + RuntimeException runtimeException) { + this.onlyFinalizerHandled = onlyFinalizerHandled; + this.updatedCustomResource = updatedCustomResource; + this.runtimeException = runtimeException; + } - public static PostExecutionControl onlyFinalizerAdded() { - return new PostExecutionControl(true, null, null); - } + public static PostExecutionControl onlyFinalizerAdded() { + return new PostExecutionControl(true, null, null); + } - public static PostExecutionControl defaultDispatch() { - return new PostExecutionControl(false, null, null); - } + public static PostExecutionControl defaultDispatch() { + return new PostExecutionControl(false, null, null); + } - public static PostExecutionControl customResourceUpdated(CustomResource updatedCustomResource) { - return new PostExecutionControl(false, updatedCustomResource, null); - } + public static PostExecutionControl customResourceUpdated(CustomResource updatedCustomResource) { + return new PostExecutionControl(false, updatedCustomResource, null); + } - public static PostExecutionControl exceptionDuringExecution(RuntimeException exception) { - return new PostExecutionControl(false, null, exception); - } + public static PostExecutionControl exceptionDuringExecution(RuntimeException exception) { + return new PostExecutionControl(false, null, exception); + } public boolean isOnlyFinalizerHandled() { return onlyFinalizerHandled; @@ -42,15 +44,15 @@ public Optional getUpdatedCustomResource() { return Optional.ofNullable(updatedCustomResource); } - public boolean customResourceUpdatedDuringExecution() { - return updatedCustomResource != null; - } + public boolean customResourceUpdatedDuringExecution() { + return updatedCustomResource != null; + } - public boolean exceptionDuringExecution() { - return runtimeException != null; - } + public boolean exceptionDuringExecution() { + return runtimeException != null; + } - public Optional getRuntimeException() { - return Optional.ofNullable(runtimeException); - } + public Optional getRuntimeException() { + return Optional.ofNullable(runtimeException); + } } diff --git a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java index 772163d863..87f249a097 100644 --- a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java +++ b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java @@ -3,9 +3,6 @@ import io.javaoperatorsdk.operator.processing.DefaultEventHandler; import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEventSource; import io.javaoperatorsdk.operator.processing.event.internal.TimerEventSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantLock; @@ -14,23 +11,22 @@ public class DefaultEventSourceManager implements EventSourceManager { - public static final String RETRY_TIMER_EVENT_SOURCE_NAME = "retry-timer-event-source"; - private static final Logger log = LoggerFactory.getLogger(DefaultEventSourceManager.class); - + public static final String RETRY_TIMER_EVENT_SOURCE_NAME = "retry-timer-event-source"; + private static final Logger log = LoggerFactory.getLogger(DefaultEventSourceManager.class); - private final ReentrantLock lock = new ReentrantLock(); - private Map eventSources = new ConcurrentHashMap<>(); - private CustomResourceEventSource customResourceEventSource; - private DefaultEventHandler defaultEventHandler; - private TimerEventSource retryTimerEventSource; + private final ReentrantLock lock = new ReentrantLock(); + private Map eventSources = new ConcurrentHashMap<>(); + private CustomResourceEventSource customResourceEventSource; + private DefaultEventHandler defaultEventHandler; + private TimerEventSource retryTimerEventSource; - public DefaultEventSourceManager(DefaultEventHandler defaultEventHandler, boolean supportRetry) { - this.defaultEventHandler = defaultEventHandler; - if (supportRetry) { - this.retryTimerEventSource = new TimerEventSource(); - registerEventSource(RETRY_TIMER_EVENT_SOURCE_NAME, retryTimerEventSource); - } + public DefaultEventSourceManager(DefaultEventHandler defaultEventHandler, boolean supportRetry) { + this.defaultEventHandler = defaultEventHandler; + if (supportRetry) { + this.retryTimerEventSource = new TimerEventSource(); + registerEventSource(RETRY_TIMER_EVENT_SOURCE_NAME, retryTimerEventSource); } + } public void registerCustomResourceEventSource( CustomResourceEventSource customResourceEventSource) { @@ -38,50 +34,56 @@ public void registerCustomResourceEventSource( this.customResourceEventSource.addedToEventManager(); } - @Override - public void registerEventSource(String name, T eventSource) { - try { - lock.lock(); - EventSource currentEventSource = eventSources.get(name); - if (currentEventSource != null) { - throw new IllegalStateException("Event source with name already registered. Event source name: " + name); - } - eventSources.put(name, eventSource); - eventSource.setEventHandler(defaultEventHandler); - } finally { - lock.unlock(); - } - } - - @Override - public Optional deRegisterCustomResourceFromEventSource(String eventSourceName, String customResourceUid) { - try { - lock.lock(); - EventSource eventSource = this.eventSources.get(eventSourceName); - if (eventSource == null) { - log.warn("Event producer: {} not found for custom resource: {}", eventSourceName, customResourceUid); - return Optional.empty(); - } else { - eventSource.eventSourceDeRegisteredForResource(customResourceUid); - return Optional.of(eventSource); - } - } finally { - lock.unlock(); - } + @Override + public void registerEventSource(String name, T eventSource) { + try { + lock.lock(); + EventSource currentEventSource = eventSources.get(name); + if (currentEventSource != null) { + throw new IllegalStateException( + "Event source with name already registered. Event source name: " + name); + } + eventSources.put(name, eventSource); + eventSource.setEventHandler(defaultEventHandler); + } finally { + lock.unlock(); } + } - public TimerEventSource getRetryTimerEventSource() { - return retryTimerEventSource; + @Override + public Optional deRegisterCustomResourceFromEventSource( + String eventSourceName, String customResourceUid) { + try { + lock.lock(); + EventSource eventSource = this.eventSources.get(eventSourceName); + if (eventSource == null) { + log.warn( + "Event producer: {} not found for custom resource: {}", + eventSourceName, + customResourceUid); + return Optional.empty(); + } else { + eventSource.eventSourceDeRegisteredForResource(customResourceUid); + return Optional.of(eventSource); + } + } finally { + lock.unlock(); } + } - @Override - public Map getRegisteredEventSources() { - return Collections.unmodifiableMap(eventSources); - } + public TimerEventSource getRetryTimerEventSource() { + return retryTimerEventSource; + } - public void cleanup(String customResourceUid) { - getRegisteredEventSources().keySet().forEach(k -> deRegisterCustomResourceFromEventSource(k, customResourceUid)); - eventSources.remove(customResourceUid); - } + @Override + public Map getRegisteredEventSources() { + return Collections.unmodifiableMap(eventSources); + } + public void cleanup(String customResourceUid) { + getRegisteredEventSources() + .keySet() + .forEach(k -> deRegisterCustomResourceFromEventSource(k, customResourceUid)); + eventSources.remove(customResourceUid); + } } diff --git a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/retry/GenericRetry.java b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/retry/GenericRetry.java index b3fc6a2873..ba8396ddeb 100644 --- a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/retry/GenericRetry.java +++ b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/retry/GenericRetry.java @@ -2,82 +2,79 @@ public class GenericRetry implements Retry { - public static final int DEFAULT_MAX_ATTEMPTS = 5; - public static final long DEFAULT_INITIAL_INTERVAL = 2000L; - public static final double DEFAULT_MULTIPLIER = 1.5D; - - private int maxAttempts = DEFAULT_MAX_ATTEMPTS; - private long initialInterval = DEFAULT_INITIAL_INTERVAL; - private double intervalMultiplier = DEFAULT_MULTIPLIER; - private long maxInterval = -1; - - public static GenericRetry defaultLimitedExponentialRetry() { - return new GenericRetry(); - } - - public static GenericRetry noRetry() { - return new GenericRetry().setMaxAttempts(0); - } - - public static GenericRetry every10second10TimesRetry() { - return new GenericRetry() - .withLinearRetry() - .setMaxAttempts(10) - .setInitialInterval(10000); - } - - @Override - public GenericRetryExecution initExecution() { - return new GenericRetryExecution(this); - } - - public int getMaxAttempts() { - return maxAttempts; - } - - public GenericRetry setMaxAttempts(int maxRetryAttempts) { - this.maxAttempts = maxRetryAttempts; - return this; - } - - public long getInitialInterval() { - return initialInterval; - } - - public GenericRetry setInitialInterval(long initialInterval) { - this.initialInterval = initialInterval; - return this; - } - - public double getIntervalMultiplier() { - return intervalMultiplier; - } - - public GenericRetry setIntervalMultiplier(double intervalMultiplier) { - this.intervalMultiplier = intervalMultiplier; - return this; - } - - public long getMaxInterval() { - return maxInterval; - } - - public GenericRetry setMaxInterval(long maxInterval) { - this.maxInterval = maxInterval; - return this; - } - - public GenericRetry withoutMaxInterval() { - this.maxInterval = -1; - return this; - } - - public GenericRetry withoutMaxAttempts() { - return this.setMaxAttempts(-1); - } - - public GenericRetry withLinearRetry() { - this.intervalMultiplier = 1; - return this; - } + public static final int DEFAULT_MAX_ATTEMPTS = 5; + public static final long DEFAULT_INITIAL_INTERVAL = 2000L; + public static final double DEFAULT_MULTIPLIER = 1.5D; + + private int maxAttempts = DEFAULT_MAX_ATTEMPTS; + private long initialInterval = DEFAULT_INITIAL_INTERVAL; + private double intervalMultiplier = DEFAULT_MULTIPLIER; + private long maxInterval = -1; + + public static GenericRetry defaultLimitedExponentialRetry() { + return new GenericRetry(); + } + + public static GenericRetry noRetry() { + return new GenericRetry().setMaxAttempts(0); + } + + public static GenericRetry every10second10TimesRetry() { + return new GenericRetry().withLinearRetry().setMaxAttempts(10).setInitialInterval(10000); + } + + @Override + public GenericRetryExecution initExecution() { + return new GenericRetryExecution(this); + } + + public int getMaxAttempts() { + return maxAttempts; + } + + public GenericRetry setMaxAttempts(int maxRetryAttempts) { + this.maxAttempts = maxRetryAttempts; + return this; + } + + public long getInitialInterval() { + return initialInterval; + } + + public GenericRetry setInitialInterval(long initialInterval) { + this.initialInterval = initialInterval; + return this; + } + + public double getIntervalMultiplier() { + return intervalMultiplier; + } + + public GenericRetry setIntervalMultiplier(double intervalMultiplier) { + this.intervalMultiplier = intervalMultiplier; + return this; + } + + public long getMaxInterval() { + return maxInterval; + } + + public GenericRetry setMaxInterval(long maxInterval) { + this.maxInterval = maxInterval; + return this; + } + + public GenericRetry withoutMaxInterval() { + this.maxInterval = -1; + return this; + } + + public GenericRetry withoutMaxAttempts() { + return this.setMaxAttempts(-1); + } + + public GenericRetry withLinearRetry() { + this.intervalMultiplier = 1; + return this; + } } diff --git a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/retry/GenericRetryExecution.java b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/retry/GenericRetryExecution.java index b206fbc960..b4618dc017 100644 --- a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/retry/GenericRetryExecution.java +++ b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/retry/GenericRetryExecution.java @@ -2,35 +2,34 @@ import java.util.Optional; - public class GenericRetryExecution implements RetryExecution { - private final GenericRetry genericRetry; + private final GenericRetry genericRetry; - private int lastAttemptIndex = 0; - private long currentInterval; + private int lastAttemptIndex = 0; + private long currentInterval; - public GenericRetryExecution(GenericRetry genericRetry) { - this.genericRetry = genericRetry; - this.currentInterval = genericRetry.getInitialInterval(); - } + public GenericRetryExecution(GenericRetry genericRetry) { + this.genericRetry = genericRetry; + this.currentInterval = genericRetry.getInitialInterval(); + } - public Optional nextDelay() { - if (genericRetry.getMaxAttempts() > -1 && lastAttemptIndex >= genericRetry.getMaxAttempts()) { - return Optional.empty(); - } - if (lastAttemptIndex > 1) { - currentInterval = (long) (currentInterval * genericRetry.getIntervalMultiplier()); - if (genericRetry.getMaxInterval() > -1 && currentInterval > genericRetry.getMaxInterval()) { - currentInterval = genericRetry.getMaxInterval(); - } - } - lastAttemptIndex++; - return Optional.of(currentInterval); + public Optional nextDelay() { + if (genericRetry.getMaxAttempts() > -1 && lastAttemptIndex >= genericRetry.getMaxAttempts()) { + return Optional.empty(); } - - @Override - public boolean isLastExecution() { - return genericRetry.getMaxAttempts() > -1 && lastAttemptIndex >= genericRetry.getMaxAttempts(); + if (lastAttemptIndex > 1) { + currentInterval = (long) (currentInterval * genericRetry.getIntervalMultiplier()); + if (genericRetry.getMaxInterval() > -1 && currentInterval > genericRetry.getMaxInterval()) { + currentInterval = genericRetry.getMaxInterval(); + } } + lastAttemptIndex++; + return Optional.of(currentInterval); + } + + @Override + public boolean isLastExecution() { + return genericRetry.getMaxAttempts() > -1 && lastAttemptIndex >= genericRetry.getMaxAttempts(); + } } diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/EventDispatcherTest.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/EventDispatcherTest.java index abc1945ec5..63f7dcbf2f 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/EventDispatcherTest.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/EventDispatcherTest.java @@ -1,5 +1,8 @@ package io.javaoperatorsdk.operator; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.*; + import io.fabric8.kubernetes.client.CustomResource; import io.fabric8.kubernetes.client.Watcher; import io.javaoperatorsdk.operator.api.DeleteControl; @@ -9,167 +12,185 @@ import io.javaoperatorsdk.operator.processing.ExecutionScope; import io.javaoperatorsdk.operator.processing.event.Event; import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEvent; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.mockito.ArgumentMatchers; - import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.Mockito.*; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentMatchers; class EventDispatcherTest { - private static final String DEFAULT_FINALIZER = "finalizer"; - private CustomResource testCustomResource; - private EventDispatcher eventDispatcher; - private ResourceController controller = mock(ResourceController.class); - private EventDispatcher.CustomResourceFacade customResourceFacade = mock(EventDispatcher.CustomResourceFacade.class); - - @BeforeEach - void setup() { - eventDispatcher = new EventDispatcher(controller, - DEFAULT_FINALIZER, customResourceFacade); - - testCustomResource = TestUtils.testCustomResource(); - testCustomResource.getMetadata().setFinalizers(new ArrayList<>(Collections.singletonList(DEFAULT_FINALIZER))); - - when(controller.createOrUpdateResource(eq(testCustomResource), any())).thenReturn(UpdateControl.updateCustomResource(testCustomResource)); - when(controller.deleteResource(eq(testCustomResource), any())).thenReturn(DeleteControl.DEFAULT_DELETE); - when(customResourceFacade.replaceWithLock(any())).thenReturn(null); - } - - @Test - void callCreateOrUpdateOnNewResource() { - eventDispatcher.handleExecution(executionScopeWithCREvent(Watcher.Action.ADDED, testCustomResource)); - verify(controller, times(1)).createOrUpdateResource(ArgumentMatchers.eq(testCustomResource), any()); - } - - @Test - void updatesOnlyStatusSubResource() { - when(controller.createOrUpdateResource(eq(testCustomResource), any())) - .thenReturn(UpdateControl.updateStatusSubResource(testCustomResource)); - - eventDispatcher.handleExecution(executionScopeWithCREvent(Watcher.Action.ADDED, testCustomResource)); - - verify(customResourceFacade, times(1)).updateStatus(testCustomResource); - verify(customResourceFacade, never()).replaceWithLock(any()); - } - - - @Test - void callCreateOrUpdateOnModifiedResource() { - eventDispatcher.handleExecution(executionScopeWithCREvent(Watcher.Action.MODIFIED, testCustomResource)); - verify(controller, times(1)).createOrUpdateResource(ArgumentMatchers.eq(testCustomResource), any()); - } - - @Test - void adsDefaultFinalizerOnCreateIfNotThere() { - eventDispatcher.handleExecution(executionScopeWithCREvent(Watcher.Action.MODIFIED, testCustomResource)); - verify(controller, times(1)) - .createOrUpdateResource(argThat(testCustomResource -> - testCustomResource.getMetadata().getFinalizers().contains(DEFAULT_FINALIZER)), any()); - } - - @Test - void callsDeleteIfObjectHasFinalizerAndMarkedForDelete() { - testCustomResource.getMetadata().setDeletionTimestamp("2019-8-10"); - testCustomResource.getMetadata().getFinalizers().add(DEFAULT_FINALIZER); - - eventDispatcher.handleExecution(executionScopeWithCREvent(Watcher.Action.MODIFIED, testCustomResource)); - - verify(controller, times(1)).deleteResource(eq(testCustomResource), any()); - } - - /** - * Note that there could be more finalizers. Out of our control. - */ - @Test - void callDeleteOnControllerIfMarkedForDeletionButThereIsNoDefaultFinalizer() { - markForDeletion(testCustomResource); - - eventDispatcher.handleExecution(executionScopeWithCREvent(Watcher.Action.MODIFIED, testCustomResource)); - - verify(controller).deleteResource(eq(testCustomResource), any()); - } - - @Test - void removesDefaultFinalizerOnDelete() { - markForDeletion(testCustomResource); - - eventDispatcher.handleExecution(executionScopeWithCREvent(Watcher.Action.MODIFIED, testCustomResource)); - - assertEquals(0, testCustomResource.getMetadata().getFinalizers().size()); - verify(customResourceFacade, times(1)).replaceWithLock(any()); - } - - @Test - void doesNotRemovesTheFinalizerIfTheDeleteNotMethodInstructsIt() { - when(controller.deleteResource(eq(testCustomResource), any())).thenReturn(DeleteControl.NO_FINALIZER_REMOVAL); - markForDeletion(testCustomResource); - - eventDispatcher.handleExecution(executionScopeWithCREvent(Watcher.Action.MODIFIED, testCustomResource)); - - assertEquals(1, testCustomResource.getMetadata().getFinalizers().size()); - verify(customResourceFacade, never()).replaceWithLock(any()); - } - - @Test - void doesNotUpdateTheResourceIfNoUpdateUpdateControl() { - when(controller.createOrUpdateResource(eq(testCustomResource), any())).thenReturn(UpdateControl.noUpdate()); - - eventDispatcher.handleExecution(executionScopeWithCREvent(Watcher.Action.MODIFIED, testCustomResource)); - verify(customResourceFacade, never()).replaceWithLock(any()); - verify(customResourceFacade, never()).updateStatus(testCustomResource); - } - - @Test - void addsFinalizerIfNotMarkedForDeletionAndEmptyCustomResourceReturned() { - removeFinalizers(testCustomResource); - when(controller.createOrUpdateResource(eq(testCustomResource), any())).thenReturn(UpdateControl.noUpdate()); - - eventDispatcher.handleExecution(executionScopeWithCREvent(Watcher.Action.MODIFIED, testCustomResource)); - - assertEquals(1, testCustomResource.getMetadata().getFinalizers().size()); - verify(customResourceFacade, times(1)).replaceWithLock(any()); - } - - @Test - void doesNotCallDeleteIfMarkedForDeletionButNotOurFinalizer() { - removeFinalizers(testCustomResource); - markForDeletion(testCustomResource); - - eventDispatcher.handleExecution(executionScopeWithCREvent(Watcher.Action.MODIFIED, testCustomResource)); - - verify(customResourceFacade, never()).replaceWithLock(any()); - verify(controller, never()).deleteResource(eq(testCustomResource), any()); - } - - @Test - void executeControllerRegardlessGenerationInNonGenerationAwareMode() { - eventDispatcher.handleExecution(executionScopeWithCREvent(Watcher.Action.MODIFIED, testCustomResource)); - eventDispatcher.handleExecution(executionScopeWithCREvent(Watcher.Action.MODIFIED, testCustomResource)); - - verify(controller, times(2)).createOrUpdateResource(eq(testCustomResource), any()); - } - - private void markForDeletion(CustomResource customResource) { - customResource.getMetadata().setDeletionTimestamp("2019-8-10"); - } - - private void removeFinalizers(CustomResource customResource) { - customResource.getMetadata().getFinalizers().clear(); - } - - public ExecutionScope executionScopeWithCREvent(Watcher.Action action, CustomResource resource, Event... otherEvents) { - CustomResourceEvent event = new CustomResourceEvent(action, resource, null); - List eventList = new ArrayList<>(1 + otherEvents.length); - eventList.add(event); - eventList.addAll(Arrays.asList(otherEvents)); - return new ExecutionScope(eventList, resource); - } - + private static final String DEFAULT_FINALIZER = "finalizer"; + private CustomResource testCustomResource; + private EventDispatcher eventDispatcher; + private ResourceController controller = mock(ResourceController.class); + private EventDispatcher.CustomResourceFacade customResourceFacade = + mock(EventDispatcher.CustomResourceFacade.class); + + @BeforeEach + void setup() { + eventDispatcher = new EventDispatcher(controller, DEFAULT_FINALIZER, customResourceFacade); + + testCustomResource = TestUtils.testCustomResource(); + testCustomResource + .getMetadata() + .setFinalizers(new ArrayList<>(Collections.singletonList(DEFAULT_FINALIZER))); + + when(controller.createOrUpdateResource(eq(testCustomResource), any())) + .thenReturn(UpdateControl.updateCustomResource(testCustomResource)); + when(controller.deleteResource(eq(testCustomResource), any())) + .thenReturn(DeleteControl.DEFAULT_DELETE); + when(customResourceFacade.replaceWithLock(any())).thenReturn(null); + } + + @Test + void callCreateOrUpdateOnNewResource() { + eventDispatcher.handleExecution( + executionScopeWithCREvent(Watcher.Action.ADDED, testCustomResource)); + verify(controller, times(1)) + .createOrUpdateResource(ArgumentMatchers.eq(testCustomResource), any()); + } + + @Test + void updatesOnlyStatusSubResource() { + when(controller.createOrUpdateResource(eq(testCustomResource), any())) + .thenReturn(UpdateControl.updateStatusSubResource(testCustomResource)); + + eventDispatcher.handleExecution( + executionScopeWithCREvent(Watcher.Action.ADDED, testCustomResource)); + + verify(customResourceFacade, times(1)).updateStatus(testCustomResource); + verify(customResourceFacade, never()).replaceWithLock(any()); + } + + @Test + void callCreateOrUpdateOnModifiedResource() { + eventDispatcher.handleExecution( + executionScopeWithCREvent(Watcher.Action.MODIFIED, testCustomResource)); + verify(controller, times(1)) + .createOrUpdateResource(ArgumentMatchers.eq(testCustomResource), any()); + } + + @Test + void adsDefaultFinalizerOnCreateIfNotThere() { + eventDispatcher.handleExecution( + executionScopeWithCREvent(Watcher.Action.MODIFIED, testCustomResource)); + verify(controller, times(1)) + .createOrUpdateResource( + argThat( + testCustomResource -> + testCustomResource.getMetadata().getFinalizers().contains(DEFAULT_FINALIZER)), + any()); + } + + @Test + void callsDeleteIfObjectHasFinalizerAndMarkedForDelete() { + testCustomResource.getMetadata().setDeletionTimestamp("2019-8-10"); + testCustomResource.getMetadata().getFinalizers().add(DEFAULT_FINALIZER); + + eventDispatcher.handleExecution( + executionScopeWithCREvent(Watcher.Action.MODIFIED, testCustomResource)); + + verify(controller, times(1)).deleteResource(eq(testCustomResource), any()); + } + + /** Note that there could be more finalizers. Out of our control. */ + @Test + void callDeleteOnControllerIfMarkedForDeletionButThereIsNoDefaultFinalizer() { + markForDeletion(testCustomResource); + + eventDispatcher.handleExecution( + executionScopeWithCREvent(Watcher.Action.MODIFIED, testCustomResource)); + + verify(controller).deleteResource(eq(testCustomResource), any()); + } + + @Test + void removesDefaultFinalizerOnDelete() { + markForDeletion(testCustomResource); + + eventDispatcher.handleExecution( + executionScopeWithCREvent(Watcher.Action.MODIFIED, testCustomResource)); + + assertEquals(0, testCustomResource.getMetadata().getFinalizers().size()); + verify(customResourceFacade, times(1)).replaceWithLock(any()); + } + + @Test + void doesNotRemovesTheFinalizerIfTheDeleteNotMethodInstructsIt() { + when(controller.deleteResource(eq(testCustomResource), any())) + .thenReturn(DeleteControl.NO_FINALIZER_REMOVAL); + markForDeletion(testCustomResource); + + eventDispatcher.handleExecution( + executionScopeWithCREvent(Watcher.Action.MODIFIED, testCustomResource)); + + assertEquals(1, testCustomResource.getMetadata().getFinalizers().size()); + verify(customResourceFacade, never()).replaceWithLock(any()); + } + + @Test + void doesNotUpdateTheResourceIfNoUpdateUpdateControl() { + when(controller.createOrUpdateResource(eq(testCustomResource), any())) + .thenReturn(UpdateControl.noUpdate()); + + eventDispatcher.handleExecution( + executionScopeWithCREvent(Watcher.Action.MODIFIED, testCustomResource)); + verify(customResourceFacade, never()).replaceWithLock(any()); + verify(customResourceFacade, never()).updateStatus(testCustomResource); + } + + @Test + void addsFinalizerIfNotMarkedForDeletionAndEmptyCustomResourceReturned() { + removeFinalizers(testCustomResource); + when(controller.createOrUpdateResource(eq(testCustomResource), any())) + .thenReturn(UpdateControl.noUpdate()); + + eventDispatcher.handleExecution( + executionScopeWithCREvent(Watcher.Action.MODIFIED, testCustomResource)); + + assertEquals(1, testCustomResource.getMetadata().getFinalizers().size()); + verify(customResourceFacade, times(1)).replaceWithLock(any()); + } + + @Test + void doesNotCallDeleteIfMarkedForDeletionButNotOurFinalizer() { + removeFinalizers(testCustomResource); + markForDeletion(testCustomResource); + + eventDispatcher.handleExecution( + executionScopeWithCREvent(Watcher.Action.MODIFIED, testCustomResource)); + + verify(customResourceFacade, never()).replaceWithLock(any()); + verify(controller, never()).deleteResource(eq(testCustomResource), any()); + } + + @Test + void executeControllerRegardlessGenerationInNonGenerationAwareMode() { + eventDispatcher.handleExecution( + executionScopeWithCREvent(Watcher.Action.MODIFIED, testCustomResource)); + eventDispatcher.handleExecution( + executionScopeWithCREvent(Watcher.Action.MODIFIED, testCustomResource)); + + verify(controller, times(2)).createOrUpdateResource(eq(testCustomResource), any()); + } + + private void markForDeletion(CustomResource customResource) { + customResource.getMetadata().setDeletionTimestamp("2019-8-10"); + } + + private void removeFinalizers(CustomResource customResource) { + customResource.getMetadata().getFinalizers().clear(); + } + + public ExecutionScope executionScopeWithCREvent( + Watcher.Action action, CustomResource resource, Event... otherEvents) { + CustomResourceEvent event = new CustomResourceEvent(action, resource, null); + List eventList = new ArrayList<>(1 + otherEvents.length); + eventList.add(event); + eventList.addAll(Arrays.asList(otherEvents)); + return new ExecutionScope(eventList, resource); + } } diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java index 208de0c705..e1443ddf3d 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java @@ -19,12 +19,14 @@ class DefaultEventHandlerTest { - public static final int FAKE_CONTROLLER_EXECUTION_DURATION = 250; - public static final int SEPARATE_EXECUTION_TIMEOUT = 450; - private EventDispatcher eventDispatcherMock = mock(EventDispatcher.class); - private CustomResourceCache customResourceCache = new CustomResourceCache(); - private DefaultEventHandler defaultEventHandler = new DefaultEventHandler(customResourceCache, eventDispatcherMock, "Test", null); - private DefaultEventSourceManager defaultEventSourceManagerMock = mock(DefaultEventSourceManager.class); + public static final int FAKE_CONTROLLER_EXECUTION_DURATION = 250; + public static final int SEPARATE_EXECUTION_TIMEOUT = 450; + private EventDispatcher eventDispatcherMock = mock(EventDispatcher.class); + private CustomResourceCache customResourceCache = new CustomResourceCache(); + private DefaultEventHandler defaultEventHandler = + new DefaultEventHandler(customResourceCache, eventDispatcherMock, "Test", null); + private DefaultEventSourceManager defaultEventSourceManagerMock = + mock(DefaultEventSourceManager.class); @BeforeEach public void setup() { @@ -35,8 +37,8 @@ public void setup() { public void dispatchesEventsIfNoExecutionInProgress() { defaultEventHandler.handleEvent(prepareCREvent()); - verify(eventDispatcherMock, timeout(50).times(1)).handleExecution(any()); - } + verify(eventDispatcherMock, timeout(50).times(1)).handleExecution(any()); + } @Test public void skipProcessingIfLatestCustomResourceNotInCache() { @@ -45,8 +47,8 @@ public void skipProcessingIfLatestCustomResourceNotInCache() { defaultEventHandler.handleEvent(event); - verify(eventDispatcherMock, timeout(50).times(0)).handleExecution(any()); - } + verify(eventDispatcherMock, timeout(50).times(0)).handleExecution(any()); + } @Test public void ifExecutionInProgressWaitsUntilItsFinished() throws InterruptedException { @@ -54,8 +56,9 @@ public void ifExecutionInProgressWaitsUntilItsFinished() throws InterruptedExcep defaultEventHandler.handleEvent(nonCREvent(resourceUid)); - verify(eventDispatcherMock, timeout(SEPARATE_EXECUTION_TIMEOUT).times(1)).handleExecution(any()); - } + verify(eventDispatcherMock, timeout(SEPARATE_EXECUTION_TIMEOUT).times(1)) + .handleExecution(any()); + } @Test public void buffersAllIncomingEventsWhileControllerInExecution() { @@ -64,13 +67,14 @@ public void buffersAllIncomingEventsWhileControllerInExecution() { defaultEventHandler.handleEvent(nonCREvent(resourceUid)); defaultEventHandler.handleEvent(prepareCREvent(resourceUid)); - ArgumentCaptor captor = ArgumentCaptor.forClass(ExecutionScope.class); - verify(eventDispatcherMock, timeout(SEPARATE_EXECUTION_TIMEOUT).times(2)).handleExecution(captor.capture()); - List events = captor.getAllValues().get(1).getEvents(); - assertThat(events).hasSize(2); - assertThat(events.get(0)).isInstanceOf(TimerEvent.class); - assertThat(events.get(1)).isInstanceOf(CustomResourceEvent.class); - } + ArgumentCaptor captor = ArgumentCaptor.forClass(ExecutionScope.class); + verify(eventDispatcherMock, timeout(SEPARATE_EXECUTION_TIMEOUT).times(2)) + .handleExecution(captor.capture()); + List events = captor.getAllValues().get(1).getEvents(); + assertThat(events).hasSize(2); + assertThat(events.get(0)).isInstanceOf(TimerEvent.class); + assertThat(events.get(1)).isInstanceOf(CustomResourceEvent.class); + } @Test public void cleanUpAfterDeleteEvent() { @@ -96,15 +100,18 @@ private void waitMinimalTime() { } } - private String eventAlreadyUnderProcessing() { - when(eventDispatcherMock.handleExecution(any())).then((Answer) invocationOnMock -> { - Thread.sleep(FAKE_CONTROLLER_EXECUTION_DURATION); - return PostExecutionControl.defaultDispatch(); - }); - Event event = prepareCREvent(); - defaultEventHandler.handleEvent(event); - return event.getRelatedCustomResourceUid(); - } + private String eventAlreadyUnderProcessing() { + when(eventDispatcherMock.handleExecution(any())) + .then( + (Answer) + invocationOnMock -> { + Thread.sleep(FAKE_CONTROLLER_EXECUTION_DURATION); + return PostExecutionControl.defaultDispatch(); + }); + Event event = prepareCREvent(); + defaultEventHandler.handleEvent(event); + return event.getRelatedCustomResourceUid(); + } private CustomResourceEvent prepareCREvent() { return prepareCREvent(UUID.randomUUID().toString()); diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManagerTest.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManagerTest.java index 5b6313c77f..edc477c734 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManagerTest.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManagerTest.java @@ -1,58 +1,61 @@ package io.javaoperatorsdk.operator.processing.event; +import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getUID; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.mockito.Mockito.*; + import io.fabric8.kubernetes.client.CustomResource; import io.javaoperatorsdk.operator.TestUtils; import io.javaoperatorsdk.operator.processing.DefaultEventHandler; import io.javaoperatorsdk.operator.processing.KubernetesResourceUtils; -import org.junit.jupiter.api.Test; - import java.util.Map; - -import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getUID; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatExceptionOfType; -import static org.mockito.Mockito.*; +import org.junit.jupiter.api.Test; class DefaultEventSourceManagerTest { - public static final String CUSTOM_EVENT_SOURCE_NAME = "CustomEventSource"; - - private DefaultEventHandler defaultEventHandlerMock = mock(DefaultEventHandler.class); - private DefaultEventSourceManager defaultEventSourceManager = new DefaultEventSourceManager(defaultEventHandlerMock, false); - - @Test - public void registersEventSource() { - EventSource eventSource = mock(EventSource.class); - - defaultEventSourceManager.registerEventSource(CUSTOM_EVENT_SOURCE_NAME, eventSource); - - Map registeredSources = - defaultEventSourceManager.getRegisteredEventSources(); - assertThat(registeredSources.entrySet()).hasSize(1); - assertThat(registeredSources.get(CUSTOM_EVENT_SOURCE_NAME)).isEqualTo(eventSource); - verify(eventSource, times(1)).setEventHandler(eq(defaultEventHandlerMock)); - } - - @Test - public void throwExceptionIfRegisteringEventSourceWithSameName() { - EventSource eventSource = mock(EventSource.class); - EventSource eventSource2 = mock(EventSource.class); - - defaultEventSourceManager.registerEventSource(CUSTOM_EVENT_SOURCE_NAME, eventSource); - assertThatExceptionOfType(IllegalStateException.class).isThrownBy(() -> { - defaultEventSourceManager.registerEventSource(CUSTOM_EVENT_SOURCE_NAME, eventSource2); - }); - } - - @Test - public void deRegistersEventSources() { - CustomResource customResource = TestUtils.testCustomResource(); - EventSource eventSource = mock(EventSource.class); - defaultEventSourceManager.registerEventSource(CUSTOM_EVENT_SOURCE_NAME, eventSource); - - defaultEventSourceManager.deRegisterCustomResourceFromEventSource(CUSTOM_EVENT_SOURCE_NAME, getUID(customResource)); - - verify(eventSource, times(1)).eventSourceDeRegisteredForResource(eq(KubernetesResourceUtils.getUID(customResource))); - } - -} \ No newline at end of file + public static final String CUSTOM_EVENT_SOURCE_NAME = "CustomEventSource"; + + private DefaultEventHandler defaultEventHandlerMock = mock(DefaultEventHandler.class); + private DefaultEventSourceManager defaultEventSourceManager = + new DefaultEventSourceManager(defaultEventHandlerMock, false); + + @Test + public void registersEventSource() { + EventSource eventSource = mock(EventSource.class); + + defaultEventSourceManager.registerEventSource(CUSTOM_EVENT_SOURCE_NAME, eventSource); + + Map registeredSources = + defaultEventSourceManager.getRegisteredEventSources(); + assertThat(registeredSources.entrySet()).hasSize(1); + assertThat(registeredSources.get(CUSTOM_EVENT_SOURCE_NAME)).isEqualTo(eventSource); + verify(eventSource, times(1)).setEventHandler(eq(defaultEventHandlerMock)); + } + + @Test + public void throwExceptionIfRegisteringEventSourceWithSameName() { + EventSource eventSource = mock(EventSource.class); + EventSource eventSource2 = mock(EventSource.class); + + defaultEventSourceManager.registerEventSource(CUSTOM_EVENT_SOURCE_NAME, eventSource); + assertThatExceptionOfType(IllegalStateException.class) + .isThrownBy( + () -> { + defaultEventSourceManager.registerEventSource(CUSTOM_EVENT_SOURCE_NAME, eventSource2); + }); + } + + @Test + public void deRegistersEventSources() { + CustomResource customResource = TestUtils.testCustomResource(); + EventSource eventSource = mock(EventSource.class); + defaultEventSourceManager.registerEventSource(CUSTOM_EVENT_SOURCE_NAME, eventSource); + + defaultEventSourceManager.deRegisterCustomResourceFromEventSource( + CUSTOM_EVENT_SOURCE_NAME, getUID(customResource)); + + verify(eventSource, times(1)) + .eventSourceDeRegisteredForResource(eq(KubernetesResourceUtils.getUID(customResource))); + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/retry/GenericRetryExecutionTest.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/retry/GenericRetryExecutionTest.java index f7df735677..5feadecafa 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/retry/GenericRetryExecutionTest.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/retry/GenericRetryExecutionTest.java @@ -1,81 +1,87 @@ package io.javaoperatorsdk.operator.processing.retry; -import org.junit.jupiter.api.Test; - -import java.util.Optional; - import static io.javaoperatorsdk.operator.processing.retry.GenericRetry.DEFAULT_INITIAL_INTERVAL; import static org.assertj.core.api.Assertions.assertThat; -public class GenericRetryExecutionTest { - - @Test - public void forFirstBackOffAlwaysReturnsInitialInterval() { - assertThat(getDefaultRetryExecution().nextDelay().get()).isEqualTo(DEFAULT_INITIAL_INTERVAL); - } - - @Test - public void delayIsMultipliedEveryNextDelayCall() { - RetryExecution retryExecution = getDefaultRetryExecution(); - - Optional res = callNextDelayNTimes(retryExecution, 1); - assertThat(res.get()).isEqualTo(DEFAULT_INITIAL_INTERVAL); - - res = retryExecution.nextDelay(); - assertThat(res.get()).isEqualTo((long) (DEFAULT_INITIAL_INTERVAL * GenericRetry.DEFAULT_MULTIPLIER)); - - res = retryExecution.nextDelay(); - assertThat(res.get()).isEqualTo((long) (DEFAULT_INITIAL_INTERVAL * GenericRetry.DEFAULT_MULTIPLIER * GenericRetry.DEFAULT_MULTIPLIER)); - } - - @Test - public void noNextDelayIfMaxAttemptLimitReached() { - RetryExecution retryExecution = GenericRetry.defaultLimitedExponentialRetry().setMaxAttempts(3).initExecution(); - Optional res = callNextDelayNTimes(retryExecution, 2); - assertThat(res).isNotEmpty(); - - res = retryExecution.nextDelay(); - assertThat(res).isEmpty(); - } - - @Test - public void canLimitMaxIntervalLength() { - RetryExecution retryExecution = GenericRetry.defaultLimitedExponentialRetry() - .setInitialInterval(2000) - .setMaxInterval(4500) - .setIntervalMultiplier(2) - .initExecution(); - - Optional res = callNextDelayNTimes(retryExecution, 4); - - assertThat(res.get()).isEqualTo(4500); - } - - @Test - public void supportsNoRetry() { - RetryExecution retryExecution = GenericRetry.noRetry().initExecution(); - assertThat(retryExecution.nextDelay()).isEmpty(); - } - - @Test - public void supportsIsLastExecution() { - GenericRetryExecution execution = new GenericRetry().setMaxAttempts(2).initExecution(); - assertThat(execution.isLastExecution()).isFalse(); - - execution.nextDelay(); - execution.nextDelay(); - assertThat(execution.isLastExecution()).isTrue(); - } +import java.util.Optional; +import org.junit.jupiter.api.Test; - private RetryExecution getDefaultRetryExecution() { - return GenericRetry.defaultLimitedExponentialRetry().initExecution(); - } +public class GenericRetryExecutionTest { - public Optional callNextDelayNTimes(RetryExecution retryExecution, int n) { - for (int i = 0; i < n ; i++) { - retryExecution.nextDelay(); - } - return retryExecution.nextDelay(); + @Test + public void forFirstBackOffAlwaysReturnsInitialInterval() { + assertThat(getDefaultRetryExecution().nextDelay().get()).isEqualTo(DEFAULT_INITIAL_INTERVAL); + } + + @Test + public void delayIsMultipliedEveryNextDelayCall() { + RetryExecution retryExecution = getDefaultRetryExecution(); + + Optional res = callNextDelayNTimes(retryExecution, 1); + assertThat(res.get()).isEqualTo(DEFAULT_INITIAL_INTERVAL); + + res = retryExecution.nextDelay(); + assertThat(res.get()) + .isEqualTo((long) (DEFAULT_INITIAL_INTERVAL * GenericRetry.DEFAULT_MULTIPLIER)); + + res = retryExecution.nextDelay(); + assertThat(res.get()) + .isEqualTo( + (long) + (DEFAULT_INITIAL_INTERVAL + * GenericRetry.DEFAULT_MULTIPLIER + * GenericRetry.DEFAULT_MULTIPLIER)); + } + + @Test + public void noNextDelayIfMaxAttemptLimitReached() { + RetryExecution retryExecution = + GenericRetry.defaultLimitedExponentialRetry().setMaxAttempts(3).initExecution(); + Optional res = callNextDelayNTimes(retryExecution, 2); + assertThat(res).isNotEmpty(); + + res = retryExecution.nextDelay(); + assertThat(res).isEmpty(); + } + + @Test + public void canLimitMaxIntervalLength() { + RetryExecution retryExecution = + GenericRetry.defaultLimitedExponentialRetry() + .setInitialInterval(2000) + .setMaxInterval(4500) + .setIntervalMultiplier(2) + .initExecution(); + + Optional res = callNextDelayNTimes(retryExecution, 4); + + assertThat(res.get()).isEqualTo(4500); + } + + @Test + public void supportsNoRetry() { + RetryExecution retryExecution = GenericRetry.noRetry().initExecution(); + assertThat(retryExecution.nextDelay()).isEmpty(); + } + + @Test + public void supportsIsLastExecution() { + GenericRetryExecution execution = new GenericRetry().setMaxAttempts(2).initExecution(); + assertThat(execution.isLastExecution()).isFalse(); + + execution.nextDelay(); + execution.nextDelay(); + assertThat(execution.isLastExecution()).isTrue(); + } + + private RetryExecution getDefaultRetryExecution() { + return GenericRetry.defaultLimitedExponentialRetry().initExecution(); + } + + public Optional callNextDelayNTimes(RetryExecution retryExecution, int n) { + for (int i = 0; i < n; i++) { + retryExecution.nextDelay(); } - + return retryExecution.nextDelay(); + } } From 45117281288f6bff1c021574a1d9106b37a68d4c Mon Sep 17 00:00:00 2001 From: csviri Date: Wed, 9 Dec 2020 10:58:30 +0100 Subject: [PATCH 03/10] fix, operator api update for retry --- .../java/io/javaoperatorsdk/operator/Operator.java | 11 +++++++++++ .../operator/processing/DefaultEventHandler.java | 1 + 2 files changed, 12 insertions(+) diff --git a/operator-framework/src/main/java/io/javaoperatorsdk/operator/Operator.java b/operator-framework/src/main/java/io/javaoperatorsdk/operator/Operator.java index 03ec95f5e9..b51d317463 100644 --- a/operator-framework/src/main/java/io/javaoperatorsdk/operator/Operator.java +++ b/operator-framework/src/main/java/io/javaoperatorsdk/operator/Operator.java @@ -36,11 +36,22 @@ public Operator(KubernetesClient k8sClient) { this.k8sClient = k8sClient; } + public void registerControllerForAllNamespaces( + ResourceController controller, Retry retry) throws OperatorException { + registerController(controller, true, retry); + } + public void registerControllerForAllNamespaces( ResourceController controller) throws OperatorException { registerController(controller, true, null); } + public void registerController( + ResourceController controller, Retry retry, String... targetNamespaces) + throws OperatorException { + registerController(controller, false, retry, targetNamespaces); + } + public void registerController( ResourceController controller, String... targetNamespaces) throws OperatorException { registerController(controller, false, null, targetNamespaces); diff --git a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java index 1051f6c692..def60e7265 100644 --- a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java +++ b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java @@ -109,6 +109,7 @@ void eventProcessingFinished( if (retry != null && postExecutionControl.exceptionDuringExecution()) { handleRetryOnException(executionScope, postExecutionControl); + return; } else if (retry != null) { handleSuccessfulExecutionRegardingRetry(executionScope); } From 115e44d47124ee478c9632d337385fcd1e74fcdb Mon Sep 17 00:00:00 2001 From: csviri Date: Fri, 11 Dec 2020 09:25:27 +0100 Subject: [PATCH 04/10] retry tests, fixes --- .../processing/DefaultEventHandler.java | 14 ++-- .../operator/processing/EventBuffer.java | 6 +- .../processing/DefaultEventHandlerTest.java | 65 ++++++++++++++++++- 3 files changed, 73 insertions(+), 12 deletions(-) diff --git a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java index def60e7265..fd3cfbc8ad 100644 --- a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java +++ b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java @@ -108,12 +108,13 @@ void eventProcessingFinished( unsetUnderExecution(executionScope.getCustomResourceUid()); if (retry != null && postExecutionControl.exceptionDuringExecution()) { - handleRetryOnException(executionScope, postExecutionControl); + handleRetryOnException(executionScope); return; - } else if (retry != null) { - handleSuccessfulExecutionRegardingRetry(executionScope); } + if (retry != null) { + markSuccessfulExecutionRegardingRetry(executionScope); + } if (containsCustomResourceDeletedEvent(executionScope.getEvents())) { cleanupAfterDeletedEvent(executionScope.getCustomResourceUid()); } else { @@ -130,17 +131,16 @@ void eventProcessingFinished( * events (received meanwhile retry is in place or already in buffer) instantly or always wait * according to the retry timing if there was an exception. */ - private void handleRetryOnException( - ExecutionScope executionScope, PostExecutionControl postExecutionControl) { + private void handleRetryOnException(ExecutionScope executionScope) { RetryExecution execution = getOrInitRetryExecution(executionScope); boolean newEventsExists = eventBuffer.newEventsExists(executionScope.getCustomResourceUid()); eventBuffer.putBackEvents(executionScope.getCustomResourceUid(), executionScope.getEvents()); - Optional nextDelay = execution.nextDelay(); if (newEventsExists) { executeBufferedEvents(executionScope.getCustomResourceUid()); return; } + Optional nextDelay = execution.nextDelay(); nextDelay.ifPresent( delay -> defaultEventSourceManager @@ -148,7 +148,7 @@ private void handleRetryOnException( .scheduleOnce(executionScope.getCustomResource(), delay)); } - private void handleSuccessfulExecutionRegardingRetry(ExecutionScope executionScope) { + private void markSuccessfulExecutionRegardingRetry(ExecutionScope executionScope) { retryState.remove(executionScope.getCustomResourceUid()); defaultEventSourceManager .getRetryTimerEventSource() diff --git a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/EventBuffer.java b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/EventBuffer.java index 914a08000b..ba6619b172 100644 --- a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/EventBuffer.java +++ b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/EventBuffer.java @@ -14,11 +14,13 @@ public void addEvent(Event event) { } public boolean newEventsExists(String resourceId) { - return !events.get(resourceId).isEmpty(); + return events.get(resourceId) != null && !events.get(resourceId).isEmpty(); } public void putBackEvents(String resourceUid, List oldEvents) { - events.get(resourceUid).addAll(0, oldEvents); + List crEvents = + events.computeIfAbsent(resourceUid, (id) -> new ArrayList<>(oldEvents.size())); + crEvents.addAll(0, oldEvents); } public boolean containsEvents(String customResourceId) { diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java index e1443ddf3d..2e83141421 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java @@ -9,7 +9,10 @@ import io.javaoperatorsdk.operator.processing.event.Event; import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEvent; import io.javaoperatorsdk.operator.processing.event.internal.TimerEvent; +import io.javaoperatorsdk.operator.processing.event.internal.TimerEventSource; +import io.javaoperatorsdk.operator.processing.retry.GenericRetry; import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; +import java.util.Arrays; import java.util.List; import java.util.UUID; import org.junit.jupiter.api.BeforeEach; @@ -23,14 +26,26 @@ class DefaultEventHandlerTest { public static final int SEPARATE_EXECUTION_TIMEOUT = 450; private EventDispatcher eventDispatcherMock = mock(EventDispatcher.class); private CustomResourceCache customResourceCache = new CustomResourceCache(); - private DefaultEventHandler defaultEventHandler = - new DefaultEventHandler(customResourceCache, eventDispatcherMock, "Test", null); private DefaultEventSourceManager defaultEventSourceManagerMock = mock(DefaultEventSourceManager.class); + private TimerEventSource retryTimerEventSourceMock = mock(TimerEventSource.class); + + private DefaultEventHandler defaultEventHandler = + new DefaultEventHandler(customResourceCache, eventDispatcherMock, "Test", null); + + private DefaultEventHandler defaultEventHandlerWithRetry = + new DefaultEventHandler( + customResourceCache, + eventDispatcherMock, + "Test", + GenericRetry.defaultLimitedExponentialRetry()); @BeforeEach public void setup() { + when(defaultEventSourceManagerMock.getRetryTimerEventSource()) + .thenReturn(retryTimerEventSourceMock); defaultEventHandler.setDefaultEventSourceManager(defaultEventSourceManagerMock); + defaultEventHandlerWithRetry.setDefaultEventSourceManager(defaultEventSourceManagerMock); } @Test @@ -85,13 +100,57 @@ public void cleanUpAfterDeleteEvent() { String uid = customResource.getMetadata().getUid(); defaultEventHandler.handleEvent(event); - // todo awaitility? + waitMinimalTime(); verify(defaultEventSourceManagerMock, times(1)).cleanup(uid); assertThat(customResourceCache.getLatestResource(uid)).isNotPresent(); } + @Test + public void schedulesAnEventRetryOnException() { + Event event = prepareCREvent(); + TestCustomResource customResource = testCustomResource(); + + ExecutionScope executionScope = new ExecutionScope(Arrays.asList(event), customResource); + PostExecutionControl postExecutionControl = + PostExecutionControl.exceptionDuringExecution(new RuntimeException("test")); + + defaultEventHandlerWithRetry.eventProcessingFinished(executionScope, postExecutionControl); + + verify(retryTimerEventSourceMock, times(1)) + .scheduleOnce(eq(customResource), eq(GenericRetry.DEFAULT_INITIAL_INTERVAL)); + } + + @Test + public void executesTheControllerInstantlyAfterErrorIfEventsBuffered() { + Event event = prepareCREvent(); + TestCustomResource customResource = testCustomResource(); + customResource.getMetadata().setUid(event.getRelatedCustomResourceUid()); + ExecutionScope executionScope = new ExecutionScope(Arrays.asList(event), customResource); + PostExecutionControl postExecutionControl = + PostExecutionControl.exceptionDuringExecution(new RuntimeException("test")); + + // start processing an event + defaultEventHandlerWithRetry.handleEvent(event); + // buffer an another event + defaultEventHandlerWithRetry.handleEvent(event); + verify(eventDispatcherMock, timeout(SEPARATE_EXECUTION_TIMEOUT).times(1)) + .handleExecution(any()); + + defaultEventHandlerWithRetry.eventProcessingFinished(executionScope, postExecutionControl); + + ArgumentCaptor executionScopeArgumentCaptor = + ArgumentCaptor.forClass(ExecutionScope.class); + verify(eventDispatcherMock, timeout(SEPARATE_EXECUTION_TIMEOUT).times(2)) + .handleExecution(executionScopeArgumentCaptor.capture()); + List allValues = executionScopeArgumentCaptor.getAllValues(); + assertThat(allValues).hasSize(2); + assertThat(allValues.get(1).getEvents()).hasSize(2); + verify(retryTimerEventSourceMock, never()) + .scheduleOnce(eq(customResource), eq(GenericRetry.DEFAULT_INITIAL_INTERVAL)); + } + private void waitMinimalTime() { try { Thread.sleep(50); From 10c97e19a605641c3805ce0b23834623c9837e5e Mon Sep 17 00:00:00 2001 From: csviri Date: Fri, 11 Dec 2020 14:22:35 +0100 Subject: [PATCH 05/10] retry improvements and tests --- .../io/javaoperatorsdk/operator/Operator.java | 2 +- .../javaoperatorsdk/operator/api/Context.java | 3 + .../operator/api/DefaultContext.java | 11 ++- .../operator/api/RetryInfo.java | 13 ++-- .../processing/DefaultEventHandler.java | 25 +++++-- .../operator/processing/EventDispatcher.java | 5 +- .../operator/processing/ExecutionScope.java | 10 ++- .../retry/GenericRetryExecution.java | 7 +- .../processing/retry/RetryExecution.java | 4 +- .../operator/EventDispatcherTest.java | 22 ++++-- .../operator/IntegrationTestSupport.java | 8 ++- .../io/javaoperatorsdk/operator/RetryIT.java | 72 +++++++++++++++++++ .../processing/DefaultEventHandlerTest.java | 41 +++++++++-- .../retry/GenericRetryExecutionTest.java | 13 +++- .../sample/retry/RetryTestCustomResource.java | 38 ++++++++++ .../RetryTestCustomResourceController.java | 61 ++++++++++++++++ .../retry/RetryTestCustomResourceSpec.java | 15 ++++ .../retry/RetryTestCustomResourceStatus.java | 20 ++++++ .../operator/retry-test-crd.yaml | 16 +++++ 19 files changed, 358 insertions(+), 28 deletions(-) create mode 100644 operator-framework/src/test/java/io/javaoperatorsdk/operator/RetryIT.java create mode 100644 operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/retry/RetryTestCustomResource.java create mode 100644 operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/retry/RetryTestCustomResourceController.java create mode 100644 operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/retry/RetryTestCustomResourceSpec.java create mode 100644 operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/retry/RetryTestCustomResourceStatus.java create mode 100644 operator-framework/src/test/resources/io/javaoperatorsdk/operator/retry-test-crd.yaml diff --git a/operator-framework/src/main/java/io/javaoperatorsdk/operator/Operator.java b/operator-framework/src/main/java/io/javaoperatorsdk/operator/Operator.java index b51d317463..8f10731726 100644 --- a/operator-framework/src/main/java/io/javaoperatorsdk/operator/Operator.java +++ b/operator-framework/src/main/java/io/javaoperatorsdk/operator/Operator.java @@ -84,7 +84,7 @@ private void registerController( customResourceCache, eventDispatcher, controller.getClass().getName(), retry); DefaultEventSourceManager eventSourceManager = new DefaultEventSourceManager(defaultEventHandler, retry != null); - defaultEventHandler.setDefaultEventSourceManager(eventSourceManager); + defaultEventHandler.setEventSourceManager(eventSourceManager); eventDispatcher.setEventSourceManager(eventSourceManager); customResourceClients.put(resClass, (CustomResourceOperationsImpl) client); diff --git a/operator-framework/src/main/java/io/javaoperatorsdk/operator/api/Context.java b/operator-framework/src/main/java/io/javaoperatorsdk/operator/api/Context.java index 2b79a9d6ae..7d04a5db93 100644 --- a/operator-framework/src/main/java/io/javaoperatorsdk/operator/api/Context.java +++ b/operator-framework/src/main/java/io/javaoperatorsdk/operator/api/Context.java @@ -3,10 +3,13 @@ import io.fabric8.kubernetes.client.CustomResource; import io.javaoperatorsdk.operator.processing.event.EventList; import io.javaoperatorsdk.operator.processing.event.EventSourceManager; +import java.util.Optional; public interface Context { EventSourceManager getEventSourceManager(); EventList getEvents(); + + Optional getRetryInfo(); } diff --git a/operator-framework/src/main/java/io/javaoperatorsdk/operator/api/DefaultContext.java b/operator-framework/src/main/java/io/javaoperatorsdk/operator/api/DefaultContext.java index 7e472324d6..c922d674da 100644 --- a/operator-framework/src/main/java/io/javaoperatorsdk/operator/api/DefaultContext.java +++ b/operator-framework/src/main/java/io/javaoperatorsdk/operator/api/DefaultContext.java @@ -3,13 +3,17 @@ import io.fabric8.kubernetes.client.CustomResource; import io.javaoperatorsdk.operator.processing.event.EventList; import io.javaoperatorsdk.operator.processing.event.EventSourceManager; +import java.util.Optional; public class DefaultContext implements Context { + private final RetryInfo retryInfo; private final EventList events; private final EventSourceManager eventSourceManager; - public DefaultContext(EventSourceManager eventSourceManager, EventList events) { + public DefaultContext( + EventSourceManager eventSourceManager, EventList events, RetryInfo retryInfo) { + this.retryInfo = retryInfo; this.events = events; this.eventSourceManager = eventSourceManager; } @@ -23,4 +27,9 @@ public EventSourceManager getEventSourceManager() { public EventList getEvents() { return events; } + + @Override + public Optional getRetryInfo() { + return Optional.ofNullable(retryInfo); + } } diff --git a/operator-framework/src/main/java/io/javaoperatorsdk/operator/api/RetryInfo.java b/operator-framework/src/main/java/io/javaoperatorsdk/operator/api/RetryInfo.java index 9c3b3c05cc..ce90e04f8a 100644 --- a/operator-framework/src/main/java/io/javaoperatorsdk/operator/api/RetryInfo.java +++ b/operator-framework/src/main/java/io/javaoperatorsdk/operator/api/RetryInfo.java @@ -2,19 +2,24 @@ public class RetryInfo { - private int retryNumber; + private int attemptIndex; private boolean lastAttempt; public RetryInfo(int retryNumber, boolean lastAttempt) { - this.retryNumber = retryNumber; + this.attemptIndex = retryNumber; this.lastAttempt = lastAttempt; } - public int getRetryNumber() { - return retryNumber; + public int getAttemptIndex() { + return attemptIndex; } public boolean isLastAttempt() { return lastAttempt; } + + @Override + public String toString() { + return "RetryInfo{" + "attemptIndex=" + attemptIndex + ", lastAttempt=" + lastAttempt + '}'; + } } diff --git a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java index fd3cfbc8ad..ba14738a61 100644 --- a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java +++ b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java @@ -5,6 +5,7 @@ import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getVersion; import io.fabric8.kubernetes.client.CustomResource; +import io.javaoperatorsdk.operator.api.RetryInfo; import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager; import io.javaoperatorsdk.operator.processing.event.Event; import io.javaoperatorsdk.operator.processing.event.EventHandler; @@ -35,7 +36,7 @@ public class DefaultEventHandler implements EventHandler { private final EventDispatcher eventDispatcher; private final Retry retry; private final Map retryState = new HashMap<>(); - private DefaultEventSourceManager defaultEventSourceManager; + private DefaultEventSourceManager eventSourceManager; private final ReentrantLock lock = new ReentrantLock(); @@ -59,8 +60,8 @@ public Thread newThread(Runnable runnable) { }); } - public void setDefaultEventSourceManager(DefaultEventSourceManager defaultEventSourceManager) { - this.defaultEventSourceManager = defaultEventSourceManager; + public void setEventSourceManager(DefaultEventSourceManager eventSourceManager) { + this.eventSourceManager = eventSourceManager; } @Override @@ -86,7 +87,8 @@ private void executeBufferedEvents(String customResourceUid) { ExecutionScope executionScope = new ExecutionScope( eventBuffer.getAndRemoveEventsForExecution(customResourceUid), - latestCustomResource.get()); + latestCustomResource.get(), + retryInfo(customResourceUid)); log.debug("Executing events for custom resource. Scope: {}", executionScope); executor.execute(new ExecutionConsumer(executionScope, eventDispatcher, this)); } else { @@ -100,6 +102,15 @@ private void executeBufferedEvents(String customResourceUid) { } } + private RetryInfo retryInfo(String customResourceUid) { + RetryExecution retryExecution = retryState.get(customResourceUid); + if (retryExecution != null) { + return new RetryInfo(retryExecution.getCurrentAttemptIndex(), retryExecution.isLastAttempt()); + } else { + return null; + } + } + void eventProcessingFinished( ExecutionScope executionScope, PostExecutionControl postExecutionControl) { try { @@ -143,14 +154,14 @@ private void handleRetryOnException(ExecutionScope executionScope) { Optional nextDelay = execution.nextDelay(); nextDelay.ifPresent( delay -> - defaultEventSourceManager + eventSourceManager .getRetryTimerEventSource() .scheduleOnce(executionScope.getCustomResource(), delay)); } private void markSuccessfulExecutionRegardingRetry(ExecutionScope executionScope) { retryState.remove(executionScope.getCustomResourceUid()); - defaultEventSourceManager + eventSourceManager .getRetryTimerEventSource() .cancelOnceSchedule(executionScope.getCustomResourceUid()); } @@ -200,7 +211,7 @@ private void cacheUpdatedResourceIfChanged( } private void cleanupAfterDeletedEvent(String customResourceUid) { - defaultEventSourceManager.cleanup(customResourceUid); + eventSourceManager.cleanup(customResourceUid); eventBuffer.cleanup(customResourceUid); customResourceCache.cleanup(customResourceUid); } diff --git a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/EventDispatcher.java b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/EventDispatcher.java index 07f432b769..d5c2df9772 100644 --- a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/EventDispatcher.java +++ b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/EventDispatcher.java @@ -66,7 +66,10 @@ private PostExecutionControl handleDispatch(ExecutionScope executionScope) { return PostExecutionControl.defaultDispatch(); } Context context = - new DefaultContext(eventSourceManager, new EventList(executionScope.getEvents())); + new DefaultContext( + eventSourceManager, + new EventList(executionScope.getEvents()), + executionScope.getRetryInfo()); if (markedForDeletion(resource)) { return handleDelete(resource, context); } else { diff --git a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/ExecutionScope.java b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/ExecutionScope.java index a4b65f5f95..73603bc7b3 100644 --- a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/ExecutionScope.java +++ b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/ExecutionScope.java @@ -1,6 +1,7 @@ package io.javaoperatorsdk.operator.processing; import io.fabric8.kubernetes.client.CustomResource; +import io.javaoperatorsdk.operator.api.RetryInfo; import io.javaoperatorsdk.operator.processing.event.Event; import java.util.List; @@ -10,9 +11,12 @@ public class ExecutionScope { // the latest custom resource from cache private CustomResource customResource; - public ExecutionScope(List list, CustomResource customResource) { + private RetryInfo retryInfo; + + public ExecutionScope(List list, CustomResource customResource, RetryInfo retryInfo) { this.events = list; this.customResource = customResource; + this.retryInfo = retryInfo; } public List getEvents() { @@ -38,4 +42,8 @@ public String toString() { + customResource.getMetadata().getResourceVersion() + '}'; } + + public RetryInfo getRetryInfo() { + return retryInfo; + } } diff --git a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/retry/GenericRetryExecution.java b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/retry/GenericRetryExecution.java index b4618dc017..172328c72d 100644 --- a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/retry/GenericRetryExecution.java +++ b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/retry/GenericRetryExecution.java @@ -29,7 +29,12 @@ public Optional nextDelay() { } @Override - public boolean isLastExecution() { + public boolean isLastAttempt() { return genericRetry.getMaxAttempts() > -1 && lastAttemptIndex >= genericRetry.getMaxAttempts(); } + + @Override + public int getCurrentAttemptIndex() { + return lastAttemptIndex; + } } diff --git a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/retry/RetryExecution.java b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/retry/RetryExecution.java index b14a5966fb..30087282de 100644 --- a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/retry/RetryExecution.java +++ b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/retry/RetryExecution.java @@ -16,5 +16,7 @@ public interface RetryExecution { * @return true, if the last returned delay is, the last returned values, thus there will be no * further retry */ - boolean isLastExecution(); + boolean isLastAttempt(); + + int getCurrentAttemptIndex(); } diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/EventDispatcherTest.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/EventDispatcherTest.java index 63f7dcbf2f..37301727e7 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/EventDispatcherTest.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/EventDispatcherTest.java @@ -1,13 +1,12 @@ package io.javaoperatorsdk.operator; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.*; import io.fabric8.kubernetes.client.CustomResource; import io.fabric8.kubernetes.client.Watcher; -import io.javaoperatorsdk.operator.api.DeleteControl; -import io.javaoperatorsdk.operator.api.ResourceController; -import io.javaoperatorsdk.operator.api.UpdateControl; +import io.javaoperatorsdk.operator.api.*; import io.javaoperatorsdk.operator.processing.EventDispatcher; import io.javaoperatorsdk.operator.processing.ExecutionScope; import io.javaoperatorsdk.operator.processing.event.Event; @@ -18,6 +17,7 @@ import java.util.List; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatchers; class EventDispatcherTest { @@ -177,6 +177,20 @@ void executeControllerRegardlessGenerationInNonGenerationAwareMode() { verify(controller, times(2)).createOrUpdateResource(eq(testCustomResource), any()); } + @Test + void propagatesRetryInfoToContext() { + eventDispatcher.handleExecution( + new ExecutionScope(Arrays.asList(), testCustomResource, new RetryInfo(2, true))); + + ArgumentCaptor> contextArgumentCaptor = + ArgumentCaptor.forClass(Context.class); + verify(controller, times(1)) + .createOrUpdateResource(eq(testCustomResource), contextArgumentCaptor.capture()); + Context context = contextArgumentCaptor.getValue(); + assertThat(context.getRetryInfo().get().getAttemptIndex()).isEqualTo(2); + assertThat(context.getRetryInfo().get().isLastAttempt()).isEqualTo(true); + } + private void markForDeletion(CustomResource customResource) { customResource.getMetadata().setDeletionTimestamp("2019-8-10"); } @@ -191,6 +205,6 @@ public ExecutionScope executionScopeWithCREvent( List eventList = new ArrayList<>(1 + otherEvents.length); eventList.add(event); eventList.addAll(Arrays.asList(otherEvents)); - return new ExecutionScope(eventList, resource); + return new ExecutionScope(eventList, resource, null); } } diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/IntegrationTestSupport.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/IntegrationTestSupport.java index bc6ef2db01..78512f6fb0 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/IntegrationTestSupport.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/IntegrationTestSupport.java @@ -16,6 +16,7 @@ import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext; import io.fabric8.kubernetes.client.utils.Serialization; import io.javaoperatorsdk.operator.api.ResourceController; +import io.javaoperatorsdk.operator.processing.retry.Retry; import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; import io.javaoperatorsdk.operator.sample.simple.TestCustomResourceSpec; import java.io.IOException; @@ -41,6 +42,11 @@ public class IntegrationTestSupport { public void initialize( KubernetesClient k8sClient, ResourceController controller, String crdPath) { + initialize(k8sClient, controller, crdPath, null); + } + + public void initialize( + KubernetesClient k8sClient, ResourceController controller, String crdPath, Retry retry) { log.info("Initializing integration test in namespace {}", TEST_NAMESPACE); this.k8sClient = k8sClient; CustomResourceDefinition crd = loadCRDAndApplyToCluster(crdPath); @@ -62,7 +68,7 @@ public void initialize( .build()); } operator = new Operator(k8sClient); - operator.registerController(controller, TEST_NAMESPACE); + operator.registerController(controller, retry, TEST_NAMESPACE); log.info("Operator is running with {}", controller.getClass().getCanonicalName()); } diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/RetryIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/RetryIT.java new file mode 100644 index 0000000000..3b1a8bc82c --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/RetryIT.java @@ -0,0 +1,72 @@ +package io.javaoperatorsdk.operator; + +import static io.javaoperatorsdk.operator.IntegrationTestSupport.TEST_NAMESPACE; +import static io.javaoperatorsdk.operator.sample.event.EventSourceTestCustomResourceController.*; +import static io.javaoperatorsdk.operator.sample.retry.RetryTestCustomResourceStatus.State.SUCCESS; +import static org.assertj.core.api.Assertions.assertThat; + +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.fabric8.kubernetes.client.DefaultKubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.javaoperatorsdk.operator.processing.retry.GenericRetry; +import io.javaoperatorsdk.operator.processing.retry.Retry; +import io.javaoperatorsdk.operator.sample.retry.RetryTestCustomResource; +import io.javaoperatorsdk.operator.sample.retry.RetryTestCustomResourceController; +import io.javaoperatorsdk.operator.sample.retry.RetryTestCustomResourceSpec; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class RetryIT { + + public static final int RETRY_INTERVAL = 150; + private IntegrationTestSupport integrationTestSupport = new IntegrationTestSupport(); + + @BeforeEach + public void initAndCleanup() { + Retry retry = + new GenericRetry().setInitialInterval(RETRY_INTERVAL).withLinearRetry().setMaxAttempts(5); + KubernetesClient k8sClient = new DefaultKubernetesClient(); + integrationTestSupport.initialize( + k8sClient, new RetryTestCustomResourceController(), "retry-test-crd.yaml", retry); + integrationTestSupport.cleanup(); + } + + @Test + public void retryFailedExecution() { + integrationTestSupport.teardownIfSuccess( + () -> { + RetryTestCustomResource resource = createTestCustomResource("1"); + integrationTestSupport.getCrOperations().inNamespace(TEST_NAMESPACE).create(resource); + + Thread.sleep( + RETRY_INTERVAL * (RetryTestCustomResourceController.NUMBER_FAILED_EXECUTIONS + 2)); + + assertThat(integrationTestSupport.numberOfControllerExecutions()) + .isGreaterThanOrEqualTo( + RetryTestCustomResourceController.NUMBER_FAILED_EXECUTIONS + 1); + + RetryTestCustomResource finalResource = + (RetryTestCustomResource) + integrationTestSupport + .getCrOperations() + .inNamespace(TEST_NAMESPACE) + .withName(resource.getMetadata().getName()) + .get(); + assertThat(finalResource.getStatus().getState()).isEqualTo(SUCCESS); + }); + } + + public RetryTestCustomResource createTestCustomResource(String id) { + RetryTestCustomResource resource = new RetryTestCustomResource(); + resource.setMetadata( + new ObjectMetaBuilder() + .withName("retrysource-" + id) + .withNamespace(TEST_NAMESPACE) + .withFinalizers(RetryTestCustomResourceController.FINALIZER_NAME) + .build()); + resource.setKind("retrysample"); + resource.setSpec(new RetryTestCustomResourceSpec()); + resource.getSpec().setValue(id); + return resource; + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java index 2e83141421..4f37d16b13 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java @@ -44,8 +44,8 @@ class DefaultEventHandlerTest { public void setup() { when(defaultEventSourceManagerMock.getRetryTimerEventSource()) .thenReturn(retryTimerEventSourceMock); - defaultEventHandler.setDefaultEventSourceManager(defaultEventSourceManagerMock); - defaultEventHandlerWithRetry.setDefaultEventSourceManager(defaultEventSourceManagerMock); + defaultEventHandler.setEventSourceManager(defaultEventSourceManagerMock); + defaultEventHandlerWithRetry.setEventSourceManager(defaultEventSourceManagerMock); } @Test @@ -112,7 +112,7 @@ public void schedulesAnEventRetryOnException() { Event event = prepareCREvent(); TestCustomResource customResource = testCustomResource(); - ExecutionScope executionScope = new ExecutionScope(Arrays.asList(event), customResource); + ExecutionScope executionScope = new ExecutionScope(Arrays.asList(event), customResource, null); PostExecutionControl postExecutionControl = PostExecutionControl.exceptionDuringExecution(new RuntimeException("test")); @@ -127,7 +127,7 @@ public void executesTheControllerInstantlyAfterErrorIfEventsBuffered() { Event event = prepareCREvent(); TestCustomResource customResource = testCustomResource(); customResource.getMetadata().setUid(event.getRelatedCustomResourceUid()); - ExecutionScope executionScope = new ExecutionScope(Arrays.asList(event), customResource); + ExecutionScope executionScope = new ExecutionScope(Arrays.asList(event), customResource, null); PostExecutionControl postExecutionControl = PostExecutionControl.exceptionDuringExecution(new RuntimeException("test")); @@ -151,6 +151,39 @@ public void executesTheControllerInstantlyAfterErrorIfEventsBuffered() { .scheduleOnce(eq(customResource), eq(GenericRetry.DEFAULT_INITIAL_INTERVAL)); } + @Test + public void successfulExecutionResetsTheRetry() { + Event event = prepareCREvent(); + TestCustomResource customResource = testCustomResource(); + customResource.getMetadata().setUid(event.getRelatedCustomResourceUid()); + ExecutionScope executionScope = new ExecutionScope(Arrays.asList(event), customResource, null); + PostExecutionControl postExecutionControlWithException = + PostExecutionControl.exceptionDuringExecution(new RuntimeException("test")); + PostExecutionControl defaultDispatchControl = PostExecutionControl.defaultDispatch(); + + defaultEventHandlerWithRetry.handleEvent(event); + defaultEventHandlerWithRetry.eventProcessingFinished( + executionScope, postExecutionControlWithException); + + defaultEventHandlerWithRetry.handleEvent(event); + defaultEventHandlerWithRetry.eventProcessingFinished(executionScope, defaultDispatchControl); + + defaultEventHandlerWithRetry.handleEvent(event); + + ArgumentCaptor executionScopeArgumentCaptor = + ArgumentCaptor.forClass(ExecutionScope.class); + verify(eventDispatcherMock, timeout(SEPARATE_EXECUTION_TIMEOUT).times(3)) + .handleExecution(executionScopeArgumentCaptor.capture()); + + List executionScopes = executionScopeArgumentCaptor.getAllValues(); + + assertThat(executionScopes).hasSize(3); + assertThat(executionScopes.get(0).getRetryInfo()).isNull(); + assertThat(executionScopes.get(2).getRetryInfo()).isNull(); + assertThat(executionScopes.get(1).getRetryInfo().getAttemptIndex()).isEqualTo(1); + assertThat(executionScopes.get(1).getRetryInfo().isLastAttempt()).isEqualTo(false); + } + private void waitMinimalTime() { try { Thread.sleep(50); diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/retry/GenericRetryExecutionTest.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/retry/GenericRetryExecutionTest.java index 5feadecafa..a84a7f00f1 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/retry/GenericRetryExecutionTest.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/retry/GenericRetryExecutionTest.java @@ -67,11 +67,20 @@ public void supportsNoRetry() { @Test public void supportsIsLastExecution() { GenericRetryExecution execution = new GenericRetry().setMaxAttempts(2).initExecution(); - assertThat(execution.isLastExecution()).isFalse(); + assertThat(execution.isLastAttempt()).isFalse(); execution.nextDelay(); execution.nextDelay(); - assertThat(execution.isLastExecution()).isTrue(); + assertThat(execution.isLastAttempt()).isTrue(); + } + + @Test + public void returnAttemptIndex() { + RetryExecution retryExecution = GenericRetry.defaultLimitedExponentialRetry().initExecution(); + + assertThat(retryExecution.getCurrentAttemptIndex()).isEqualTo(0); + retryExecution.nextDelay(); + assertThat(retryExecution.getCurrentAttemptIndex()).isEqualTo(1); } private RetryExecution getDefaultRetryExecution() { diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/retry/RetryTestCustomResource.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/retry/RetryTestCustomResource.java new file mode 100644 index 0000000000..a351a78a33 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/retry/RetryTestCustomResource.java @@ -0,0 +1,38 @@ +package io.javaoperatorsdk.operator.sample.retry; + +import io.fabric8.kubernetes.client.CustomResource; + +public class RetryTestCustomResource extends CustomResource { + + private RetryTestCustomResourceSpec spec; + + private RetryTestCustomResourceStatus status; + + public RetryTestCustomResourceSpec getSpec() { + return spec; + } + + public void setSpec(RetryTestCustomResourceSpec spec) { + this.spec = spec; + } + + public RetryTestCustomResourceStatus getStatus() { + return status; + } + + public void setStatus(RetryTestCustomResourceStatus status) { + this.status = status; + } + + @Override + public String toString() { + return "TestCustomResource{" + + "spec=" + + spec + + ", status=" + + status + + ", extendedFrom=" + + super.toString() + + '}'; + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/retry/RetryTestCustomResourceController.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/retry/RetryTestCustomResourceController.java new file mode 100644 index 0000000000..6554446922 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/retry/RetryTestCustomResourceController.java @@ -0,0 +1,61 @@ +package io.javaoperatorsdk.operator.sample.retry; + +import io.javaoperatorsdk.operator.TestExecutionInfoProvider; +import io.javaoperatorsdk.operator.api.*; +import java.util.concurrent.atomic.AtomicInteger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Controller(crdName = RetryTestCustomResourceController.CRD_NAME) +public class RetryTestCustomResourceController + implements ResourceController, TestExecutionInfoProvider { + + public static final int NUMBER_FAILED_EXECUTIONS = 2; + + public static final String CRD_NAME = "retrysamples.sample.javaoperatorsdk"; + public static final String FINALIZER_NAME = CRD_NAME + "/finalizer"; + private static final Logger log = + LoggerFactory.getLogger(RetryTestCustomResourceController.class); + private final AtomicInteger numberOfExecutions = new AtomicInteger(0); + + @Override + public DeleteControl deleteResource( + RetryTestCustomResource resource, Context context) { + return DeleteControl.DEFAULT_DELETE; + } + + @Override + public UpdateControl createOrUpdateResource( + RetryTestCustomResource resource, Context context) { + numberOfExecutions.addAndGet(1); + + if (!resource.getMetadata().getFinalizers().contains(FINALIZER_NAME)) { + throw new IllegalStateException("Finalizer is not present."); + } + log.info("Value: " + resource.getSpec().getValue()); + + if (numberOfExecutions.get() < NUMBER_FAILED_EXECUTIONS + 1) { + throw new RuntimeException("Testing Retry"); + } + if (context.getRetryInfo().isEmpty() || context.getRetryInfo().get().isLastAttempt() == true) { + throw new IllegalStateException("Not expected retry info: " + context.getRetryInfo()); + } + + ensureStatusExists(resource); + resource.getStatus().setState(RetryTestCustomResourceStatus.State.SUCCESS); + + return UpdateControl.updateStatusSubResource(resource); + } + + private void ensureStatusExists(RetryTestCustomResource resource) { + RetryTestCustomResourceStatus status = resource.getStatus(); + if (status == null) { + status = new RetryTestCustomResourceStatus(); + resource.setStatus(status); + } + } + + public int getNumberOfExecutions() { + return numberOfExecutions.get(); + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/retry/RetryTestCustomResourceSpec.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/retry/RetryTestCustomResourceSpec.java new file mode 100644 index 0000000000..ec34d11df8 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/retry/RetryTestCustomResourceSpec.java @@ -0,0 +1,15 @@ +package io.javaoperatorsdk.operator.sample.retry; + +public class RetryTestCustomResourceSpec { + + private String value; + + public String getValue() { + return value; + } + + public RetryTestCustomResourceSpec setValue(String value) { + this.value = value; + return this; + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/retry/RetryTestCustomResourceStatus.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/retry/RetryTestCustomResourceStatus.java new file mode 100644 index 0000000000..5514c04465 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/retry/RetryTestCustomResourceStatus.java @@ -0,0 +1,20 @@ +package io.javaoperatorsdk.operator.sample.retry; + +public class RetryTestCustomResourceStatus { + + private State state; + + public State getState() { + return state; + } + + public RetryTestCustomResourceStatus setState(State state) { + this.state = state; + return this; + } + + public enum State { + SUCCESS, + ERROR + } +} diff --git a/operator-framework/src/test/resources/io/javaoperatorsdk/operator/retry-test-crd.yaml b/operator-framework/src/test/resources/io/javaoperatorsdk/operator/retry-test-crd.yaml new file mode 100644 index 0000000000..e61f3202ca --- /dev/null +++ b/operator-framework/src/test/resources/io/javaoperatorsdk/operator/retry-test-crd.yaml @@ -0,0 +1,16 @@ +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + name: retrysamples.sample.javaoperatorsdk +spec: + group: sample.javaoperatorsdk + version: v1 + subresources: + status: {} + scope: Namespaced + names: + plural: retrysamples + singular: retrysample + kind: retrysample + shortNames: + - rs From 1affee484a7d8b338e31d01216a6839936269014 Mon Sep 17 00:00:00 2001 From: Soroosh Sarabadani Date: Fri, 11 Dec 2020 16:58:05 +0100 Subject: [PATCH 06/10] resolve conflict --- .../operator/processing/DefaultEventHandlerTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java index b4c3cdd888..9731736be8 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java @@ -2,8 +2,10 @@ import static io.javaoperatorsdk.operator.TestUtils.testCustomResource; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; From b1d9f1d4f88a4fa09b5c4bb5d5527881271d2169 Mon Sep 17 00:00:00 2001 From: csviri Date: Mon, 14 Dec 2020 14:03:01 +0100 Subject: [PATCH 07/10] renaming from PR comments, test info message --- .../operator/api/RetryInfo.java | 23 +++---------------- .../processing/DefaultEventHandler.java | 7 +----- .../retry/GenericRetryExecution.java | 2 +- .../processing/retry/RetryExecution.java | 11 ++------- .../operator/EventDispatcherTest.java | 17 ++++++++++++-- .../processing/DefaultEventHandlerTest.java | 8 ++++++- .../retry/GenericRetryExecutionTest.java | 4 ++-- 7 files changed, 31 insertions(+), 41 deletions(-) diff --git a/operator-framework/src/main/java/io/javaoperatorsdk/operator/api/RetryInfo.java b/operator-framework/src/main/java/io/javaoperatorsdk/operator/api/RetryInfo.java index ce90e04f8a..92149012a7 100644 --- a/operator-framework/src/main/java/io/javaoperatorsdk/operator/api/RetryInfo.java +++ b/operator-framework/src/main/java/io/javaoperatorsdk/operator/api/RetryInfo.java @@ -1,25 +1,8 @@ package io.javaoperatorsdk.operator.api; -public class RetryInfo { +public interface RetryInfo { - private int attemptIndex; - private boolean lastAttempt; + int getAttemptCount(); - public RetryInfo(int retryNumber, boolean lastAttempt) { - this.attemptIndex = retryNumber; - this.lastAttempt = lastAttempt; - } - - public int getAttemptIndex() { - return attemptIndex; - } - - public boolean isLastAttempt() { - return lastAttempt; - } - - @Override - public String toString() { - return "RetryInfo{" + "attemptIndex=" + attemptIndex + ", lastAttempt=" + lastAttempt + '}'; - } + boolean isLastAttempt(); } diff --git a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java index ba14738a61..f42be0f9eb 100644 --- a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java +++ b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java @@ -103,12 +103,7 @@ private void executeBufferedEvents(String customResourceUid) { } private RetryInfo retryInfo(String customResourceUid) { - RetryExecution retryExecution = retryState.get(customResourceUid); - if (retryExecution != null) { - return new RetryInfo(retryExecution.getCurrentAttemptIndex(), retryExecution.isLastAttempt()); - } else { - return null; - } + return retryState.get(customResourceUid); } void eventProcessingFinished( diff --git a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/retry/GenericRetryExecution.java b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/retry/GenericRetryExecution.java index 172328c72d..a2c7a9a609 100644 --- a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/retry/GenericRetryExecution.java +++ b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/retry/GenericRetryExecution.java @@ -34,7 +34,7 @@ public boolean isLastAttempt() { } @Override - public int getCurrentAttemptIndex() { + public int getAttemptCount() { return lastAttemptIndex; } } diff --git a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/retry/RetryExecution.java b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/retry/RetryExecution.java index 30087282de..e5fb22ae62 100644 --- a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/retry/RetryExecution.java +++ b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/retry/RetryExecution.java @@ -1,8 +1,9 @@ package io.javaoperatorsdk.operator.processing.retry; +import io.javaoperatorsdk.operator.api.RetryInfo; import java.util.Optional; -public interface RetryExecution { +public interface RetryExecution extends RetryInfo { /** * Calculates the delay for the next execution. This method should return 0, when called first @@ -11,12 +12,4 @@ public interface RetryExecution { * @return */ Optional nextDelay(); - - /** - * @return true, if the last returned delay is, the last returned values, thus there will be no - * further retry - */ - boolean isLastAttempt(); - - int getCurrentAttemptIndex(); } diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/EventDispatcherTest.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/EventDispatcherTest.java index 9a58bb0194..4e37bc573d 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/EventDispatcherTest.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/EventDispatcherTest.java @@ -187,14 +187,27 @@ void executeControllerRegardlessGenerationInNonGenerationAwareMode() { @Test void propagatesRetryInfoToContext() { eventDispatcher.handleExecution( - new ExecutionScope(Arrays.asList(), testCustomResource, new RetryInfo(2, true))); + new ExecutionScope( + Arrays.asList(), + testCustomResource, + new RetryInfo() { + @Override + public int getAttemptCount() { + return 2; + } + + @Override + public boolean isLastAttempt() { + return true; + } + })); ArgumentCaptor> contextArgumentCaptor = ArgumentCaptor.forClass(Context.class); verify(controller, times(1)) .createOrUpdateResource(eq(testCustomResource), contextArgumentCaptor.capture()); Context context = contextArgumentCaptor.getValue(); - assertThat(context.getRetryInfo().get().getAttemptIndex()).isEqualTo(2); + assertThat(context.getRetryInfo().get().getAttemptCount()).isEqualTo(2); assertThat(context.getRetryInfo().get().isLastAttempt()).isEqualTo(true); } diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java index 9731736be8..8595c40348 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java @@ -26,9 +26,13 @@ import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; class DefaultEventHandlerTest { + private static final Logger log = LoggerFactory.getLogger(DefaultEventHandlerTest.class); + public static final int FAKE_CONTROLLER_EXECUTION_DURATION = 250; public static final int SEPARATE_EXECUTION_TIMEOUT = 450; private EventDispatcher eventDispatcherMock = mock(EventDispatcher.class); @@ -160,6 +164,8 @@ public void executesTheControllerInstantlyAfterErrorIfEventsBuffered() { @Test public void successfulExecutionResetsTheRetry() { + log.info("Starting successfulExecutionResetsTheRetry"); + Event event = prepareCREvent(); TestCustomResource customResource = testCustomResource(); customResource.getMetadata().setUid(event.getRelatedCustomResourceUid()); @@ -187,7 +193,7 @@ public void successfulExecutionResetsTheRetry() { assertThat(executionScopes).hasSize(3); assertThat(executionScopes.get(0).getRetryInfo()).isNull(); assertThat(executionScopes.get(2).getRetryInfo()).isNull(); - assertThat(executionScopes.get(1).getRetryInfo().getAttemptIndex()).isEqualTo(1); + assertThat(executionScopes.get(1).getRetryInfo().getAttemptCount()).isEqualTo(1); assertThat(executionScopes.get(1).getRetryInfo().isLastAttempt()).isEqualTo(false); } diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/retry/GenericRetryExecutionTest.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/retry/GenericRetryExecutionTest.java index a84a7f00f1..df934fdec5 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/retry/GenericRetryExecutionTest.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/retry/GenericRetryExecutionTest.java @@ -78,9 +78,9 @@ public void supportsIsLastExecution() { public void returnAttemptIndex() { RetryExecution retryExecution = GenericRetry.defaultLimitedExponentialRetry().initExecution(); - assertThat(retryExecution.getCurrentAttemptIndex()).isEqualTo(0); + assertThat(retryExecution.getAttemptCount()).isEqualTo(0); retryExecution.nextDelay(); - assertThat(retryExecution.getCurrentAttemptIndex()).isEqualTo(1); + assertThat(retryExecution.getAttemptCount()).isEqualTo(1); } private RetryExecution getDefaultRetryExecution() { From 6ac3ad0c26022ae0d778f78bda741d1be76170a5 Mon Sep 17 00:00:00 2001 From: csviri Date: Mon, 14 Dec 2020 14:24:14 +0100 Subject: [PATCH 08/10] improved logging --- .../processing/DefaultEventHandler.java | 22 ++++++++++++++----- .../processing/DefaultEventHandlerTest.java | 2 ++ 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java index f42be0f9eb..1d9b21194f 100644 --- a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java +++ b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java @@ -110,7 +110,10 @@ void eventProcessingFinished( ExecutionScope executionScope, PostExecutionControl postExecutionControl) { try { lock.lock(); - log.debug("Event processing finished. Scope: {}", executionScope); + log.debug( + "Event processing finished. Scope: {}, PostExecutionControl: {}", + executionScope, + postExecutionControl); unsetUnderExecution(executionScope.getCustomResourceUid()); if (retry != null && postExecutionControl.exceptionDuringExecution()) { @@ -143,18 +146,27 @@ private void handleRetryOnException(ExecutionScope executionScope) { eventBuffer.putBackEvents(executionScope.getCustomResourceUid(), executionScope.getEvents()); if (newEventsExists) { + log.debug("New events exists for for resource id: {}", executionScope.getCustomResourceUid()); executeBufferedEvents(executionScope.getCustomResourceUid()); return; } Optional nextDelay = execution.nextDelay(); + nextDelay.ifPresent( - delay -> - eventSourceManager - .getRetryTimerEventSource() - .scheduleOnce(executionScope.getCustomResource(), delay)); + delay -> { + log.debug( + "Scheduling timer event for retry with delay:{} for resource: {}", + delay, + executionScope.getCustomResourceUid()); + eventSourceManager + .getRetryTimerEventSource() + .scheduleOnce(executionScope.getCustomResource(), delay); + }); } private void markSuccessfulExecutionRegardingRetry(ExecutionScope executionScope) { + log.debug( + "Marking successful execution for resource: {}", executionScope.getCustomResourceUid()); retryState.remove(executionScope.getCustomResourceUid()); eventSourceManager .getRetryTimerEventSource() diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java index 8595c40348..f9ec01393d 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java @@ -183,6 +183,8 @@ public void successfulExecutionResetsTheRetry() { defaultEventHandlerWithRetry.handleEvent(event); + log.info("Finished successfulExecutionResetsTheRetry"); + ArgumentCaptor executionScopeArgumentCaptor = ArgumentCaptor.forClass(ExecutionScope.class); verify(eventDispatcherMock, timeout(SEPARATE_EXECUTION_TIMEOUT).times(3)) From 4850606b741be8eab0e20ae3141dee4c8521b7d3 Mon Sep 17 00:00:00 2001 From: csviri Date: Mon, 14 Dec 2020 14:37:34 +0100 Subject: [PATCH 09/10] retry test fix --- .../processing/PostExecutionControl.java | 9 ++++++++ .../processing/DefaultEventHandlerTest.java | 23 +++++++++++-------- 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/PostExecutionControl.java b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/PostExecutionControl.java index 072136e046..35a39574d4 100644 --- a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/PostExecutionControl.java +++ b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/PostExecutionControl.java @@ -55,4 +55,13 @@ public boolean exceptionDuringExecution() { public Optional getRuntimeException() { return Optional.ofNullable(runtimeException); } + + @Override + public String toString() { + return "PostExecutionControl{" + + "onlyFinalizerHandled=" + onlyFinalizerHandled + + ", updatedCustomResource=" + updatedCustomResource + + ", runtimeException=" + runtimeException + + '}'; + } } diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java index f9ec01393d..8a66a4594c 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java @@ -34,7 +34,7 @@ class DefaultEventHandlerTest { private static final Logger log = LoggerFactory.getLogger(DefaultEventHandlerTest.class); public static final int FAKE_CONTROLLER_EXECUTION_DURATION = 250; - public static final int SEPARATE_EXECUTION_TIMEOUT = 450; + public static final int SEPARATE_EXECUTION_TIMEOUT = 150; private EventDispatcher eventDispatcherMock = mock(EventDispatcher.class); private CustomResourceCache customResourceCache = new CustomResourceCache(); private DefaultEventSourceManager defaultEventSourceManagerMock = @@ -174,21 +174,26 @@ public void successfulExecutionResetsTheRetry() { PostExecutionControl.exceptionDuringExecution(new RuntimeException("test")); PostExecutionControl defaultDispatchControl = PostExecutionControl.defaultDispatch(); - defaultEventHandlerWithRetry.handleEvent(event); - defaultEventHandlerWithRetry.eventProcessingFinished( - executionScope, postExecutionControlWithException); + when(eventDispatcherMock.handleExecution(any())) + .thenReturn(postExecutionControlWithException) + .thenReturn(defaultDispatchControl); + + ArgumentCaptor executionScopeArgumentCaptor = + ArgumentCaptor.forClass(ExecutionScope.class); defaultEventHandlerWithRetry.handleEvent(event); - defaultEventHandlerWithRetry.eventProcessingFinished(executionScope, defaultDispatchControl); + verify(eventDispatcherMock, timeout(SEPARATE_EXECUTION_TIMEOUT).times(1)) + .handleExecution(any()); defaultEventHandlerWithRetry.handleEvent(event); - log.info("Finished successfulExecutionResetsTheRetry"); + verify(eventDispatcherMock, timeout(SEPARATE_EXECUTION_TIMEOUT).times(2)) + .handleExecution(any()); + defaultEventHandlerWithRetry.handleEvent(event); - ArgumentCaptor executionScopeArgumentCaptor = - ArgumentCaptor.forClass(ExecutionScope.class); verify(eventDispatcherMock, timeout(SEPARATE_EXECUTION_TIMEOUT).times(3)) - .handleExecution(executionScopeArgumentCaptor.capture()); + .handleExecution(executionScopeArgumentCaptor.capture()); + log.info("Finished successfulExecutionResetsTheRetry"); List executionScopes = executionScopeArgumentCaptor.getAllValues(); From f30f35e77318117fc185cf9d5b66402a7d21ab9c Mon Sep 17 00:00:00 2001 From: csviri Date: Mon, 14 Dec 2020 15:31:01 +0100 Subject: [PATCH 10/10] test fix --- .../operator/processing/PostExecutionControl.java | 13 ++++++++----- .../processing/DefaultEventHandlerTest.java | 14 +++++++------- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/PostExecutionControl.java b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/PostExecutionControl.java index 35a39574d4..1e0c82f1b2 100644 --- a/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/PostExecutionControl.java +++ b/operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/PostExecutionControl.java @@ -58,10 +58,13 @@ public Optional getRuntimeException() { @Override public String toString() { - return "PostExecutionControl{" + - "onlyFinalizerHandled=" + onlyFinalizerHandled + - ", updatedCustomResource=" + updatedCustomResource + - ", runtimeException=" + runtimeException + - '}'; + return "PostExecutionControl{" + + "onlyFinalizerHandled=" + + onlyFinalizerHandled + + ", updatedCustomResource=" + + updatedCustomResource + + ", runtimeException=" + + runtimeException + + '}'; } } diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java index 8a66a4594c..b998518d53 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java @@ -34,7 +34,7 @@ class DefaultEventHandlerTest { private static final Logger log = LoggerFactory.getLogger(DefaultEventHandlerTest.class); public static final int FAKE_CONTROLLER_EXECUTION_DURATION = 250; - public static final int SEPARATE_EXECUTION_TIMEOUT = 150; + public static final int SEPARATE_EXECUTION_TIMEOUT = 450; private EventDispatcher eventDispatcherMock = mock(EventDispatcher.class); private CustomResourceCache customResourceCache = new CustomResourceCache(); private DefaultEventSourceManager defaultEventSourceManagerMock = @@ -175,24 +175,24 @@ public void successfulExecutionResetsTheRetry() { PostExecutionControl defaultDispatchControl = PostExecutionControl.defaultDispatch(); when(eventDispatcherMock.handleExecution(any())) - .thenReturn(postExecutionControlWithException) - .thenReturn(defaultDispatchControl); + .thenReturn(postExecutionControlWithException) + .thenReturn(defaultDispatchControl); ArgumentCaptor executionScopeArgumentCaptor = - ArgumentCaptor.forClass(ExecutionScope.class); + ArgumentCaptor.forClass(ExecutionScope.class); defaultEventHandlerWithRetry.handleEvent(event); verify(eventDispatcherMock, timeout(SEPARATE_EXECUTION_TIMEOUT).times(1)) - .handleExecution(any()); + .handleExecution(any()); defaultEventHandlerWithRetry.handleEvent(event); verify(eventDispatcherMock, timeout(SEPARATE_EXECUTION_TIMEOUT).times(2)) - .handleExecution(any()); + .handleExecution(any()); defaultEventHandlerWithRetry.handleEvent(event); verify(eventDispatcherMock, timeout(SEPARATE_EXECUTION_TIMEOUT).times(3)) - .handleExecution(executionScopeArgumentCaptor.capture()); + .handleExecution(executionScopeArgumentCaptor.capture()); log.info("Finished successfulExecutionResetsTheRetry"); List executionScopes = executionScopeArgumentCaptor.getAllValues();