Skip to content
Draft
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ thiserror = "2.0.2"
tracing = { version = "0.1" }
url = "2.2"
walkdir = { version = "2", optional = true }
arc-swap = "1.7.1"

# Cloud storage support
base64 = { version = "0.22", default-features = false, features = ["std"], optional = true }
Expand Down
66 changes: 48 additions & 18 deletions src/client/token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
// specific language governing permissions and limitations
// under the License.

use arc_swap::ArcSwapOption;
use std::future::Future;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::Mutex;

Expand All @@ -33,15 +35,17 @@ pub(crate) struct TemporaryToken<T> {
/// [`TemporaryToken`] based on its expiry
#[derive(Debug)]
pub(crate) struct TokenCache<T> {
cache: Mutex<Option<(TemporaryToken<T>, Instant)>>,
cache: ArcSwapOption<CacheEntry<T>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So in general, unless we have benchmark results that show arc-swap is necessary, I am opposed to adding a new dependency

Did you try a RWLock before reaching for a new crate? I always worry about adding new crates like arcswap as I don't want to have to deal with a RUSTSEC report if/when it becomes abandoned.

I do see there are many other users

RWLocks would allow multiple concurrent readers but if you had a lot of writers you might still have contention. If the you find update contention is too much, you could change to use RWLock<Arc<..>> so that the lock only needs to be held to clone an Arc

I understand the docs for arc swap claims https://docs.rs/arc-swap/latest/arc_swap/

Better option would be to have RwLock<Arc>. Then one would lock, clone the Arc and unlock. This suffers from CPU-level contention (on the lock and on the reference count of the Arc) which makes it relatively slow. Depending on the implementation, an update may be blocked for arbitrary long time by a steady inflow of readers.

I would imagine the overhead of actually using the token (making an HTTP request) is pretty huge compared to getting a lock.

Copy link
Author

@ankrgyl ankrgyl Nov 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I just saw this after writing my other comment.

I would imagine the overhead of actually using the token (making an HTTP request) is pretty huge compared to getting a lock.

The problem with the previous design, which may not apply to an RwLock (and sure, I will benchmark it and report back), is that "waiting in line" for the mutex would become so expensive with a high number of concurrent requests (eg. HEAD requests with 8ms p50 latencies), that it actually overwhelmed tokio's worker threads and dominated the execution time (we saw p50 "HEAD" operation latency spike to 700ms, and realized the mutex was the root cause).

Let me run a benchmark with arc swap vs. RwLock and report back

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me run a benchmark with arc swap vs. RwLock and report back

👍

refresh_lock: Mutex<()>,
min_ttl: Duration,
fetch_backoff: Duration,
}

impl<T> Default for TokenCache<T> {
fn default() -> Self {
Self {
cache: Default::default(),
cache: ArcSwapOption::new(None),
refresh_lock: Default::default(),
min_ttl: Duration::from_secs(300),
// How long to wait before re-attempting a token fetch after receiving one that
// is still within the min-ttl
Expand All @@ -62,32 +66,58 @@ impl<T: Clone + Send> TokenCache<T> {
F: FnOnce() -> Fut + Send,
Fut: Future<Output = Result<TemporaryToken<T>, E>> + Send,
{
let now = Instant::now();
let mut locked = self.cache.lock().await;
if let Some(token) = self.try_get_cached() {
return Ok(token);
}

// Only one fetch at a time
let _refresh = self.refresh_lock.lock().await;

// Re-check after acquiring lock in case another task refreshed already
if let Some(token) = self.try_get_cached() {
return Ok(token);
}

let fetched = f().await?;
let token_clone = fetched.token.clone();
let entry = Arc::new(CacheEntry {
token: fetched.token,
expiry: fetched.expiry,
fetched_at: Instant::now(),
});
self.cache.store(Some(entry));
Ok(token_clone)
}

if let Some((cached, fetched_at)) = locked.as_ref() {
match cached.expiry {
fn try_get_cached(&self) -> Option<T> {
let now = Instant::now();
if let Some(entry) = self.cache.load_full() {
match entry.expiry {
Some(ttl) => {
if ttl.checked_duration_since(now).unwrap_or_default() > self.min_ttl ||
// if we've recently attempted to fetch this token and it's not actually
// expired, we'll wait to re-fetch it and return the cached one
(fetched_at.elapsed() < self.fetch_backoff && ttl.checked_duration_since(now).is_some())
let remaining = ttl.checked_duration_since(now).unwrap_or_default();
if remaining > self.min_ttl
|| (entry.fetched_at.elapsed() < self.fetch_backoff
&& ttl.checked_duration_since(now).is_some())
{
return Ok(cached.token.clone());
return Some(entry.token.clone());
}
}
None => return Ok(cached.token.clone()),
None => {
return Some(entry.token.clone());
}
}
}

let cached = f().await?;
let token = cached.token.clone();
*locked = Some((cached, Instant::now()));

Ok(token)
None
}
}

#[derive(Debug)]
struct CacheEntry<T> {
token: T,
expiry: Option<Instant>,
fetched_at: Instant,
}

#[cfg(test)]
mod test {
use crate::client::token::{TemporaryToken, TokenCache};
Expand Down