Skip to content

Commit 25e97d8

Browse files
csvirimetacosm
authored andcommitted
Add InformerEventSource
- this creates basically an adaptor between informers and event sources - add integration test serving as as a sample
1 parent a9ddde0 commit 25e97d8

File tree

9 files changed

+348
-1
lines changed

9 files changed

+348
-1
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEvent.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public String toString() {
5050
+ " }";
5151
}
5252

53-
private static class UIDMatchingPredicate implements Predicate<CustomResource> {
53+
public static class UIDMatchingPredicate implements Predicate<CustomResource> {
5454
private final String uid;
5555

5656
public UIDMatchingPredicate(String uid) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package io.javaoperatorsdk.operator.processing.event.internal;
2+
3+
import java.util.Optional;
4+
import java.util.function.Predicate;
5+
6+
import io.fabric8.kubernetes.client.CustomResource;
7+
import io.javaoperatorsdk.operator.processing.event.DefaultEvent;
8+
import io.javaoperatorsdk.operator.processing.event.EventSource;
9+
10+
public class InformerEvent<T> extends DefaultEvent {
11+
12+
private Action action;
13+
private T resource;
14+
private T oldResource;
15+
16+
public InformerEvent(String relatedCustomResourceUid, EventSource eventSource, Action action,
17+
T resource,
18+
T oldResource) {
19+
this(new UIDMatchingPredicate(relatedCustomResourceUid), eventSource, action, resource,
20+
oldResource);
21+
22+
}
23+
24+
public InformerEvent(Predicate<CustomResource> customResourcesSelector, EventSource eventSource,
25+
Action action,
26+
T resource, T oldResource) {
27+
super(customResourcesSelector, eventSource);
28+
this.action = action;
29+
this.resource = resource;
30+
this.oldResource = oldResource;
31+
}
32+
33+
public T getResource() {
34+
return resource;
35+
}
36+
37+
public Optional<T> getOldResource() {
38+
return Optional.ofNullable(oldResource);
39+
}
40+
41+
public Action getAction() {
42+
return action;
43+
}
44+
45+
public enum Action {
46+
ADD, UPDATE, DELETE
47+
}
48+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package io.javaoperatorsdk.operator.processing.event.internal;
2+
3+
import java.io.IOException;
4+
import java.util.Optional;
5+
6+
import io.fabric8.kubernetes.api.model.HasMetadata;
7+
import io.fabric8.kubernetes.client.KubernetesClient;
8+
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
9+
import io.fabric8.kubernetes.client.informers.SharedInformer;
10+
import io.fabric8.kubernetes.client.informers.cache.Store;
11+
import io.javaoperatorsdk.operator.processing.event.AbstractEventSource;
12+
13+
public class InformerEventSource<T extends HasMetadata> extends AbstractEventSource {
14+
15+
private final SharedInformer<T> sharedInformer;
16+
private final ResourceToRelatedCustomResourceUIDMapper<T> mapper;
17+
private final boolean skipUpdateEventPropagationIfNoChange;
18+
19+
public InformerEventSource(SharedInformer<T> sharedInformer,
20+
ResourceToRelatedCustomResourceUIDMapper<T> mapper) {
21+
this(sharedInformer, mapper, true);
22+
}
23+
24+
InformerEventSource(KubernetesClient client, Class<T> type,
25+
ResourceToRelatedCustomResourceUIDMapper<T> mapper) {
26+
this(client, type, mapper, false);
27+
}
28+
29+
InformerEventSource(KubernetesClient client, Class<T> type,
30+
ResourceToRelatedCustomResourceUIDMapper<T> mapper,
31+
boolean skipUpdateEventPropagationIfNoChange) {
32+
this(client.informers().sharedIndexInformerFor(type, 0), mapper,
33+
skipUpdateEventPropagationIfNoChange);
34+
}
35+
36+
public InformerEventSource(SharedInformer<T> sharedInformer,
37+
ResourceToRelatedCustomResourceUIDMapper<T> mapper,
38+
boolean skipUpdateEventPropagationIfNoChange) {
39+
this.sharedInformer = sharedInformer;
40+
this.mapper = mapper;
41+
this.skipUpdateEventPropagationIfNoChange = skipUpdateEventPropagationIfNoChange;
42+
43+
sharedInformer.addEventHandler(new ResourceEventHandler<T>() {
44+
@Override
45+
public void onAdd(T t) {
46+
propagateEvent(InformerEvent.Action.ADD, t, null);
47+
}
48+
49+
@Override
50+
public void onUpdate(T oldObject, T newObject) {
51+
if (InformerEventSource.this.skipUpdateEventPropagationIfNoChange &&
52+
oldObject.getMetadata().getResourceVersion()
53+
.equals(newObject.getMetadata().getResourceVersion())) {
54+
return;
55+
}
56+
propagateEvent(InformerEvent.Action.UPDATE, newObject, oldObject);
57+
}
58+
59+
@Override
60+
public void onDelete(T t, boolean b) {
61+
propagateEvent(InformerEvent.Action.DELETE, t, null);
62+
}
63+
});
64+
}
65+
66+
private void propagateEvent(InformerEvent.Action action, T object, T oldObject) {
67+
var uid = mapper.map(object);
68+
if (uid.isEmpty()) {
69+
return;
70+
}
71+
InformerEvent event = new InformerEvent(uid.get(), this, action, object, oldObject);
72+
this.eventHandler.handleEvent(event);
73+
}
74+
75+
@Override
76+
public void start() {
77+
sharedInformer.run();
78+
}
79+
80+
@Override
81+
public void close() throws IOException {
82+
sharedInformer.close();
83+
}
84+
85+
public Store<T> getStore() {
86+
return sharedInformer.getStore();
87+
}
88+
89+
public SharedInformer<T> getSharedInformer() {
90+
return sharedInformer;
91+
}
92+
93+
public interface ResourceToRelatedCustomResourceUIDMapper<T> {
94+
// in case cannot map to the related CR uid, skip the event processing
95+
Optional<String> map(T resource);
96+
}
97+
98+
}

operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/OperatorExtension.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,10 @@ public <T extends HasMetadata> T create(Class<T> type, T resource) {
131131
return kubernetesClient.resources(type).inNamespace(namespace).create(resource);
132132
}
133133

134+
public <T extends HasMetadata> T replace(Class<T> type, T resource) {
135+
return kubernetesClient.resources(type).inNamespace(namespace).replace(resource);
136+
}
137+
134138
@SuppressWarnings("unchecked")
135139
protected void before(ExtensionContext context) {
136140
namespace = context.getRequiredTestClass().getSimpleName();
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package io.javaoperatorsdk.operator;
2+
3+
import java.util.HashMap;
4+
import java.util.concurrent.TimeUnit;
5+
6+
import org.junit.jupiter.api.Test;
7+
import org.junit.jupiter.api.extension.RegisterExtension;
8+
9+
import io.fabric8.kubernetes.api.model.ConfigMap;
10+
import io.fabric8.kubernetes.api.model.ObjectMeta;
11+
import io.javaoperatorsdk.operator.config.runtime.DefaultConfigurationService;
12+
import io.javaoperatorsdk.operator.junit.OperatorExtension;
13+
import io.javaoperatorsdk.operator.sample.informereventsource.InformerEventSourceTestCustomResource;
14+
import io.javaoperatorsdk.operator.sample.informereventsource.InformerEventSourceTestCustomResourceController;
15+
16+
import static io.javaoperatorsdk.operator.sample.informereventsource.InformerEventSourceTestCustomResourceController.RELATED_RESOURCE_UID;
17+
import static io.javaoperatorsdk.operator.sample.informereventsource.InformerEventSourceTestCustomResourceController.TARGET_CONFIG_MAP_KEY;
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.awaitility.Awaitility.await;
20+
21+
public class InformerEventSourceIT {
22+
23+
public static final String RESOURCE_NAME = "informertestcr";
24+
public static final String INITIAL_STATUS_MESSAGE = "Initial Status";
25+
public static final String UPDATE_STATUS_MESSAGE = "Updated Status";
26+
27+
@RegisterExtension
28+
OperatorExtension operator =
29+
OperatorExtension.builder()
30+
.withConfigurationService(DefaultConfigurationService.instance())
31+
.withController(new InformerEventSourceTestCustomResourceController())
32+
.build();
33+
34+
@Test
35+
public void testUsingInformerToWatchChangesOfConfigMap() {
36+
var customResource = initialCustomResource();
37+
customResource = operator.create(InformerEventSourceTestCustomResource.class, customResource);
38+
ConfigMap configMap =
39+
operator.create(ConfigMap.class, relatedConfigMap(customResource.getMetadata().getUid()));
40+
waitForCRStatusValue(INITIAL_STATUS_MESSAGE);
41+
42+
configMap.getData().put(TARGET_CONFIG_MAP_KEY, UPDATE_STATUS_MESSAGE);
43+
operator.replace(ConfigMap.class, configMap);
44+
45+
waitForCRStatusValue(UPDATE_STATUS_MESSAGE);
46+
}
47+
48+
private ConfigMap relatedConfigMap(String relatedResourceAnnotation) {
49+
ConfigMap configMap = new ConfigMap();
50+
51+
ObjectMeta objectMeta = new ObjectMeta();
52+
objectMeta.setName(RESOURCE_NAME);
53+
objectMeta.setAnnotations(new HashMap<>());
54+
objectMeta.getAnnotations().put(RELATED_RESOURCE_UID, relatedResourceAnnotation);
55+
configMap.setMetadata(objectMeta);
56+
57+
configMap.setData(new HashMap<>());
58+
configMap.getData().put(TARGET_CONFIG_MAP_KEY, INITIAL_STATUS_MESSAGE);
59+
return configMap;
60+
}
61+
62+
private InformerEventSourceTestCustomResource initialCustomResource() {
63+
var customResource = new InformerEventSourceTestCustomResource();
64+
ObjectMeta objectMeta = new ObjectMeta();
65+
objectMeta.setName(RESOURCE_NAME);
66+
customResource.setMetadata(objectMeta);
67+
return customResource;
68+
}
69+
70+
private void waitForCRStatusValue(String value) {
71+
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
72+
var cr =
73+
operator.getNamedResource(InformerEventSourceTestCustomResource.class, RESOURCE_NAME);
74+
assertThat(cr.getStatus()).isNotNull();
75+
assertThat(cr.getStatus().getConfigMapValue()).isEqualTo(value);
76+
});
77+
}
78+
79+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package io.javaoperatorsdk.operator.sample.informereventsource;
2+
3+
import io.fabric8.kubernetes.api.model.Namespaced;
4+
import io.fabric8.kubernetes.client.CustomResource;
5+
import io.fabric8.kubernetes.model.annotation.Group;
6+
import io.fabric8.kubernetes.model.annotation.Kind;
7+
import io.fabric8.kubernetes.model.annotation.ShortNames;
8+
import io.fabric8.kubernetes.model.annotation.Version;
9+
10+
@Group("sample.javaoperatorsdk")
11+
@Version("v1")
12+
@Kind("Informereventsourcesample")
13+
@ShortNames("ies")
14+
public class InformerEventSourceTestCustomResource extends
15+
CustomResource<InformerEventSourceTestCustomResourceSpec, InformerEventSourceTestCustomResourceStatus>
16+
implements Namespaced {
17+
18+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package io.javaoperatorsdk.operator.sample.informereventsource;
2+
3+
import java.util.Optional;
4+
5+
import org.slf4j.Logger;
6+
import org.slf4j.LoggerFactory;
7+
8+
import io.fabric8.kubernetes.api.model.ConfigMap;
9+
import io.fabric8.kubernetes.client.KubernetesClient;
10+
import io.fabric8.kubernetes.client.informers.SharedInformer;
11+
import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
12+
import io.fabric8.kubernetes.client.informers.cache.Cache;
13+
import io.javaoperatorsdk.operator.api.Context;
14+
import io.javaoperatorsdk.operator.api.Controller;
15+
import io.javaoperatorsdk.operator.api.ResourceController;
16+
import io.javaoperatorsdk.operator.api.UpdateControl;
17+
import io.javaoperatorsdk.operator.junit.KubernetesClientAware;
18+
import io.javaoperatorsdk.operator.processing.event.EventSourceManager;
19+
import io.javaoperatorsdk.operator.processing.event.internal.InformerEventSource;
20+
21+
import static io.javaoperatorsdk.operator.api.Controller.NO_FINALIZER;
22+
23+
/**
24+
* Copies the config map value from spec into status. The main purpose is to test and demonstrate
25+
* sample usage of InformerEventSource
26+
*/
27+
@Controller(finalizerName = NO_FINALIZER)
28+
public class InformerEventSourceTestCustomResourceController implements
29+
ResourceController<InformerEventSourceTestCustomResource>, KubernetesClientAware {
30+
31+
private static final Logger LOGGER =
32+
LoggerFactory.getLogger(InformerEventSourceTestCustomResourceController.class);
33+
34+
public static final String RELATED_RESOURCE_UID = "relatedResourceUID";
35+
public static final String TARGET_CONFIG_MAP_KEY = "targetStatus";
36+
37+
private KubernetesClient kubernetesClient;
38+
private SharedInformer<ConfigMap> informer;
39+
40+
@Override
41+
public void init(EventSourceManager eventSourceManager) {
42+
SharedInformerFactory sharedInformerFactory = kubernetesClient.informers();
43+
informer = sharedInformerFactory.sharedIndexInformerFor(ConfigMap.class, 0);
44+
eventSourceManager.registerEventSource("configmap", new InformerEventSource<>(informer,
45+
resource -> {
46+
if (resource.getMetadata() == null || resource.getMetadata().getAnnotations() == null) {
47+
return Optional.empty();
48+
}
49+
return Optional
50+
.ofNullable(resource.getMetadata().getAnnotations().get(RELATED_RESOURCE_UID));
51+
}));
52+
}
53+
54+
@Override
55+
public UpdateControl<InformerEventSourceTestCustomResource> createOrUpdateResource(
56+
InformerEventSourceTestCustomResource resource,
57+
Context<InformerEventSourceTestCustomResource> context) {
58+
59+
// Reading the config map from the informer not from the API
60+
// name of the config map same as custom resource for sake of simplicity
61+
ConfigMap configMap =
62+
informer.getStore().getByKey(Cache.namespaceKeyFunc(resource.getMetadata().getNamespace(),
63+
resource.getMetadata().getName()));
64+
65+
String targetStatus = configMap.getData().get(TARGET_CONFIG_MAP_KEY);
66+
LOGGER.debug("Setting target status for CR: {}", targetStatus);
67+
resource.setStatus(new InformerEventSourceTestCustomResourceStatus());
68+
resource.getStatus().setConfigMapValue(targetStatus);
69+
return UpdateControl.updateStatusSubResource(resource);
70+
}
71+
72+
@Override
73+
public KubernetesClient getKubernetesClient() {
74+
return kubernetesClient;
75+
}
76+
77+
@Override
78+
public void setKubernetesClient(KubernetesClient kubernetesClient) {
79+
this.kubernetesClient = kubernetesClient;
80+
}
81+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
package io.javaoperatorsdk.operator.sample.informereventsource;
2+
3+
public class InformerEventSourceTestCustomResourceSpec {
4+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package io.javaoperatorsdk.operator.sample.informereventsource;
2+
3+
public class InformerEventSourceTestCustomResourceStatus {
4+
5+
private String configMapValue;
6+
7+
public String getConfigMapValue() {
8+
return configMapValue;
9+
}
10+
11+
public InformerEventSourceTestCustomResourceStatus setConfigMapValue(String configMapValue) {
12+
this.configMapValue = configMapValue;
13+
return this;
14+
}
15+
}

0 commit comments

Comments
 (0)