From d77644f7bc7b990d6066d2b51e05311c37752965 Mon Sep 17 00:00:00 2001 From: csviri Date: Wed, 9 Mar 2022 08:57:35 +0100 Subject: [PATCH 1/5] fix: synchonized event source methods --- .../controller/ControllerResourceEventSource.java | 14 ++++++-------- .../event/source/informer/InformerEventSource.java | 10 +++++----- .../informer/ManagedInformerEventSource.java | 10 +++++----- 3 files changed, 16 insertions(+), 18 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java index 801eae09ee..f4791ce307 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java @@ -48,15 +48,13 @@ public ControllerResourceEventSource(Controller controller) { public void start() { try { super.start(); - } catch (Exception e) { - if (e instanceof KubernetesClientException) { - handleKubernetesClientException(e); - } + } catch (KubernetesClientException e) { + handleKubernetesClientException(e); throw e; } } - public void eventReceived(ResourceAction action, T resource, T oldResource) { + public synchronized void eventReceived(ResourceAction action, T resource, T oldResource) { try { log.debug("Event received for resource: {}", getName(resource)); MDCUtils.addResourceInfo(resource); @@ -74,19 +72,19 @@ public void eventReceived(ResourceAction action, T resource, T oldResource) { } @Override - public void onAdd(T resource) { + public synchronized void onAdd(T resource) { super.onAdd(resource); eventReceived(ResourceAction.ADDED, resource, null); } @Override - public void onUpdate(T oldCustomResource, T newCustomResource) { + public synchronized void onUpdate(T oldCustomResource, T newCustomResource) { super.onUpdate(oldCustomResource, newCustomResource); eventReceived(ResourceAction.UPDATED, newCustomResource, oldCustomResource); } @Override - public void onDelete(T resource, boolean b) { + public synchronized void onDelete(T resource, boolean b) { super.onDelete(resource, b); eventReceived(ResourceAction.DELETED, resource, null); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java index 4a23bd9363..bfbf0f3599 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java @@ -83,12 +83,12 @@ public InformerEventSource(InformerConfiguration configuration, Kubernetes } @Override - public void onAdd(R resource) { + public synchronized void onAdd(R resource) { onAddOrUpdate("add", resource, () -> InformerEventSource.super.onAdd(resource)); } @Override - public void onUpdate(R oldObject, R newObject) { + public synchronized void onUpdate(R oldObject, R newObject) { onAddOrUpdate("update", newObject, () -> InformerEventSource.super.onUpdate(oldObject, newObject)); } @@ -117,7 +117,7 @@ private synchronized void onAddOrUpdate(String operation, R newObject, Runnable } @Override - public void onDelete(R r, boolean b) { + public synchronized void onDelete(R r, boolean b) { super.onDelete(r, b); propagateEvent(r); } @@ -161,7 +161,7 @@ public InformerConfiguration getConfiguration() { } @Override - public void handleRecentResourceUpdate(ResourceID resourceID, R resource, + public synchronized void handleRecentResourceUpdate(ResourceID resourceID, R resource, R previousResourceVersion) { handleRecentCreateOrUpdate(resource, () -> super.handleRecentResourceUpdate(resourceID, resource, @@ -169,7 +169,7 @@ public void handleRecentResourceUpdate(ResourceID resourceID, R resource, } @Override - public void handleRecentResourceCreate(ResourceID resourceID, R resource) { + public synchronized void handleRecentResourceCreate(ResourceID resourceID, R resource) { handleRecentCreateOrUpdate(resource, () -> super.handleRecentResourceCreate(resourceID, resource)); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index 063207b0e7..070d34e38d 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -34,17 +34,17 @@ protected ManagedInformerEventSource( } @Override - public void onAdd(R resource) { + public synchronized void onAdd(R resource) { temporaryResourceCache.removeResourceFromCache(resource); } @Override - public void onUpdate(R oldObj, R newObj) { + public synchronized void onUpdate(R oldObj, R newObj) { temporaryResourceCache.removeResourceFromCache(newObj); } @Override - public void onDelete(R obj, boolean deletedFinalStateUnknown) { + public synchronized void onDelete(R obj, boolean deletedFinalStateUnknown) { temporaryResourceCache.removeResourceFromCache(obj); } @@ -70,14 +70,14 @@ public void stop() { } @Override - public void handleRecentResourceUpdate(ResourceID resourceID, R resource, + public synchronized void handleRecentResourceUpdate(ResourceID resourceID, R resource, R previousResourceVersion) { temporaryResourceCache.putUpdatedResource(resource, previousResourceVersion.getMetadata().getResourceVersion()); } @Override - public void handleRecentResourceCreate(ResourceID resourceID, R resource) { + public synchronized void handleRecentResourceCreate(ResourceID resourceID, R resource) { temporaryResourceCache.putAddedResource(resource); } From 6f35775b5382b13d7a39f576c5a65e44c7e548a0 Mon Sep 17 00:00:00 2001 From: csviri Date: Wed, 9 Mar 2022 10:40:24 +0100 Subject: [PATCH 2/5] fix: sync approach --- .../ControllerResourceEventSource.java | 8 +-- .../event/source/informer/EventRecorder.java | 27 +++++--- .../source/informer/InformerEventSource.java | 26 ++++---- .../informer/ManagedInformerEventSource.java | 66 +++++++++++++------ 4 files changed, 81 insertions(+), 46 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java index f4791ce307..464cc3e500 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java @@ -54,7 +54,7 @@ public void start() { } } - public synchronized void eventReceived(ResourceAction action, T resource, T oldResource) { + public void eventReceived(ResourceAction action, T resource, T oldResource) { try { log.debug("Event received for resource: {}", getName(resource)); MDCUtils.addResourceInfo(resource); @@ -72,19 +72,19 @@ public synchronized void eventReceived(ResourceAction action, T resource, T oldR } @Override - public synchronized void onAdd(T resource) { + public void onAdd(T resource) { super.onAdd(resource); eventReceived(ResourceAction.ADDED, resource, null); } @Override - public synchronized void onUpdate(T oldCustomResource, T newCustomResource) { + public void onUpdate(T oldCustomResource, T newCustomResource) { super.onUpdate(oldCustomResource, newCustomResource); eventReceived(ResourceAction.UPDATED, newCustomResource, oldCustomResource); } @Override - public synchronized void onDelete(T resource, boolean b) { + public void onDelete(T resource, boolean b) { super.onDelete(resource, b); eventReceived(ResourceAction.DELETED, resource, null); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventRecorder.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventRecorder.java index 284b749f07..f010cc8d95 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventRecorder.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventRecorder.java @@ -1,34 +1,35 @@ package io.javaoperatorsdk.operator.processing.event.source.informer; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.processing.event.ResourceID; public class EventRecorder { - private final Map> resourceEvents = new ConcurrentHashMap<>(); + private final Map> resourceEvents = new HashMap<>(); - void startEventRecording(ResourceID resourceID) { + synchronized void startEventRecording(ResourceID resourceID) { resourceEvents.putIfAbsent(resourceID, new ArrayList<>(5)); } - public boolean isRecordingFor(ResourceID resourceID) { + public synchronized boolean isRecordingFor(ResourceID resourceID) { return resourceEvents.get(resourceID) != null; } - public void stopEventRecording(ResourceID resourceID) { + public synchronized void stopEventRecording(ResourceID resourceID) { resourceEvents.remove(resourceID); } - public void recordEvent(R resource) { + public synchronized void recordEvent(R resource) { resourceEvents.get(ResourceID.fromResource(resource)).add(resource); } - public boolean containsEventWithResourceVersion(ResourceID resourceID, String resourceVersion) { + public synchronized boolean containsEventWithResourceVersion(ResourceID resourceID, + String resourceVersion) { List events = resourceEvents.get(resourceID); if (events == null) { return false; @@ -41,7 +42,7 @@ public boolean containsEventWithResourceVersion(ResourceID resourceID, String re } } - public boolean containsEventWithVersionButItsNotLastOne( + public synchronized boolean containsEventWithVersionButItsNotLastOne( ResourceID resourceID, String resourceVersion) { List resources = resourceEvents.get(resourceID); if (resources == null) { @@ -59,7 +60,7 @@ public boolean containsEventWithVersionButItsNotLastOne( .equals(resourceVersion); } - public R getLastEvent(ResourceID resourceID) { + public synchronized R getLastEvent(ResourceID resourceID) { List resources = resourceEvents.get(resourceID); if (resources == null) { throw new IllegalStateException( @@ -68,4 +69,12 @@ public R getLastEvent(ResourceID resourceID) { } return resources.get(resources.size() - 1); } + + public synchronized boolean recordEventIfStartedRecording(R resource) { + if (isRecordingFor(ResourceID.fromResource(resource))) { + recordEvent(resource); + return true; + } + return false; + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java index bfbf0f3599..785f6d88ba 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java @@ -1,6 +1,7 @@ package io.javaoperatorsdk.operator.processing.event.source.informer; import java.util.Optional; +import java.util.concurrent.locks.ReentrantLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,6 +71,7 @@ public class InformerEventSource private final InformerConfiguration configuration; private final EventRecorder eventRecorder = new EventRecorder<>(); + private final ReentrantLock eventRecorderLock = new ReentrantLock(); public InformerEventSource( InformerConfiguration configuration, EventSourceContext

context) { @@ -83,21 +85,19 @@ public InformerEventSource(InformerConfiguration configuration, Kubernetes } @Override - public synchronized void onAdd(R resource) { + public void onAdd(R resource) { onAddOrUpdate("add", resource, () -> InformerEventSource.super.onAdd(resource)); } @Override - public synchronized void onUpdate(R oldObject, R newObject) { + public void onUpdate(R oldObject, R newObject) { onAddOrUpdate("update", newObject, () -> InformerEventSource.super.onUpdate(oldObject, newObject)); } - private synchronized void onAddOrUpdate(String operation, R newObject, Runnable superOnOp) { + private void onAddOrUpdate(String operation, R newObject, Runnable superOnOp) { var resourceID = ResourceID.fromResource(newObject); - if (eventRecorder.isRecordingFor(resourceID)) { - log.info("Recording event for: " + resourceID); - eventRecorder.recordEvent(newObject); + if (eventRecorder.recordEventIfStartedRecording(newObject)) { return; } if (temporalCacheHasResourceWithVersionAs(newObject)) { @@ -117,7 +117,7 @@ private synchronized void onAddOrUpdate(String operation, R newObject, Runnable } @Override - public synchronized void onDelete(R r, boolean b) { + public void onDelete(R r, boolean b) { super.onDelete(r, b); propagateEvent(r); } @@ -161,7 +161,7 @@ public InformerConfiguration getConfiguration() { } @Override - public synchronized void handleRecentResourceUpdate(ResourceID resourceID, R resource, + public void handleRecentResourceUpdate(ResourceID resourceID, R resource, R previousResourceVersion) { handleRecentCreateOrUpdate(resource, () -> super.handleRecentResourceUpdate(resourceID, resource, @@ -169,12 +169,12 @@ public synchronized void handleRecentResourceUpdate(ResourceID resourceID, R res } @Override - public synchronized void handleRecentResourceCreate(ResourceID resourceID, R resource) { + public void handleRecentResourceCreate(ResourceID resourceID, R resource) { handleRecentCreateOrUpdate(resource, () -> super.handleRecentResourceCreate(resourceID, resource)); } - private synchronized void handleRecentCreateOrUpdate(R resource, Runnable runnable) { + private void handleRecentCreateOrUpdate(R resource, Runnable runnable) { if (eventRecorder.isRecordingFor(ResourceID.fromResource(resource))) { handleRecentResourceOperationAndStopEventRecording(resource); } else { @@ -199,7 +199,7 @@ private synchronized void handleRecentCreateOrUpdate(R resource, Runnable runnab * * @param resource just created or updated resource */ - private synchronized void handleRecentResourceOperationAndStopEventRecording(R resource) { + private void handleRecentResourceOperationAndStopEventRecording(R resource) { ResourceID resourceID = ResourceID.fromResource(resource); try { if (!eventRecorder.containsEventWithResourceVersion( @@ -221,7 +221,7 @@ private synchronized void handleRecentResourceOperationAndStopEventRecording(R r } @Override - public synchronized void prepareForCreateOrUpdateEventFiltering(ResourceID resourceID, + public void prepareForCreateOrUpdateEventFiltering(ResourceID resourceID, R resource) { log.info("Starting event recording for: {}", resourceID); eventRecorder.startEventRecording(resourceID); @@ -234,7 +234,7 @@ public synchronized void prepareForCreateOrUpdateEventFiltering(ResourceID resou * @param resource handled by the informer */ @Override - public synchronized void cleanupOnCreateOrUpdateEventFiltering(ResourceID resourceID, + public void cleanupOnCreateOrUpdateEventFiltering(ResourceID resourceID, R resource) { log.info("Stopping event recording for: {}", resourceID); eventRecorder.stopEventRecording(resourceID); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index 070d34e38d..690536ed62 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -1,6 +1,7 @@ package io.javaoperatorsdk.operator.processing.event.source.informer; import java.util.Optional; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Predicate; import java.util.stream.Stream; @@ -26,6 +27,7 @@ public abstract class ManagedInformerEventSource temporaryResourceCache = new TemporaryResourceCache<>(this); + protected ReentrantLock tempCacheUpdateLock = new ReentrantLock(); protected ManagedInformerEventSource( MixedOperation, Resource> client, C configuration) { @@ -34,18 +36,27 @@ protected ManagedInformerEventSource( } @Override - public synchronized void onAdd(R resource) { - temporaryResourceCache.removeResourceFromCache(resource); + public void onAdd(R resource) { + syncedRemoveResourceFromCache(resource); } @Override - public synchronized void onUpdate(R oldObj, R newObj) { - temporaryResourceCache.removeResourceFromCache(newObj); + public void onUpdate(R oldObj, R newObj) { + syncedRemoveResourceFromCache(newObj); } @Override - public synchronized void onDelete(R obj, boolean deletedFinalStateUnknown) { - temporaryResourceCache.removeResourceFromCache(obj); + public void onDelete(R obj, boolean deletedFinalStateUnknown) { + syncedRemoveResourceFromCache(obj); + } + + protected void syncedRemoveResourceFromCache(R obj) { + tempCacheUpdateLock.lock(); + try { + temporaryResourceCache.removeResourceFromCache(obj); + } finally { + tempCacheUpdateLock.unlock(); + } } @Override @@ -70,15 +81,25 @@ public void stop() { } @Override - public synchronized void handleRecentResourceUpdate(ResourceID resourceID, R resource, + public void handleRecentResourceUpdate(ResourceID resourceID, R resource, R previousResourceVersion) { - temporaryResourceCache.putUpdatedResource(resource, - previousResourceVersion.getMetadata().getResourceVersion()); + tempCacheUpdateLock.lock(); + try { + temporaryResourceCache.putUpdatedResource(resource, + previousResourceVersion.getMetadata().getResourceVersion()); + } finally { + tempCacheUpdateLock.unlock(); + } } @Override - public synchronized void handleRecentResourceCreate(ResourceID resourceID, R resource) { - temporaryResourceCache.putAddedResource(resource); + public void handleRecentResourceCreate(ResourceID resourceID, R resource) { + tempCacheUpdateLock.lock(); + try { + temporaryResourceCache.putAddedResource(resource); + } finally { + tempCacheUpdateLock.unlock(); + } } @Override @@ -103,15 +124,20 @@ public Optional getCachedValue(ResourceID resourceID) { } protected boolean temporalCacheHasResourceWithVersionAs(R resource) { - var resourceID = ResourceID.fromResource(resource); - var res = temporaryResourceCache.getResourceFromCache(resourceID); - return res.map(r -> { - boolean resVersionsEqual = r.getMetadata().getResourceVersion() - .equals(resource.getMetadata().getResourceVersion()); - log.debug("Resource found in temporal cache for id: {} resource versions equal: {}", - resourceID, resVersionsEqual); - return resVersionsEqual; - }).orElse(false); + tempCacheUpdateLock.lock(); + try { + var resourceID = ResourceID.fromResource(resource); + var res = temporaryResourceCache.getResourceFromCache(resourceID); + return res.map(r -> { + boolean resVersionsEqual = r.getMetadata().getResourceVersion() + .equals(resource.getMetadata().getResourceVersion()); + log.debug("Resource found in temporal cache for id: {} resource versions equal: {}", + resourceID, resVersionsEqual); + return resVersionsEqual; + }).orElse(false); + } finally { + tempCacheUpdateLock.unlock(); + } } @Override From 55db3ff733367fa632aaa8ecd6cf29852e5db3ce Mon Sep 17 00:00:00 2001 From: csviri Date: Wed, 9 Mar 2022 10:49:06 +0100 Subject: [PATCH 3/5] fix: removed unnecessary syncing --- .../source/informer/InformerEventSource.java | 2 - .../informer/ManagedInformerEventSource.java | 56 ++++--------- .../informer/TemporaryResourceCache.java | 83 +++++++------------ 3 files changed, 43 insertions(+), 98 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java index 785f6d88ba..fac9153d16 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java @@ -1,7 +1,6 @@ package io.javaoperatorsdk.operator.processing.event.source.informer; import java.util.Optional; -import java.util.concurrent.locks.ReentrantLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,7 +70,6 @@ public class InformerEventSource private final InformerConfiguration configuration; private final EventRecorder eventRecorder = new EventRecorder<>(); - private final ReentrantLock eventRecorderLock = new ReentrantLock(); public InformerEventSource( InformerConfiguration configuration, EventSourceContext

context) { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index 690536ed62..063207b0e7 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -1,7 +1,6 @@ package io.javaoperatorsdk.operator.processing.event.source.informer; import java.util.Optional; -import java.util.concurrent.locks.ReentrantLock; import java.util.function.Predicate; import java.util.stream.Stream; @@ -27,7 +26,6 @@ public abstract class ManagedInformerEventSource temporaryResourceCache = new TemporaryResourceCache<>(this); - protected ReentrantLock tempCacheUpdateLock = new ReentrantLock(); protected ManagedInformerEventSource( MixedOperation, Resource> client, C configuration) { @@ -37,26 +35,17 @@ protected ManagedInformerEventSource( @Override public void onAdd(R resource) { - syncedRemoveResourceFromCache(resource); + temporaryResourceCache.removeResourceFromCache(resource); } @Override public void onUpdate(R oldObj, R newObj) { - syncedRemoveResourceFromCache(newObj); + temporaryResourceCache.removeResourceFromCache(newObj); } @Override public void onDelete(R obj, boolean deletedFinalStateUnknown) { - syncedRemoveResourceFromCache(obj); - } - - protected void syncedRemoveResourceFromCache(R obj) { - tempCacheUpdateLock.lock(); - try { - temporaryResourceCache.removeResourceFromCache(obj); - } finally { - tempCacheUpdateLock.unlock(); - } + temporaryResourceCache.removeResourceFromCache(obj); } @Override @@ -83,23 +72,13 @@ public void stop() { @Override public void handleRecentResourceUpdate(ResourceID resourceID, R resource, R previousResourceVersion) { - tempCacheUpdateLock.lock(); - try { - temporaryResourceCache.putUpdatedResource(resource, - previousResourceVersion.getMetadata().getResourceVersion()); - } finally { - tempCacheUpdateLock.unlock(); - } + temporaryResourceCache.putUpdatedResource(resource, + previousResourceVersion.getMetadata().getResourceVersion()); } @Override public void handleRecentResourceCreate(ResourceID resourceID, R resource) { - tempCacheUpdateLock.lock(); - try { - temporaryResourceCache.putAddedResource(resource); - } finally { - tempCacheUpdateLock.unlock(); - } + temporaryResourceCache.putAddedResource(resource); } @Override @@ -124,20 +103,15 @@ public Optional getCachedValue(ResourceID resourceID) { } protected boolean temporalCacheHasResourceWithVersionAs(R resource) { - tempCacheUpdateLock.lock(); - try { - var resourceID = ResourceID.fromResource(resource); - var res = temporaryResourceCache.getResourceFromCache(resourceID); - return res.map(r -> { - boolean resVersionsEqual = r.getMetadata().getResourceVersion() - .equals(resource.getMetadata().getResourceVersion()); - log.debug("Resource found in temporal cache for id: {} resource versions equal: {}", - resourceID, resVersionsEqual); - return resVersionsEqual; - }).orElse(false); - } finally { - tempCacheUpdateLock.unlock(); - } + var resourceID = ResourceID.fromResource(resource); + var res = temporaryResourceCache.getResourceFromCache(resourceID); + return res.map(r -> { + boolean resVersionsEqual = r.getMetadata().getResourceVersion() + .equals(resource.getMetadata().getResourceVersion()); + log.debug("Resource found in temporal cache for id: {} resource versions equal: {}", + resourceID, resVersionsEqual); + return resVersionsEqual; + }).orElse(false); } @Override diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java index fb14ea6847..51a5a73bd4 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java @@ -3,7 +3,6 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.locks.ReentrantLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,75 +35,49 @@ public class TemporaryResourceCache { private static final Logger log = LoggerFactory.getLogger(TemporaryResourceCache.class); private final Map cache = new ConcurrentHashMap<>(); - private final ReentrantLock lock = new ReentrantLock(); private final ManagedInformerEventSource managedInformerEventSource; public TemporaryResourceCache(ManagedInformerEventSource managedInformerEventSource) { this.managedInformerEventSource = managedInformerEventSource; } - public void removeResourceFromCache(T resource) { - lock.lock(); - try { - cache.remove(ResourceID.fromResource(resource)); - } finally { - lock.unlock(); - } + public synchronized void removeResourceFromCache(T resource) { + cache.remove(ResourceID.fromResource(resource)); } - public void unconditionallyCacheResource(T newResource) { - lock.lock(); - try { - cache.put(ResourceID.fromResource(newResource), newResource); - } finally { - lock.unlock(); - } + public synchronized void unconditionallyCacheResource(T newResource) { + cache.put(ResourceID.fromResource(newResource), newResource); } - public void putAddedResource(T newResource) { - lock.lock(); - try { - ResourceID resourceID = ResourceID.fromResource(newResource); - if (managedInformerEventSource.get(resourceID).isEmpty()) { - log.debug("Putting resource to cache with ID: {}", resourceID); - cache.put(ResourceID.fromResource(newResource), newResource); - } else { - log.debug("Won't put resource into cache found already informer cache: {}", resourceID); - } - } finally { - lock.unlock(); + public synchronized void putAddedResource(T newResource) { + ResourceID resourceID = ResourceID.fromResource(newResource); + if (managedInformerEventSource.get(resourceID).isEmpty()) { + log.debug("Putting resource to cache with ID: {}", resourceID); + cache.put(ResourceID.fromResource(newResource), newResource); + } else { + log.debug("Won't put resource into cache found already informer cache: {}", resourceID); } } - public void putUpdatedResource(T newResource, String previousResourceVersion) { - lock.lock(); - try { - var resourceId = ResourceID.fromResource(newResource); - var informerCacheResource = managedInformerEventSource.get(resourceId); - if (informerCacheResource.isEmpty()) { - log.debug("No cached value present for resource: {}", newResource); - return; - } - // if this is not true that means the cache was already updated - if (informerCacheResource.get().getMetadata().getResourceVersion() - .equals(previousResourceVersion)) { - log.debug("Putting resource to temporal cache with id: {}", resourceId); - cache.put(resourceId, newResource); - } else { - // if something is in cache it's surely obsolete now - cache.remove(resourceId); - } - } finally { - lock.unlock(); + public synchronized void putUpdatedResource(T newResource, String previousResourceVersion) { + var resourceId = ResourceID.fromResource(newResource); + var informerCacheResource = managedInformerEventSource.get(resourceId); + if (informerCacheResource.isEmpty()) { + log.debug("No cached value present for resource: {}", newResource); + return; + } + // if this is not true that means the cache was already updated + if (informerCacheResource.get().getMetadata().getResourceVersion() + .equals(previousResourceVersion)) { + log.debug("Putting resource to temporal cache with id: {}", resourceId); + cache.put(resourceId, newResource); + } else { + // if something is in cache it's surely obsolete now + cache.remove(resourceId); } } - public Optional getResourceFromCache(ResourceID resourceID) { - try { - lock.lock(); - return Optional.ofNullable(cache.get(resourceID)); - } finally { - lock.unlock(); - } + public synchronized Optional getResourceFromCache(ResourceID resourceID) { + return Optional.ofNullable(cache.get(resourceID)); } } From b9fd4449293e3c7d0c9cca3d53d6287390662a19 Mon Sep 17 00:00:00 2001 From: csviri Date: Wed, 9 Mar 2022 10:54:57 +0100 Subject: [PATCH 4/5] fix: remove unnecessary syn --- .../event/source/informer/EventRecorder.java | 16 ++++++++-------- .../source/informer/InformerEventSource.java | 11 ++++++----- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventRecorder.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventRecorder.java index f010cc8d95..85c1894f63 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventRecorder.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventRecorder.java @@ -12,23 +12,23 @@ public class EventRecorder { private final Map> resourceEvents = new HashMap<>(); - synchronized void startEventRecording(ResourceID resourceID) { + public void startEventRecording(ResourceID resourceID) { resourceEvents.putIfAbsent(resourceID, new ArrayList<>(5)); } - public synchronized boolean isRecordingFor(ResourceID resourceID) { + public boolean isRecordingFor(ResourceID resourceID) { return resourceEvents.get(resourceID) != null; } - public synchronized void stopEventRecording(ResourceID resourceID) { + public void stopEventRecording(ResourceID resourceID) { resourceEvents.remove(resourceID); } - public synchronized void recordEvent(R resource) { + public void recordEvent(R resource) { resourceEvents.get(ResourceID.fromResource(resource)).add(resource); } - public synchronized boolean containsEventWithResourceVersion(ResourceID resourceID, + public boolean containsEventWithResourceVersion(ResourceID resourceID, String resourceVersion) { List events = resourceEvents.get(resourceID); if (events == null) { @@ -42,7 +42,7 @@ public synchronized boolean containsEventWithResourceVersion(ResourceID resource } } - public synchronized boolean containsEventWithVersionButItsNotLastOne( + public boolean containsEventWithVersionButItsNotLastOne( ResourceID resourceID, String resourceVersion) { List resources = resourceEvents.get(resourceID); if (resources == null) { @@ -60,7 +60,7 @@ public synchronized boolean containsEventWithVersionButItsNotLastOne( .equals(resourceVersion); } - public synchronized R getLastEvent(ResourceID resourceID) { + public R getLastEvent(ResourceID resourceID) { List resources = resourceEvents.get(resourceID); if (resources == null) { throw new IllegalStateException( @@ -70,7 +70,7 @@ public synchronized R getLastEvent(ResourceID resourceID) { return resources.get(resources.size() - 1); } - public synchronized boolean recordEventIfStartedRecording(R resource) { + public boolean recordEventIfStartedRecording(R resource) { if (isRecordingFor(ResourceID.fromResource(resource))) { recordEvent(resource); return true; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java index fac9153d16..f95719885c 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java @@ -69,6 +69,7 @@ public class InformerEventSource private static final Logger log = LoggerFactory.getLogger(InformerEventSource.class); private final InformerConfiguration configuration; + // always called from a synchronized method private final EventRecorder eventRecorder = new EventRecorder<>(); public InformerEventSource( @@ -93,7 +94,7 @@ public void onUpdate(R oldObject, R newObject) { () -> InformerEventSource.super.onUpdate(oldObject, newObject)); } - private void onAddOrUpdate(String operation, R newObject, Runnable superOnOp) { + private synchronized void onAddOrUpdate(String operation, R newObject, Runnable superOnOp) { var resourceID = ResourceID.fromResource(newObject); if (eventRecorder.recordEventIfStartedRecording(newObject)) { return; @@ -159,7 +160,7 @@ public InformerConfiguration getConfiguration() { } @Override - public void handleRecentResourceUpdate(ResourceID resourceID, R resource, + public synchronized void handleRecentResourceUpdate(ResourceID resourceID, R resource, R previousResourceVersion) { handleRecentCreateOrUpdate(resource, () -> super.handleRecentResourceUpdate(resourceID, resource, @@ -167,7 +168,7 @@ public void handleRecentResourceUpdate(ResourceID resourceID, R resource, } @Override - public void handleRecentResourceCreate(ResourceID resourceID, R resource) { + public synchronized void handleRecentResourceCreate(ResourceID resourceID, R resource) { handleRecentCreateOrUpdate(resource, () -> super.handleRecentResourceCreate(resourceID, resource)); } @@ -219,7 +220,7 @@ private void handleRecentResourceOperationAndStopEventRecording(R resource) { } @Override - public void prepareForCreateOrUpdateEventFiltering(ResourceID resourceID, + public synchronized void prepareForCreateOrUpdateEventFiltering(ResourceID resourceID, R resource) { log.info("Starting event recording for: {}", resourceID); eventRecorder.startEventRecording(resourceID); @@ -232,7 +233,7 @@ public void prepareForCreateOrUpdateEventFiltering(ResourceID resourceID, * @param resource handled by the informer */ @Override - public void cleanupOnCreateOrUpdateEventFiltering(ResourceID resourceID, + public synchronized void cleanupOnCreateOrUpdateEventFiltering(ResourceID resourceID, R resource) { log.info("Stopping event recording for: {}", resourceID); eventRecorder.stopEventRecording(resourceID); From 8c6ed0074fef1c3598ae4dc7dd03bad7cd57eb49 Mon Sep 17 00:00:00 2001 From: csviri Date: Wed, 9 Mar 2022 10:56:59 +0100 Subject: [PATCH 5/5] fix: revert --- .../processing/event/source/informer/EventRecorder.java | 8 -------- .../event/source/informer/InformerEventSource.java | 4 +++- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventRecorder.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventRecorder.java index 85c1894f63..5d23d870aa 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventRecorder.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventRecorder.java @@ -69,12 +69,4 @@ public R getLastEvent(ResourceID resourceID) { } return resources.get(resources.size() - 1); } - - public boolean recordEventIfStartedRecording(R resource) { - if (isRecordingFor(ResourceID.fromResource(resource))) { - recordEvent(resource); - return true; - } - return false; - } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java index f95719885c..e1ef859549 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java @@ -96,7 +96,9 @@ public void onUpdate(R oldObject, R newObject) { private synchronized void onAddOrUpdate(String operation, R newObject, Runnable superOnOp) { var resourceID = ResourceID.fromResource(newObject); - if (eventRecorder.recordEventIfStartedRecording(newObject)) { + if (eventRecorder.isRecordingFor(resourceID)) { + log.info("Recording event for: " + resourceID); + eventRecorder.recordEvent(newObject); return; } if (temporalCacheHasResourceWithVersionAs(newObject)) {