14
14
import org .slf4j .LoggerFactory ;
15
15
16
16
import io .fabric8 .kubernetes .client .CustomResource ;
17
+ import io .javaoperatorsdk .operator .Metrics ;
17
18
import io .javaoperatorsdk .operator .api .RetryInfo ;
18
19
import io .javaoperatorsdk .operator .api .config .ConfigurationService ;
19
20
import io .javaoperatorsdk .operator .api .config .ExecutorServiceManager ;
35
36
public class DefaultEventHandler <R extends CustomResource <?, ?>> implements EventHandler {
36
37
37
38
private static final Logger log = LoggerFactory .getLogger (DefaultEventHandler .class );
38
- private static EventMonitor monitor = new EventMonitor () {
39
- @ Override
40
- public void processedEvent (String uid , Event event ) {}
41
39
42
- @ Override
43
- public void failedEvent (String uid , Event event ) {}
44
- };
40
+ @ Deprecated
41
+ private static EventMonitor monitor = EventMonitor .NOOP ;
45
42
46
43
private final EventBuffer eventBuffer ;
47
44
private final Set <String > underProcessing = new HashSet <>();
@@ -51,22 +48,24 @@ public void failedEvent(String uid, Event event) {}
51
48
private final ExecutorService executor ;
52
49
private final String controllerName ;
53
50
private final ReentrantLock lock = new ReentrantLock ();
51
+ private final EventMonitor eventMonitor ;
54
52
private DefaultEventSourceManager <R > eventSourceManager ;
55
53
56
54
public DefaultEventHandler (ConfiguredController <R > controller ) {
57
55
this (ExecutorServiceManager .instance ().executorService (),
58
56
controller .getConfiguration ().getName (),
59
57
new EventDispatcher <>(controller ),
60
- GenericRetry .fromConfiguration (controller .getConfiguration ().getRetryConfiguration ()));
58
+ GenericRetry .fromConfiguration (controller .getConfiguration ().getRetryConfiguration ()),
59
+ controller .getConfiguration ().getConfigurationService ().getMetrics ().getEventMonitor ());
61
60
}
62
61
63
62
DefaultEventHandler (EventDispatcher <R > eventDispatcher , String relatedControllerName ,
64
63
Retry retry ) {
65
- this (null , relatedControllerName , eventDispatcher , retry );
64
+ this (null , relatedControllerName , eventDispatcher , retry , null );
66
65
}
67
66
68
67
private DefaultEventHandler (ExecutorService executor , String relatedControllerName ,
69
- EventDispatcher <R > eventDispatcher , Retry retry ) {
68
+ EventDispatcher <R > eventDispatcher , Retry retry , EventMonitor monitor ) {
70
69
this .executor =
71
70
executor == null
72
71
? new ScheduledThreadPoolExecutor (
@@ -76,29 +75,54 @@ private DefaultEventHandler(ExecutorService executor, String relatedControllerNa
76
75
this .eventDispatcher = eventDispatcher ;
77
76
this .retry = retry ;
78
77
eventBuffer = new EventBuffer ();
78
+ this .eventMonitor = monitor != null ? monitor : EventMonitor .NOOP ;
79
79
}
80
80
81
81
public void setEventSourceManager (DefaultEventSourceManager <R > eventSourceManager ) {
82
82
this .eventSourceManager = eventSourceManager ;
83
83
}
84
84
85
+ /**
86
+ * @deprecated the EventMonitor to be used should now be retrieved from
87
+ * {@link Metrics#getEventMonitor()}
88
+ * @param monitor
89
+ */
90
+ @ Deprecated
85
91
public static void setEventMonitor (EventMonitor monitor ) {
86
92
DefaultEventHandler .monitor = monitor ;
87
93
}
88
94
95
+ /*
96
+ * TODO: promote this interface to top-level, probably create a `monitoring` package?
97
+ */
89
98
public interface EventMonitor {
99
+ EventMonitor NOOP = new EventMonitor () {
100
+ @ Override
101
+ public void processedEvent (String uid , Event event ) {}
102
+
103
+ @ Override
104
+ public void failedEvent (String uid , Event event ) {}
105
+ };
106
+
90
107
void processedEvent (String uid , Event event );
91
108
92
109
void failedEvent (String uid , Event event );
93
110
}
94
111
112
+ private EventMonitor monitor () {
113
+ // todo: remove us of static monitor, only here for backwards compatibility
114
+ return DefaultEventHandler .monitor != EventMonitor .NOOP ? DefaultEventHandler .monitor
115
+ : eventMonitor ;
116
+ }
117
+
95
118
@ Override
96
119
public void handleEvent (Event event ) {
97
120
try {
98
121
lock .lock ();
99
122
log .debug ("Received event: {}" , event );
100
123
101
124
final Predicate <CustomResource > selector = event .getCustomResourcesSelector ();
125
+ final var monitor = monitor ();
102
126
for (String uid : eventSourceManager .getLatestResourceUids (selector )) {
103
127
eventBuffer .addEvent (uid , event );
104
128
monitor .processedEvent (uid , event );
@@ -151,6 +175,7 @@ void eventProcessingFinished(
151
175
152
176
if (retry != null && postExecutionControl .exceptionDuringExecution ()) {
153
177
handleRetryOnException (executionScope );
178
+ final var monitor = monitor ();
154
179
executionScope .getEvents ()
155
180
.forEach (e -> monitor .failedEvent (executionScope .getCustomResourceUid (), e ));
156
181
return ;
0 commit comments