diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java index 801eae09ee..464cc3e500 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java @@ -48,10 +48,8 @@ public ControllerResourceEventSource(Controller controller) { public void start() { try { super.start(); - } catch (Exception e) { - if (e instanceof KubernetesClientException) { - handleKubernetesClientException(e); - } + } catch (KubernetesClientException e) { + handleKubernetesClientException(e); throw e; } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventRecorder.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventRecorder.java index 284b749f07..5d23d870aa 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventRecorder.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventRecorder.java @@ -1,18 +1,18 @@ package io.javaoperatorsdk.operator.processing.event.source.informer; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.processing.event.ResourceID; public class EventRecorder { - private final Map> resourceEvents = new ConcurrentHashMap<>(); + private final Map> resourceEvents = new HashMap<>(); - void startEventRecording(ResourceID resourceID) { + public void startEventRecording(ResourceID resourceID) { resourceEvents.putIfAbsent(resourceID, new ArrayList<>(5)); } @@ -28,7 +28,8 @@ public void recordEvent(R resource) { resourceEvents.get(ResourceID.fromResource(resource)).add(resource); } - public boolean containsEventWithResourceVersion(ResourceID resourceID, String resourceVersion) { + public boolean containsEventWithResourceVersion(ResourceID resourceID, + String resourceVersion) { List events = resourceEvents.get(resourceID); if (events == null) { return false; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java index 4a23bd9363..e1ef859549 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java @@ -69,6 +69,7 @@ public class InformerEventSource private static final Logger log = LoggerFactory.getLogger(InformerEventSource.class); private final InformerConfiguration configuration; + // always called from a synchronized method private final EventRecorder eventRecorder = new EventRecorder<>(); public InformerEventSource( @@ -161,7 +162,7 @@ public InformerConfiguration getConfiguration() { } @Override - public void handleRecentResourceUpdate(ResourceID resourceID, R resource, + public synchronized void handleRecentResourceUpdate(ResourceID resourceID, R resource, R previousResourceVersion) { handleRecentCreateOrUpdate(resource, () -> super.handleRecentResourceUpdate(resourceID, resource, @@ -169,12 +170,12 @@ public void handleRecentResourceUpdate(ResourceID resourceID, R resource, } @Override - public void handleRecentResourceCreate(ResourceID resourceID, R resource) { + public synchronized void handleRecentResourceCreate(ResourceID resourceID, R resource) { handleRecentCreateOrUpdate(resource, () -> super.handleRecentResourceCreate(resourceID, resource)); } - private synchronized void handleRecentCreateOrUpdate(R resource, Runnable runnable) { + private void handleRecentCreateOrUpdate(R resource, Runnable runnable) { if (eventRecorder.isRecordingFor(ResourceID.fromResource(resource))) { handleRecentResourceOperationAndStopEventRecording(resource); } else { @@ -199,7 +200,7 @@ private synchronized void handleRecentCreateOrUpdate(R resource, Runnable runnab * * @param resource just created or updated resource */ - private synchronized void handleRecentResourceOperationAndStopEventRecording(R resource) { + private void handleRecentResourceOperationAndStopEventRecording(R resource) { ResourceID resourceID = ResourceID.fromResource(resource); try { if (!eventRecorder.containsEventWithResourceVersion( diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java index fb14ea6847..51a5a73bd4 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java @@ -3,7 +3,6 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.locks.ReentrantLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,75 +35,49 @@ public class TemporaryResourceCache { private static final Logger log = LoggerFactory.getLogger(TemporaryResourceCache.class); private final Map cache = new ConcurrentHashMap<>(); - private final ReentrantLock lock = new ReentrantLock(); private final ManagedInformerEventSource managedInformerEventSource; public TemporaryResourceCache(ManagedInformerEventSource managedInformerEventSource) { this.managedInformerEventSource = managedInformerEventSource; } - public void removeResourceFromCache(T resource) { - lock.lock(); - try { - cache.remove(ResourceID.fromResource(resource)); - } finally { - lock.unlock(); - } + public synchronized void removeResourceFromCache(T resource) { + cache.remove(ResourceID.fromResource(resource)); } - public void unconditionallyCacheResource(T newResource) { - lock.lock(); - try { - cache.put(ResourceID.fromResource(newResource), newResource); - } finally { - lock.unlock(); - } + public synchronized void unconditionallyCacheResource(T newResource) { + cache.put(ResourceID.fromResource(newResource), newResource); } - public void putAddedResource(T newResource) { - lock.lock(); - try { - ResourceID resourceID = ResourceID.fromResource(newResource); - if (managedInformerEventSource.get(resourceID).isEmpty()) { - log.debug("Putting resource to cache with ID: {}", resourceID); - cache.put(ResourceID.fromResource(newResource), newResource); - } else { - log.debug("Won't put resource into cache found already informer cache: {}", resourceID); - } - } finally { - lock.unlock(); + public synchronized void putAddedResource(T newResource) { + ResourceID resourceID = ResourceID.fromResource(newResource); + if (managedInformerEventSource.get(resourceID).isEmpty()) { + log.debug("Putting resource to cache with ID: {}", resourceID); + cache.put(ResourceID.fromResource(newResource), newResource); + } else { + log.debug("Won't put resource into cache found already informer cache: {}", resourceID); } } - public void putUpdatedResource(T newResource, String previousResourceVersion) { - lock.lock(); - try { - var resourceId = ResourceID.fromResource(newResource); - var informerCacheResource = managedInformerEventSource.get(resourceId); - if (informerCacheResource.isEmpty()) { - log.debug("No cached value present for resource: {}", newResource); - return; - } - // if this is not true that means the cache was already updated - if (informerCacheResource.get().getMetadata().getResourceVersion() - .equals(previousResourceVersion)) { - log.debug("Putting resource to temporal cache with id: {}", resourceId); - cache.put(resourceId, newResource); - } else { - // if something is in cache it's surely obsolete now - cache.remove(resourceId); - } - } finally { - lock.unlock(); + public synchronized void putUpdatedResource(T newResource, String previousResourceVersion) { + var resourceId = ResourceID.fromResource(newResource); + var informerCacheResource = managedInformerEventSource.get(resourceId); + if (informerCacheResource.isEmpty()) { + log.debug("No cached value present for resource: {}", newResource); + return; + } + // if this is not true that means the cache was already updated + if (informerCacheResource.get().getMetadata().getResourceVersion() + .equals(previousResourceVersion)) { + log.debug("Putting resource to temporal cache with id: {}", resourceId); + cache.put(resourceId, newResource); + } else { + // if something is in cache it's surely obsolete now + cache.remove(resourceId); } } - public Optional getResourceFromCache(ResourceID resourceID) { - try { - lock.lock(); - return Optional.ofNullable(cache.get(resourceID)); - } finally { - lock.unlock(); - } + public synchronized Optional getResourceFromCache(ResourceID resourceID) { + return Optional.ofNullable(cache.get(resourceID)); } }