Skip to content

Commit dc6dea4

Browse files
authored
Merge branch 'main' into rustfmt-2024
2 parents d3caa7e + b14f3d7 commit dc6dea4

File tree

11 files changed

+157
-24
lines changed

11 files changed

+157
-24
lines changed

Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ either = "1.6.1"
4747
form_urlencoded = "1.2.0"
4848
futures = { version = "0.3.17", default-features = false }
4949
hashbrown = "0.16.0"
50-
home = "0.5.4"
5150
hostname = "0.4"
5251
http = "1.1.0"
5352
http-body = "1.0.1"

examples/node_reflector.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,10 @@ async fn main() -> anyhow::Result<()> {
2424
.default_backoff()
2525
.reflect(writer)
2626
.applied_objects()
27-
.predicate_filter(predicates::labels.combine(predicates::annotations));
27+
.predicate_filter(
28+
predicates::labels.combine(predicates::annotations),
29+
Default::default(),
30+
);
2831
let mut stream = pin!(stream);
2932

3033
// Periodically read our state in the background

examples/pod_reflector.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ async fn main() -> anyhow::Result<()> {
4141
})
4242
.reflect(writer)
4343
.applied_objects()
44-
.predicate_filter(predicates::resource_version);
44+
.predicate_filter(predicates::resource_version, Default::default());
4545
let mut stream = pin!(stream);
4646

4747
while let Some(pod) = stream.try_next().await? {

examples/shared_stream_controllers.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ async fn main() -> anyhow::Result<()> {
6262
let filtered = subscriber
6363
.clone()
6464
.map(|r| Ok(r.deref().clone()))
65-
.predicate_filter(predicates::resource_version)
65+
.predicate_filter(predicates::resource_version, Default::default())
6666
.filter_map(|r| future::ready(r.ok().map(Arc::new)));
6767

6868
// Reflect a stream of pod watch events into the store and apply a backoff. For subscribers to

kube-client/Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ gzip = ["client", "tower-http/decompression-gzip"]
2626
client = ["config", "__non_core", "hyper", "hyper-util", "http-body", "http-body-util", "tower", "tower-http", "hyper-timeout", "chrono", "jsonpath-rust", "bytes", "futures", "tokio", "tokio-util", "either"]
2727
jsonpatch = ["kube-core/jsonpatch"]
2828
admission = ["kube-core/admission"]
29-
config = ["__non_core", "pem", "home"]
29+
config = ["__non_core", "pem"]
3030
socks5 = ["hyper-util/client-proxy"]
3131
http-proxy = ["hyper-util/client-proxy"]
3232
unstable-client = []
@@ -45,7 +45,6 @@ workspace = true
4545
[dependencies]
4646
base64 = { workspace = true, optional = true }
4747
chrono = { workspace = true, optional = true }
48-
home = { workspace = true, optional = true }
4948
serde = { workspace = true, features = ["derive"] }
5049
serde_json.workspace = true
5150
serde_yaml = { workspace = true, optional = true }

kube-client/src/config/file_config.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -660,7 +660,15 @@ fn ensure_trailing_newline(mut data: Vec<u8>) -> Vec<u8> {
660660

661661
/// Returns kubeconfig path from `$HOME/.kube/config`.
662662
fn default_kube_path() -> Option<PathBuf> {
663-
home::home_dir().map(|h| h.join(".kube").join("config"))
663+
// Before Rust 1.85.0, `home_dir` would return wrong results on Windows, usage of the crate
664+
// `home` was encouraged (and is what kube-rs did).
665+
// Rust 1.85.0 fixed the problem (https://doc.rust-lang.org/1.85.0/std/env/fn.home_dir.html),
666+
// Rust 1.87.0 removed the function deprecation.
667+
// As the MSRV was bumped to 1.85.0 we are safe to use the fixed std function.
668+
// Note: We intentionally use `allow` over `expect` to support compilation on Rust >= 1.87.0
669+
// Note: This can be removed once the MSRV is bumped to >= 1.87.0
670+
#[allow(deprecated)]
671+
std::env::home_dir().map(|h| h.join(".kube").join("config"))
664672
}
665673

666674
mod base64serde {

kube-runtime/src/controller/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -730,7 +730,7 @@ where
730730
/// .default_backoff()
731731
/// .reflect(writer)
732732
/// .applied_objects()
733-
/// .predicate_filter(predicates::generation);
733+
/// .predicate_filter(predicates::generation, Default::default());
734734
///
735735
/// Controller::for_stream(deploys, reader)
736736
/// .run(reconcile, error_policy, Arc::new(()))
@@ -993,7 +993,7 @@ where
993993
/// # async fn doc(client: kube::Client) {
994994
/// let sts_stream = metadata_watcher(Api::<StatefulSet>::all(client.clone()), watcher::Config::default())
995995
/// .touched_objects()
996-
/// .predicate_filter(predicates::generation);
996+
/// .predicate_filter(predicates::generation, Default::default());
997997
///
998998
/// Controller::new(Api::<CustomResource>::all(client), watcher::Config::default())
999999
/// .owns_stream(sts_stream)
@@ -1271,7 +1271,7 @@ where
12711271
/// let cr: Api<CustomResource> = Api::all(client.clone());
12721272
/// let daemons = watcher(api, watcher::Config::default())
12731273
/// .touched_objects()
1274-
/// .predicate_filter(predicates::generation);
1274+
/// .predicate_filter(predicates::generation, Default::default());
12751275
///
12761276
/// Controller::new(cr, watcher::Config::default())
12771277
/// .watches_stream(daemons, mapper)

kube-runtime/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,5 +37,5 @@ pub use scheduler::scheduler;
3737
pub use utils::WatchStreamExt;
3838
pub use watcher::{metadata_watcher, watcher};
3939

40-
pub use utils::{predicates, Predicate};
40+
pub use utils::{predicates, Predicate, PredicateConfig};
4141
pub use wait::conditions;

kube-runtime/src/utils/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ mod watch_ext;
1212
pub use backoff_reset_timer::{Backoff, ResetTimerBackoff};
1313
pub use event_decode::EventDecode;
1414
pub use event_modify::EventModify;
15-
pub use predicate::{predicates, Predicate, PredicateFilter};
15+
pub use predicate::{predicates, Config as PredicateConfig, Predicate, PredicateFilter};
1616
pub use reflect::Reflect;
1717
pub use stream_backoff::StreamBackoff;
1818
pub use watch_ext::WatchStreamExt;

kube-runtime/src/utils/predicate.rs

Lines changed: 128 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use std::{
1010
collections::{hash_map::DefaultHasher, HashMap},
1111
hash::{Hash, Hasher},
1212
marker::PhantomData,
13+
time::{Duration, Instant},
1314
};
1415

1516
fn hash<T: Hash + ?Sized>(t: &T) -> u64 {
@@ -114,6 +115,44 @@ where
114115
}
115116
}
116117

118+
/// Configuration for predicate filtering with cache TTL
119+
#[derive(Debug, Clone)]
120+
pub struct Config {
121+
/// Time-to-live for cache entries
122+
///
123+
/// Entries not seen for at least this long will be evicted from the cache.
124+
/// Default is 1 hour.
125+
ttl: Duration,
126+
}
127+
128+
impl Config {
129+
/// Set the time-to-live for cache entries
130+
///
131+
/// Entries not seen for at least this long will be evicted from the cache.
132+
#[must_use]
133+
pub fn ttl(mut self, ttl: Duration) -> Self {
134+
self.ttl = ttl;
135+
self
136+
}
137+
}
138+
139+
impl Default for Config {
140+
fn default() -> Self {
141+
Self {
142+
// Default to 1 hour TTL - long enough to avoid unnecessary reconciles
143+
// but short enough to prevent unbounded memory growth
144+
ttl: Duration::from_secs(3600),
145+
}
146+
}
147+
}
148+
149+
/// Cache entry storing predicate hash and last access time
150+
#[derive(Debug, Clone)]
151+
struct CacheEntry {
152+
hash: u64,
153+
last_seen: Instant,
154+
}
155+
117156
#[allow(clippy::pedantic)]
118157
#[pin_project]
119158
/// Stream returned by the [`predicate_filter`](super::WatchStreamExt::predicate_filter) method.
@@ -122,7 +161,8 @@ pub struct PredicateFilter<St, K: Resource, P: Predicate<K>> {
122161
#[pin]
123162
stream: St,
124163
predicate: P,
125-
cache: HashMap<PredicateCacheKey, u64>,
164+
cache: HashMap<PredicateCacheKey, CacheEntry>,
165+
config: Config,
126166
// K: Resource necessary to get .meta() of an object during polling
127167
_phantom: PhantomData<K>,
128168
}
@@ -132,11 +172,12 @@ where
132172
K: Resource,
133173
P: Predicate<K>,
134174
{
135-
pub(super) fn new(stream: St, predicate: P) -> Self {
175+
pub(super) fn new(stream: St, predicate: P, config: Config) -> Self {
136176
Self {
137177
stream,
138178
predicate,
139179
cache: HashMap::new(),
180+
config,
140181
_phantom: PhantomData,
141182
}
142183
}
@@ -152,13 +193,29 @@ where
152193

153194
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
154195
let mut me = self.project();
196+
197+
// Evict expired entries before processing new events
198+
let now = Instant::now();
199+
let ttl = me.config.ttl;
200+
me.cache
201+
.retain(|_, entry| now.duration_since(entry.last_seen) < ttl);
202+
155203
Poll::Ready(loop {
156204
break match ready!(me.stream.as_mut().poll_next(cx)) {
157205
Some(Ok(obj)) => {
158206
if let Some(val) = me.predicate.hash_property(&obj) {
159207
let key = PredicateCacheKey::from(obj.meta());
160-
let changed = me.cache.get(&key) != Some(&val);
161-
me.cache.insert(key, val);
208+
let now = Instant::now();
209+
210+
// Check if the predicate value changed or entry doesn't exist
211+
let changed = me.cache.get(&key).map(|entry| entry.hash) != Some(val);
212+
213+
// Upsert the cache entry with new hash and timestamp
214+
me.cache.insert(key, CacheEntry {
215+
hash: val,
216+
last_seen: now,
217+
});
218+
162219
if changed {
163220
Some(Ok(obj))
164221
} else {
@@ -216,7 +273,7 @@ pub mod predicates {
216273
pub(crate) mod tests {
217274
use std::{pin::pin, task::Poll};
218275

219-
use super::{predicates, Error, PredicateFilter};
276+
use super::{predicates, Config, Error, PredicateFilter};
220277
use futures::{poll, stream, FutureExt, StreamExt};
221278
use kube_client::Resource;
222279
use serde_json::json;
@@ -248,7 +305,11 @@ pub(crate) mod tests {
248305
Ok(mkobj(1)),
249306
Ok(mkobj(2)),
250307
]);
251-
let mut rx = pin!(PredicateFilter::new(data, predicates::generation));
308+
let mut rx = pin!(PredicateFilter::new(
309+
data,
310+
predicates::generation,
311+
Config::default()
312+
));
252313

253314
// mkobj(1) passed through
254315
let first = rx.next().now_or_never().unwrap().unwrap().unwrap();
@@ -299,7 +360,11 @@ pub(crate) mod tests {
299360
Ok(mkobj(1, "uid-2")),
300361
Ok(mkobj(2, "uid-3")),
301362
]);
302-
let mut rx = pin!(PredicateFilter::new(data, predicates::generation));
363+
let mut rx = pin!(PredicateFilter::new(
364+
data,
365+
predicates::generation,
366+
Config::default()
367+
));
303368

304369
// mkobj(1, uid-1) passed through
305370
let first = rx.next().now_or_never().unwrap().unwrap().unwrap();
@@ -319,4 +384,60 @@ pub(crate) mod tests {
319384

320385
assert!(matches!(poll!(rx.next()), Poll::Ready(None)));
321386
}
387+
388+
#[tokio::test]
389+
async fn predicate_cache_ttl_evicts_expired_entries() {
390+
use futures::{channel::mpsc, SinkExt};
391+
use k8s_openapi::api::core::v1::Pod;
392+
use std::time::Duration;
393+
394+
let mkobj = |g: i32, uid: &str| {
395+
let p: Pod = serde_json::from_value(json!({
396+
"apiVersion": "v1",
397+
"kind": "Pod",
398+
"metadata": {
399+
"name": "blog",
400+
"namespace": "default",
401+
"generation": Some(g),
402+
"uid": uid,
403+
},
404+
"spec": {
405+
"containers": [{
406+
"name": "blog",
407+
"image": "clux/blog:0.1.0"
408+
}],
409+
}
410+
}))
411+
.unwrap();
412+
p
413+
};
414+
415+
// Use a very short TTL for testing
416+
let config = Config::default().ttl(Duration::from_millis(50));
417+
418+
// Use a channel so we can send items with delays
419+
let (mut tx, rx) = mpsc::unbounded();
420+
let mut filtered = pin!(PredicateFilter::new(
421+
rx.map(Ok::<_, Error>),
422+
predicates::generation,
423+
config
424+
));
425+
426+
// Send first object
427+
tx.send(mkobj(1, "uid-1")).await.unwrap();
428+
let first = filtered.next().now_or_never().unwrap().unwrap().unwrap();
429+
assert_eq!(first.meta().generation, Some(1));
430+
431+
// Send same object immediately - should be filtered
432+
tx.send(mkobj(1, "uid-1")).await.unwrap();
433+
assert!(matches!(poll!(filtered.next()), Poll::Pending));
434+
435+
// Wait for TTL to expire
436+
tokio::time::sleep(Duration::from_millis(100)).await;
437+
438+
// Send same object after TTL - should pass through due to eviction
439+
tx.send(mkobj(1, "uid-1")).await.unwrap();
440+
let second = filtered.next().now_or_never().unwrap().unwrap().unwrap();
441+
assert_eq!(second.meta().generation, Some(1));
442+
}
322443
}

0 commit comments

Comments
 (0)