Skip to content

Improve cache interface for CustomResourceEventSource #684

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 25 commits into from
Nov 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
d29e50b
feat: Add tomcat operator sample (#659)
adam-sandor Nov 12, 2021
10e6595
feature: simplified cached access
csviri Nov 15, 2021
106e5f0
Merge branch 'v2' into improve-cache-interface
csviri Nov 16, 2021
9a9740c
fix: improved cache stream handling?
csviri Nov 17, 2021
e4824a4
Merge branch 'v2' into improve-cache-interface
csviri Nov 19, 2021
ea77813
feat: Add tomcat operator sample (#659)
adam-sandor Nov 12, 2021
b0c190b
feature: simplified cached access
csviri Nov 15, 2021
07cf523
fix: improved cache stream handling?
csviri Nov 17, 2021
78dc9de
fix: rebase on v2
csviri Nov 22, 2021
86f77fe
Merge branch 'improve-cache-interface' of github.com:java-operator-sd…
csviri Nov 22, 2021
a8549dd
feat: added additional cache methods
csviri Nov 22, 2021
721fec0
fix: formatting
csviri Nov 22, 2021
a681a1b
feat: Add tomcat operator sample (#659)
adam-sandor Nov 12, 2021
5089ebf
feature: simplified cached access
csviri Nov 15, 2021
ae08281
fix: improved cache stream handling?
csviri Nov 17, 2021
453b999
refactor: simplify getCachedCustomResources
metacosm Nov 26, 2021
02e5557
refactor: move cache-related methods to ResourceCache
metacosm Nov 26, 2021
574daaa
refactor: expose only ResourceCache and not controller EventSource
metacosm Nov 26, 2021
058dca0
refactor: streamline method names
metacosm Nov 26, 2021
084e39a
refactor: more renaming CustomResource -> Resource
metacosm Nov 26, 2021
a99d6bb
Merge remote-tracking branch 'origin/improve-cache-interface' into im…
csviri Nov 26, 2021
23bede1
fix: add more methods
csviri Nov 26, 2021
36bbd4b
fix: improve robustness (in particular if no informer exist for NS)
metacosm Nov 26, 2021
b01c5b9
Revert "refactor: expose only ResourceCache and not controller EventS…
metacosm Nov 29, 2021
0e18f29
refactor: separate cache impl
csviri Nov 29, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@ public class MDCUtils {
private static final String GENERATION = "resource.generation";
private static final String UID = "resource.uid";

public static void addCustomResourceIDInfo(ResourceID resourceID) {
public static void addResourceIDInfo(ResourceID resourceID) {
MDC.put(NAME, resourceID.getName());
MDC.put(NAMESPACE, resourceID.getNamespace().orElse("no namespace"));
}

public static void removeCustomResourceIDInfo() {
public static void removeResourceIDInfo() {
MDC.remove(NAME);
MDC.remove(NAMESPACE);
}

public static void addCustomResourceInfo(HasMetadata resource) {
public static void addResourceInfo(HasMetadata resource) {
MDC.put(API_VERSION, resource.getApiVersion());
MDC.put(KIND, resource.getKind());
MDC.put(NAME, resource.getMetadata().getName());
Expand All @@ -37,7 +37,7 @@ public static void addCustomResourceInfo(HasMetadata resource) {
MDC.put(UID, resource.getMetadata().getUid());
}

public static void removeCustomResourceInfo() {
public static void removeResourceInfo() {
MDC.remove(API_VERSION);
MDC.remove(KIND);
MDC.remove(NAME);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
import io.javaoperatorsdk.operator.processing.LifecycleAware;
import io.javaoperatorsdk.operator.processing.MDCUtils;
import io.javaoperatorsdk.operator.processing.ResourceCache;
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.source.ResourceCache;
import io.javaoperatorsdk.operator.processing.event.source.ResourceEvent;
import io.javaoperatorsdk.operator.processing.event.source.TimerEventSource;
import io.javaoperatorsdk.operator.processing.retry.GenericRetry;
Expand Down Expand Up @@ -54,7 +54,7 @@ class EventProcessor<R extends HasMetadata> implements EventHandler, LifecycleAw

EventProcessor(EventSourceManager<R> eventSourceManager) {
this(
eventSourceManager.getControllerResourceEventSource(),
eventSourceManager.getControllerResourceEventSource().getResourceCache(),
ExecutorServiceManager.instance().executorService(),
eventSourceManager.getController().getConfiguration().getName(),
new ReconciliationDispatcher<>(eventSourceManager.getController()),
Expand All @@ -69,7 +69,8 @@ class EventProcessor<R extends HasMetadata> implements EventHandler, LifecycleAw
EventSourceManager<R> eventSourceManager,
String relatedControllerName,
Retry retry) {
this(eventSourceManager.getControllerResourceEventSource(), null, relatedControllerName,
this(eventSourceManager.getControllerResourceEventSource().getResourceCache(), null,
relatedControllerName,
reconciliationDispatcher, retry, null, eventSourceManager);
}

Expand Down Expand Up @@ -105,7 +106,7 @@ public void handleEvent(Event event) {
return;
}
final var resourceID = event.getRelatedCustomResourceID();
MDCUtils.addCustomResourceIDInfo(resourceID);
MDCUtils.addResourceIDInfo(resourceID);
metrics.receivedEvent(event);

handleEventMarking(event);
Expand All @@ -116,42 +117,35 @@ public void handleEvent(Event event) {
}
} finally {
lock.unlock();
MDCUtils.removeCustomResourceIDInfo();
MDCUtils.removeResourceIDInfo();
}
}

private void submitReconciliationExecution(ResourceID customResourceUid) {
private void submitReconciliationExecution(ResourceID resourceID) {
try {
boolean controllerUnderExecution = isControllerUnderExecution(customResourceUid);
Optional<R> latestCustomResource =
resourceCache.getCustomResource(customResourceUid);
latestCustomResource.ifPresent(MDCUtils::addCustomResourceInfo);
if (!controllerUnderExecution
&& latestCustomResource.isPresent()) {
setUnderExecutionProcessing(customResourceUid);
final var retryInfo = retryInfo(customResourceUid);
ExecutionScope<R> executionScope =
new ExecutionScope<>(
latestCustomResource.get(),
retryInfo);
eventMarker.unMarkEventReceived(customResourceUid);
metrics.reconcileCustomResource(customResourceUid, retryInfo);
boolean controllerUnderExecution = isControllerUnderExecution(resourceID);
Optional<R> latest = resourceCache.get(resourceID);
latest.ifPresent(MDCUtils::addResourceInfo);
if (!controllerUnderExecution && latest.isPresent()) {
setUnderExecutionProcessing(resourceID);
final var retryInfo = retryInfo(resourceID);
ExecutionScope<R> executionScope = new ExecutionScope<>(latest.get(), retryInfo);
eventMarker.unMarkEventReceived(resourceID);
metrics.reconcileCustomResource(resourceID, retryInfo);
log.debug("Executing events for custom resource. Scope: {}", executionScope);
executor.execute(new ControllerExecution(executionScope));
} else {
log.debug(
"Skipping executing controller for resource id: {}."
+ " Controller in execution: {}. Latest CustomResource present: {}",
customResourceUid,
"Skipping executing controller for resource id: {}. Controller in execution: {}. Latest Resource present: {}",
resourceID,
controllerUnderExecution,
latestCustomResource.isPresent());
if (latestCustomResource.isEmpty()) {
log.warn("no custom resource found in cache for CustomResourceID: {}",
customResourceUid);
latest.isPresent());
if (latest.isEmpty()) {
log.warn("no custom resource found in cache for ResourceID: {}", resourceID);
}
}
} finally {
MDCUtils.removeCustomResourceInfo();
MDCUtils.removeResourceInfo();
}
}

Expand Down Expand Up @@ -227,7 +221,7 @@ private boolean isCacheReadyForInstantReconciliation(ExecutionScope<R> execution
.orElseThrow(() -> new IllegalStateException(
"Updated custom resource must be present at this point of time")));
String cachedCustomResourceVersion = getVersion(resourceCache
.getCustomResource(executionScope.getCustomResourceID())
.get(executionScope.getCustomResourceID())
.orElseThrow(() -> new IllegalStateException(
"Cached custom resource must be present at this point")));

Expand Down Expand Up @@ -357,15 +351,15 @@ public void run() {
final var thread = Thread.currentThread();
final var name = thread.getName();
try {
MDCUtils.addCustomResourceInfo(executionScope.getResource());
MDCUtils.addResourceInfo(executionScope.getResource());
thread.setName("EventHandler-" + controllerName);
PostExecutionControl<R> postExecutionControl =
reconciliationDispatcher.handleExecution(executionScope);
eventProcessingFinished(executionScope, postExecutionControl);
} finally {
// restore original name
thread.setName(name);
MDCUtils.removeCustomResourceInfo();
MDCUtils.removeResourceInfo();
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package io.javaoperatorsdk.operator.processing.event.source;

import java.util.Map;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.stream.Stream;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.cache.Cache;
import io.javaoperatorsdk.operator.api.config.Cloner;
import io.javaoperatorsdk.operator.processing.event.ResourceID;

import static io.javaoperatorsdk.operator.processing.event.source.ControllerResourceEventSource.ANY_NAMESPACE_MAP_KEY;

public class ControllerResourceCache<T extends HasMetadata> implements ResourceCache<T> {

private final Map<String, SharedIndexInformer<T>> sharedIndexInformers;
private final Cloner cloner;

public ControllerResourceCache(Map<String, SharedIndexInformer<T>> sharedIndexInformers,
Cloner cloner) {
this.sharedIndexInformers = sharedIndexInformers;
this.cloner = cloner;
}

@Override
public Stream<T> list(Predicate<T> predicate) {
return sharedIndexInformers.values().stream()
.flatMap(i -> i.getStore().list().stream().filter(predicate));
}

@Override
public Stream<T> list(String namespace, Predicate<T> predicate) {
if (isWatchingAllNamespaces()) {
final var stream = sharedIndexInformers.get(ANY_NAMESPACE_MAP_KEY).getStore().list().stream()
.filter(r -> r.getMetadata().getNamespace().equals(namespace));
return predicate != null ? stream.filter(predicate) : stream;
} else {
final var informer = sharedIndexInformers.get(namespace);
return informer != null ? informer.getStore().list().stream().filter(predicate)
: Stream.empty();
}
}

@Override
public Optional<T> get(ResourceID resourceID) {
var sharedIndexInformer = sharedIndexInformers.get(ANY_NAMESPACE_MAP_KEY);
if (sharedIndexInformer == null) {
sharedIndexInformer =
sharedIndexInformers.get(resourceID.getNamespace().orElse(ANY_NAMESPACE_MAP_KEY));
}
var resource = sharedIndexInformer.getStore()
.getByKey(Cache.namespaceKeyFunc(resourceID.getNamespace().orElse(null),
resourceID.getName()));
if (resource == null) {
return Optional.empty();
} else {
return Optional.of(cloner.clone(resource));
}
}

private boolean isWatchingAllNamespaces() {
return sharedIndexInformers.containsKey(ANY_NAMESPACE_MAP_KEY);
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package io.javaoperatorsdk.operator.processing.event.source;

import java.util.*;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

import org.slf4j.Logger;
Expand All @@ -12,13 +15,10 @@
import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.cache.Cache;
import io.javaoperatorsdk.operator.MissingCRDException;
import io.javaoperatorsdk.operator.api.config.Cloner;
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
import io.javaoperatorsdk.operator.processing.Controller;
import io.javaoperatorsdk.operator.processing.MDCUtils;
import io.javaoperatorsdk.operator.processing.ResourceCache;
import io.javaoperatorsdk.operator.processing.event.ResourceID;

import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getName;
Expand All @@ -29,7 +29,7 @@
* This is a special case since is not bound to a single custom resource
*/
public class ControllerResourceEventSource<T extends HasMetadata> extends AbstractEventSource
implements ResourceEventHandler<T>, ResourceCache<T> {
implements ResourceEventHandler<T> {

public static final String ANY_NAMESPACE_MAP_KEY = "anyNamespace";

Expand All @@ -41,11 +41,12 @@ public class ControllerResourceEventSource<T extends HasMetadata> extends Abstra

private final ResourceEventFilter<T> filter;
private final OnceWhitelistEventFilterEventFilter<T> onceWhitelistEventFilterEventFilter;
private final Cloner cloner;
private final ControllerResourceCache<T> cache;

public ControllerResourceEventSource(Controller<T> controller) {
this.controller = controller;
this.cloner = controller.getConfiguration().getConfigurationService().getResourceCloner();
var cloner = controller.getConfiguration().getConfigurationService().getResourceCloner();
this.cache = new ControllerResourceCache<>(sharedIndexInformers, cloner);

var filters = new ResourceEventFilter[] {
ResourceEventFilters.finalizerNeededAndApplied(),
Expand Down Expand Up @@ -128,7 +129,7 @@ public void eventReceived(ResourceAction action, T customResource, T oldResource
try {
log.debug(
"Event received for resource: {}", getName(customResource));
MDCUtils.addCustomResourceInfo(customResource);
MDCUtils.addResourceInfo(customResource);
if (filter.acceptChange(controller.getConfiguration(), oldResource, customResource)) {
eventHandler.handleEvent(
new ResourceEvent(action, ResourceID.fromResource(customResource)));
Expand All @@ -139,7 +140,7 @@ public void eventReceived(ResourceAction action, T customResource, T oldResource
getVersion(customResource));
}
} finally {
MDCUtils.removeCustomResourceInfo();
MDCUtils.removeResourceInfo();
}
}

Expand All @@ -158,24 +159,13 @@ public void onDelete(T resource, boolean b) {
eventReceived(ResourceAction.DELETED, resource, null);
}

@Override
public Optional<T> getCustomResource(ResourceID resourceID) {
var sharedIndexInformer = sharedIndexInformers.get(ANY_NAMESPACE_MAP_KEY);
if (sharedIndexInformer == null) {
sharedIndexInformer =
sharedIndexInformers.get(resourceID.getNamespace().orElse(ANY_NAMESPACE_MAP_KEY));
}
var resource = sharedIndexInformer.getStore()
.getByKey(Cache.namespaceKeyFunc(resourceID.getNamespace().orElse(null),
resourceID.getName()));
if (resource == null) {
return Optional.empty();
} else {
return Optional.of(cloner.clone(resource));
}
public Optional<T> get(ResourceID resourceID) {
return cache.get(resourceID);
}


public ControllerResourceCache<T> getResourceCache() {
return cache;
}

/**
* @return shared informers by namespace. If custom resource is not namespace scoped use
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.javaoperatorsdk.operator.processing.event.source;

import java.util.Optional;
import java.util.function.Predicate;
import java.util.stream.Stream;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.processing.event.ResourceID;

@SuppressWarnings({"rawtypes", "unchecked"})
public interface ResourceCache<T extends HasMetadata> {
Predicate TRUE = (a) -> true;

Optional<T> get(ResourceID resourceID);

default Stream<T> list() {
return list(TRUE);
}

Stream<T> list(Predicate<T> predicate);

default Stream<T> list(String namespace) {
return list(namespace, TRUE);
}

Stream<T> list(String namespace, Predicate<T> predicate);
}
Loading