Skip to content

Commit dbd51df

Browse files
authored
Add predicates to allow filtering watcher streams (#911)
* Add predicates to filter reconciler runs Signed-off-by: clux <[email protected]> * rewrite as a WatchStreamExt helper PoC on node_reflector to only show changed labels Signed-off-by: clux <[email protected]> * notes and build fix Signed-off-by: clux <[email protected]> * tests Signed-off-by: clux <[email protected]> * only cache hashes Signed-off-by: clux <[email protected]> * minor tweak Signed-off-by: clux <[email protected]> * clippy Signed-off-by: clux <[email protected]> * Fixup and wrap in unstable feature Signed-off-by: clux <[email protected]> * fmt Signed-off-by: clux <[email protected]> * fix docs Signed-off-by: clux <[email protected]> * fix example Signed-off-by: clux <[email protected]> * make predicate fns Send Signed-off-by: clux <[email protected]> * change pred input from impl Fn to generic over F Signed-off-by: clux <[email protected]> * simplify; less boxing, less complexity Signed-off-by: clux <[email protected]> * compile tests for watchstreamext + github having a rough one Signed-off-by: clux <[email protected]> --------- Signed-off-by: clux <[email protected]> Signed-off-by: Eirik A <[email protected]>
1 parent 4f633ee commit dbd51df

File tree

9 files changed

+257
-11
lines changed

9 files changed

+257
-11
lines changed

examples/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ default = ["openssl-tls", "kubederive", "ws", "latest", "runtime"]
1818
kubederive = ["kube/derive"]
1919
openssl-tls = ["kube/client", "kube/openssl-tls"]
2020
rustls-tls = ["kube/client", "kube/rustls-tls"]
21-
runtime = ["kube/runtime"]
21+
runtime = ["kube/runtime", "kube/unstable-runtime"]
2222
ws = ["kube/ws"]
2323
latest = ["k8s-openapi/v1_26"]
2424

examples/node_reflector.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
use futures::{StreamExt, TryStreamExt};
1+
use futures::{pin_mut, TryStreamExt};
22
use k8s_openapi::api::core::v1::Node;
33
use kube::{
44
api::{Api, ResourceExt},
5-
runtime::{reflector, watcher, WatchStreamExt},
5+
runtime::{predicates, reflector, watcher, WatchStreamExt},
66
Client,
77
};
88
use tracing::*;
@@ -18,7 +18,9 @@ async fn main() -> anyhow::Result<()> {
1818
.timeout(10); // short watch timeout in this example
1919

2020
let (reader, writer) = reflector::store();
21-
let rf = reflector(writer, watcher(nodes, wc));
21+
let rf = reflector(writer, watcher(nodes, wc))
22+
.applied_objects()
23+
.predicate_filter(predicates::labels); // NB: requires an unstable feature
2224

2325
// Periodically read our state in the background
2426
tokio::spawn(async move {
@@ -29,10 +31,10 @@ async fn main() -> anyhow::Result<()> {
2931
}
3032
});
3133

32-
// Drain and log applied events from the reflector
33-
let mut rfa = rf.applied_objects().boxed();
34-
while let Some(event) = rfa.try_next().await? {
35-
info!("saw {}", event.name_any());
34+
// Log applied events with changes from the reflector
35+
pin_mut!(rf);
36+
while let Some(node) = rf.try_next().await? {
37+
info!("saw node {} with hitherto unseen labels", node.name_any());
3638
}
3739

3840
Ok(())

justfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ fmt:
1515
rustfmt +nightly --edition 2021 $(find . -type f -iname *.rs)
1616

1717
doc:
18-
RUSTDOCFLAGS="--cfg docsrs" cargo +nightly doc --lib --workspace --features=derive,ws,oauth,jsonpatch,client,derive,runtime,admission,k8s-openapi/v1_26 --open
18+
RUSTDOCFLAGS="--cfg docsrs" cargo +nightly doc --lib --workspace --features=derive,ws,oauth,jsonpatch,client,derive,runtime,admission,k8s-openapi/v1_26,unstable-runtime --open
1919

2020
deny:
2121
# might require rm Cargo.lock first to match CI

kube-runtime/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,9 @@ rust-version = "1.63.0"
1515
edition = "2021"
1616

1717
[features]
18-
unstable-runtime = ["unstable-runtime-subscribe"]
18+
unstable-runtime = ["unstable-runtime-subscribe", "unstable-runtime-predicates"]
1919
unstable-runtime-subscribe = []
20+
unstable-runtime-predicates = []
2021

2122
[package.metadata.docs.rs]
2223
features = ["k8s-openapi/v1_26", "unstable-runtime"]

kube-runtime/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,3 +35,6 @@ pub use reflector::reflector;
3535
pub use scheduler::scheduler;
3636
pub use utils::WatchStreamExt;
3737
pub use watcher::{metadata_watcher, watcher};
38+
39+
#[cfg(feature = "unstable-runtime-predicates")] pub use utils::predicates;
40+
pub use wait::conditions;

kube-runtime/src/utils/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,15 @@
22
33
mod backoff_reset_timer;
44
mod event_flatten;
5+
#[cfg(feature = "unstable-runtime-predicates")] mod predicate;
56
mod stream_backoff;
67
#[cfg(feature = "unstable-runtime-subscribe")] pub mod stream_subscribe;
78
mod watch_ext;
89

910
pub use backoff_reset_timer::ResetTimerBackoff;
1011
pub use event_flatten::EventFlatten;
12+
#[cfg(feature = "unstable-runtime-predicates")]
13+
pub use predicate::{predicates, PredicateFilter};
1114
pub use stream_backoff::StreamBackoff;
1215
#[cfg(feature = "unstable-runtime-subscribe")]
1316
pub use stream_subscribe::StreamSubscribe;

kube-runtime/src/utils/predicate.rs

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
use crate::{reflector::ObjectRef, watcher::Error};
2+
use core::{
3+
pin::Pin,
4+
task::{Context, Poll},
5+
};
6+
use futures::{ready, Stream};
7+
use kube_client::Resource;
8+
use pin_project::pin_project;
9+
use std::{collections::HashMap, hash::Hash};
10+
11+
#[allow(clippy::pedantic)]
12+
#[pin_project]
13+
/// Stream returned by the [`predicate_filter`](super::WatchStreamExt::predicate_filter) method.
14+
#[must_use = "streams do nothing unless polled"]
15+
pub struct PredicateFilter<St, K: Resource, Func> {
16+
#[pin]
17+
stream: St,
18+
predicate: Func,
19+
cache: HashMap<ObjectRef<K>, u64>,
20+
}
21+
impl<St, K, F> PredicateFilter<St, K, F>
22+
where
23+
St: Stream<Item = Result<K, Error>>,
24+
K: Resource,
25+
F: Fn(&K) -> Option<u64> + 'static,
26+
{
27+
pub(super) fn new(stream: St, predicate: F) -> Self {
28+
Self {
29+
stream,
30+
predicate,
31+
cache: HashMap::new(),
32+
}
33+
}
34+
}
35+
impl<St, K, F> Stream for PredicateFilter<St, K, F>
36+
where
37+
St: Stream<Item = Result<K, Error>>,
38+
K: Resource,
39+
K::DynamicType: Default + Eq + Hash,
40+
F: Fn(&K) -> Option<u64> + 'static,
41+
{
42+
type Item = Result<K, Error>;
43+
44+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
45+
let mut me = self.project();
46+
Poll::Ready(loop {
47+
break match ready!(me.stream.as_mut().poll_next(cx)) {
48+
Some(Ok(obj)) => {
49+
if let Some(val) = (me.predicate)(&obj) {
50+
let key = ObjectRef::from_obj(&obj);
51+
let changed = if let Some(old) = me.cache.get(&key) {
52+
*old != val
53+
} else {
54+
true
55+
};
56+
if let Some(old) = me.cache.get_mut(&key) {
57+
*old = val;
58+
} else {
59+
me.cache.insert(key, val);
60+
}
61+
if changed {
62+
Some(Ok(obj))
63+
} else {
64+
continue;
65+
}
66+
} else {
67+
// if we can't evaluate predicate, always emit K
68+
Some(Ok(obj))
69+
}
70+
}
71+
Some(Err(err)) => Some(Err(err)),
72+
None => return Poll::Ready(None),
73+
};
74+
})
75+
}
76+
}
77+
78+
pub mod predicates {
79+
use kube_client::{Resource, ResourceExt};
80+
use std::{
81+
collections::hash_map::DefaultHasher,
82+
hash::{Hash, Hasher},
83+
};
84+
85+
// See: https://github.com/kubernetes-sigs/controller-runtime/blob/v0.12.0/pkg/predicate/predicate.go
86+
87+
fn hash<T: Hash>(t: &T) -> u64 {
88+
let mut hasher = DefaultHasher::new();
89+
t.hash(&mut hasher);
90+
hasher.finish()
91+
}
92+
93+
/// Hash the generation of a Resource K
94+
pub fn generation<K: Resource>(obj: &K) -> Option<u64> {
95+
obj.meta().generation.map(|g| hash(&g))
96+
}
97+
98+
/// Hash the labels of a Resource K
99+
pub fn labels<K: Resource>(obj: &K) -> Option<u64> {
100+
Some(hash(obj.labels()))
101+
}
102+
103+
/// Hash the annotations of a Resource K
104+
pub fn annotations<K: Resource>(obj: &K) -> Option<u64> {
105+
Some(hash(obj.annotations()))
106+
}
107+
}
108+
109+
#[cfg(test)]
110+
pub(crate) mod tests {
111+
use std::task::Poll;
112+
113+
use super::{predicates, Error, PredicateFilter};
114+
use futures::{pin_mut, poll, stream, FutureExt, StreamExt};
115+
use kube_client::Resource;
116+
use serde_json::json;
117+
118+
#[tokio::test]
119+
async fn predicate_filtering_hides_equal_predicate_values() {
120+
use k8s_openapi::api::core::v1::Pod;
121+
let mkobj = |gen: i32| {
122+
let p: Pod = serde_json::from_value(json!({
123+
"apiVersion": "v1",
124+
"kind": "Pod",
125+
"metadata": {
126+
"name": "blog",
127+
"generation": Some(gen),
128+
},
129+
"spec": {
130+
"containers": [{
131+
"name": "blog",
132+
"image": "clux/blog:0.1.0"
133+
}],
134+
}
135+
}))
136+
.unwrap();
137+
p
138+
};
139+
let data = stream::iter([
140+
Ok(mkobj(1)),
141+
Err(Error::TooManyObjects),
142+
Ok(mkobj(1)),
143+
Ok(mkobj(2)),
144+
]);
145+
let rx = PredicateFilter::new(data, predicates::generation);
146+
pin_mut!(rx);
147+
148+
// mkobj(1) passed through
149+
let first = rx.next().now_or_never().unwrap().unwrap().unwrap();
150+
assert_eq!(first.meta().generation, Some(1));
151+
152+
// Error passed through
153+
assert!(matches!(
154+
poll!(rx.next()),
155+
Poll::Ready(Some(Err(Error::TooManyObjects)))
156+
));
157+
// (no repeat mkobj(1) - same generation)
158+
// mkobj(2) next
159+
let second = rx.next().now_or_never().unwrap().unwrap().unwrap();
160+
assert_eq!(second.meta().generation, Some(2));
161+
assert!(matches!(poll!(rx.next()), Poll::Ready(None)));
162+
}
163+
}

kube-runtime/src/utils/watch_ext.rs

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
1+
#[cfg(feature = "unstable-runtime-predicates")]
2+
use crate::utils::predicate::PredicateFilter;
13
#[cfg(feature = "unstable-runtime-subscribe")]
24
use crate::utils::stream_subscribe::StreamSubscribe;
35
use crate::{
46
utils::{event_flatten::EventFlatten, stream_backoff::StreamBackoff},
57
watcher,
68
};
9+
#[cfg(feature = "unstable-runtime-predicates")] use kube_client::Resource;
10+
711
use backoff::backoff::Backoff;
812
use futures::{Stream, TryStream};
913

@@ -38,6 +42,43 @@ pub trait WatchStreamExt: Stream {
3842
EventFlatten::new(self, true)
3943
}
4044

45+
46+
/// Filter out a flattened stream on [`predicates`](crate::predicates).
47+
///
48+
/// This will filter out repeat calls where the predicate returns the same result.
49+
/// Common use case for this is to avoid repeat events for status updates
50+
/// by filtering on []`predicates::generation`].
51+
///
52+
/// ## Usage
53+
/// ```no_run
54+
/// # use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
55+
/// use kube::{Api, Client, ResourceExt};
56+
/// use kube_runtime::{watcher, WatchStreamExt, predicates};
57+
/// use k8s_openapi::api::core::v1::Pod;
58+
/// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
59+
/// # let client: kube::Client = todo!();
60+
/// let pods: Api<Pod> = Api::default_namespaced(client);
61+
/// let changed_pods = watcher(pods, watcher::Config::default())
62+
/// .applied_objects()
63+
/// .predicate_filter(predicates::generation);
64+
/// pin_mut!(changed_pods);
65+
///
66+
/// while let Some(pod) = changed_pods.try_next().await? {
67+
/// println!("saw Pod '{} with hitherto unseen generation", pod.name_any());
68+
/// }
69+
/// # Ok(())
70+
/// # }
71+
/// ```
72+
#[cfg(feature = "unstable-runtime-predicates")]
73+
fn predicate_filter<K, F>(self, predicate: F) -> PredicateFilter<Self, K, F>
74+
where
75+
Self: Stream<Item = Result<K, watcher::Error>> + Sized,
76+
K: Resource + 'static,
77+
F: Fn(&K) -> Option<u64> + 'static,
78+
{
79+
PredicateFilter::new(self, predicate)
80+
}
81+
4182
/// Create a [`StreamSubscribe`] from a [`watcher()`] stream.
4283
///
4384
/// The [`StreamSubscribe::subscribe()`] method which allows additional consumers
@@ -103,3 +144,36 @@ pub trait WatchStreamExt: Stream {
103144
}
104145

105146
impl<St: ?Sized> WatchStreamExt for St where St: Stream {}
147+
148+
// Compile tests
149+
#[cfg(feature = "unstable-runtime-predicates")]
150+
#[cfg(test)]
151+
pub(crate) mod tests {
152+
use super::*;
153+
use crate::predicates;
154+
use futures::StreamExt;
155+
use k8s_openapi::api::core::v1::Pod;
156+
use kube_client::{Api, Resource};
157+
158+
fn compile_type<T>() -> T {
159+
unimplemented!("not called - compile test only")
160+
}
161+
162+
pub fn assert_stream<T, K>(x: T) -> T
163+
where
164+
T: Stream<Item = watcher::Result<K>> + Send,
165+
K: Resource + Clone + Send + 'static,
166+
{
167+
x
168+
}
169+
170+
// not #[test] because this is only a compile check verification
171+
#[allow(dead_code, unused_must_use)]
172+
fn test_watcher_stream_type_drift() {
173+
let pred_watch = watcher(compile_type::<Api<Pod>>(), Default::default())
174+
.touched_objects()
175+
.predicate_filter(predicates::generation)
176+
.boxed();
177+
assert_stream(pred_watch);
178+
}
179+
}

kube/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ runtime = ["kube-runtime"]
3131
unstable-runtime = ["kube-runtime/unstable-runtime"]
3232

3333
[package.metadata.docs.rs]
34-
features = ["client", "rustls-tls", "openssl-tls", "derive", "ws", "oauth", "jsonpatch", "admission", "runtime", "k8s-openapi/v1_26"]
34+
features = ["client", "rustls-tls", "openssl-tls", "derive", "ws", "oauth", "jsonpatch", "admission", "runtime", "k8s-openapi/v1_26", "unstable-runtime"]
3535
# Define the configuration attribute `docsrs`. Used to enable `doc_cfg` feature.
3636
rustdoc-args = ["--cfg", "docsrs"]
3737

0 commit comments

Comments
 (0)