|
| 1 | +//! This example shows how to create tags that expire after a certain time. |
| 2 | +//! |
| 3 | +//! We use a prefix so we can distinguish between expiring and normal tags, and |
| 4 | +//! then encode the expiry date in the tag name after the prefix, in a format |
| 5 | +//! that sorts in the same order as the expiry date. |
| 6 | +//! |
| 7 | +//! The example creates a number of blobs and protects them directly or indirectly |
| 8 | +//! with expiring tags. Watch as the expired tags are deleted and the blobs |
| 9 | +//! are removed from the store. |
| 10 | +use std::{ |
| 11 | + ops::Deref, |
| 12 | + time::{Duration, SystemTime}, |
| 13 | +}; |
| 14 | + |
| 15 | +use chrono::Utc; |
| 16 | +use futures_lite::StreamExt; |
| 17 | +use iroh_blobs::{ |
| 18 | + api::{blobs::AddBytesOptions, Store, Tag}, |
| 19 | + hashseq::HashSeq, |
| 20 | + store::fs::options::{BatchOptions, GcConfig, InlineOptions, Options, PathOptions}, |
| 21 | + BlobFormat, Hash, |
| 22 | +}; |
| 23 | +use tokio::signal::ctrl_c; |
| 24 | + |
| 25 | +/// Using an iroh rpc client, create a tag that is marked to expire at `expiry` for all the given hashes. |
| 26 | +/// |
| 27 | +/// The tag name will be `prefix`- followed by the expiry date in iso8601 format (e.g. `expiry-2025-01-01T12:00:00Z`). |
| 28 | +async fn create_expiring_tag( |
| 29 | + store: &Store, |
| 30 | + hashes: &[Hash], |
| 31 | + prefix: &str, |
| 32 | + expiry: SystemTime, |
| 33 | +) -> anyhow::Result<()> { |
| 34 | + let expiry = chrono::DateTime::<chrono::Utc>::from(expiry); |
| 35 | + let expiry = expiry.to_rfc3339_opts(chrono::SecondsFormat::Secs, true); |
| 36 | + let tagname = format!("{prefix}-{expiry}"); |
| 37 | + if hashes.is_empty() { |
| 38 | + return Ok(()); |
| 39 | + } else if hashes.len() == 1 { |
| 40 | + let hash = hashes[0]; |
| 41 | + store.tags().set(&tagname, hash).await?; |
| 42 | + } else { |
| 43 | + let hs = hashes.iter().copied().collect::<HashSeq>(); |
| 44 | + store |
| 45 | + .add_bytes_with_opts(AddBytesOptions { |
| 46 | + data: hs.into(), |
| 47 | + format: BlobFormat::HashSeq, |
| 48 | + }) |
| 49 | + .with_named_tag(&tagname) |
| 50 | + .await?; |
| 51 | + }; |
| 52 | + println!("Created tag {tagname}"); |
| 53 | + Ok(()) |
| 54 | +} |
| 55 | + |
| 56 | +async fn delete_expired_tags(blobs: &Store, prefix: &str, bulk: bool) -> anyhow::Result<()> { |
| 57 | + let prefix = format!("{prefix}-"); |
| 58 | + let now = chrono::Utc::now(); |
| 59 | + let end = format!( |
| 60 | + "{}-{}", |
| 61 | + prefix, |
| 62 | + now.to_rfc3339_opts(chrono::SecondsFormat::Secs, true) |
| 63 | + ); |
| 64 | + if bulk { |
| 65 | + // delete all tags with the prefix and an expiry date before now |
| 66 | + // |
| 67 | + // this should be very efficient, since it is just a single database operation |
| 68 | + blobs |
| 69 | + .tags() |
| 70 | + .delete_range(Tag::from(prefix.clone())..Tag::from(end)) |
| 71 | + .await?; |
| 72 | + } else { |
| 73 | + // find tags to delete one by one and then delete them |
| 74 | + // |
| 75 | + // this allows us to print the tags before deleting them |
| 76 | + let mut tags = blobs.tags().list().await?; |
| 77 | + let mut to_delete = Vec::new(); |
| 78 | + while let Some(tag) = tags.next().await { |
| 79 | + let tag = tag?.name; |
| 80 | + if let Some(rest) = tag.0.strip_prefix(prefix.as_bytes()) { |
| 81 | + let Ok(expiry) = std::str::from_utf8(rest) else { |
| 82 | + tracing::warn!("Tag {} does have non utf8 expiry", tag); |
| 83 | + continue; |
| 84 | + }; |
| 85 | + let Ok(expiry) = chrono::DateTime::parse_from_rfc3339(expiry) else { |
| 86 | + tracing::warn!("Tag {} does have invalid expiry date", tag); |
| 87 | + continue; |
| 88 | + }; |
| 89 | + let expiry = expiry.with_timezone(&Utc); |
| 90 | + if expiry < now { |
| 91 | + to_delete.push(tag); |
| 92 | + } |
| 93 | + } |
| 94 | + } |
| 95 | + for tag in to_delete { |
| 96 | + println!("Deleting expired tag {tag}\n"); |
| 97 | + blobs.tags().delete(tag).await?; |
| 98 | + } |
| 99 | + } |
| 100 | + Ok(()) |
| 101 | +} |
| 102 | + |
| 103 | +async fn print_store_info(store: &Store) -> anyhow::Result<()> { |
| 104 | + let now = chrono::Utc::now(); |
| 105 | + let mut tags = store.tags().list().await?; |
| 106 | + println!( |
| 107 | + "Current time: {}", |
| 108 | + now.to_rfc3339_opts(chrono::SecondsFormat::Secs, true) |
| 109 | + ); |
| 110 | + println!("Tags:"); |
| 111 | + while let Some(tag) = tags.next().await { |
| 112 | + let tag = tag?; |
| 113 | + println!(" {tag:?}"); |
| 114 | + } |
| 115 | + let mut blobs = store.list().stream().await?; |
| 116 | + println!("Blobs:"); |
| 117 | + while let Some(item) = blobs.next().await { |
| 118 | + println!(" {}", item?); |
| 119 | + } |
| 120 | + println!(); |
| 121 | + Ok(()) |
| 122 | +} |
| 123 | + |
| 124 | +async fn info_task(store: Store) -> anyhow::Result<()> { |
| 125 | + tokio::time::sleep(Duration::from_secs(1)).await; |
| 126 | + loop { |
| 127 | + print_store_info(&store).await?; |
| 128 | + tokio::time::sleep(Duration::from_secs(5)).await; |
| 129 | + } |
| 130 | +} |
| 131 | + |
| 132 | +async fn delete_expired_tags_task(store: Store, prefix: &str) -> anyhow::Result<()> { |
| 133 | + loop { |
| 134 | + delete_expired_tags(&store, prefix, false).await?; |
| 135 | + tokio::time::sleep(Duration::from_secs(5)).await; |
| 136 | + } |
| 137 | +} |
| 138 | + |
| 139 | +#[tokio::main] |
| 140 | +async fn main() -> anyhow::Result<()> { |
| 141 | + tracing_subscriber::fmt::init(); |
| 142 | + let path = std::env::current_dir()?.join("blobs"); |
| 143 | + let options = Options { |
| 144 | + path: PathOptions::new(&path), |
| 145 | + gc: Some(GcConfig { |
| 146 | + add_protected: None, |
| 147 | + interval: Duration::from_secs(10), |
| 148 | + }), |
| 149 | + inline: InlineOptions::default(), |
| 150 | + batch: BatchOptions::default(), |
| 151 | + }; |
| 152 | + let store = |
| 153 | + iroh_blobs::store::fs::FsStore::load_with_opts(path.join("blobs.db"), options).await?; |
| 154 | + |
| 155 | + // setup: add some data and tag it |
| 156 | + { |
| 157 | + // add several blobs and tag them with an expiry date 10 seconds in the future |
| 158 | + let batch = store.batch().await?; |
| 159 | + let a = batch.add_bytes("blob 1".as_bytes()).await?; |
| 160 | + let b = batch.add_bytes("blob 2".as_bytes()).await?; |
| 161 | + |
| 162 | + let expires_at = SystemTime::now() |
| 163 | + .checked_add(Duration::from_secs(10)) |
| 164 | + .unwrap(); |
| 165 | + create_expiring_tag(&store, &[*a.hash(), *b.hash()], "expiring", expires_at).await?; |
| 166 | + |
| 167 | + // add a single blob and tag it with an expiry date 60 seconds in the future |
| 168 | + let c = batch.add_bytes("blob 3".as_bytes()).await?; |
| 169 | + let expires_at = SystemTime::now() |
| 170 | + .checked_add(Duration::from_secs(60)) |
| 171 | + .unwrap(); |
| 172 | + create_expiring_tag(&store, &[*c.hash()], "expiring", expires_at).await?; |
| 173 | + // batch goes out of scope, so data is only protected by the tags we created |
| 174 | + } |
| 175 | + |
| 176 | + // delete expired tags every 5 seconds |
| 177 | + let delete_task = tokio::spawn(delete_expired_tags_task(store.deref().clone(), "expiring")); |
| 178 | + // print all tags and blobs every 5 seconds |
| 179 | + let info_task = tokio::spawn(info_task(store.deref().clone())); |
| 180 | + |
| 181 | + ctrl_c().await?; |
| 182 | + delete_task.abort(); |
| 183 | + info_task.abort(); |
| 184 | + store.shutdown().await?; |
| 185 | + Ok(()) |
| 186 | +} |
0 commit comments