Skip to content

fix: changing the metrics to reconciliation, removing events from pro… #621

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.Map;

import io.javaoperatorsdk.operator.api.monitoring.Metrics;
import io.javaoperatorsdk.operator.processing.event.CustomResourceID;
import io.javaoperatorsdk.operator.processing.event.Event;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
Expand Down Expand Up @@ -44,42 +45,45 @@ public <T> T timeControllerExecution(ControllerExecution<T> execution) {
}
}

public void incrementControllerRetriesNumber() {
registry
.counter(
PREFIX + "retry.on.exception", "retry", "retryCounter", "type",
"retryException")
.increment();

}

public void processingEvent(Event event) {
public void receivedEvent(Event event) {
incrementCounter(event, "events.received");
}

public void processedEvent(Event event) {
incrementCounter(event, "events.processed");
public void reconcileCustomResource(CustomResourceID customResourceID) {
incrementCounter(customResourceID, "reconciliation.times");
}

public void failedEvent(Event event, RuntimeException exception) {
public void failedReconciliation(CustomResourceID customResourceID, RuntimeException exception) {
var cause = exception.getCause();
if (cause == null) {
cause = exception;
} else if (cause instanceof RuntimeException) {
cause = cause.getCause() != null ? cause.getCause() : cause;
}
incrementCounter(event, "events.failed", "exception", cause.getClass().getSimpleName());
incrementCounter(customResourceID, "reconciliation.failed", "exception",
cause.getClass().getSimpleName());
}

public <T extends Map<?, ?>> T monitorSizeOf(T map, String name) {
return registry.gaugeMapSize(PREFIX + name + ".size", Collections.emptyList(), map);
}

private void incrementCounter(CustomResourceID id, String counterName, String... additionalTags) {
var tags = List.of("namespace", id.getNamespace().orElse(""),
"scope", id.getNamespace().isPresent() ? "namespace" : "cluster",
"type", "reconciliation");
if (additionalTags != null && additionalTags.length > 0) {
tags = new LinkedList<>(tags);
tags.addAll(List.of(additionalTags));
}
registry.counter(PREFIX + counterName, tags.toArray(new String[0])).increment();
}

private void incrementCounter(Event event, String counterName, String... additionalTags) {
final var id = event.getRelatedCustomResourceID();
var tags = List.of("namespace", id.getNamespace().orElse(""),
"scope", id.getNamespace().isPresent() ? "namespace" : "cluster",
"type", event.getType().name());
"type", event.getClass().getSimpleName());
if (additionalTags != null && additionalTags.length > 0) {
tags = new LinkedList<>(tags);
tags.addAll(List.of(additionalTags));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.javaoperatorsdk.operator.api;

import io.fabric8.kubernetes.client.CustomResource;
import io.javaoperatorsdk.operator.processing.event.EventSourceManager;

public interface EventSourceInitializer<T extends CustomResource<?, ?>> {

/**
* In this typically you might want to register event sources. But can access
* CustomResourceEventSource, what might be handy for some edge cases.
*
* @param eventSourceManager the {@link EventSourceManager} where event sources can be registered.
*/
void prepareEventSources(EventSourceManager<T> eventSourceManager);

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.javaoperatorsdk.operator.api;

import io.fabric8.kubernetes.client.CustomResource;
import io.javaoperatorsdk.operator.processing.event.EventSourceManager;

public interface ResourceController<R extends CustomResource> {

Expand Down Expand Up @@ -49,11 +48,4 @@ default DeleteControl deleteResource(R resource, Context<R> context) {
*/
UpdateControl<R> createOrUpdateResource(R resource, Context<R> context);

/**
* In init typically you might want to register event sources.
*
* @param eventSourceManager the {@link EventSourceManager} which handles this controller and with
* which event sources can be registered
*/
default void init(EventSourceManager eventSourceManager) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@

import java.util.Map;

import io.javaoperatorsdk.operator.processing.event.CustomResourceID;
import io.javaoperatorsdk.operator.processing.event.Event;

public interface Metrics {
Metrics NOOP = new Metrics() {};

default void processingEvent(Event event) {}
default void receivedEvent(Event event) {}

default void processedEvent(Event event) {}
default void reconcileCustomResource(CustomResourceID customResourceID) {}

default void failedEvent(Event event, RuntimeException exception) {}
default void failedReconciliation(CustomResourceID customResourceID,
RuntimeException exception) {}


interface ControllerExecution<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,18 @@
import io.javaoperatorsdk.operator.CustomResourceUtils;
import io.javaoperatorsdk.operator.MissingCRDException;
import io.javaoperatorsdk.operator.OperatorException;
import io.javaoperatorsdk.operator.api.Context;
import io.javaoperatorsdk.operator.api.DeleteControl;
import io.javaoperatorsdk.operator.api.ResourceController;
import io.javaoperatorsdk.operator.api.UpdateControl;
import io.javaoperatorsdk.operator.api.*;
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.monitoring.Metrics.ControllerExecution;
import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager;
import io.javaoperatorsdk.operator.processing.event.EventSourceManager;

public class ConfiguredController<R extends CustomResource<?, ?>> implements ResourceController<R>,
Closeable {
Closeable, EventSourceInitializer {
private final ResourceController<R> controller;
private final ControllerConfiguration<R> configuration;
private final KubernetesClient kubernetesClient;
private EventSourceManager eventSourceManager;
private DefaultEventSourceManager eventSourceManager;

public ConfiguredController(ResourceController<R> controller,
ControllerConfiguration<R> configuration,
Expand Down Expand Up @@ -97,7 +94,7 @@ public UpdateControl<R> execute() {
}

@Override
public void init(EventSourceManager eventSourceManager) {
public void prepareEventSources(EventSourceManager eventSourceManager) {
throw new UnsupportedOperationException("This method should never be called directly");
}

Expand Down Expand Up @@ -169,7 +166,9 @@ public void start() throws OperatorException {

try {
eventSourceManager = new DefaultEventSourceManager<>(this);
controller.init(eventSourceManager);
if (controller instanceof EventSourceInitializer) {
((EventSourceInitializer) controller).prepareEventSources(eventSourceManager);
}
} catch (MissingCRDException e) {
throwMissingCRDException(crdName, specVersion, controllerName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager;
import io.javaoperatorsdk.operator.processing.event.Event;
import io.javaoperatorsdk.operator.processing.event.EventHandler;
import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEvent;
import io.javaoperatorsdk.operator.processing.event.internal.ResourceAction;
import io.javaoperatorsdk.operator.processing.retry.GenericRetry;
import io.javaoperatorsdk.operator.processing.retry.Retry;
import io.javaoperatorsdk.operator.processing.retry.RetryExecution;
Expand Down Expand Up @@ -98,23 +100,21 @@ public void handleEvent(Event event) {
return;
}
final var resourceID = event.getRelatedCustomResourceID();
metrics.processingEvent(event);
metrics.receivedEvent(event);

handleEventMarking(event);
if (!eventMarker.deleteEventPresent(resourceID)) {
submitReconciliationExecution(event);
submitReconciliationExecution(resourceID);
} else {
cleanupForDeletedEvent(event);
cleanupForDeletedEvent(resourceID);
}

metrics.processedEvent(event);
} finally {
lock.unlock();
}
}

private void submitReconciliationExecution(Event event) {
final var customResourceUid = event.getRelatedCustomResourceID();
private void submitReconciliationExecution(CustomResourceID customResourceUid) {
boolean controllerUnderExecution = isControllerUnderExecution(customResourceUid);
Optional<R> latestCustomResource =
resourceCache.getCustomResource(customResourceUid);
Expand All @@ -125,8 +125,9 @@ private void submitReconciliationExecution(Event event) {
ExecutionScope<R> executionScope =
new ExecutionScope<>(
latestCustomResource.get(),
retryInfo(customResourceUid), event);
retryInfo(customResourceUid));
eventMarker.unMarkEventReceived(customResourceUid);
metrics.reconcileCustomResource(customResourceUid);
log.debug("Executing events for custom resource. Scope: {}", executionScope);
executor.execute(new ControllerExecution(executionScope));
} else {
Expand All @@ -144,7 +145,8 @@ private void submitReconciliationExecution(Event event) {
}

private void handleEventMarking(Event event) {
if (event.isDeleteEvent()) {
if (event instanceof CustomResourceEvent &&
((CustomResourceEvent) event).getAction() == ResourceAction.DELETED) {
eventMarker.markDeleteEventReceived(event);
} else if (!eventMarker.deleteEventPresent(event.getRelatedCustomResourceID())) {
eventMarker.markEventReceived(event);
Expand Down Expand Up @@ -180,11 +182,11 @@ void eventProcessingFinished(
}
cleanupOnSuccessfulExecution(executionScope);
if (eventMarker.deleteEventPresent(customResourceID)) {
cleanupForDeletedEvent(executionScope.getTriggeringEvent());
cleanupForDeletedEvent(executionScope.getCustomResourceID());
} else {
if (eventMarker.eventPresent(customResourceID)) {
if (isCacheReadyForInstantReconciliation(executionScope, postExecutionControl)) {
submitReconciliationExecution(executionScope.getTriggeringEvent());
submitReconciliationExecution(customResourceID);
} else {
postponeReconciliationAndHandleCacheSyncEvent(customResourceID);
}
Expand Down Expand Up @@ -249,7 +251,7 @@ private void handleRetryOnException(ExecutionScope<R> executionScope,
if (eventPresent) {
log.debug("New events exists for for resource id: {}",
customResourceID);
submitReconciliationExecution(executionScope.getTriggeringEvent());
submitReconciliationExecution(customResourceID);
return;
}
Optional<Long> nextDelay = execution.nextDelay();
Expand All @@ -260,7 +262,7 @@ private void handleRetryOnException(ExecutionScope<R> executionScope,
"Scheduling timer event for retry with delay:{} for resource: {}",
delay,
customResourceID);
metrics.failedEvent(executionScope.getTriggeringEvent(), exception);
metrics.failedReconciliation(customResourceID, exception);
eventSourceManager
.getRetryAndRescheduleTimerEventSource()
.scheduleOnce(executionScope.getCustomResource(), delay);
Expand Down Expand Up @@ -289,8 +291,7 @@ private RetryExecution getOrInitRetryExecution(ExecutionScope<R> executionScope)
return retryExecution;
}

private void cleanupForDeletedEvent(Event event) {
final var customResourceUid = event.getRelatedCustomResourceID();
private void cleanupForDeletedEvent(CustomResourceID customResourceUid) {
eventSourceManager.cleanupForCustomResource(customResourceUid);
eventMarker.cleanup(customResourceUid);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,16 @@
import io.fabric8.kubernetes.client.CustomResource;
import io.javaoperatorsdk.operator.api.RetryInfo;
import io.javaoperatorsdk.operator.processing.event.CustomResourceID;
import io.javaoperatorsdk.operator.processing.event.Event;

public class ExecutionScope<R extends CustomResource<?, ?>> {

// the latest custom resource from cache
private final R customResource;
private final RetryInfo retryInfo;
private final Event triggeringEvent;

ExecutionScope(R customResource, RetryInfo retryInfo, Event triggeringEvent) {
public ExecutionScope(R customResource, RetryInfo retryInfo) {
this.customResource = customResource;
this.retryInfo = retryInfo;
this.triggeringEvent = triggeringEvent;
}

public R getCustomResource() {
Expand All @@ -26,10 +23,6 @@ public CustomResourceID getCustomResourceID() {
return CustomResourceID.fromResource(customResource);
}

public Event getTriggeringEvent() {
return triggeringEvent;
}

@Override
public String toString() {
return "ExecutionScope{"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,20 @@
public class DefaultEvent implements Event {

private final CustomResourceID relatedCustomResource;
private final Type type;

public DefaultEvent(CustomResourceID targetCustomResource,
Type type) {
public DefaultEvent(CustomResourceID targetCustomResource) {
this.relatedCustomResource = targetCustomResource;
this.type = type;
}

@Override
public CustomResourceID getRelatedCustomResourceID() {
return relatedCustomResource;
}

@Override
public Type getType() {
return type;
}

@Override
public String toString() {
return "DefaultEvent{" +
"relatedCustomResource=" + relatedCustomResource +
", type=" + type +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.javaoperatorsdk.operator.processing.event;

import java.io.Closeable;
import java.util.*;
import java.util.concurrent.locks.ReentrantLock;

Expand All @@ -15,7 +16,8 @@
import io.javaoperatorsdk.operator.processing.event.internal.TimerEventSource;

public class DefaultEventSourceManager<R extends CustomResource<?, ?>>
implements EventSourceManager<R> {
implements EventSourceManager<R>, Closeable {


private static final Logger log = LoggerFactory.getLogger(DefaultEventSourceManager.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,4 @@ public interface Event {

CustomResourceID getRelatedCustomResourceID();

Type getType();

default boolean isDeleteEvent() {
return Type.DELETED == getType();
}

enum Type {
ADDED, UPDATED, DELETED, OTHER, UNKNOWN
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
package io.javaoperatorsdk.operator.processing.event;

import java.io.Closeable;
import java.io.IOException;
import java.util.Set;

import io.fabric8.kubernetes.client.CustomResource;
import io.javaoperatorsdk.operator.OperatorException;
import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEventSource;

public interface EventSourceManager<T extends CustomResource<?, ?>> extends Closeable {
public interface EventSourceManager<T extends CustomResource<?, ?>> {

/**
* Add the {@link EventSource} identified by the given <code>name</code> to the event manager.
Expand All @@ -25,6 +23,4 @@ void registerEventSource(EventSource eventSource)

CustomResourceEventSource<T> getCustomResourceEventSource();

@Override
default void close() throws IOException {}
}
Loading