-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Allow multi-read in CollectionView
#4609
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 10 commits
11f4b70
c0aa375
a0283da
a63565d
f0667a9
ae214d5
420b46a
91403f2
6c4fdd1
204a713
ca1d0b5
2f0b025
bfe15d4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,7 +9,7 @@ use std::{ | |
mem, | ||
}; | ||
|
||
use async_lock::{RwLock, RwLockReadGuard, RwLockWriteGuard}; | ||
use async_lock::{RwLock, RwLockReadGuard}; | ||
#[cfg(with_metrics)] | ||
use linera_base::prometheus_util::MeasureLatency as _; | ||
use serde::{de::DeserializeOwned, Serialize}; | ||
|
@@ -51,19 +51,37 @@ pub struct ByteCollectionView<C, W> { | |
} | ||
|
||
/// A read-only accessor for a particular subview in a [`CollectionView`]. | ||
pub struct ReadGuardedView<'a, W> { | ||
guard: RwLockReadGuard<'a, BTreeMap<Vec<u8>, Update<W>>>, | ||
short_key: Vec<u8>, | ||
pub enum ReadGuardedView<'a, W> { | ||
/// The view is loaded in the updates | ||
Loaded { | ||
/// The guard for the updates. | ||
updates: RwLockReadGuard<'a, BTreeMap<Vec<u8>, Update<W>>>, | ||
/// The key in question. | ||
short_key: Vec<u8>, | ||
}, | ||
/// The view is not loaded in the updates | ||
NotLoaded { | ||
/// The guard for the updates. It is needed so that it prevents | ||
/// opening the view as write separately. | ||
_updates: RwLockReadGuard<'a, BTreeMap<Vec<u8>, Update<W>>>, | ||
/// The view obtained from the storage | ||
view: W, | ||
}, | ||
} | ||
|
||
impl<W> std::ops::Deref for ReadGuardedView<'_, W> { | ||
type Target = W; | ||
|
||
fn deref(&self) -> &W { | ||
let Update::Set(view) = self.guard.get(&self.short_key).unwrap() else { | ||
unreachable!(); | ||
}; | ||
view | ||
match self { | ||
ReadGuardedView::Loaded { updates, short_key } => { | ||
let Update::Set(view) = updates.get(short_key).unwrap() else { | ||
unreachable!(); | ||
}; | ||
view | ||
} | ||
ReadGuardedView::NotLoaded { _updates, view } => view, | ||
} | ||
} | ||
} | ||
|
||
|
@@ -269,25 +287,16 @@ impl<W: View> ByteCollectionView<W::Context, W> { | |
&self, | ||
short_key: &[u8], | ||
) -> Result<Option<ReadGuardedView<W>>, ViewError> { | ||
let mut updates = self | ||
.updates | ||
.try_write() | ||
.ok_or(ViewError::CannotAcquireCollectionEntry)?; | ||
match updates.entry(short_key.to_vec()) { | ||
btree_map::Entry::Occupied(entry) => { | ||
let entry = entry.into_mut(); | ||
match entry { | ||
Update::Set(_) => { | ||
let guard = RwLockWriteGuard::downgrade(updates); | ||
Ok(Some(ReadGuardedView { | ||
guard, | ||
short_key: short_key.to_vec(), | ||
})) | ||
} | ||
Update::Removed => Ok(None), | ||
} | ||
} | ||
btree_map::Entry::Vacant(entry) => { | ||
let updates = self.updates.read().await; | ||
match updates.get(short_key) { | ||
Some(update) => match update { | ||
Update::Removed => Ok(None), | ||
_ => Ok(Some(ReadGuardedView::Loaded { | ||
|
||
updates, | ||
short_key: short_key.to_vec(), | ||
})), | ||
}, | ||
None => { | ||
let key_index = self | ||
.context | ||
.base_key() | ||
|
@@ -301,11 +310,9 @@ impl<W: View> ByteCollectionView<W::Context, W> { | |
.base_tag_index(KeyTag::Subview as u8, short_key); | ||
let context = self.context.clone_with_base_key(key); | ||
let view = W::load(context).await?; | ||
entry.insert(Update::Set(view)); | ||
let guard = RwLockWriteGuard::downgrade(updates); | ||
Ok(Some(ReadGuardedView { | ||
guard, | ||
short_key: short_key.to_vec(), | ||
Ok(Some(ReadGuardedView::NotLoaded { | ||
_updates: updates, | ||
view, | ||
ma2bd marked this conversation as resolved.
Show resolved
Hide resolved
|
||
})) | ||
} else { | ||
Ok(None) | ||
|
@@ -314,6 +321,95 @@ impl<W: View> ByteCollectionView<W::Context, W> { | |
} | ||
} | ||
|
||
/// Load multiple entries for reading at once. | ||
/// The entries in `short_keys` have to be all distinct. | ||
/// ```rust | ||
/// # tokio_test::block_on(async { | ||
/// # use linera_views::context::MemoryContext; | ||
/// # use linera_views::collection_view::ByteCollectionView; | ||
/// # use linera_views::register_view::RegisterView; | ||
/// # use linera_views::views::View; | ||
/// # let context = MemoryContext::new_for_testing(()); | ||
/// let mut view: ByteCollectionView<_, RegisterView<_, String>> = | ||
/// ByteCollectionView::load(context).await.unwrap(); | ||
/// { | ||
/// let _subview = view.load_entry_or_insert(&[0, 1]).await.unwrap(); | ||
/// } | ||
/// let short_keys = vec![vec![0, 1], vec![2, 3]]; | ||
/// let subviews = view.try_load_entries(short_keys).await.unwrap(); | ||
/// let value0 = subviews[0].as_ref().unwrap().get(); | ||
/// assert_eq!(*value0, String::default()); | ||
/// # }) | ||
/// ``` | ||
pub async fn try_load_entries( | ||
&self, | ||
short_keys: Vec<Vec<u8>>, | ||
) -> Result<Vec<Option<ReadGuardedView<W>>>, ViewError> { | ||
let mut results = Vec::with_capacity(short_keys.len()); | ||
let mut keys_to_check = Vec::new(); | ||
let mut keys_to_check_metadata = Vec::new(); | ||
let updates = self.updates.read().await; | ||
|
||
for (position, short_key) in short_keys.into_iter().enumerate() { | ||
match updates.get(&short_key) { | ||
Some(update) => match update { | ||
Update::Removed => { | ||
results.push(None); | ||
} | ||
_ => { | ||
let updates = self.updates.read().await; | ||
results.push(Some(ReadGuardedView::Loaded { | ||
updates, | ||
short_key: short_key.clone(), | ||
})); | ||
} | ||
}, | ||
None => { | ||
results.push(None); // Placeholder, may be updated later | ||
if !self.delete_storage_first { | ||
let key = self | ||
.context | ||
.base_key() | ||
.base_tag_index(KeyTag::Index as u8, &short_key); | ||
let subview_context = self.context.clone_with_base_key(key.clone()); | ||
keys_to_check.push(key); | ||
keys_to_check_metadata.push((position, subview_context)); | ||
} | ||
} | ||
} | ||
} | ||
|
||
let found_keys = self.context.store().contains_keys(keys_to_check).await?; | ||
let entries_to_load = keys_to_check_metadata | ||
.into_iter() | ||
.zip(found_keys) | ||
.filter_map(|(metadata, found)| found.then_some(metadata)) | ||
.collect::<Vec<_>>(); | ||
|
||
let mut keys_to_load = Vec::with_capacity(entries_to_load.len() * W::NUM_INIT_KEYS); | ||
for (_, context) in &entries_to_load { | ||
keys_to_load.extend(W::pre_load(context)?); | ||
} | ||
let values = self | ||
.context | ||
.store() | ||
.read_multi_values_bytes(keys_to_load) | ||
.await?; | ||
|
||
for (loaded_values, (position, context)) in | ||
values.chunks_exact(W::NUM_INIT_KEYS).zip(entries_to_load) | ||
{ | ||
let view = W::post_load(context, loaded_values)?; | ||
let updates = self.updates.read().await; | ||
results[position] = Some(ReadGuardedView::NotLoaded { | ||
_updates: updates, | ||
view, | ||
}); | ||
} | ||
|
||
Ok(results) | ||
} | ||
|
||
/// Resets an entry to the default value. | ||
/// ```rust | ||
/// # tokio_test::block_on(async { | ||
|
@@ -831,6 +927,41 @@ impl<I: Serialize, W: View> CollectionView<W::Context, I, W> { | |
self.collection.try_load_entry(&short_key).await | ||
} | ||
|
||
/// Load multiple entries for reading at once. | ||
/// The entries in indices have to be all distinct. | ||
/// ```rust | ||
/// # tokio_test::block_on(async { | ||
/// # use linera_views::context::MemoryContext; | ||
/// # use linera_views::collection_view::CollectionView; | ||
/// # use linera_views::register_view::RegisterView; | ||
/// # use linera_views::views::View; | ||
/// # let context = MemoryContext::new_for_testing(()); | ||
/// let mut view: CollectionView<_, u64, RegisterView<_, String>> = | ||
/// CollectionView::load(context).await.unwrap(); | ||
/// { | ||
/// let _subview = view.load_entry_or_insert(&23).await.unwrap(); | ||
/// } | ||
/// let indices = vec![23, 24]; | ||
/// let subviews = view.try_load_entries(&indices).await.unwrap(); | ||
/// let value0 = subviews[0].as_ref().unwrap().get(); | ||
/// assert_eq!(*value0, String::default()); | ||
/// # }) | ||
/// ``` | ||
pub async fn try_load_entries<'a, Q>( | ||
&self, | ||
indices: impl IntoIterator<Item = &'a Q>, | ||
) -> Result<Vec<Option<ReadGuardedView<W>>>, ViewError> | ||
where | ||
I: Borrow<Q>, | ||
Q: Serialize + 'a, | ||
{ | ||
let short_keys = indices | ||
.into_iter() | ||
.map(|index| BaseKey::derive_short_key(index)) | ||
.collect::<Result<_, _>>()?; | ||
self.collection.try_load_entries(short_keys).await | ||
} | ||
|
||
/// Resets an entry to the default value. | ||
/// ```rust | ||
/// # tokio_test::block_on(async { | ||
|
@@ -895,7 +1026,7 @@ impl<I: Serialize, W: View> CollectionView<W::Context, I, W> { | |
|
||
impl<I, W: View> CollectionView<W::Context, I, W> | ||
where | ||
I: Sync + Clone + Send + Serialize + DeserializeOwned, | ||
I: Sync + Send + Serialize + DeserializeOwned, | ||
{ | ||
/// Returns the list of indices in the collection in the order determined by | ||
/// the serialization. | ||
|
@@ -1181,6 +1312,40 @@ impl<I: CustomSerialize, W: View> CustomCollectionView<W::Context, I, W> { | |
self.collection.try_load_entry(&short_key).await | ||
} | ||
|
||
/// Load multiple entries for reading at once. | ||
/// The entries in indices have to be all distinct. | ||
/// ```rust | ||
/// # tokio_test::block_on(async { | ||
/// # use linera_views::context::MemoryContext; | ||
/// # use linera_views::collection_view::CustomCollectionView; | ||
/// # use linera_views::register_view::RegisterView; | ||
/// # use linera_views::views::View; | ||
/// # let context = MemoryContext::new_for_testing(()); | ||
/// let mut view: CustomCollectionView<_, u128, RegisterView<_, String>> = | ||
/// CustomCollectionView::load(context).await.unwrap(); | ||
/// { | ||
/// let _subview = view.load_entry_or_insert(&23).await.unwrap(); | ||
/// } | ||
/// let subviews = view.try_load_entries(&[23, 42]).await.unwrap(); | ||
/// let value0 = subviews[0].as_ref().unwrap().get(); | ||
/// assert_eq!(*value0, String::default()); | ||
/// # }) | ||
/// ``` | ||
pub async fn try_load_entries<'a, Q>( | ||
&self, | ||
indices: impl IntoIterator<Item = &'a Q>, | ||
) -> Result<Vec<Option<ReadGuardedView<W>>>, ViewError> | ||
where | ||
I: Borrow<Q>, | ||
Q: CustomSerialize + 'a, | ||
{ | ||
let short_keys = indices | ||
.into_iter() | ||
.map(|index| index.to_custom_bytes()) | ||
.collect::<Result<_, _>>()?; | ||
self.collection.try_load_entries(short_keys).await | ||
} | ||
|
||
/// Marks the entry so that it is removed in the next flush. | ||
/// ```rust | ||
/// # tokio_test::block_on(async { | ||
|
@@ -1443,8 +1608,7 @@ mod graphql { | |
+ async_graphql::OutputType | ||
+ serde::ser::Serialize | ||
+ serde::de::DeserializeOwned | ||
+ std::fmt::Debug | ||
+ Clone, | ||
+ std::fmt::Debug, | ||
V: View + async_graphql::OutputType, | ||
MapInput<K>: async_graphql::InputType, | ||
MapFilters<K>: async_graphql::InputType, | ||
|
@@ -1477,16 +1641,15 @@ mod graphql { | |
self.indices().await? | ||
}; | ||
|
||
let mut values = vec![]; | ||
for key in keys { | ||
let value = self | ||
.try_load_entry(&key) | ||
.await? | ||
.ok_or_else(|| missing_key_error(&key))?; | ||
values.push(Entry { value, key }) | ||
} | ||
|
||
Ok(values) | ||
let values = self.try_load_entries(&keys).await?; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nice! |
||
values | ||
.into_iter() | ||
.zip(keys) | ||
.map(|(value, key)| match value { | ||
None => Err(missing_key_error(&key)), | ||
|
||
Some(value) => Ok(Entry { value, key }), | ||
}) | ||
.collect() | ||
} | ||
} | ||
|
||
|
@@ -1543,16 +1706,15 @@ mod graphql { | |
self.indices().await? | ||
}; | ||
|
||
let mut values = vec![]; | ||
for key in keys { | ||
let value = self | ||
.try_load_entry(&key) | ||
.await? | ||
.ok_or_else(|| missing_key_error(&key))?; | ||
values.push(Entry { value, key }) | ||
} | ||
|
||
Ok(values) | ||
let values = self.try_load_entries(&keys).await?; | ||
values | ||
.into_iter() | ||
.zip(keys) | ||
.map(|(value, key)| match value { | ||
None => Err(missing_key_error(&key)), | ||
Some(value) => Ok(Entry { value, key }), | ||
}) | ||
.collect() | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's great to finally remove this! We can remove the comment "May fail if one subview is already being visited."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make sure to do another (non-backportable) PR after this one to remove
CannotAcquireCollectionEntry
.