diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerConfiguration.java index acb5bd23c4..0d08b65f99 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerConfiguration.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerConfiguration.java @@ -37,6 +37,7 @@ protected DefaultInformerConfiguration(ConfigurationService service, String labe Objects.requireNonNullElseGet(associatedWith, () -> ResourceID::fromResource); } + public PrimaryResourcesRetriever getPrimaryResourcesRetriever() { return secondaryToPrimaryResourcesIdSet; } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java index 89aabe0d9b..e8996c351c 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java @@ -214,7 +214,7 @@ public void start() throws OperatorException { } final var context = new EventSourceContext<>( - eventSourceManager.getControllerResourceEventSource().getResourceCache(), + eventSourceManager.getControllerResourceEventSource(), configurationService(), kubernetesClient); prepareEventSources(context).forEach(eventSourceManager::registerEventSource); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependent.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependent.java index c0922f569f..45d2095d1a 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependent.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependent.java @@ -32,5 +32,4 @@ * @return the label selector */ String labelSelector() default EMPTY_STRING; - } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java index a0513b1234..8ce3560fdc 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java @@ -75,17 +75,16 @@ private void configureWith(ConfigurationService configService, String labelSelec .withPrimaryResourcesRetriever(primaryResourcesRetriever) .withAssociatedSecondaryResourceIdentifier(secondaryResourceIdentifier) .build(); - configureWith(configService, new InformerEventSource<>(ic, client), addOwnerReference); + configureWith(new InformerEventSource<>(ic, client), addOwnerReference); } /** * Use to share informers between event more resources. - * - * @param configurationService get configs + * * @param informerEventSource informer to use * @param addOwnerReference to the created resource */ - public void configureWith(ConfigurationService configurationService, + public void configureWith( InformerEventSource informerEventSource, boolean addOwnerReference) { this.informerEventSource = informerEventSource; @@ -93,12 +92,29 @@ public void configureWith(ConfigurationService configurationService, } public void create(R target, P primary, Context context) { - prepare(target, primary, "Creating").create(target); + var resourceID = ResourceID.fromResource(target); + try { + informerEventSource.prepareForCreateOrUpdateEventFiltering(resourceID); + var created = prepare(target, primary, "Creating").create(target); + informerEventSource.handleRecentResourceCreate(created); + } catch (RuntimeException e) { + informerEventSource.cleanupOnCreateOrUpdateEventFiltering(resourceID); + throw e; + } } public void update(R actual, R target, P primary, Context context) { - var updatedActual = processor.replaceSpecOnActual(actual, target, context); - prepare(target, primary, "Updating").replace(updatedActual); + var resourceID = ResourceID.fromResource(target); + try { + var updatedActual = processor.replaceSpecOnActual(actual, target, context); + informerEventSource.prepareForCreateOrUpdateEventFiltering(resourceID); + var updated = prepare(target, primary, "Updating").replace(updatedActual); + informerEventSource.handleRecentResourceUpdate(updated, + actual.getMetadata().getResourceVersion()); + } catch (RuntimeException e) { + informerEventSource.cleanupOnCreateOrUpdateEventFiltering(resourceID); + throw e; + } } public boolean match(R actualResource, R desiredResource, Context context) { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java index 29f9adab55..81e637a2f6 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java @@ -29,7 +29,6 @@ import io.javaoperatorsdk.operator.processing.retry.RetryExecution; import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getName; -import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getVersion; class EventProcessor implements EventHandler, LifecycleAware { @@ -50,7 +49,7 @@ class EventProcessor implements EventHandler, LifecycleAw EventProcessor(EventSourceManager eventSourceManager) { this( - eventSourceManager.getControllerResourceEventSource().getResourceCache(), + eventSourceManager.getControllerResourceEventSource(), ExecutorServiceManager.instance().executorService(), eventSourceManager.getController().getConfiguration().getName(), new ReconciliationDispatcher<>(eventSourceManager.getController()), @@ -73,7 +72,7 @@ class EventProcessor implements EventHandler, LifecycleAw Retry retry, Metrics metrics) { this( - eventSourceManager.getControllerResourceEventSource().getResourceCache(), + eventSourceManager.getControllerResourceEventSource(), null, relatedControllerName, reconciliationDispatcher, @@ -208,12 +207,12 @@ void eventProcessingFinished( if (eventMarker.deleteEventPresent(resourceID)) { cleanupForDeletedEvent(executionScope.getCustomResourceID()); } else { + postExecutionControl.getUpdatedCustomResource().ifPresent(r -> { + eventSourceManager.getControllerResourceEventSource().handleRecentResourceUpdate(r, + executionScope.getResource().getMetadata().getResourceVersion()); + }); if (eventMarker.eventPresent(resourceID)) { - if (isCacheReadyForInstantReconciliation(executionScope, postExecutionControl)) { - submitReconciliationExecution(resourceID); - } else { - postponeReconciliationAndHandleCacheSyncEvent(resourceID); - } + submitReconciliationExecution(resourceID); } else { reScheduleExecutionIfInstructed(postExecutionControl, executionScope.getResource()); } @@ -223,41 +222,6 @@ void eventProcessingFinished( } } - private void postponeReconciliationAndHandleCacheSyncEvent(ResourceID resourceID) { - eventSourceManager.getControllerResourceEventSource().whitelistNextEvent(resourceID); - } - - private boolean isCacheReadyForInstantReconciliation( - ExecutionScope executionScope, PostExecutionControl postExecutionControl) { - if (!postExecutionControl.customResourceUpdatedDuringExecution()) { - return true; - } - String originalResourceVersion = getVersion(executionScope.getResource()); - String customResourceVersionAfterExecution = - getVersion( - postExecutionControl - .getUpdatedCustomResource() - .orElseThrow( - () -> new IllegalStateException( - "Updated custom resource must be present at this point of time"))); - String cachedCustomResourceVersion = - getVersion( - cache - .get(executionScope.getCustomResourceID()) - .orElseThrow( - () -> new IllegalStateException( - "Cached custom resource must be present at this point"))); - - if (cachedCustomResourceVersion.equals(customResourceVersionAfterExecution)) { - return true; - } - // If the cached resource version equals neither the version before nor after execution - // probably an update happened on the custom resource independent of the framework during - // reconciliation. We cannot tell at this point if it happened before our update or before. - // (Well we could if we would parse resource version, but that should not be done by definition) - return !cachedCustomResourceVersion.equals(originalResourceVersion); - } - private void reScheduleExecutionIfInstructed( PostExecutionControl postExecutionControl, R customResource) { postExecutionControl diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java index 2a224a87df..801eae09ee 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java @@ -13,7 +13,6 @@ import io.javaoperatorsdk.operator.processing.Controller; import io.javaoperatorsdk.operator.processing.MDCUtils; import io.javaoperatorsdk.operator.processing.event.ResourceID; -import io.javaoperatorsdk.operator.processing.event.source.ResourceCache; import io.javaoperatorsdk.operator.processing.event.source.informer.ManagedInformerEventSource; import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getName; @@ -25,30 +24,19 @@ public class ControllerResourceEventSource implements ResourceEventHandler { public static final String ANY_NAMESPACE_MAP_KEY = "anyNamespace"; - private static final Logger log = LoggerFactory.getLogger(ControllerResourceEventSource.class); private final Controller controller; private final ResourceEventFilter filter; - private final OnceWhitelistEventFilterEventFilter onceWhitelistEventFilterEventFilter; public ControllerResourceEventSource(Controller controller) { super(controller.getCRClient(), controller.getConfiguration()); this.controller = controller; - var filters = new ResourceEventFilter[] { ResourceEventFilters.finalizerNeededAndApplied(), ResourceEventFilters.markedForDeletion(), ResourceEventFilters.generationAware(), - null }; - - if (controller.getConfiguration().isGenerationAware()) { - onceWhitelistEventFilterEventFilter = new OnceWhitelistEventFilterEventFilter<>(); - filters[filters.length - 1] = onceWhitelistEventFilterEventFilter; - } else { - onceWhitelistEventFilterEventFilter = null; - } if (controller.getConfiguration().getEventFilter() != null) { filter = controller.getConfiguration().getEventFilter().and(ResourceEventFilters.or(filters)); } else { @@ -87,36 +75,22 @@ public void eventReceived(ResourceAction action, T resource, T oldResource) { @Override public void onAdd(T resource) { + super.onAdd(resource); eventReceived(ResourceAction.ADDED, resource, null); } @Override public void onUpdate(T oldCustomResource, T newCustomResource) { + super.onUpdate(oldCustomResource, newCustomResource); eventReceived(ResourceAction.UPDATED, newCustomResource, oldCustomResource); } @Override public void onDelete(T resource, boolean b) { + super.onDelete(resource, b); eventReceived(ResourceAction.DELETED, resource, null); } - public ResourceCache getResourceCache() { - return manager(); - } - - /** - * This will ensure that the next event received after this method is called will not be filtered - * out. - * - * @param resourceID - to which the event is related - */ - public void whitelistNextEvent(ResourceID resourceID) { - if (onceWhitelistEventFilterEventFilter != null) { - onceWhitelistEventFilterEventFilter.whitelistNextEvent(resourceID); - } - } - - private void handleKubernetesClientException(Exception e) { KubernetesClientException ke = (KubernetesClientException) e; if (404 == ke.getCode()) { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventRecorder.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventRecorder.java new file mode 100644 index 0000000000..284b749f07 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventRecorder.java @@ -0,0 +1,71 @@ +package io.javaoperatorsdk.operator.processing.event.source.informer; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.processing.event.ResourceID; + +public class EventRecorder { + + private final Map> resourceEvents = new ConcurrentHashMap<>(); + + void startEventRecording(ResourceID resourceID) { + resourceEvents.putIfAbsent(resourceID, new ArrayList<>(5)); + } + + public boolean isRecordingFor(ResourceID resourceID) { + return resourceEvents.get(resourceID) != null; + } + + public void stopEventRecording(ResourceID resourceID) { + resourceEvents.remove(resourceID); + } + + public void recordEvent(R resource) { + resourceEvents.get(ResourceID.fromResource(resource)).add(resource); + } + + public boolean containsEventWithResourceVersion(ResourceID resourceID, String resourceVersion) { + List events = resourceEvents.get(resourceID); + if (events == null) { + return false; + } + if (events.isEmpty()) { + return false; + } else { + return events.stream() + .anyMatch(e -> e.getMetadata().getResourceVersion().equals(resourceVersion)); + } + } + + public boolean containsEventWithVersionButItsNotLastOne( + ResourceID resourceID, String resourceVersion) { + List resources = resourceEvents.get(resourceID); + if (resources == null) { + throw new IllegalStateException( + "Null events list, this is probably a result of invalid usage of the " + + "InformerEventSource. Resource ID: " + resourceID); + } + if (resources.isEmpty()) { + throw new IllegalStateException("No events for resource id: " + resourceID); + } + return !resources + .get(resources.size() - 1) + .getMetadata() + .getResourceVersion() + .equals(resourceVersion); + } + + public R getLastEvent(ResourceID resourceID) { + List resources = resourceEvents.get(resourceID); + if (resources == null) { + throw new IllegalStateException( + "Null events list, this is probably a result of invalid usage of the " + + "InformerEventSource. Resource ID: " + resourceID); + } + return resources.get(resources.size() - 1); + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java index a706e244ab..2f8705b63a 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java @@ -1,8 +1,9 @@ package io.javaoperatorsdk.operator.processing.event.source.informer; import java.util.Optional; -import java.util.function.Predicate; -import java.util.stream.Stream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.client.KubernetesClient; @@ -11,87 +12,224 @@ import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; import io.javaoperatorsdk.operator.processing.event.Event; import io.javaoperatorsdk.operator.processing.event.EventHandler; +import io.javaoperatorsdk.operator.processing.event.ResourceID; import io.javaoperatorsdk.operator.processing.event.source.ResourceCache; -public class InformerEventSource - extends ManagedInformerEventSource> - implements ResourceCache, ResourceEventHandler { +/** + *

+ * Wraps informer(s) so it is connected to the eventing system of the framework. Note that since + * it's it is built on top of Informers, it also support caching resources using caching from + * fabric8 client Informer caches and additional caches described below. + *

+ *

+ * InformerEventSource also supports two features to better handle events and caching of resources + * on top of Informers from fabric8 Kubernetes client. These two features implementation wise are + * related to each other: + *

+ *
+ *

+ * 1. API that allows to make sure the cache contains the fresh resource after an update. This is + * important for {@link io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource} and + * mainly for + * {@link io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResource} + * so after reconcile if getResource() called always return the fresh resource. To achieve this + * handleRecentResourceUpdate() and handleRecentResourceCreate() needs to be called explicitly after + * resource created/updated using the kubernetes client. (These calls are done automatically by + * KubernetesDependentResource implementation.). In the background this will store the new resource + * in a temporary cache {@link TemporaryResourceCache} which do additional checks. After a new event + * is received the cachec object is removed from this cache, since in general then it is already in + * the cache of informer. + *

+ *
+ *

+ * 2. Additional API is provided that is ment to be used with the combination of the previous one, + * and the goal is to filter out events that are the results of updates and creates made by the + * controller itself. For example if in reconciler a ConfigMaps is created, there should be an + * Informer in place to handle change events of that ConfigMap, but since it has bean created (or + * updated) by the reconciler this should not trigger an additional reconciliation by default. In + * order to achieve this prepareForCreateOrUpdateEventFiltering(..) method needs to be called before + * the operation of the k8s client. And the operation from point 1. after the k8s client call. See + * it's usage in CreateUpdateEventFilterTestReconciler integration test for the usage. (Again this + * is managed for the developer if using dependent resources.)
+ * Roughly it works in a way that before the K8S API call is made, we set mark the resource ID, and + * from that point informer won't propagate events further just will start record them. After the + * client operation is done, it's checked and analysed what events were received and based on that + * it will propagate event or not and/or put the new resource into the temporal cache - so if the + * event not arrived yet about the update will be able to filter it in the future. + *

+ * + * @param resource type watching + * @param

type of the primary resource + */ +public class InformerEventSource + extends ManagedInformerEventSource> + implements ResourceCache, ResourceEventHandler { + + private static final Logger log = LoggerFactory.getLogger(InformerEventSource.class); - private final InformerConfiguration configuration; + private final InformerConfiguration configuration; + private final EventRecorder eventRecorder = new EventRecorder<>(); - public InformerEventSource(InformerConfiguration configuration, - EventSourceContext

context) { + public InformerEventSource( + InformerConfiguration configuration, EventSourceContext

context) { super(context.getClient().resources(configuration.getResourceClass()), configuration); this.configuration = configuration; } - public InformerEventSource(InformerConfiguration configuration, - KubernetesClient client) { + public InformerEventSource(InformerConfiguration configuration, KubernetesClient client) { super(client.resources(configuration.getResourceClass()), configuration); this.configuration = configuration; } @Override - public void onAdd(T t) { - propagateEvent(t); + public void onAdd(R resource) { + onAddOrUpdate("add", resource, () -> InformerEventSource.super.onAdd(resource)); } @Override - public void onUpdate(T oldObject, T newObject) { - if (newObject == null) { - // this is a fix for this potential issue with informer: - // https://github.com/java-operator-sdk/java-operator-sdk/issues/830 - propagateEvent(oldObject); - return; - } + public void onUpdate(R oldObject, R newObject) { + onAddOrUpdate("update", newObject, + () -> InformerEventSource.super.onUpdate(oldObject, newObject)); + } - if (oldObject.getMetadata().getResourceVersion() - .equals(newObject.getMetadata().getResourceVersion())) { + private synchronized void onAddOrUpdate(String operation, R newObject, Runnable superOnOp) { + var resourceID = ResourceID.fromResource(newObject); + if (eventRecorder.isRecordingFor(resourceID)) { + log.info("Recording event for: " + resourceID); + eventRecorder.recordEvent(newObject); return; } - propagateEvent(newObject); + if (temporalCacheHasResourceWithVersionAs(newObject)) { + log.debug( + "Skipping event propagation for {}, since was a result of a reconcile action. Resource ID: {}", + operation, + ResourceID.fromResource(newObject)); + superOnOp.run(); + } else { + superOnOp.run(); + log.debug( + "Propagating event for {}, resource with same version not result of a reconciliation. Resource ID: {}", + operation, + resourceID); + propagateEvent(newObject); + } } @Override - public void onDelete(T t, boolean b) { - propagateEvent(t); + public void onDelete(R r, boolean b) { + super.onDelete(r, b); + propagateEvent(r); } - private void propagateEvent(T object) { + private void propagateEvent(R object) { var primaryResourceIdSet = configuration.getPrimaryResourcesRetriever().associatedPrimaryResources(object); if (primaryResourceIdSet.isEmpty()) { return; } - primaryResourceIdSet.forEach(resourceId -> { - Event event = new Event(resourceId); - /* - * In fabric8 client for certain cases informers can be created on in a way that they are - * automatically started, what would cause a NullPointerException here, since an event might - * be received between creation and registration. - */ - final EventHandler eventHandler = getEventHandler(); - if (eventHandler != null) { - eventHandler.handleEvent(event); - } - }); + primaryResourceIdSet.forEach( + resourceId -> { + Event event = new Event(resourceId); + /* + * In fabric8 client for certain cases informers can be created on in a way that they are + * automatically started, what would cause a NullPointerException here, since an event + * might be received between creation and registration. + */ + final EventHandler eventHandler = getEventHandler(); + if (eventHandler != null) { + eventHandler.handleEvent(event); + } + }); } /** * Retrieves the informed resource associated with the specified primary resource as defined by * the function provided when this InformerEventSource was created - * + * * @param resource the primary resource we want to retrieve the associated resource for * @return the informed resource associated with the specified primary resource */ @Override - public Optional getAssociated(P resource) { + public Optional getAssociated(P resource) { final var id = configuration.getAssociatedResourceIdentifier().associatedSecondaryID(resource); return get(id); } + public InformerConfiguration getConfiguration() { + return configuration; + } + @Override - public Stream list(String namespace, Predicate predicate) { - return manager().list(namespace, predicate); + public void handleRecentResourceUpdate(R resource, String previousResourceVersion) { + handleRecentCreateOrUpdate(resource, + () -> super.handleRecentResourceUpdate(resource, previousResourceVersion)); + } + + @Override + public void handleRecentResourceCreate(R resource) { + handleRecentCreateOrUpdate(resource, () -> super.handleRecentResourceCreate(resource)); + } + + private synchronized void handleRecentCreateOrUpdate(R resource, Runnable runnable) { + if (eventRecorder.isRecordingFor(ResourceID.fromResource(resource))) { + handleRecentResourceOperationAndStopEventRecording(resource); + } else { + runnable.run(); + } + } + + /** + * There can be the following cases: + *

    + *
  • 1. Did not receive the event yet for the target resource, then we need to put it to temp + * cache. Because event will arrive. Note that this not necessary mean that the even is not sent + * yet (we are in sync context). Also does not mean that there are no more events received after + * that. But during the event processing (onAdd, onUpdate) we make sure that the propagation just + * skipped for the right event.
  • + *
  • 2. Received the event about the operation already, it was the last. This means already is + * on cache of informer. So we have to do nothing. Since it was just recorded and not propagated. + *
  • + *
  • 3. Received the event but more events received since, so those were not propagated yet. So + * an event needs to be propagated to compensate.
  • + *
+ * + * @param resource just created or updated resource + */ + private synchronized void handleRecentResourceOperationAndStopEventRecording(R resource) { + ResourceID resourceID = ResourceID.fromResource(resource); + try { + if (!eventRecorder.containsEventWithResourceVersion( + resourceID, resource.getMetadata().getResourceVersion())) { + log.debug( + "Did not found event in buffer with target version and resource id: {}", resourceID); + temporaryResourceCache.unconditionallyCacheResource(resource); + } else if (eventRecorder.containsEventWithVersionButItsNotLastOne( + resourceID, resource.getMetadata().getResourceVersion())) { + R lastEvent = eventRecorder.getLastEvent(resourceID); + log.debug( + "Found events in event buffer but the target event is not last for id: {}. Propagating event.", + resourceID); + propagateEvent(lastEvent); + } + } finally { + eventRecorder.stopEventRecording(resourceID); + } } + + public synchronized void prepareForCreateOrUpdateEventFiltering(ResourceID resourceID) { + log.info("Starting event recording for: {}", resourceID); + eventRecorder.startEventRecording(resourceID); + } + + /** + * Mean to be called to clean up in case of an exception from the client. Usually in a catch + * block. + * + * @param resourceID of the resource + */ + public synchronized void cleanupOnCreateOrUpdateEventFiltering(ResourceID resourceID) { + log.info("Stopping event recording for: {}", resourceID); + eventRecorder.stopEventRecording(resourceID); + } + } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java index e65f34df48..33f962e7ed 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java @@ -139,6 +139,10 @@ public T remove(ResourceID key) { @Override public void put(ResourceID key, T resource) { getSource(key.getNamespace().orElse(ANY_NAMESPACE_MAP_KEY)) - .ifPresent(c -> c.put(key, resource)); + .ifPresentOrElse(c -> c.put(key, resource), + () -> log.warn( + "Cannot put resource in the cache. No related cache found: {}. Resource: {}", + key, resource)); } + } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java index c7de41f331..710f230580 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java @@ -81,4 +81,5 @@ public T remove(ResourceID key) { public void put(ResourceID key, T resource) { cache.put(key, resource); } + } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index 1dcfa2aa0e..47de996914 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -1,17 +1,30 @@ package io.javaoperatorsdk.operator.processing.event.source.informer; +import java.util.Optional; +import java.util.function.Predicate; +import java.util.stream.Stream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.KubernetesResourceList; import io.fabric8.kubernetes.client.dsl.MixedOperation; import io.fabric8.kubernetes.client.dsl.Resource; import io.fabric8.kubernetes.client.informers.ResourceEventHandler; import io.javaoperatorsdk.operator.api.config.ResourceConfiguration; +import io.javaoperatorsdk.operator.processing.event.ResourceID; import io.javaoperatorsdk.operator.processing.event.source.CachingEventSource; +import io.javaoperatorsdk.operator.processing.event.source.ResourceCache; import io.javaoperatorsdk.operator.processing.event.source.UpdatableCache; public abstract class ManagedInformerEventSource> extends CachingEventSource - implements ResourceEventHandler { + implements ResourceEventHandler, ResourceCache { + + private static final Logger log = LoggerFactory.getLogger(ManagedInformerEventSource.class); + + protected TemporaryResourceCache temporaryResourceCache = new TemporaryResourceCache<>(this); protected ManagedInformerEventSource( MixedOperation, Resource> client, C configuration) { @@ -19,6 +32,21 @@ protected ManagedInformerEventSource( manager().initSources(client, configuration, this); } + @Override + public void onAdd(R resource) { + temporaryResourceCache.removeResourceFromCache(resource); + } + + @Override + public void onUpdate(R oldObj, R newObj) { + temporaryResourceCache.removeResourceFromCache(newObj); + } + + @Override + public void onDelete(R obj, boolean deletedFinalStateUnknown) { + temporaryResourceCache.removeResourceFromCache(obj); + } + @Override protected UpdatableCache initCache() { return new InformerManager<>(); @@ -39,4 +67,56 @@ public void stop() { super.stop(); manager().stop(); } + + public void handleRecentResourceUpdate(R resource, String previousResourceVersion) { + temporaryResourceCache.putUpdatedResource(resource, previousResourceVersion); + } + + public void handleRecentResourceCreate(R resource) { + temporaryResourceCache.putAddedResource(resource); + } + + @Override + public Optional get(ResourceID resourceID) { + Optional resource = temporaryResourceCache.getResourceFromCache(resourceID); + if (resource.isPresent()) { + log.debug("Resource found in temporal cache for Resource ID: {}", resourceID); + return resource; + } else { + return super.get(resourceID); + } + } + + @Override + public Optional getAssociated(P primary) { + return get(ResourceID.fromResource(primary)); + } + + @Override + public Optional getCachedValue(ResourceID resourceID) { + return get(resourceID); + } + + protected boolean temporalCacheHasResourceWithVersionAs(R resource) { + var resourceID = ResourceID.fromResource(resource); + var res = temporaryResourceCache.getResourceFromCache(resourceID); + return res.map(r -> { + boolean resVersionsEqual = r.getMetadata().getResourceVersion() + .equals(resource.getMetadata().getResourceVersion()); + log.debug("Resource found in temporal cache for id: {} resource versions equal: {}", + resourceID, resVersionsEqual); + return resVersionsEqual; + }).orElse(false); + } + + @Override + public Stream list(String namespace, Predicate predicate) { + return manager().list(namespace, predicate); + } + + ManagedInformerEventSource setTemporalResourceCache( + TemporaryResourceCache temporaryResourceCache) { + this.temporaryResourceCache = temporaryResourceCache; + return this; + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java new file mode 100644 index 0000000000..64c93484b8 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java @@ -0,0 +1,110 @@ +package io.javaoperatorsdk.operator.processing.event.source.informer; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResource; +import io.javaoperatorsdk.operator.processing.event.ResourceID; + +/** + *

+ * Temporal cache is used to solve the problem for {@link KubernetesDependentResource} that is, when + * a create or update is executed the subsequent getResource opeeration might not return the + * up-to-date resource from informer cache, since it is not received yet by webhook. + *

+ *

+ * The idea of the solution is, that since an update (for create is simpler) was done successfully, + * and optimistic locking is in place, there were no other operations between reading the resource + * from the cache and the actual update. So when the new resource is stored in the temporal cache + * only if the informer still has the previous resource version, from before the update. If not, + * that means there were already updates on the cache (either by the actual update from + * DependentResource or other) so the resource does not needs to be cached. Subsequently if event + * received from the informer, it means that the cache of the informer was updated, so it already + * contains a more fresh version of the resource. + *

+ * + * @param resource to cache. + */ +public class TemporaryResourceCache { + + private static final Logger log = LoggerFactory.getLogger(TemporaryResourceCache.class); + + private final Map cache = new ConcurrentHashMap<>(); + private final ReentrantLock lock = new ReentrantLock(); + private final ManagedInformerEventSource managedInformerEventSource; + + public TemporaryResourceCache(ManagedInformerEventSource managedInformerEventSource) { + this.managedInformerEventSource = managedInformerEventSource; + } + + public void removeResourceFromCache(T resource) { + lock.lock(); + try { + cache.remove(ResourceID.fromResource(resource)); + } finally { + lock.unlock(); + } + } + + public void unconditionallyCacheResource(T newResource) { + lock.lock(); + try { + cache.put(ResourceID.fromResource(newResource), newResource); + } finally { + lock.unlock(); + } + } + + public void putAddedResource(T newResource) { + lock.lock(); + try { + ResourceID resourceID = ResourceID.fromResource(newResource); + if (managedInformerEventSource.get(resourceID).isEmpty()) { + log.debug("Putting resource to cache with ID: {}", resourceID); + cache.put(ResourceID.fromResource(newResource), newResource); + } else { + log.debug("Won't put resource into cache found already informer cache: {}", resourceID); + } + } finally { + lock.unlock(); + } + } + + public void putUpdatedResource(T newResource, String previousResourceVersion) { + lock.lock(); + try { + var resourceId = ResourceID.fromResource(newResource); + var informerCacheResource = managedInformerEventSource.get(resourceId); + if (informerCacheResource.isEmpty()) { + log.debug("No cached value present for resource: {}", newResource); + return; + } + // if this is not true that means the cache was already updated + if (informerCacheResource.get().getMetadata().getResourceVersion() + .equals(previousResourceVersion)) { + log.debug("Putting resource to temporal cache with id: {}", resourceId); + cache.put(resourceId, newResource); + } else { + // if something is in cache it's surely obsolete now + cache.remove(resourceId); + } + } finally { + lock.unlock(); + } + } + + public Optional getResourceFromCache(ResourceID resourceID) { + try { + lock.lock(); + return Optional.ofNullable(cache.get(resourceID)); + } finally { + lock.unlock(); + } + } +} diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResourceTest.java new file mode 100644 index 0000000000..e73244f7c6 --- /dev/null +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResourceTest.java @@ -0,0 +1,77 @@ +package io.javaoperatorsdk.operator.processing.dependent.kubernetes; + +import static org.mockito.Mockito.*; + +class KubernetesDependentResourceTest { + + // private InformerEventSource informerEventSourceMock = mock(InformerEventSource.class); + // private AssociatedSecondaryResourceIdentifier associatedResourceIdentifierMock = + // mock(AssociatedSecondaryResourceIdentifier.class); + // private ResourceMatcher resourceMatcherMock = mock(ResourceMatcher.class); + // private KubernetesDependentResource.ClientFacade clientFacadeMock = + // mock(KubernetesDependentResource.ClientFacade.class); + // + // KubernetesDependentResource kubernetesDependentResource = + // new KubernetesDependentResource() { + // { + // this.informerEventSource = informerEventSourceMock; + // this.resourceMatcher = resourceMatcherMock; + // this.clientFacade = clientFacadeMock; + // this.resourceUpdatePreProcessor = mock(ResourceUpdatePreProcessor.class); + // } + // + // @Override + // protected Object desired(HasMetadata primary, Context context) { + // return testResource(); + // } + // }; + // + // @BeforeEach + // public void setup() { + // InformerConfiguration informerConfigurationMock = mock(InformerConfiguration.class); + // when(informerEventSourceMock.getConfiguration()).thenReturn(informerConfigurationMock); + // when(informerConfigurationMock.getAssociatedResourceIdentifier()) + // .thenReturn(associatedResourceIdentifierMock); + // when(associatedResourceIdentifierMock.associatedSecondaryID(any())) + // .thenReturn(ResourceID.fromResource(testResource())); + // } + // + // @Test + // void updateCallsInformerJustUpdatedHandler() { + // when(resourceMatcherMock.match(any(), any(), any())).thenReturn(false); + // when(clientFacadeMock.replaceResource(any(), any(), any())).thenReturn(testResource()); + // when(informerEventSourceMock.getAssociated(any())).thenReturn(Optional.of(testResource())); + // + // kubernetesDependentResource.reconcile(primaryResource(), null); + // + // verify(informerEventSourceMock, times(1)).handleRecentResourceUpdate(any(), any()); + // } + // + // @Test + // void createCallsInformerJustUpdatedHandler() { + // when(clientFacadeMock.createResource(any(), any(), any())).thenReturn(testResource()); + // when(informerEventSourceMock.getAssociated(any())).thenReturn(Optional.empty()); + // + // kubernetesDependentResource.reconcile(primaryResource(), null); + // + // verify(informerEventSourceMock, times(1)).handleRecentResourceAdd(any()); + // } + // + // TestCustomResource primaryResource() { + // TestCustomResource testCustomResource = new TestCustomResource(); + // testCustomResource.setMetadata(new ObjectMeta()); + // testCustomResource.getMetadata().setName("test"); + // testCustomResource.getMetadata().setNamespace("default"); + // return testCustomResource; + // } + // + // ConfigMap testResource() { + // ConfigMap configMap = new ConfigMap(); + // configMap.setMetadata(new ObjectMeta()); + // configMap.getMetadata().setName("test"); + // configMap.getMetadata().setNamespace("default"); + // configMap.getMetadata().setResourceVersion("0"); + // return configMap; + // } + +} diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java index 5a3f73742e..c02a3b0da1 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java @@ -13,7 +13,6 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.api.monitoring.Metrics; -import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerResourceCache; import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerResourceEventSource; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent; @@ -44,8 +43,6 @@ class EventProcessorTest { private ReconciliationDispatcher reconciliationDispatcherMock = mock(ReconciliationDispatcher.class); private EventSourceManager eventSourceManagerMock = mock(EventSourceManager.class); - private ControllerResourceCache resourceCacheMock = - mock(ControllerResourceCache.class); private TimerEventSource retryTimerEventSourceMock = mock(TimerEventSource.class); private ControllerResourceEventSource controllerResourceEventSourceMock = mock(ControllerResourceEventSource.class); @@ -54,12 +51,9 @@ class EventProcessorTest { private EventProcessor eventProcessorWithRetry; @BeforeEach - public void setup() { - + void setup() { when(eventSourceManagerMock.getControllerResourceEventSource()) .thenReturn(controllerResourceEventSourceMock); - when(controllerResourceEventSourceMock.getResourceCache()).thenReturn(resourceCacheMock); - eventProcessor = spy(new EventProcessor(reconciliationDispatcherMock, eventSourceManagerMock, "Test", null, null)); @@ -68,22 +62,22 @@ public void setup() { spy(new EventProcessor(reconciliationDispatcherMock, eventSourceManagerMock, "Test", GenericRetry.defaultLimitedExponentialRetry(), null)); eventProcessorWithRetry.start(); - when(eventProcessor.retryEventSource()).thenReturn(retryTimerEventSourceMock); when(eventProcessorWithRetry.retryEventSource()).thenReturn(retryTimerEventSourceMock); } @Test - public void dispatchesEventsIfNoExecutionInProgress() { + void dispatchesEventsIfNoExecutionInProgress() { eventProcessor.handleEvent(prepareCREvent()); verify(reconciliationDispatcherMock, timeout(50).times(1)).handleExecution(any()); } @Test - public void skipProcessingIfLatestCustomResourceNotInCache() { + void skipProcessingIfLatestCustomResourceNotInCache() { Event event = prepareCREvent(); - when(resourceCacheMock.get(event.getRelatedCustomResourceID())).thenReturn(Optional.empty()); + when(controllerResourceEventSourceMock.get(event.getRelatedCustomResourceID())) + .thenReturn(Optional.empty()); eventProcessor.handleEvent(event); @@ -91,7 +85,7 @@ public void skipProcessingIfLatestCustomResourceNotInCache() { } @Test - public void ifExecutionInProgressWaitsUntilItsFinished() throws InterruptedException { + void ifExecutionInProgressWaitsUntilItsFinished() throws InterruptedException { ResourceID resourceUid = eventAlreadyUnderProcessing(); eventProcessor.handleEvent(nonCREvent(resourceUid)); @@ -101,7 +95,7 @@ public void ifExecutionInProgressWaitsUntilItsFinished() throws InterruptedExcep } @Test - public void schedulesAnEventRetryOnException() { + void schedulesAnEventRetryOnException() { TestCustomResource customResource = testCustomResource(); ExecutionScope executionScope = new ExecutionScope(customResource, null); @@ -115,7 +109,7 @@ public void schedulesAnEventRetryOnException() { } @Test - public void executesTheControllerInstantlyAfterErrorIfNewEventsReceived() { + void executesTheControllerInstantlyAfterErrorIfNewEventsReceived() { Event event = prepareCREvent(); TestCustomResource customResource = testCustomResource(); overrideData(event.getRelatedCustomResourceID(), customResource); @@ -142,7 +136,7 @@ public void executesTheControllerInstantlyAfterErrorIfNewEventsReceived() { } @Test - public void successfulExecutionResetsTheRetry() { + void successfulExecutionResetsTheRetry() { log.info("Starting successfulExecutionResetsTheRetry"); Event event = prepareCREvent(); @@ -182,7 +176,7 @@ public void successfulExecutionResetsTheRetry() { } @Test - public void scheduleTimedEventIfInstructedByPostExecutionControl() { + void scheduleTimedEventIfInstructedByPostExecutionControl() { var testDelay = 10000L; when(reconciliationDispatcherMock.handleExecution(any())) .thenReturn(PostExecutionControl.defaultDispatch().withReSchedule(testDelay)); @@ -194,7 +188,7 @@ public void scheduleTimedEventIfInstructedByPostExecutionControl() { } @Test - public void reScheduleOnlyIfNotExecutedEventsReceivedMeanwhile() { + void reScheduleOnlyIfNotExecutedEventsReceivedMeanwhile() { var testDelay = 10000L; when(reconciliationDispatcherMock.handleExecution(any())) .thenReturn(PostExecutionControl.defaultDispatch().withReSchedule(testDelay)); @@ -207,7 +201,7 @@ public void reScheduleOnlyIfNotExecutedEventsReceivedMeanwhile() { } @Test - public void doNotFireEventsIfClosing() { + void doNotFireEventsIfClosing() { eventProcessor.stop(); eventProcessor.handleEvent(prepareCREvent()); @@ -215,58 +209,7 @@ public void doNotFireEventsIfClosing() { } @Test - public void whitelistNextEventIfTheCacheIsNotPropagatedAfterAnUpdate() { - var crID = new ResourceID("test-cr", TEST_NAMESPACE); - var cr = testCustomResource(crID); - var updatedCr = testCustomResource(crID); - updatedCr.getMetadata().setResourceVersion("2"); - var mockCREventSource = mock(ControllerResourceEventSource.class); - eventProcessor.getEventMarker().markEventReceived(crID); - when(resourceCacheMock.get(eq(crID))).thenReturn(Optional.of(cr)); - when(eventSourceManagerMock.getControllerResourceEventSource()).thenReturn(mockCREventSource); - - eventProcessor.eventProcessingFinished(new ExecutionScope(cr, null), - PostExecutionControl.customResourceUpdated(updatedCr)); - - verify(mockCREventSource, times(1)).whitelistNextEvent(eq(crID)); - } - - @Test - public void dontWhitelistsEventWhenOtherChangeDuringExecution() { - var crID = new ResourceID("test-cr", TEST_NAMESPACE); - var cr = testCustomResource(crID); - var updatedCr = testCustomResource(crID); - updatedCr.getMetadata().setResourceVersion("2"); - var otherChangeCR = testCustomResource(crID); - otherChangeCR.getMetadata().setResourceVersion("3"); - var mockCREventSource = mock(ControllerResourceEventSource.class); - eventProcessor.getEventMarker().markEventReceived(crID); - when(resourceCacheMock.get(eq(crID))).thenReturn(Optional.of(otherChangeCR)); - when(eventSourceManagerMock.getControllerResourceEventSource()).thenReturn(mockCREventSource); - - eventProcessor.eventProcessingFinished(new ExecutionScope(cr, null), - PostExecutionControl.customResourceUpdated(updatedCr)); - - verify(mockCREventSource, times(0)).whitelistNextEvent(eq(crID)); - } - - @Test - public void dontWhitelistsEventIfUpdatedEventInCache() { - var crID = new ResourceID("test-cr", TEST_NAMESPACE); - var cr = testCustomResource(crID); - var mockCREventSource = mock(ControllerResourceEventSource.class); - eventProcessor.getEventMarker().markEventReceived(crID); - when(resourceCacheMock.get(eq(crID))).thenReturn(Optional.of(cr)); - when(eventSourceManagerMock.getControllerResourceEventSource()).thenReturn(mockCREventSource); - - eventProcessor.eventProcessingFinished(new ExecutionScope(cr, null), - PostExecutionControl.customResourceUpdated(cr)); - - verify(mockCREventSource, times(0)).whitelistNextEvent(eq(crID)); - } - - @Test - public void cancelScheduleOnceEventsOnSuccessfulExecution() { + void cancelScheduleOnceEventsOnSuccessfulExecution() { var crID = new ResourceID("test-cr", TEST_NAMESPACE); var cr = testCustomResource(crID); @@ -277,12 +220,13 @@ public void cancelScheduleOnceEventsOnSuccessfulExecution() { } @Test - public void startProcessedMarkedEventReceivedBefore() { + void startProcessedMarkedEventReceivedBefore() { var crID = new ResourceID("test-cr", TEST_NAMESPACE); eventProcessor = spy(new EventProcessor(reconciliationDispatcherMock, eventSourceManagerMock, "Test", null, metricsMock)); - when(resourceCacheMock.get(eq(crID))).thenReturn(Optional.of(testCustomResource())); + when(controllerResourceEventSourceMock.get(eq(crID))) + .thenReturn(Optional.of(testCustomResource())); eventProcessor.handleEvent(new Event(crID)); verify(reconciliationDispatcherMock, timeout(100).times(0)).handleExecution(any()); @@ -293,6 +237,19 @@ public void startProcessedMarkedEventReceivedBefore() { verify(metricsMock, times(1)).reconcileCustomResource(any(), isNull()); } + @Test + void updatesEventSourceHandlerIfResourceUpdated() { + TestCustomResource customResource = testCustomResource(); + ExecutionScope executionScope = new ExecutionScope(customResource, null); + PostExecutionControl postExecutionControl = + PostExecutionControl.customResourceUpdated(customResource); + + eventProcessorWithRetry.eventProcessingFinished(executionScope, postExecutionControl); + + + verify(controllerResourceEventSourceMock, times(1)).handleRecentResourceUpdate(any(), any()); + } + private ResourceID eventAlreadyUnderProcessing() { when(reconciliationDispatcherMock.handleExecution(any())) .then( @@ -311,7 +268,7 @@ private ResourceEvent prepareCREvent() { private ResourceEvent prepareCREvent(ResourceID uid) { TestCustomResource customResource = testCustomResource(uid); - when(resourceCacheMock.get(eq(uid))).thenReturn(Optional.of(customResource)); + when(controllerResourceEventSourceMock.get(eq(uid))).thenReturn(Optional.of(customResource)); return new ResourceEvent(ResourceAction.UPDATED, ResourceID.fromResource(customResource)); } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSourceTest.java index 83528256df..9782948dba 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSourceTest.java @@ -12,7 +12,6 @@ import io.javaoperatorsdk.operator.processing.Controller; import io.javaoperatorsdk.operator.processing.event.EventHandler; import io.javaoperatorsdk.operator.processing.event.EventSourceManager; -import io.javaoperatorsdk.operator.processing.event.ResourceID; import io.javaoperatorsdk.operator.processing.event.source.AbstractEventSourceTestBase; import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; @@ -99,27 +98,6 @@ public void eventWithNoGenerationProcessedIfNoFinalizer() { verify(eventHandler, times(1)).handleEvent(any()); } - @Test - public void handlesNextEventIfWhitelisted() { - TestCustomResource customResource = TestUtils.testCustomResource(); - customResource.getMetadata().setFinalizers(List.of(FINALIZER)); - source.whitelistNextEvent(ResourceID.fromResource(customResource)); - - source.eventReceived(ResourceAction.UPDATED, customResource, customResource); - - verify(eventHandler, times(1)).handleEvent(any()); - } - - @Test - public void notHandlesNextEventIfNotWhitelisted() { - TestCustomResource customResource = TestUtils.testCustomResource(); - customResource.getMetadata().setFinalizers(List.of(FINALIZER)); - - source.eventReceived(ResourceAction.UPDATED, customResource, customResource); - - verify(eventHandler, times(0)).handleEvent(any()); - } - @Test public void callsBroadcastsOnResourceEvents() { TestCustomResource customResource1 = TestUtils.testCustomResource(); diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventRecorderTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventRecorderTest.java new file mode 100644 index 0000000000..3e25e1da3c --- /dev/null +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventRecorderTest.java @@ -0,0 +1,80 @@ +package io.javaoperatorsdk.operator.processing.event.source.informer; + +import org.junit.jupiter.api.Test; + +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.ObjectMeta; +import io.javaoperatorsdk.operator.processing.event.ResourceID; + +import static org.assertj.core.api.Assertions.assertThat; + +class EventRecorderTest { + + public static final String RESOURCE_VERSION = "0"; + public static final String RESOURCE_VERSION1 = "1"; + EventRecorder eventRecorder = new EventRecorder(); + + ConfigMap testConfigMap = testConfigMap(RESOURCE_VERSION); + ConfigMap testConfigMap2 = testConfigMap(RESOURCE_VERSION1); + + ResourceID id = ResourceID.fromResource(testConfigMap); + + @Test + void recordsEvents() { + + assertThat(eventRecorder.isRecordingFor(id)).isFalse(); + + eventRecorder.startEventRecording(id); + assertThat(eventRecorder.isRecordingFor(id)).isTrue(); + + eventRecorder.recordEvent(testConfigMap); + + eventRecorder.stopEventRecording(id); + assertThat(eventRecorder.isRecordingFor(id)).isFalse(); + } + + @Test + void getsLastRecorded() { + eventRecorder.startEventRecording(id); + + eventRecorder.recordEvent(testConfigMap); + eventRecorder.recordEvent(testConfigMap2); + + assertThat(eventRecorder.getLastEvent(id)).isEqualTo(testConfigMap2); + } + + @Test + void checksContainsWithResourceVersion() { + eventRecorder.startEventRecording(id); + + eventRecorder.recordEvent(testConfigMap); + eventRecorder.recordEvent(testConfigMap2); + + assertThat(eventRecorder.containsEventWithResourceVersion(id, RESOURCE_VERSION)).isTrue(); + assertThat(eventRecorder.containsEventWithResourceVersion(id, RESOURCE_VERSION1)).isTrue(); + assertThat(eventRecorder.containsEventWithResourceVersion(id, "xxx")).isFalse(); + } + + @Test + void checkLastItemVersion() { + eventRecorder.startEventRecording(id); + + eventRecorder.recordEvent(testConfigMap); + eventRecorder.recordEvent(testConfigMap2); + + assertThat(eventRecorder.containsEventWithVersionButItsNotLastOne(id, RESOURCE_VERSION)) + .isTrue(); + assertThat(eventRecorder.containsEventWithVersionButItsNotLastOne(id, RESOURCE_VERSION1)) + .isFalse(); + } + + ConfigMap testConfigMap(String resourceVersion) { + ConfigMap configMap = new ConfigMap(); + configMap.setMetadata(new ObjectMeta()); + configMap.getMetadata().setName("test"); + configMap.getMetadata().setResourceVersion(resourceVersion); + + return configMap; + } + +} diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java new file mode 100644 index 0000000000..1c901034e0 --- /dev/null +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -0,0 +1,194 @@ +package io.javaoperatorsdk.operator.processing.event.source.informer; + +import java.util.Optional; +import java.util.Set; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import io.fabric8.kubernetes.api.model.ObjectMeta; +import io.fabric8.kubernetes.api.model.apps.Deployment; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable; +import io.fabric8.kubernetes.client.dsl.FilterWatchListMultiDeletable; +import io.fabric8.kubernetes.client.dsl.MixedOperation; +import io.fabric8.kubernetes.client.informers.SharedIndexInformer; +import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration; +import io.javaoperatorsdk.operator.processing.event.EventHandler; +import io.javaoperatorsdk.operator.processing.event.ResourceID; +import io.javaoperatorsdk.operator.processing.event.source.PrimaryResourcesRetriever; +import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +class InformerEventSourceTest { + + private static final String PREV_RESOURCE_VERSION = "0"; + private static final String DEFAULT_RESOURCE_VERSION = "1"; + private static final String NEXT_RESOURCE_VERSION = "2"; + + private InformerEventSource informerEventSource; + private KubernetesClient clientMock = mock(KubernetesClient.class); + private TemporaryResourceCache temporaryResourceCacheMock = + mock(TemporaryResourceCache.class); + private EventHandler eventHandlerMock = mock(EventHandler.class); + private MixedOperation crClientMock = mock(MixedOperation.class); + private FilterWatchListMultiDeletable specificResourceClientMock = + mock(FilterWatchListMultiDeletable.class); + private FilterWatchListDeletable labeledResourceClientMock = mock(FilterWatchListDeletable.class); + private SharedIndexInformer informer = mock(SharedIndexInformer.class); + private InformerConfiguration informerConfiguration = + mock(InformerConfiguration.class); + + @BeforeEach + void setup() { + when(clientMock.resources(any())).thenReturn(crClientMock); + when(crClientMock.inAnyNamespace()).thenReturn(specificResourceClientMock); + when(specificResourceClientMock.withLabelSelector((String) null)) + .thenReturn(labeledResourceClientMock); + when(labeledResourceClientMock.runnableInformer(0)).thenReturn(informer); + + when(informerConfiguration.getPrimaryResourcesRetriever()) + .thenReturn(mock(PrimaryResourcesRetriever.class)); + + informerEventSource = new InformerEventSource<>(informerConfiguration, clientMock); + informerEventSource.setTemporalResourceCache(temporaryResourceCacheMock); + informerEventSource.setEventHandler(eventHandlerMock); + + PrimaryResourcesRetriever primaryResourcesRetriever = mock(PrimaryResourcesRetriever.class); + when(informerConfiguration.getPrimaryResourcesRetriever()) + .thenReturn(primaryResourcesRetriever); + when(primaryResourcesRetriever.associatedPrimaryResources(any())) + .thenReturn(Set.of(ResourceID.fromResource(testDeployment()))); + } + + @Test + void skipsEventPropagationIfResourceWithSameVersionInResourceCache() { + when(temporaryResourceCacheMock.getResourceFromCache(any())) + .thenReturn(Optional.of(testDeployment())); + + informerEventSource.onAdd(testDeployment()); + informerEventSource.onUpdate(testDeployment(), testDeployment()); + + verify(eventHandlerMock, never()).handleEvent(any()); + } + + @Test + void propagateEventAndRemoveResourceFromTempCacheIfResourceVersionMismatch() { + Deployment cachedDeployment = testDeployment(); + cachedDeployment.getMetadata().setResourceVersion(PREV_RESOURCE_VERSION); + when(temporaryResourceCacheMock.getResourceFromCache(any())) + .thenReturn(Optional.of(cachedDeployment)); + + + informerEventSource.onUpdate(cachedDeployment, testDeployment()); + + verify(eventHandlerMock, times(1)).handleEvent(any()); + verify(temporaryResourceCacheMock, times(1)).removeResourceFromCache(any()); + } + + @Test + void notPropagatesEventIfAfterUpdateReceivedJustTheRelatedEvent() { + var testDeployment = testDeployment(); + var prevTestDeployment = testDeployment(); + prevTestDeployment.getMetadata().setResourceVersion(PREV_RESOURCE_VERSION); + + + informerEventSource + .prepareForCreateOrUpdateEventFiltering(ResourceID.fromResource(testDeployment)); + informerEventSource.onUpdate(prevTestDeployment, testDeployment); + informerEventSource.handleRecentResourceUpdate(testDeployment, PREV_RESOURCE_VERSION); + + verify(eventHandlerMock, times(0)).handleEvent(any()); + verify(temporaryResourceCacheMock, times(0)).unconditionallyCacheResource(any()); + } + + + @Test + void notPropagatesEventIfAfterCreateReceivedJustTheRelatedEvent() { + var testDeployment = testDeployment(); + + informerEventSource + .prepareForCreateOrUpdateEventFiltering(ResourceID.fromResource(testDeployment)); + informerEventSource.onAdd(testDeployment); + informerEventSource.handleRecentResourceCreate(testDeployment); + + verify(eventHandlerMock, times(0)).handleEvent(any()); + verify(temporaryResourceCacheMock, times(0)).unconditionallyCacheResource(any()); + } + + @Test + void propagatesEventIfNewEventReceivedAfterTheCurrentTargetEvent() { + var testDeployment = testDeployment(); + var prevTestDeployment = testDeployment(); + prevTestDeployment.getMetadata().setResourceVersion(PREV_RESOURCE_VERSION); + var nextTestDeployment = testDeployment(); + nextTestDeployment.getMetadata().setResourceVersion(NEXT_RESOURCE_VERSION); + + informerEventSource + .prepareForCreateOrUpdateEventFiltering(ResourceID.fromResource(testDeployment)); + informerEventSource.onUpdate(prevTestDeployment, testDeployment); + informerEventSource.onUpdate(testDeployment, nextTestDeployment); + informerEventSource.handleRecentResourceUpdate(testDeployment, PREV_RESOURCE_VERSION); + + verify(eventHandlerMock, times(1)).handleEvent(any()); + verify(temporaryResourceCacheMock, times(0)).unconditionallyCacheResource(any()); + } + + @Test + void notPropagatesEventIfMoreReceivedButTheLastIsTheUpdated() { + var testDeployment = testDeployment(); + var prevTestDeployment = testDeployment(); + prevTestDeployment.getMetadata().setResourceVersion(PREV_RESOURCE_VERSION); + var prevPrevTestDeployment = testDeployment(); + prevPrevTestDeployment.getMetadata().setResourceVersion("-1"); + + informerEventSource + .prepareForCreateOrUpdateEventFiltering(ResourceID.fromResource(testDeployment)); + informerEventSource.onUpdate(prevPrevTestDeployment, prevTestDeployment); + informerEventSource.onUpdate(prevTestDeployment, testDeployment); + informerEventSource.handleRecentResourceUpdate(testDeployment, PREV_RESOURCE_VERSION); + + verify(eventHandlerMock, times(0)).handleEvent(any()); + verify(temporaryResourceCacheMock, times(0)).unconditionallyCacheResource(any()); + } + + @Test + void putsResourceOnTempCacheIfNoEventRecorded() { + var testDeployment = testDeployment(); + + informerEventSource + .prepareForCreateOrUpdateEventFiltering(ResourceID.fromResource(testDeployment)); + informerEventSource.handleRecentResourceUpdate(testDeployment, PREV_RESOURCE_VERSION); + + verify(eventHandlerMock, times(0)).handleEvent(any()); + verify(temporaryResourceCacheMock, times(1)).unconditionallyCacheResource(any()); + } + + @Test + void putsResourceOnTempCacheIfNoEventRecordedWithSameResourceVersion() { + var testDeployment = testDeployment(); + var prevTestDeployment = testDeployment(); + prevTestDeployment.getMetadata().setResourceVersion(PREV_RESOURCE_VERSION); + var prevPrevTestDeployment = testDeployment(); + prevPrevTestDeployment.getMetadata().setResourceVersion("-1"); + + informerEventSource + .prepareForCreateOrUpdateEventFiltering(ResourceID.fromResource(testDeployment)); + informerEventSource.onUpdate(prevPrevTestDeployment, prevTestDeployment); + informerEventSource.handleRecentResourceUpdate(testDeployment, PREV_RESOURCE_VERSION); + + verify(eventHandlerMock, times(0)).handleEvent(any()); + verify(temporaryResourceCacheMock, times(1)).unconditionallyCacheResource(any()); + } + + Deployment testDeployment() { + Deployment deployment = new Deployment(); + deployment.setMetadata(new ObjectMeta()); + deployment.getMetadata().setResourceVersion(DEFAULT_RESOURCE_VERSION); + deployment.getMetadata().setName("test"); + return deployment; + } + +} diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCacheTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCacheTest.java new file mode 100644 index 0000000000..2af66a4abe --- /dev/null +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCacheTest.java @@ -0,0 +1,100 @@ +package io.javaoperatorsdk.operator.processing.event.source.informer; + +import java.util.Optional; + +import org.junit.jupiter.api.Test; + +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.ObjectMeta; +import io.javaoperatorsdk.operator.processing.event.ResourceID; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class TemporaryResourceCacheTest { + + public static final String RESOURCE_VERSION = "1"; + private InformerEventSource informerEventSource = mock(InformerEventSource.class); + private TemporaryResourceCache temporaryResourceCache = + new TemporaryResourceCache<>(informerEventSource); + + + @Test + void updateAddsTheResourceIntoCacheIfTheInformerHasThePreviousResourceVersion() { + var testResource = testResource(); + var prevTestResource = testResource(); + prevTestResource.getMetadata().setResourceVersion("0"); + when(informerEventSource.get(any())).thenReturn(Optional.of(prevTestResource)); + + temporaryResourceCache.putUpdatedResource(testResource, "0"); + + var cached = temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource)); + assertThat(cached).isPresent(); + } + + @Test + void updateNotAddsTheResourceIntoCacheIfTheInformerHasOtherVersion() { + var testResource = testResource(); + var informerCachedResource = testResource(); + informerCachedResource.getMetadata().setResourceVersion("x"); + when(informerEventSource.get(any())).thenReturn(Optional.of(informerCachedResource)); + + temporaryResourceCache.putUpdatedResource(testResource, "0"); + + var cached = temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource)); + assertThat(cached).isNotPresent(); + } + + @Test + void addOperationAddsTheResourceIfInformerCacheStillEmpty() { + var testResource = testResource(); + when(informerEventSource.get(any())).thenReturn(Optional.empty()); + + temporaryResourceCache.putAddedResource(testResource); + + var cached = temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource)); + assertThat(cached).isPresent(); + } + + @Test + void addOperationNotAddsTheResourceIfInformerCacheNotEmpty() { + var testResource = testResource(); + when(informerEventSource.get(any())).thenReturn(Optional.of(testResource())); + + temporaryResourceCache.putAddedResource(testResource); + + var cached = temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource)); + assertThat(cached).isNotPresent(); + } + + @Test + void removesResourceFromCache() { + ConfigMap testResource = propagateTestResourceToCache(); + + temporaryResourceCache.removeResourceFromCache(testResource()); + + assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource))) + .isNotPresent(); + } + + private ConfigMap propagateTestResourceToCache() { + var testResource = testResource(); + when(informerEventSource.get(any())).thenReturn(Optional.empty()); + temporaryResourceCache.putAddedResource(testResource); + assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource))) + .isPresent(); + return testResource; + } + + ConfigMap testResource() { + ConfigMap configMap = new ConfigMap(); + configMap.setMetadata(new ObjectMeta()); + configMap.getMetadata().setName("test"); + configMap.getMetadata().setNamespace("default"); + configMap.getMetadata().setResourceVersion(RESOURCE_VERSION); + return configMap; + } + +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/CreateUpdateDependentEventFilterIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/CreateUpdateDependentEventFilterIT.java new file mode 100644 index 0000000000..57fbc78838 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/CreateUpdateDependentEventFilterIT.java @@ -0,0 +1,88 @@ +package io.javaoperatorsdk.operator; + +import java.time.Duration; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.ObjectMeta; +import io.javaoperatorsdk.operator.config.runtime.DefaultConfigurationService; +import io.javaoperatorsdk.operator.junit.OperatorExtension; +import io.javaoperatorsdk.operator.sample.createupdateeventfilter.CreateUpdateEventFilterTestCustomResource; +import io.javaoperatorsdk.operator.sample.createupdateeventfilter.CreateUpdateEventFilterTestCustomResourceSpec; +import io.javaoperatorsdk.operator.sample.createupdateeventfilter.CreateUpdateEventFilterTestReconciler; + +import static io.javaoperatorsdk.operator.sample.createupdateeventfilter.CreateUpdateEventFilterTestReconciler.CONFIG_MAP_TEST_DATA_KEY; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +public class CreateUpdateDependentEventFilterIT { + + @RegisterExtension + OperatorExtension operator = + OperatorExtension.builder() + .withConfigurationService(DefaultConfigurationService.instance()) + .withReconciler(new CreateUpdateEventFilterTestReconciler()) + .build(); + + @Test + void updateEventNotReceivedAfterCreateOrUpdate() { + CreateUpdateEventFilterTestCustomResource resource = prepareTestResource(); + var createdResource = + operator.create(CreateUpdateEventFilterTestCustomResource.class, resource); + + await() + .atMost(Duration.ofSeconds(1)) + .until(() -> { + var cm = operator.get(ConfigMap.class, createdResource.getMetadata().getName()); + if (cm == null) { + return false; + } + return cm.getData() + .get(CONFIG_MAP_TEST_DATA_KEY) + .equals(createdResource.getSpec().getValue()); + }); + + assertThat( + ((CreateUpdateEventFilterTestReconciler) operator.getFirstReconciler()) + .getNumberOfExecutions()) + .isEqualTo(1); // this should be 1 usually but sometimes event is received + // faster than added resource added to the cache + + + CreateUpdateEventFilterTestCustomResource actualCreatedResource = + operator.get(CreateUpdateEventFilterTestCustomResource.class, + resource.getMetadata().getName()); + actualCreatedResource.getSpec().setValue("2"); + operator.replace(CreateUpdateEventFilterTestCustomResource.class, actualCreatedResource); + + + await().atMost(Duration.ofSeconds(1)) + .until(() -> { + var cm = operator.get(ConfigMap.class, createdResource.getMetadata().getName()); + if (cm == null) { + return false; + } + return cm.getData() + .get(CONFIG_MAP_TEST_DATA_KEY) + .equals(actualCreatedResource.getSpec().getValue()); + }); + + assertThat( + ((CreateUpdateEventFilterTestReconciler) operator.getFirstReconciler()) + .getNumberOfExecutions()) + // same as for previous assert (usually this should be 2) + .isEqualTo(2); + } + + private CreateUpdateEventFilterTestCustomResource prepareTestResource() { + CreateUpdateEventFilterTestCustomResource resource = + new CreateUpdateEventFilterTestCustomResource(); + resource.setMetadata(new ObjectMeta()); + resource.getMetadata().setName("test1"); + resource.setSpec(new CreateUpdateEventFilterTestCustomResourceSpec()); + resource.getSpec().setValue("1"); + return resource; + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/StandaloneDependentResourceIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/StandaloneDependentResourceIT.java similarity index 56% rename from operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/StandaloneDependentResourceIT.java rename to operator-framework/src/test/java/io/javaoperatorsdk/operator/StandaloneDependentResourceIT.java index 20c0f20b80..d86385f1f1 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/StandaloneDependentResourceIT.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/StandaloneDependentResourceIT.java @@ -1,4 +1,4 @@ -package io.javaoperatorsdk.operator.dependent; +package io.javaoperatorsdk.operator; import java.time.Duration; @@ -10,8 +10,10 @@ import io.javaoperatorsdk.operator.config.runtime.DefaultConfigurationService; import io.javaoperatorsdk.operator.junit.OperatorExtension; import io.javaoperatorsdk.operator.sample.standalonedependent.StandaloneDependentTestCustomResource; +import io.javaoperatorsdk.operator.sample.standalonedependent.StandaloneDependentTestCustomResourceSpec; import io.javaoperatorsdk.operator.sample.standalonedependent.StandaloneDependentTestReconciler; +import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; class StandaloneDependentResourceIT { @@ -29,10 +31,38 @@ class StandaloneDependentResourceIT { void dependentResourceManagesDeployment() { StandaloneDependentTestCustomResource customResource = new StandaloneDependentTestCustomResource(); + customResource.setSpec(new StandaloneDependentTestCustomResourceSpec()); customResource.setMetadata(new ObjectMeta()); customResource.getMetadata().setName(DEPENDENT_TEST_NAME); var createdCR = operator.create(StandaloneDependentTestCustomResource.class, customResource); + awaitForDeploymentReadyReplicas(1); + assertThat( + ((StandaloneDependentTestReconciler) operator.getFirstReconciler()).isErrorOccurred()) + .isFalse(); + } + + @Test + void executeUpdateForTestingCacheUpdateForGetResource() { + StandaloneDependentTestCustomResource customResource = + new StandaloneDependentTestCustomResource(); + customResource.setSpec(new StandaloneDependentTestCustomResourceSpec()); + customResource.setMetadata(new ObjectMeta()); + customResource.getMetadata().setName(DEPENDENT_TEST_NAME); + var createdCR = operator.create(StandaloneDependentTestCustomResource.class, customResource); + + awaitForDeploymentReadyReplicas(1); + + createdCR.getSpec().setReplicaCount(2); + operator.replace(StandaloneDependentTestCustomResource.class, createdCR); + + awaitForDeploymentReadyReplicas(2); + assertThat( + ((StandaloneDependentTestReconciler) operator.getFirstReconciler()).isErrorOccurred()) + .isFalse(); + } + + void awaitForDeploymentReadyReplicas(int expectedReplicaCount) { await() .pollInterval(Duration.ofMillis(300)) .atMost(Duration.ofSeconds(50)) @@ -42,13 +72,13 @@ void dependentResourceManagesDeployment() { operator .getKubernetesClient() .resources(Deployment.class) - .inNamespace(createdCR.getMetadata().getNamespace()) + .inNamespace(operator.getNamespace()) .withName(DEPENDENT_TEST_NAME) .get(); return deployment != null && deployment.getStatus() != null && deployment.getStatus().getReadyReplicas() != null - && deployment.getStatus().getReadyReplicas() > 0; + && deployment.getStatus().getReadyReplicas() == expectedReplicaCount; }); } } diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/createupdateeventfilter/CreateUpdateEventFilterTestCustomResource.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/createupdateeventfilter/CreateUpdateEventFilterTestCustomResource.java new file mode 100644 index 0000000000..5797a44d9d --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/createupdateeventfilter/CreateUpdateEventFilterTestCustomResource.java @@ -0,0 +1,22 @@ +package io.javaoperatorsdk.operator.sample.createupdateeventfilter; + +import io.fabric8.kubernetes.api.model.Namespaced; +import io.fabric8.kubernetes.client.CustomResource; +import io.fabric8.kubernetes.model.annotation.Group; +import io.fabric8.kubernetes.model.annotation.ShortNames; +import io.fabric8.kubernetes.model.annotation.Version; + +@Group("sample.javaoperatorsdk") +@Version("v1") +@ShortNames("cue") +public class CreateUpdateEventFilterTestCustomResource + extends + CustomResource + implements Namespaced { + + @Override + protected CreateUpdateEventFilterTestCustomResourceStatus initStatus() { + return new CreateUpdateEventFilterTestCustomResourceStatus(); + } + +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/createupdateeventfilter/CreateUpdateEventFilterTestCustomResourceSpec.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/createupdateeventfilter/CreateUpdateEventFilterTestCustomResourceSpec.java new file mode 100644 index 0000000000..fcd3807bc8 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/createupdateeventfilter/CreateUpdateEventFilterTestCustomResourceSpec.java @@ -0,0 +1,16 @@ +package io.javaoperatorsdk.operator.sample.createupdateeventfilter; + +public class CreateUpdateEventFilterTestCustomResourceSpec { + + private String value; + + public String getValue() { + return value; + } + + public CreateUpdateEventFilterTestCustomResourceSpec setValue(String value) { + this.value = value; + return this; + } + +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/createupdateeventfilter/CreateUpdateEventFilterTestCustomResourceStatus.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/createupdateeventfilter/CreateUpdateEventFilterTestCustomResourceStatus.java new file mode 100644 index 0000000000..6733e1a7cd --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/createupdateeventfilter/CreateUpdateEventFilterTestCustomResourceStatus.java @@ -0,0 +1,7 @@ +package io.javaoperatorsdk.operator.sample.createupdateeventfilter; + +import io.javaoperatorsdk.operator.api.ObservedGenerationAwareStatus; + +public class CreateUpdateEventFilterTestCustomResourceStatus extends ObservedGenerationAwareStatus { + +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/createupdateeventfilter/CreateUpdateEventFilterTestReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/createupdateeventfilter/CreateUpdateEventFilterTestReconciler.java new file mode 100644 index 0000000000..ae41eacb68 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/createupdateeventfilter/CreateUpdateEventFilterTestReconciler.java @@ -0,0 +1,120 @@ +package io.javaoperatorsdk.operator.sample.createupdateeventfilter; + +import java.util.HashMap; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; + +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.ObjectMeta; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration; +import io.javaoperatorsdk.operator.api.reconciler.*; +import io.javaoperatorsdk.operator.junit.KubernetesClientAware; +import io.javaoperatorsdk.operator.processing.event.ResourceID; +import io.javaoperatorsdk.operator.processing.event.source.EventSource; +import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource; + +import static io.javaoperatorsdk.operator.api.reconciler.Constants.NO_FINALIZER; + +@ControllerConfiguration(finalizerName = NO_FINALIZER) +public class CreateUpdateEventFilterTestReconciler + implements Reconciler, + EventSourceInitializer, + KubernetesClientAware { + + public static final String CONFIG_MAP_TEST_DATA_KEY = "key"; + private KubernetesClient client; + private final AtomicInteger numberOfExecutions = new AtomicInteger(0); + private InformerEventSource informerEventSource; + + @Override + public UpdateControl reconcile( + CreateUpdateEventFilterTestCustomResource resource, Context context) { + numberOfExecutions.incrementAndGet(); + + ConfigMap configMap = + client + .configMaps() + .inNamespace(resource.getMetadata().getNamespace()) + .withName(resource.getMetadata().getName()) + .get(); + if (configMap == null) { + var configMapToCreate = createConfigMap(resource); + try { + informerEventSource.prepareForCreateOrUpdateEventFiltering( + ResourceID.fromResource(configMapToCreate)); + configMap = + client + .configMaps() + .inNamespace(resource.getMetadata().getNamespace()) + .create(configMapToCreate); + informerEventSource.handleRecentResourceCreate(configMap); + } catch (RuntimeException e) { + informerEventSource + .cleanupOnCreateOrUpdateEventFiltering(ResourceID.fromResource(configMapToCreate)); + throw e; + } + } else { + if (!Objects.equals( + configMap.getData().get(CONFIG_MAP_TEST_DATA_KEY), resource.getSpec().getValue())) { + configMap.getData().put(CONFIG_MAP_TEST_DATA_KEY, resource.getSpec().getValue()); + try { + informerEventSource + .prepareForCreateOrUpdateEventFiltering(ResourceID.fromResource(configMap)); + var newConfigMap = + client + .configMaps() + .inNamespace(resource.getMetadata().getNamespace()) + .replace(configMap); + informerEventSource.handleRecentResourceUpdate( + newConfigMap, configMap.getMetadata().getResourceVersion()); + } catch (RuntimeException e) { + informerEventSource + .cleanupOnCreateOrUpdateEventFiltering(ResourceID.fromResource(configMap)); + throw e; + } + } + } + return UpdateControl.noUpdate(); + } + + private ConfigMap createConfigMap(CreateUpdateEventFilterTestCustomResource resource) { + ConfigMap configMap = new ConfigMap(); + configMap.setMetadata(new ObjectMeta()); + configMap.getMetadata().setName(resource.getMetadata().getName()); + configMap.getMetadata().setLabels(new HashMap<>()); + configMap.getMetadata().getLabels().put("integrationtest", this.getClass().getSimpleName()); + configMap.getMetadata().setNamespace(resource.getMetadata().getNamespace()); + configMap.setData(new HashMap<>()); + configMap.getData().put(CONFIG_MAP_TEST_DATA_KEY, resource.getSpec().getValue()); + configMap.addOwnerReference(resource); + + return configMap; + } + + @Override + public List prepareEventSources( + EventSourceContext context) { + InformerConfiguration informerConfiguration = + InformerConfiguration.from(context, ConfigMap.class) + .withLabelSelector("integrationtest = " + this.getClass().getSimpleName()) + .build(); + informerEventSource = new InformerEventSource<>(informerConfiguration, client); + return List.of(informerEventSource); + } + + @Override + public KubernetesClient getKubernetesClient() { + return client; + } + + @Override + public void setKubernetesClient(KubernetesClient kubernetesClient) { + this.client = kubernetesClient; + } + + public int getNumberOfExecutions() { + return numberOfExecutions.get(); + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/standalonedependent/StandaloneDependentTestCustomResource.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/standalonedependent/StandaloneDependentTestCustomResource.java index 3e6737c83c..e88d00c985 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/standalonedependent/StandaloneDependentTestCustomResource.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/standalonedependent/StandaloneDependentTestCustomResource.java @@ -10,6 +10,7 @@ @Version("v1") @ShortNames("sdt") public class StandaloneDependentTestCustomResource - extends CustomResource + extends + CustomResource implements Namespaced { } diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/standalonedependent/StandaloneDependentTestCustomResourceSpec.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/standalonedependent/StandaloneDependentTestCustomResourceSpec.java new file mode 100644 index 0000000000..4a88cf6044 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/standalonedependent/StandaloneDependentTestCustomResourceSpec.java @@ -0,0 +1,23 @@ +package io.javaoperatorsdk.operator.sample.standalonedependent; + +public class StandaloneDependentTestCustomResourceSpec { + + private int replicaCount; + + public StandaloneDependentTestCustomResourceSpec(int replicaCount) { + this.replicaCount = replicaCount; + } + + public StandaloneDependentTestCustomResourceSpec() { + this(1); + } + + public int getReplicaCount() { + return replicaCount; + } + + public StandaloneDependentTestCustomResourceSpec setReplicaCount(int replicaCount) { + this.replicaCount = replicaCount; + return this; + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/standalonedependent/StandaloneDependentTestReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/standalonedependent/StandaloneDependentTestReconciler.java index 21f02c6c3a..b6efb0f579 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/standalonedependent/StandaloneDependentTestReconciler.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/standalonedependent/StandaloneDependentTestReconciler.java @@ -1,12 +1,14 @@ package io.javaoperatorsdk.operator.sample.standalonedependent; import java.util.List; +import java.util.Optional; import io.fabric8.kubernetes.api.model.apps.Deployment; import io.fabric8.kubernetes.client.KubernetesClient; import io.javaoperatorsdk.operator.ReconcilerUtils; import io.javaoperatorsdk.operator.api.reconciler.*; import io.javaoperatorsdk.operator.api.reconciler.dependent.Creator; +import io.javaoperatorsdk.operator.api.reconciler.dependent.Updater; import io.javaoperatorsdk.operator.junit.KubernetesClientAware; import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResource; import io.javaoperatorsdk.operator.processing.event.source.EventSource; @@ -17,11 +19,12 @@ public class StandaloneDependentTestReconciler implements Reconciler, EventSourceInitializer, - KubernetesClientAware { + KubernetesClientAware, ErrorStatusHandler { private KubernetesClient kubernetesClient; + private boolean errorOccurred = false; - KubernetesDependentResource deploymentDependent; + DeploymentDependentResource deploymentDependent; public StandaloneDependentTestReconciler() { deploymentDependent = new DeploymentDependentResource(); @@ -35,8 +38,17 @@ public List prepareEventSources( @Override public UpdateControl reconcile( - StandaloneDependentTestCustomResource resource, Context context) { - deploymentDependent.reconcile(resource, context); + StandaloneDependentTestCustomResource primary, Context context) { + deploymentDependent.reconcile(primary, context); + Optional deployment = deploymentDependent.getResource(primary); + if (deployment.isEmpty()) { + throw new IllegalStateException("Resource should not be empty after reconcile."); + } + + if (deployment.get().getSpec().getReplicas() != primary.getSpec().getReplicaCount()) { + // see https://github.com/java-operator-sdk/java-operator-sdk/issues/924 + throw new IllegalStateException("Something went wrong withe the cache mechanism."); + } return UpdateControl.noUpdate(); } @@ -51,15 +63,28 @@ public KubernetesClient getKubernetesClient() { return this.kubernetesClient; } + @Override + public Optional updateErrorStatus( + StandaloneDependentTestCustomResource resource, RetryInfo retryInfo, RuntimeException e) { + errorOccurred = true; + return Optional.empty(); + } + + public boolean isErrorOccurred() { + return errorOccurred; + } + private static class DeploymentDependentResource extends KubernetesDependentResource - implements Creator { + implements Creator, + Updater { @Override protected Deployment desired(StandaloneDependentTestCustomResource primary, Context context) { Deployment deployment = ReconcilerUtils.loadYaml(Deployment.class, getClass(), "nginx-deployment.yaml"); deployment.getMetadata().setName(primary.getMetadata().getName()); + deployment.getSpec().setReplicas(primary.getSpec().getReplicaCount()); deployment.getMetadata().setNamespace(primary.getMetadata().getNamespace()); return deployment; } diff --git a/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/WebPageReconciler.java b/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/WebPageReconciler.java index d7fd52f788..beef42599c 100644 --- a/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/WebPageReconciler.java +++ b/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/WebPageReconciler.java @@ -1,6 +1,5 @@ package io.javaoperatorsdk.operator.sample; -import java.time.Duration; import java.util.*; import org.apache.commons.lang3.StringUtils; @@ -20,7 +19,6 @@ import static io.javaoperatorsdk.operator.ReconcilerUtils.loadYaml; import static io.javaoperatorsdk.operator.api.reconciler.Constants.NO_FINALIZER; -import static org.awaitility.Awaitility.await; @ControllerConfiguration(finalizerName = NO_FINALIZER) public class WebPageReconciler @@ -59,7 +57,6 @@ public UpdateControl reconcile(WebPage webPage, Context context) { WebPageStatus status = new WebPageStatus(); - waitUntilConfigMapAvailable(webPage); status.setHtmlConfigMap(configMapDR.getResource(webPage).orElseThrow().getMetadata().getName()); status.setAreWeGood("Yes!"); status.setErrorMessage(null); @@ -68,12 +65,6 @@ public UpdateControl reconcile(WebPage webPage, Context context) { return UpdateControl.updateStatus(webPage); } - // todo after implemented we can remove this method: - // https://github.com/java-operator-sdk/java-operator-sdk/issues/924 - private void waitUntilConfigMapAvailable(WebPage webPage) { - await().atMost(Duration.ofSeconds(5)).until(() -> configMapDR.getResource(webPage).isPresent()); - } - @Override public Optional updateErrorStatus( WebPage resource, RetryInfo retryInfo, RuntimeException e) {