Skip to content

Commit a45d78b

Browse files
authored
Merge pull request #22 from at-microcosm/collection-stats
collection stats
2 parents b59da61 + f99725e commit a45d78b

File tree

11 files changed

+332
-1933
lines changed

11 files changed

+332
-1933
lines changed

Cargo.lock

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ufos/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ schemars = { version = "0.8.22", features = ["raw_value", "chrono"] }
2323
semver = "1.0.26"
2424
serde = "1.0.219"
2525
serde_json = "1.0.140"
26+
serde_qs = "1.0.0-rc.3"
2627
sha2 = "0.10.9"
2728
thiserror = "2.0.12"
2829
tokio = { version = "1.44.2", features = ["full", "sync", "time"] }

ufos/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ pub mod index_html;
66
pub mod server;
77
pub mod storage;
88
pub mod storage_fjall;
9-
pub mod storage_mem;
109
pub mod store_types;
1110

1211
use crate::error::BatchInsertError;
@@ -287,6 +286,8 @@ pub struct NsidCount {
287286
#[derive(Debug, Serialize, JsonSchema)]
288287
pub struct JustCount {
289288
creates: u64,
289+
updates: u64,
290+
deletes: u64,
290291
dids_estimate: u64,
291292
}
292293

ufos/src/main.rs

Lines changed: 17 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use ufos::file_consumer;
77
use ufos::server;
88
use ufos::storage::{StorageWhatever, StoreBackground, StoreReader, StoreWriter};
99
use ufos::storage_fjall::FjallStorage;
10-
use ufos::storage_mem::MemStorage;
1110
use ufos::store_types::SketchSecretPrefix;
1211
use ufos::{nice_duration, ConsumerInfo};
1312

@@ -19,7 +18,7 @@ use tikv_jemallocator::Jemalloc;
1918
static GLOBAL: Jemalloc = Jemalloc;
2019

2120
/// Aggregate links in the at-mosphere
22-
#[derive(Parser, Debug)]
21+
#[derive(Parser, Debug, Clone)]
2322
#[command(version, about, long_about = None)]
2423
struct Args {
2524
/// Jetstream server to connect to (exclusive with --fixture). Provide either a wss:// URL, or a shorhand value:
@@ -47,9 +46,6 @@ struct Args {
4746
/// todo: restore this
4847
#[arg(long, action)]
4948
pause_rw: bool,
50-
/// DEBUG: use an in-memory store instead of fjall
51-
#[arg(long, action)]
52-
in_mem: bool,
5349
/// reset the rollup cursor, scrape through missed things in the past (backfill)
5450
#[arg(long, action)]
5551
reroll: bool,
@@ -64,56 +60,18 @@ async fn main() -> anyhow::Result<()> {
6460

6561
let args = Args::parse();
6662
let jetstream = args.jetstream.clone();
67-
if args.in_mem {
68-
let (read_store, write_store, cursor, sketch_secret) = MemStorage::init(
69-
args.data,
70-
jetstream,
71-
args.jetstream_force,
72-
Default::default(),
73-
)?;
74-
go(
75-
args.jetstream,
76-
args.jetstream_fixture,
77-
args.pause_writer,
78-
args.backfill,
79-
args.reroll,
80-
read_store,
81-
write_store,
82-
cursor,
83-
sketch_secret,
84-
)
85-
.await?;
86-
} else {
87-
let (read_store, write_store, cursor, sketch_secret) = FjallStorage::init(
88-
args.data,
89-
jetstream,
90-
args.jetstream_force,
91-
Default::default(),
92-
)?;
93-
go(
94-
args.jetstream,
95-
args.jetstream_fixture,
96-
args.pause_writer,
97-
args.backfill,
98-
args.reroll,
99-
read_store,
100-
write_store,
101-
cursor,
102-
sketch_secret,
103-
)
104-
.await?;
105-
}
106-
63+
let (read_store, write_store, cursor, sketch_secret) = FjallStorage::init(
64+
args.data.clone(),
65+
jetstream,
66+
args.jetstream_force,
67+
Default::default(),
68+
)?;
69+
go(args, read_store, write_store, cursor, sketch_secret).await?;
10770
Ok(())
10871
}
10972

110-
#[allow(clippy::too_many_arguments)]
11173
async fn go<B: StoreBackground>(
112-
jetstream: String,
113-
jetstream_fixture: bool,
114-
pause_writer: bool,
115-
backfill: bool,
116-
reroll: bool,
74+
args: Args,
11775
read_store: impl StoreReader + 'static + Clone,
11876
mut write_store: impl StoreWriter<B> + 'static,
11977
cursor: Option<Cursor>,
@@ -122,24 +80,26 @@ async fn go<B: StoreBackground>(
12280
println!("starting server with storage...");
12381
let serving = server::serve(read_store.clone());
12482

125-
if pause_writer {
83+
if args.pause_writer {
12684
log::info!("not starting jetstream or the write loop.");
12785
serving.await.map_err(|e| anyhow::anyhow!(e))?;
12886
return Ok(());
12987
}
13088

131-
let batches = if jetstream_fixture {
132-
log::info!("starting with jestream file fixture: {jetstream:?}");
133-
file_consumer::consume(jetstream.into(), sketch_secret, cursor).await?
89+
let batches = if args.jetstream_fixture {
90+
log::info!("starting with jestream file fixture: {:?}", args.jetstream);
91+
file_consumer::consume(args.jetstream.into(), sketch_secret, cursor).await?
13492
} else {
13593
log::info!(
13694
"starting consumer with cursor: {cursor:?} from {:?} ago",
13795
cursor.map(|c| c.elapsed())
13896
);
139-
consumer::consume(&jetstream, cursor, false, sketch_secret).await?
97+
consumer::consume(&args.jetstream, cursor, false, sketch_secret).await?
14098
};
14199

142-
let rolling = write_store.background_tasks(reroll)?.run(backfill);
100+
let rolling = write_store
101+
.background_tasks(args.reroll)?
102+
.run(args.backfill);
143103
let storing = write_store.receive_batches(batches);
144104

145105
let stating = do_update_stuff(read_store);
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
use crate::Nsid;
2+
use async_trait::async_trait;
3+
use dropshot::{
4+
ApiEndpointBodyContentType, ExtractorMetadata, HttpError, Query, RequestContext, ServerContext,
5+
SharedExtractor,
6+
};
7+
use schemars::JsonSchema;
8+
use serde::Deserialize;
9+
use std::collections::HashSet;
10+
11+
/// The real type that gets deserialized
12+
#[derive(Debug, Deserialize, JsonSchema)]
13+
pub struct MultiCollectionQuery {
14+
pub collection: Vec<String>,
15+
}
16+
17+
/// The fake corresponding type for docs that dropshot won't freak out about a
18+
/// vec for
19+
#[derive(Deserialize, JsonSchema)]
20+
#[allow(dead_code)]
21+
struct MultiCollectionQueryForDocs {
22+
/// One or more collection [NSID](https://atproto.com/specs/nsid)s
23+
///
24+
/// Pass this parameter multiple times to specify multiple collections, like
25+
/// `collection=app.bsky.feed.like&collection=app.bsky.feed.post`
26+
collection: String,
27+
}
28+
29+
impl TryFrom<MultiCollectionQuery> for HashSet<Nsid> {
30+
type Error = HttpError;
31+
fn try_from(mcq: MultiCollectionQuery) -> Result<Self, Self::Error> {
32+
let mut out = HashSet::with_capacity(mcq.collection.len());
33+
for c in mcq.collection {
34+
let nsid = Nsid::new(c).map_err(|e| {
35+
HttpError::for_bad_request(
36+
None,
37+
format!("failed to convert collection to an NSID: {e:?}"),
38+
)
39+
})?;
40+
out.insert(nsid);
41+
}
42+
Ok(out)
43+
}
44+
}
45+
46+
// The `SharedExtractor` implementation for Query<QueryType> describes how to
47+
// construct an instance of `Query<QueryType>` from an HTTP request: namely, by
48+
// parsing the query string to an instance of `QueryType`.
49+
#[async_trait]
50+
impl SharedExtractor for MultiCollectionQuery {
51+
async fn from_request<Context: ServerContext>(
52+
ctx: &RequestContext<Context>,
53+
) -> Result<MultiCollectionQuery, HttpError> {
54+
let raw_query = ctx.request.uri().query().unwrap_or("");
55+
let q = serde_qs::from_str(raw_query).map_err(|e| {
56+
HttpError::for_bad_request(None, format!("unable to parse query string: {}", e))
57+
})?;
58+
Ok(q)
59+
}
60+
61+
fn metadata(body_content_type: ApiEndpointBodyContentType) -> ExtractorMetadata {
62+
// HACK: query type switcheroo: passing MultiCollectionQuery to
63+
// `metadata` would "helpfully" panic because dropshot believes we can
64+
// only have scalar types in a query.
65+
//
66+
// so instead we have a fake second type whose only job is to look the
67+
// same as MultiCollectionQuery exept that it has `String` instead of
68+
// `Vec<String>`, which dropshot will accept, and generate ~close-enough
69+
// docs for.
70+
<Query<MultiCollectionQueryForDocs> as SharedExtractor>::metadata(body_content_type)
71+
}
72+
}

ufos/src/server/cors.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
use dropshot::{HttpError, HttpResponseHeaders, HttpResponseOk};
2+
use schemars::JsonSchema;
3+
use serde::Serialize;
4+
5+
pub type OkCorsResponse<T> = Result<HttpResponseHeaders<HttpResponseOk<T>>, HttpError>;
6+
7+
/// Helper for constructing Ok responses: return OkCors(T).into()
8+
/// (not happy with this yet)
9+
pub struct OkCors<T: Serialize + JsonSchema + Send + Sync>(pub T);
10+
11+
impl<T> From<OkCors<T>> for OkCorsResponse<T>
12+
where
13+
T: Serialize + JsonSchema + Send + Sync,
14+
{
15+
fn from(ok: OkCors<T>) -> OkCorsResponse<T> {
16+
let mut res = HttpResponseHeaders::new_unnamed(HttpResponseOk(ok.0));
17+
res.headers_mut()
18+
.insert("access-control-allow-origin", "*".parse().unwrap());
19+
Ok(res)
20+
}
21+
}
22+
23+
// TODO: cors for HttpError

0 commit comments

Comments
 (0)