Skip to content

fix: making sure there is not race condition with temporal cache #1009

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,8 @@ public ControllerResourceEventSource(Controller<T> controller) {
public void start() {
try {
super.start();
} catch (Exception e) {
if (e instanceof KubernetesClientException) {
handleKubernetesClientException(e);
}
} catch (KubernetesClientException e) {
handleKubernetesClientException(e);
throw e;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
package io.javaoperatorsdk.operator.processing.event.source.informer;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.processing.event.ResourceID;

public class EventRecorder<R extends HasMetadata> {

private final Map<ResourceID, ArrayList<R>> resourceEvents = new ConcurrentHashMap<>();
private final Map<ResourceID, ArrayList<R>> resourceEvents = new HashMap<>();

void startEventRecording(ResourceID resourceID) {
public void startEventRecording(ResourceID resourceID) {
resourceEvents.putIfAbsent(resourceID, new ArrayList<>(5));
}

Expand All @@ -28,7 +28,8 @@ public void recordEvent(R resource) {
resourceEvents.get(ResourceID.fromResource(resource)).add(resource);
}

public boolean containsEventWithResourceVersion(ResourceID resourceID, String resourceVersion) {
public boolean containsEventWithResourceVersion(ResourceID resourceID,
String resourceVersion) {
List<R> events = resourceEvents.get(resourceID);
if (events == null) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public class InformerEventSource<R extends HasMetadata, P extends HasMetadata>
private static final Logger log = LoggerFactory.getLogger(InformerEventSource.class);

private final InformerConfiguration<R, P> configuration;
// always called from a synchronized method
private final EventRecorder<R> eventRecorder = new EventRecorder<>();

public InformerEventSource(
Expand Down Expand Up @@ -161,20 +162,20 @@ public InformerConfiguration<R, P> getConfiguration() {
}

@Override
public void handleRecentResourceUpdate(ResourceID resourceID, R resource,
public synchronized void handleRecentResourceUpdate(ResourceID resourceID, R resource,
R previousResourceVersion) {
handleRecentCreateOrUpdate(resource,
() -> super.handleRecentResourceUpdate(resourceID, resource,
previousResourceVersion));
}

@Override
public void handleRecentResourceCreate(ResourceID resourceID, R resource) {
public synchronized void handleRecentResourceCreate(ResourceID resourceID, R resource) {
handleRecentCreateOrUpdate(resource,
() -> super.handleRecentResourceCreate(resourceID, resource));
}

private synchronized void handleRecentCreateOrUpdate(R resource, Runnable runnable) {
private void handleRecentCreateOrUpdate(R resource, Runnable runnable) {
if (eventRecorder.isRecordingFor(ResourceID.fromResource(resource))) {
handleRecentResourceOperationAndStopEventRecording(resource);
} else {
Expand All @@ -199,7 +200,7 @@ private synchronized void handleRecentCreateOrUpdate(R resource, Runnable runnab
*
* @param resource just created or updated resource
*/
private synchronized void handleRecentResourceOperationAndStopEventRecording(R resource) {
private void handleRecentResourceOperationAndStopEventRecording(R resource) {
ResourceID resourceID = ResourceID.fromResource(resource);
try {
if (!eventRecorder.containsEventWithResourceVersion(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -36,75 +35,49 @@ public class TemporaryResourceCache<T extends HasMetadata> {
private static final Logger log = LoggerFactory.getLogger(TemporaryResourceCache.class);

private final Map<ResourceID, T> cache = new ConcurrentHashMap<>();
private final ReentrantLock lock = new ReentrantLock();
private final ManagedInformerEventSource<T, ?, ?> managedInformerEventSource;

public TemporaryResourceCache(ManagedInformerEventSource<T, ?, ?> managedInformerEventSource) {
this.managedInformerEventSource = managedInformerEventSource;
}

public void removeResourceFromCache(T resource) {
lock.lock();
try {
cache.remove(ResourceID.fromResource(resource));
} finally {
lock.unlock();
}
public synchronized void removeResourceFromCache(T resource) {
cache.remove(ResourceID.fromResource(resource));
}

public void unconditionallyCacheResource(T newResource) {
lock.lock();
try {
cache.put(ResourceID.fromResource(newResource), newResource);
} finally {
lock.unlock();
}
public synchronized void unconditionallyCacheResource(T newResource) {
cache.put(ResourceID.fromResource(newResource), newResource);
}

public void putAddedResource(T newResource) {
lock.lock();
try {
ResourceID resourceID = ResourceID.fromResource(newResource);
if (managedInformerEventSource.get(resourceID).isEmpty()) {
log.debug("Putting resource to cache with ID: {}", resourceID);
cache.put(ResourceID.fromResource(newResource), newResource);
} else {
log.debug("Won't put resource into cache found already informer cache: {}", resourceID);
}
} finally {
lock.unlock();
public synchronized void putAddedResource(T newResource) {
ResourceID resourceID = ResourceID.fromResource(newResource);
if (managedInformerEventSource.get(resourceID).isEmpty()) {
log.debug("Putting resource to cache with ID: {}", resourceID);
cache.put(ResourceID.fromResource(newResource), newResource);
} else {
log.debug("Won't put resource into cache found already informer cache: {}", resourceID);
}
}

public void putUpdatedResource(T newResource, String previousResourceVersion) {
lock.lock();
try {
var resourceId = ResourceID.fromResource(newResource);
var informerCacheResource = managedInformerEventSource.get(resourceId);
if (informerCacheResource.isEmpty()) {
log.debug("No cached value present for resource: {}", newResource);
return;
}
// if this is not true that means the cache was already updated
if (informerCacheResource.get().getMetadata().getResourceVersion()
.equals(previousResourceVersion)) {
log.debug("Putting resource to temporal cache with id: {}", resourceId);
cache.put(resourceId, newResource);
} else {
// if something is in cache it's surely obsolete now
cache.remove(resourceId);
}
} finally {
lock.unlock();
public synchronized void putUpdatedResource(T newResource, String previousResourceVersion) {
var resourceId = ResourceID.fromResource(newResource);
var informerCacheResource = managedInformerEventSource.get(resourceId);
if (informerCacheResource.isEmpty()) {
log.debug("No cached value present for resource: {}", newResource);
return;
}
// if this is not true that means the cache was already updated
if (informerCacheResource.get().getMetadata().getResourceVersion()
.equals(previousResourceVersion)) {
log.debug("Putting resource to temporal cache with id: {}", resourceId);
cache.put(resourceId, newResource);
} else {
// if something is in cache it's surely obsolete now
cache.remove(resourceId);
}
}

public Optional<T> getResourceFromCache(ResourceID resourceID) {
try {
lock.lock();
return Optional.ofNullable(cache.get(resourceID));
} finally {
lock.unlock();
}
public synchronized Optional<T> getResourceFromCache(ResourceID resourceID) {
return Optional.ofNullable(cache.get(resourceID));
}
}