Skip to content

Commit 692f01f

Browse files
committed
Implement cached schema & variant materialized views infrastructure
This change establishes the foundation for materialized views (MVs) that provide cached access to schema and variant data from uninstalled packages. The implementation uses the MV building pattern established by the change set MVs and supports graceful fallback chains in API endpoints. Key components: - CachedSchema and CachedDefaultVariant MV types for schema metadata access - Deployment MV discovery system with direct key lookup in content- addressable store vs O(nm) traversal through layers - JoinSet task scheduling for parallel MV building in edda-server - Luminork API endpoints updated with MV-first lookups and DAL fallbacks - Function collection system with error handling for variant data assembly The MV system enables efficient access to schema information without requiring full package installation, supporting API integrations and MCP servers.
1 parent 0b41846 commit 692f01f

File tree

26 files changed

+1509
-457
lines changed

26 files changed

+1509
-457
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lib/dal-materialized-views/BUCK

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ rust_library(
66
"//lib/dal:dal",
77
"//lib/si-frontend-mv-types-rs:si-frontend-mv-types",
88
"//lib/si-id:si-id",
9+
"//lib/si-pkg:si-pkg",
910
"//lib/telemetry-rs:telemetry",
1011
"//third-party/rust:remain",
1112
"//third-party/rust:serde_json",

lib/dal-materialized-views/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ remain = { workspace = true }
1414
serde_json = { workspace = true }
1515
si-frontend-mv-types = { path = "../../lib/si-frontend-mv-types-rs" }
1616
si-id = { path = "../../lib/si-id" }
17+
si-pkg = { path = "../../lib/si-pkg" }
1718
telemetry = { path = "../../lib/telemetry-rs" }
1819
thiserror = { workspace = true }
1920

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
use si_id::FuncId;
2+
use si_pkg::HasUniqueId;
3+
use telemetry::prelude::*;
4+
5+
pub mod schema;
6+
pub mod schemas;
7+
8+
/// Collects function IDs from SiPkg function types, handling invalid IDs gracefully.
9+
/// Logs warnings for function IDs that cannot be parsed as ULIDs and skips them.
10+
pub(crate) fn collect_function_ids<T, E, I, F>(
11+
get_functions: F,
12+
collector: &mut Vec<FuncId>,
13+
schema_id: &str,
14+
variant_id: &str,
15+
func_type: &str,
16+
) -> Result<(), E>
17+
where
18+
F: FnOnce() -> Result<I, E>,
19+
I: IntoIterator<Item = T>,
20+
T: HasUniqueId,
21+
E: std::error::Error,
22+
{
23+
for func in get_functions()? {
24+
if let Some(unique_id) = func.unique_id() {
25+
match unique_id.parse::<FuncId>() {
26+
Ok(func_id) => collector.push(func_id),
27+
Err(e) => {
28+
debug!(
29+
"Skipping invalid function ID '{}' in {} for schema {} variant {}: {}. Expected ULID format.",
30+
unique_id, func_type, schema_id, variant_id, e
31+
);
32+
}
33+
}
34+
}
35+
}
36+
Ok(())
37+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
use dal::{
2+
DalContext,
3+
SchemaId,
4+
SchemaVariantId,
5+
cached_module::CachedModule,
6+
};
7+
use si_frontend_mv_types::cached_schema::CachedSchema as CachedSchemaMv;
8+
use telemetry::prelude::*;
9+
10+
#[instrument(
11+
name = "dal_materialized_views.cached_schema",
12+
level = "debug",
13+
skip_all
14+
)]
15+
pub async fn assemble(ctx: DalContext, id: SchemaId) -> crate::Result<CachedSchemaMv> {
16+
let mut module = CachedModule::find_latest_for_schema_id(&ctx, id)
17+
.await?
18+
.ok_or_else(|| crate::Error::Schema(dal::SchemaError::UninstalledSchemaNotFound(id)))?;
19+
20+
// Get the SiPkg data to extract variant information
21+
let si_pkg = module.si_pkg(&ctx).await?;
22+
let schemas = si_pkg.schemas()?;
23+
let schema = schemas
24+
.into_iter()
25+
.next()
26+
.ok_or_else(|| crate::Error::Schema(dal::SchemaError::UninstalledSchemaNotFound(id)))?;
27+
28+
let variants = schema.variants()?;
29+
30+
// Find the default variant based on the schema data
31+
let schema_data = schema
32+
.data()
33+
.ok_or_else(|| crate::Error::Schema(dal::SchemaError::UninstalledSchemaNotFound(id)))?;
34+
35+
let default_variant =
36+
if let Some(default_variant_unique_id) = schema_data.default_schema_variant() {
37+
// Find the variant with the matching unique_id
38+
variants
39+
.iter()
40+
.find(|v| v.unique_id() == Some(default_variant_unique_id))
41+
.or_else(|| variants.first()) // Fallback to first variant
42+
} else {
43+
// No default specified, use first variant
44+
variants.first()
45+
}
46+
.ok_or_else(|| crate::Error::Schema(dal::SchemaError::UninstalledSchemaNotFound(id)))?;
47+
48+
// Extract the variant unique_id and convert to SchemaVariantId
49+
let default_variant_id: SchemaVariantId = default_variant
50+
.unique_id()
51+
.ok_or_else(|| crate::Error::Schema(dal::SchemaError::UninstalledSchemaNotFound(id)))?
52+
.parse::<SchemaVariantId>()
53+
.map_err(|_| crate::Error::Schema(dal::SchemaError::UninstalledSchemaNotFound(id)))?;
54+
55+
// Collect all variant IDs from their unique_ids
56+
let mut variant_ids = Vec::new();
57+
for variant in &variants {
58+
if let Some(unique_id) = variant.unique_id() {
59+
if let Ok(variant_id) = unique_id.parse::<SchemaVariantId>() {
60+
variant_ids.push(variant_id);
61+
}
62+
}
63+
}
64+
65+
Ok(CachedSchemaMv::new(
66+
id,
67+
module.schema_name,
68+
default_variant_id,
69+
variant_ids,
70+
))
71+
}
72+
73+
pub mod variant;
Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
use dal::{
2+
DalContext,
3+
SchemaVariantId,
4+
cached_module::CachedModule,
5+
schema::variant::DEFAULT_SCHEMA_VARIANT_COLOR,
6+
};
7+
use si_frontend_mv_types::cached_schema_variant::CachedSchemaVariant as CachedSchemaVariantMv;
8+
use si_id::FuncId;
9+
use si_pkg::{
10+
SiPkgSchema,
11+
SiPkgSchemaVariant,
12+
};
13+
use telemetry::prelude::*;
14+
15+
use crate::cached::collect_function_ids;
16+
17+
/// Data structure for assembled variant information
18+
pub(crate) struct AssembledVariantData {
19+
pub variant_id: SchemaVariantId,
20+
pub display_name: String,
21+
pub category: String,
22+
pub color: String,
23+
pub description: Option<String>,
24+
pub link: Option<String>,
25+
pub asset_func_id: FuncId,
26+
pub variant_func_ids: Vec<FuncId>,
27+
}
28+
29+
/// Assembles variant data from schema and variant components
30+
pub(crate) async fn assemble_variant_data(
31+
schema: &SiPkgSchema<'_>,
32+
variant: &SiPkgSchemaVariant<'_>,
33+
variant_id: SchemaVariantId,
34+
) -> crate::Result<AssembledVariantData> {
35+
// Get variant data
36+
let variant_data = variant.data().ok_or_else(|| {
37+
crate::Error::SchemaVariant(dal::SchemaVariantError::NotFound(variant_id))
38+
})?;
39+
40+
// Get the asset func unique_id and convert to FuncId
41+
let asset_func_id = variant_data
42+
.func_unique_id()
43+
.parse::<FuncId>()
44+
.map_err(|_| crate::Error::SchemaVariant(dal::SchemaVariantError::NotFound(variant_id)))?;
45+
46+
// Get category from schema data
47+
let category = schema
48+
.data
49+
.as_ref()
50+
.map(|d| d.category())
51+
.unwrap_or("Component") // default category
52+
.to_string();
53+
54+
// Get display name from variant spec
55+
let variant_spec = variant.to_spec().await?;
56+
let display_name = variant_spec
57+
.data
58+
.as_ref()
59+
.and_then(|d| d.display_name.as_ref())
60+
.map(|d| d.to_string())
61+
.unwrap_or_else(|| schema.name().to_string()); // fallback to schema name
62+
63+
// Collect all function IDs attached to this variant
64+
let mut variant_func_ids = Vec::with_capacity(100); // Pre-allocate based on typical size
65+
let schema_id_str = schema.unique_id().unwrap_or("unknown");
66+
let variant_id_str = variant_id.to_string();
67+
68+
// All function types use the same helper with closure syntax
69+
collect_function_ids(
70+
|| variant.leaf_functions(),
71+
&mut variant_func_ids,
72+
schema_id_str,
73+
&variant_id_str,
74+
"leaf function",
75+
)?;
76+
collect_function_ids(
77+
|| variant.action_funcs(),
78+
&mut variant_func_ids,
79+
schema_id_str,
80+
&variant_id_str,
81+
"action function",
82+
)?;
83+
collect_function_ids(
84+
|| variant.auth_funcs(),
85+
&mut variant_func_ids,
86+
schema_id_str,
87+
&variant_id_str,
88+
"auth function",
89+
)?;
90+
collect_function_ids(
91+
|| variant.management_funcs(),
92+
&mut variant_func_ids,
93+
schema_id_str,
94+
&variant_id_str,
95+
"management function",
96+
)?;
97+
collect_function_ids(
98+
|| variant.si_prop_funcs(),
99+
&mut variant_func_ids,
100+
schema_id_str,
101+
&variant_id_str,
102+
"si prop function",
103+
)?;
104+
collect_function_ids(
105+
|| variant.root_prop_funcs(),
106+
&mut variant_func_ids,
107+
schema_id_str,
108+
&variant_id_str,
109+
"root prop function",
110+
)?;
111+
112+
// Remove duplicates efficiently, and ensure we have stable output for the MV.
113+
variant_func_ids.sort_unstable();
114+
variant_func_ids.dedup();
115+
116+
Ok(AssembledVariantData {
117+
variant_id,
118+
display_name,
119+
category,
120+
color: variant_data
121+
.color()
122+
.unwrap_or(DEFAULT_SCHEMA_VARIANT_COLOR)
123+
.to_string(),
124+
description: variant_data.description().map(|d| d.to_string()),
125+
link: variant_data.link().map(|l| l.to_string()),
126+
asset_func_id,
127+
variant_func_ids,
128+
})
129+
}
130+
131+
#[instrument(
132+
name = "dal_materialized_views.cached_schema_variant",
133+
level = "debug",
134+
skip_all
135+
)]
136+
pub async fn assemble(
137+
ctx: DalContext,
138+
id: SchemaVariantId,
139+
) -> crate::Result<CachedSchemaVariantMv> {
140+
// Find the cached module containing this variant by storing the module info
141+
for mut module in CachedModule::latest_modules(&ctx).await? {
142+
let si_pkg = module.si_pkg(&ctx).await?;
143+
let schemas = si_pkg.schemas()?;
144+
for schema in schemas {
145+
let variants = schema.variants()?;
146+
for variant in variants {
147+
if let Some(unique_id) = variant.unique_id() {
148+
if let Ok(variant_id) = unique_id.parse::<SchemaVariantId>() {
149+
if variant_id == id {
150+
// Found the variant, assemble its data
151+
let assembled_data =
152+
assemble_variant_data(&schema, &variant, id).await?;
153+
154+
// Determine if this variant is the default variant for the schema
155+
let is_default_variant = if let Some(schema_data) = schema.data.as_ref()
156+
{
157+
if let Some(default_variant_unique_id) =
158+
schema_data.default_schema_variant()
159+
{
160+
// Check if this variant's unique_id matches the default
161+
variant.unique_id() == Some(default_variant_unique_id)
162+
} else {
163+
// No default specified in schema, check if this is the first variant
164+
let all_variants = schema.variants()?;
165+
all_variants.first().map(|first| first.unique_id())
166+
== variant.unique_id().map(Some)
167+
}
168+
} else {
169+
// No schema data, assume first variant is default
170+
let all_variants = schema.variants()?;
171+
all_variants.first().map(|first| first.unique_id())
172+
== variant.unique_id().map(Some)
173+
};
174+
175+
return Ok(CachedSchemaVariantMv::new(
176+
assembled_data.variant_id,
177+
assembled_data.display_name,
178+
assembled_data.category,
179+
assembled_data.color,
180+
true, // is_locked - cached modules are locked until installed
181+
assembled_data.description,
182+
assembled_data.link,
183+
assembled_data.asset_func_id,
184+
assembled_data.variant_func_ids,
185+
is_default_variant,
186+
));
187+
}
188+
}
189+
}
190+
}
191+
}
192+
}
193+
194+
// Variant not found
195+
Err(crate::Error::SchemaVariant(
196+
dal::SchemaVariantError::NotFound(id),
197+
))
198+
}
199+
200+
pub mod default;

0 commit comments

Comments
 (0)