Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion linera-views/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ type HasherOutputSize = <sha3::Sha3_256 as sha3::digest::OutputSizeUser>::Output
pub type HasherOutput = generic_array::GenericArray<u8, HasherOutputSize>;

#[derive(Clone, Debug)]
pub(crate) enum Update<T> {
/// An update, for example to a view.
pub enum Update<T> {
/// The entry is removed.
Removed,
/// The entry is set to the following.
Set(T),
}

Expand Down
272 changes: 212 additions & 60 deletions linera-views/src/views/collection_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
mem,
};

use async_lock::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use async_lock::{RwLock, RwLockReadGuard};
#[cfg(with_metrics)]
use linera_base::prometheus_util::MeasureLatency as _;
use serde::{de::DeserializeOwned, Serialize};
Expand Down Expand Up @@ -51,19 +51,37 @@ pub struct ByteCollectionView<C, W> {
}

/// A read-only accessor for a particular subview in a [`CollectionView`].
pub struct ReadGuardedView<'a, W> {
guard: RwLockReadGuard<'a, BTreeMap<Vec<u8>, Update<W>>>,
short_key: Vec<u8>,
pub enum ReadGuardedView<'a, W> {
/// The view is loaded in the updates
Loaded {
/// The guard for the updates.
updates: RwLockReadGuard<'a, BTreeMap<Vec<u8>, Update<W>>>,
/// The key in question.
short_key: Vec<u8>,
},
/// The view is not loaded in the updates
NotLoaded {
/// The guard for the updates. It is needed so that it prevents
/// opening the view as write separately.
_updates: RwLockReadGuard<'a, BTreeMap<Vec<u8>, Update<W>>>,
/// The view obtained from the storage
view: W,
},
}

impl<W> std::ops::Deref for ReadGuardedView<'_, W> {
type Target = W;

fn deref(&self) -> &W {
let Update::Set(view) = self.guard.get(&self.short_key).unwrap() else {
unreachable!();
};
view
match self {
ReadGuardedView::Loaded { updates, short_key } => {
let Update::Set(view) = updates.get(short_key).unwrap() else {
unreachable!();
};
view
}
ReadGuardedView::NotLoaded { _updates, view } => view,
}
}
}

Expand Down Expand Up @@ -269,25 +287,16 @@ impl<W: View> ByteCollectionView<W::Context, W> {
&self,
short_key: &[u8],
) -> Result<Option<ReadGuardedView<W>>, ViewError> {
let mut updates = self
.updates
.try_write()
.ok_or(ViewError::CannotAcquireCollectionEntry)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's great to finally remove this! We can remove the comment "May fail if one subview is already being visited."

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make sure to do another (non-backportable) PR after this one to remove CannotAcquireCollectionEntry.

match updates.entry(short_key.to_vec()) {
btree_map::Entry::Occupied(entry) => {
let entry = entry.into_mut();
match entry {
Update::Set(_) => {
let guard = RwLockWriteGuard::downgrade(updates);
Ok(Some(ReadGuardedView {
guard,
short_key: short_key.to_vec(),
}))
}
Update::Removed => Ok(None),
}
}
btree_map::Entry::Vacant(entry) => {
let updates = self.updates.read().await;
match updates.get(short_key) {
Some(update) => match update {
Update::Removed => Ok(None),
Update::Set(_) => Ok(Some(ReadGuardedView::Loaded {
updates,
short_key: short_key.to_vec(),
})),
},
None => {
let key_index = self
.context
.base_key()
Expand All @@ -301,11 +310,9 @@ impl<W: View> ByteCollectionView<W::Context, W> {
.base_tag_index(KeyTag::Subview as u8, short_key);
let context = self.context.clone_with_base_key(key);
let view = W::load(context).await?;
entry.insert(Update::Set(view));
let guard = RwLockWriteGuard::downgrade(updates);
Ok(Some(ReadGuardedView {
guard,
short_key: short_key.to_vec(),
Ok(Some(ReadGuardedView::NotLoaded {
_updates: updates,
view,
}))
} else {
Ok(None)
Expand All @@ -314,6 +321,95 @@ impl<W: View> ByteCollectionView<W::Context, W> {
}
}

/// Load multiple entries for reading at once.
/// The entries in `short_keys` have to be all distinct.
/// ```rust
/// # tokio_test::block_on(async {
/// # use linera_views::context::MemoryContext;
/// # use linera_views::collection_view::ByteCollectionView;
/// # use linera_views::register_view::RegisterView;
/// # use linera_views::views::View;
/// # let context = MemoryContext::new_for_testing(());
/// let mut view: ByteCollectionView<_, RegisterView<_, String>> =
/// ByteCollectionView::load(context).await.unwrap();
/// {
/// let _subview = view.load_entry_or_insert(&[0, 1]).await.unwrap();
/// }
/// let short_keys = vec![vec![0, 1], vec![2, 3]];
/// let subviews = view.try_load_entries(short_keys).await.unwrap();
/// let value0 = subviews[0].as_ref().unwrap().get();
/// assert_eq!(*value0, String::default());
/// # })
/// ```
pub async fn try_load_entries(
&self,
short_keys: Vec<Vec<u8>>,
) -> Result<Vec<Option<ReadGuardedView<W>>>, ViewError> {
let mut results = Vec::with_capacity(short_keys.len());
let mut keys_to_check = Vec::new();
let mut keys_to_check_metadata = Vec::new();
let updates = self.updates.read().await;

for (position, short_key) in short_keys.into_iter().enumerate() {
match updates.get(&short_key) {
Some(update) => match update {
Update::Removed => {
results.push(None);
}
Update::Set(_) => {
let updates = self.updates.read().await;
results.push(Some(ReadGuardedView::Loaded {
updates,
short_key: short_key.clone(),
}));
}
},
None => {
results.push(None); // Placeholder, may be updated later
if !self.delete_storage_first {
let key = self
.context
.base_key()
.base_tag_index(KeyTag::Index as u8, &short_key);
let subview_context = self.context.clone_with_base_key(key.clone());
keys_to_check.push(key);
keys_to_check_metadata.push((position, subview_context));
}
}
}
}

let found_keys = self.context.store().contains_keys(keys_to_check).await?;
let entries_to_load = keys_to_check_metadata
.into_iter()
.zip(found_keys)
.filter_map(|(metadata, found)| found.then_some(metadata))
.collect::<Vec<_>>();

let mut keys_to_load = Vec::with_capacity(entries_to_load.len() * W::NUM_INIT_KEYS);
for (_, context) in &entries_to_load {
keys_to_load.extend(W::pre_load(context)?);
}
let values = self
.context
.store()
.read_multi_values_bytes(keys_to_load)
.await?;

for (loaded_values, (position, context)) in
values.chunks_exact(W::NUM_INIT_KEYS).zip(entries_to_load)
{
let view = W::post_load(context, loaded_values)?;
let updates = self.updates.read().await;
results[position] = Some(ReadGuardedView::NotLoaded {
_updates: updates,
view,
});
}

Ok(results)
}

/// Resets an entry to the default value.
/// ```rust
/// # tokio_test::block_on(async {
Expand Down Expand Up @@ -831,6 +927,41 @@ impl<I: Serialize, W: View> CollectionView<W::Context, I, W> {
self.collection.try_load_entry(&short_key).await
}

/// Load multiple entries for reading at once.
/// The entries in indices have to be all distinct.
/// ```rust
/// # tokio_test::block_on(async {
/// # use linera_views::context::MemoryContext;
/// # use linera_views::collection_view::CollectionView;
/// # use linera_views::register_view::RegisterView;
/// # use linera_views::views::View;
/// # let context = MemoryContext::new_for_testing(());
/// let mut view: CollectionView<_, u64, RegisterView<_, String>> =
/// CollectionView::load(context).await.unwrap();
/// {
/// let _subview = view.load_entry_or_insert(&23).await.unwrap();
/// }
/// let indices = vec![23, 24];
/// let subviews = view.try_load_entries(&indices).await.unwrap();
/// let value0 = subviews[0].as_ref().unwrap().get();
/// assert_eq!(*value0, String::default());
/// # })
/// ```
pub async fn try_load_entries<'a, Q>(
&self,
indices: impl IntoIterator<Item = &'a Q>,
) -> Result<Vec<Option<ReadGuardedView<W>>>, ViewError>
where
I: Borrow<Q>,
Q: Serialize + 'a,
{
let short_keys = indices
.into_iter()
.map(|index| BaseKey::derive_short_key(index))
.collect::<Result<_, _>>()?;
self.collection.try_load_entries(short_keys).await
}

/// Resets an entry to the default value.
/// ```rust
/// # tokio_test::block_on(async {
Expand Down Expand Up @@ -895,7 +1026,7 @@ impl<I: Serialize, W: View> CollectionView<W::Context, I, W> {

impl<I, W: View> CollectionView<W::Context, I, W>
where
I: Sync + Clone + Send + Serialize + DeserializeOwned,
I: Sync + Send + Serialize + DeserializeOwned,
{
/// Returns the list of indices in the collection in the order determined by
/// the serialization.
Expand Down Expand Up @@ -1181,6 +1312,40 @@ impl<I: CustomSerialize, W: View> CustomCollectionView<W::Context, I, W> {
self.collection.try_load_entry(&short_key).await
}

/// Load multiple entries for reading at once.
/// The entries in indices have to be all distinct.
/// ```rust
/// # tokio_test::block_on(async {
/// # use linera_views::context::MemoryContext;
/// # use linera_views::collection_view::CustomCollectionView;
/// # use linera_views::register_view::RegisterView;
/// # use linera_views::views::View;
/// # let context = MemoryContext::new_for_testing(());
/// let mut view: CustomCollectionView<_, u128, RegisterView<_, String>> =
/// CustomCollectionView::load(context).await.unwrap();
/// {
/// let _subview = view.load_entry_or_insert(&23).await.unwrap();
/// }
/// let subviews = view.try_load_entries(&[23, 42]).await.unwrap();
/// let value0 = subviews[0].as_ref().unwrap().get();
/// assert_eq!(*value0, String::default());
/// # })
/// ```
pub async fn try_load_entries<'a, Q>(
&self,
indices: impl IntoIterator<Item = &'a Q>,
) -> Result<Vec<Option<ReadGuardedView<W>>>, ViewError>
where
I: Borrow<Q>,
Q: CustomSerialize + 'a,
{
let short_keys = indices
.into_iter()
.map(|index| index.to_custom_bytes())
.collect::<Result<_, _>>()?;
self.collection.try_load_entries(short_keys).await
}

/// Marks the entry so that it is removed in the next flush.
/// ```rust
/// # tokio_test::block_on(async {
Expand Down Expand Up @@ -1400,7 +1565,7 @@ mod graphql {

use super::{CollectionView, CustomCollectionView, ReadGuardedView};
use crate::{
graphql::{hash_name, mangle, missing_key_error, Entry, MapFilters, MapInput},
graphql::{hash_name, mangle, missing_key_error, Entry, MapInput},
views::View,
};

Expand Down Expand Up @@ -1443,11 +1608,8 @@ mod graphql {
+ async_graphql::OutputType
+ serde::ser::Serialize
+ serde::de::DeserializeOwned
+ std::fmt::Debug
+ Clone,
+ std::fmt::Debug,
V: View + async_graphql::OutputType,
MapInput<K>: async_graphql::InputType,
MapFilters<K>: async_graphql::InputType,
{
async fn keys(&self) -> Result<Vec<K>, async_graphql::Error> {
Ok(self.indices().await?)
Expand Down Expand Up @@ -1477,16 +1639,12 @@ mod graphql {
self.indices().await?
};

let mut values = vec![];
for key in keys {
let value = self
.try_load_entry(&key)
.await?
.ok_or_else(|| missing_key_error(&key))?;
values.push(Entry { value, key })
}

Ok(values)
let values = self.try_load_entries(&keys).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

Ok(values
.into_iter()
.zip(keys)
.filter_map(|(value, key)| value.map(|value| Entry { value, key }))
.collect())
}
}

Expand All @@ -1512,8 +1670,6 @@ mod graphql {
+ crate::common::CustomSerialize
+ std::fmt::Debug,
V: View + async_graphql::OutputType,
MapInput<K>: async_graphql::InputType,
MapFilters<K>: async_graphql::InputType,
{
async fn keys(&self) -> Result<Vec<K>, async_graphql::Error> {
Ok(self.indices().await?)
Expand Down Expand Up @@ -1543,16 +1699,12 @@ mod graphql {
self.indices().await?
};

let mut values = vec![];
for key in keys {
let value = self
.try_load_entry(&key)
.await?
.ok_or_else(|| missing_key_error(&key))?;
values.push(Entry { value, key })
}

Ok(values)
let values = self.try_load_entries(&keys).await?;
Ok(values
.into_iter()
.zip(keys)
.filter_map(|(value, key)| value.map(|value| Entry { value, key }))
.collect())
}
}
}
Loading
Loading