Skip to content

Commit 3f79ab8

Browse files
committed
Backends: improvements of properties management and traits documentation
1 parent 9d9f514 commit 3f79ab8

File tree

8 files changed

+118
-71
lines changed

8 files changed

+118
-71
lines changed

backends/influxdb/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ log = "0.4"
3636
uuid = { version = "0.8", features = ["v4"] }
3737
serde = { version = "1.0", features = ["derive"] }
3838
serde_json = "1.0"
39-
influxdb = "0.3"
39+
influxdb = { version = "0.3", features = ["h1-client"] }
4040

4141
[dependencies.async-std]
4242
version = "=1.6.5"

backends/influxdb/src/lib.rs

Lines changed: 43 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,14 @@ use zenoh_backend_traits::*;
3434
use zenoh_util::collections::{Timed, TimedEvent, TimedHandle, Timer};
3535
use zenoh_util::{zerror, zerror2};
3636

37+
// Properies used by the Backend
38+
pub const PROP_BACKEND_URL: &str = "url";
39+
40+
// Properies used by the Storage
41+
pub const PROP_STORAGE_DB: &str = "db";
42+
pub const PROP_STORAGE_CREATE_DB: &str = "create_db";
43+
pub const PROP_STORAGE_ON_CLOSURE: &str = "on_closure";
44+
3745
// delay after deletion to drop a measurement
3846
const DROP_MEASUREMENT_TIMEOUT_MS: u64 = 5000;
3947

@@ -43,15 +51,15 @@ pub fn create_backend(props: &Properties) -> ZResult<Box<dyn Backend>> {
4351
// Try to activate it here, ignoring failures.
4452
let _ = env_logger::try_init();
4553

46-
match props.get("url") {
54+
match props.get(PROP_BACKEND_URL) {
4755
Some(url) => {
4856
// Check connectivity to InfluxDB, no need for a database for this
4957
let client = Client::new(url, "UNUSED");
5058
match async_std::task::block_on(async move { client.ping().await }) {
5159
Ok(_) => {
52-
let mut admin_status_props = props.clone();
53-
let _ = admin_status_props.insert("kind".into(), "time series".into());
54-
let admin_status = zenoh::utils::properties_to_json_value(&admin_status_props);
60+
let mut p = props.clone();
61+
p.insert(PROP_BACKEND_TYPE.into(), "InfluxDB".into());
62+
let admin_status = zenoh::utils::properties_to_json_value(&p);
5563
Ok(Box::new(InfluxDbBackend {
5664
url: url.clone(),
5765
admin_status,
@@ -101,7 +109,7 @@ enum OnClosure {
101109
impl TryFrom<&Properties> for OnClosure {
102110
type Error = ZError;
103111
fn try_from(p: &Properties) -> ZResult<OnClosure> {
104-
match p.get("on_closure") {
112+
match p.get(PROP_STORAGE_ON_CLOSURE) {
105113
Some(s) => {
106114
if s == "drop_db" {
107115
Ok(OnClosure::DropDB)
@@ -129,14 +137,14 @@ struct InfluxDbStorage {
129137

130138
impl InfluxDbStorage {
131139
async fn new(props: Properties, url: &str) -> ZResult<InfluxDbStorage> {
132-
let path_expr = props.get("path_expr").unwrap();
133-
let path_prefix = match props.get("path_prefix") {
140+
let path_expr = props.get(PROP_STORAGE_PATH_EXPR).unwrap();
141+
let path_prefix = match props.get(PROP_STORAGE_PATH_PREFIX) {
134142
Some(p) => {
135143
if !path_expr.starts_with(p) {
136144
return zerror!(ZErrorKind::Other {
137145
descr: format!(
138-
"The specified path_prefix={} is not a prefix of pah_expr={}",
139-
p, path_expr
146+
"The specified {}={} is not a prefix of {}={}",
147+
PROP_STORAGE_PATH_PREFIX, p, PROP_STORAGE_PATH_EXPR, path_expr
140148
)
141149
});
142150
}
@@ -145,9 +153,9 @@ impl InfluxDbStorage {
145153
None => None,
146154
};
147155
let on_closure = OnClosure::try_from(&props)?;
148-
let createdb = props.contains_key("create_db");
156+
let createdb = props.contains_key(PROP_STORAGE_CREATE_DB);
149157

150-
let client = match props.get("db") {
158+
let client = match props.get(PROP_STORAGE_DB) {
151159
Some(db) => {
152160
check_db_existence(url, db, createdb).await?;
153161
Client::new(url, db)
@@ -160,7 +168,7 @@ impl InfluxDbStorage {
160168
};
161169

162170
let mut admin_status_props = props.clone();
163-
admin_status_props.insert("db".into(), client.database_name().into());
171+
admin_status_props.insert(PROP_STORAGE_DB.into(), client.database_name().into());
164172
let admin_status = zenoh::utils::properties_to_json_value(&admin_status_props);
165173
Ok(InfluxDbStorage {
166174
admin_status,
@@ -268,9 +276,12 @@ impl Storage for InfluxDbStorage {
268276
}
269277
}
270278

279+
// encode the value as a string to be stored in InfluxDB
271280
let (encoding, base64, value) = change.value.unwrap().encode_to_string();
281+
272282
// Note: tags are stored as strings in InfluxDB, while fileds are typed.
273283
// For simpler/faster deserialization, we store encoding, timestamp and base64 as fields.
284+
// while the kind is stored as a tag to be indexed by InfluxDB and have faster queries on it.
274285
let query =
275286
InfluxWQuery::new(InfluxTimestamp::Nanoseconds(influx_time), measurement)
276287
.add_tag("kind", "PUT")
@@ -333,16 +344,10 @@ impl Storage for InfluxDbStorage {
333344

334345
// When receiving a Query (i.e. on GET operations)
335346
async fn on_query(&mut self, query: Query) -> ZResult<()> {
336-
#[derive(Deserialize, Debug)]
337-
struct ZenohPoint {
338-
kind: String,
339-
timestamp: String,
340-
encoding: zenoh::net::ZInt,
341-
base64: bool,
342-
value: String,
343-
}
344-
347+
// get the query's Selector
345348
let selector = Selector::try_from(&query)?;
349+
350+
// if a path_prefix is used
346351
let regex = if let Some(prefix) = &self.path_prefix {
347352
// get the list of sub-path expressions that will match the same stored keys than
348353
// the selector, if those keys had the path_prefix.
@@ -351,28 +356,45 @@ impl Storage for InfluxDbStorage {
351356
"Query on {} with path_expr={} => sub_path_exprs = {:?}",
352357
selector.path_expr, prefix, path_exprs
353358
);
359+
// convert the sub-path expressions into an Influx regex
354360
path_exprs_to_influx_regex(&path_exprs)
355361
} else {
362+
// convert the Selector's path expression into an Influx regex
356363
path_exprs_to_influx_regex(&[selector.path_expr.as_str()])
357364
};
365+
366+
// construct the Influx query clauses from the Selector
358367
let clauses = clauses_from_selector(&selector);
368+
369+
// the Influx query
359370
let influx_query_str = format!("SELECT * FROM {} {}", regex, clauses);
360371
let influx_query = InfluxQuery::raw_read_query(&influx_query_str);
361372

373+
// the expected JSon type resulting from the query
374+
#[derive(Deserialize, Debug)]
375+
struct ZenohPoint {
376+
kind: String,
377+
timestamp: String,
378+
encoding: zenoh::net::ZInt,
379+
base64: bool,
380+
value: String,
381+
}
362382
debug!("Get {} with Influx query: {}", selector, influx_query_str);
363383
match self.client.json_query(influx_query).await {
364384
Ok(mut query_result) => {
365385
while !query_result.results.is_empty() {
366386
match query_result.deserialize_next::<ZenohPoint>() {
367387
Ok(retn) => {
368388
for serie in retn.series {
389+
// reconstruct the path from the measurement name (same as serie.name)
369390
let mut res_name = String::with_capacity(serie.name.len());
370391
if let Some(p) = &self.path_prefix {
371392
res_name.push_str(&p);
372393
}
373394
res_name.push_str(&serie.name);
374395
debug!("Replying {} values for {}", serie.values.len(), res_name);
375396
for zpoint in serie.values {
397+
// decode the value and the timestamp
376398
match (
377399
Value::decode_from_string(
378400
zpoint.encoding,

backends/traits/src/lib.rs

Lines changed: 40 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,27 @@ use async_std::sync::{Arc, RwLock};
1515
use async_trait::async_trait;
1616
use std::convert::TryFrom;
1717
use zenoh::net::Sample;
18-
use zenoh::{Properties, Selector, Timestamp, Value, ZError, ZResult};
18+
use zenoh::{Properties, Selector, Value, ZError, ZResult};
1919

2020
pub mod utils;
2121

22-
pub const STORAGE_PATH_EXPR_PROPERTY: &str = "path_expr";
22+
/// The `"type"` property key to be used in admin status reported by Backends.
23+
pub const PROP_BACKEND_TYPE: &str = "type";
2324

24-
/// Trait to be implemented by a Backend.
25-
/// A library implementing the Backend and Storage traits must also declare a [`create_backend()`] operation
25+
/// The `"path_expr"` property key to be used for configuration of each storage.
26+
pub const PROP_STORAGE_PATH_EXPR: &str = "path_expr";
27+
28+
/// The `"path_prefix"` property key that could be used to specify the common path prefix
29+
/// to be stripped from Paths before storing them as keys in the Storage.
30+
///
31+
/// Note that it shall be a prefix of the `"path_expr"`.
32+
/// If you use it, you should also adapt in [`Storage::on_query()`] implementation the incoming
33+
/// queries' path expression to the stored keys calling [`crate::utils::get_sub_path_exprs()`].
34+
pub const PROP_STORAGE_PATH_PREFIX: &str = "path_prefix";
35+
36+
/// Trait to be implemented by a Backend.
37+
///
38+
/// A library implementing the Backend and Storage traits must also declare a `create_backend()` operation
2639
/// with the `#[no_mangle]` attribute as an entrypoint to be called for the Backend creation.
2740
///
2841
/// # Example
@@ -33,18 +46,27 @@ pub const STORAGE_PATH_EXPR_PROPERTY: &str = "path_expr";
3346
/// use zenoh_backend_traits::*;
3447
///
3548
/// #[no_mangle]
36-
/// pub fn create_backend(_unused: &Properties) -> ZResult<Box<dyn Backend>> {
37-
/// Ok(Box::new(MyBackend {}))
49+
/// pub fn create_backend(properties: &Properties) -> ZResult<Box<dyn Backend>> {
50+
/// // The properties are the ones passed via a PUT in the admin space for Backend creation
51+
/// // Here we re-expose them in the admin space for GET operations, adding the PROP_BACKEND_TYPE entry.
52+
/// let mut p = properties.clone();
53+
/// p.insert(PROP_BACKEND_TYPE.into(), "my_backend_type".into());
54+
/// let admin_status = utils::properties_to_json_value(&p);
55+
/// Ok(Box::new(MyBackend { admin_status }))
3856
/// }
3957
///
4058
/// // Your Backend implementation
41-
/// pub struct MyBackend {}
59+
/// struct MyBackend {
60+
/// admin_status: Value,
61+
/// }
4262
///
4363
/// #[async_trait]
4464
/// impl Backend for MyBackend {
4565
/// async fn get_admin_status(&self) -> Value {
46-
/// // TODO: possibly add more properties in returned Value for more information about this storage
47-
/// Value::Json(r#"{"kind"="some kind"}"#.to_string())
66+
/// // This operation is called on GET operation on the admin space for the Backend
67+
/// // Here we reply with a static status (containing the configuration properties).
68+
/// // But we could add dynamic properties for Backend monitoring.
69+
/// self.admin_status.clone()
4870
/// }
4971
///
5072
/// async fn create_storage(&mut self, properties: Properties) -> ZResult<Box<dyn Storage>> {
@@ -67,6 +89,9 @@ pub const STORAGE_PATH_EXPR_PROPERTY: &str = "path_expr";
6789
///
6890
/// impl MyStorage {
6991
/// async fn new(properties: Properties) -> ZResult<MyStorage> {
92+
/// // The properties are the ones passed via a PUT in the admin space for Storage creation.
93+
/// // They contain at least a PROP_STORAGE_PATH_EXPR entry (i.e. "path_expr").
94+
/// // Here we choose to re-expose them as they are in the admin space for GET operations.
7095
/// let admin_status = utils::properties_to_json_value(&properties);
7196
/// Ok(MyStorage { admin_status })
7297
/// }
@@ -75,7 +100,9 @@ pub const STORAGE_PATH_EXPR_PROPERTY: &str = "path_expr";
75100
/// #[async_trait]
76101
/// impl Storage for MyStorage {
77102
/// async fn get_admin_status(&self) -> Value {
78-
/// // TODO: possibly add more properties in returned Value for more information about this storage
103+
/// // This operation is called on GET operation on the admin space for the Storage
104+
/// // Here we reply with a static status (containing the configuration properties).
105+
/// // But we could add dynamic properties for Storage monitoring.
79106
/// self.admin_status.clone()
80107
/// }
81108
///
@@ -87,11 +114,11 @@ pub const STORAGE_PATH_EXPR_PROPERTY: &str = "path_expr";
87114
/// info.kind.map_or(ChangeKind::PUT, ChangeKind::from),
88115
/// match &info.timestamp {
89116
/// Some(ts) => ts.clone(),
90-
/// None => new_reception_timestamp(),
117+
/// None => zenoh::utils::new_reception_timestamp(),
91118
/// },
92119
/// )
93120
/// } else {
94-
/// (ChangeKind::PUT, new_reception_timestamp())
121+
/// (ChangeKind::PUT, zenoh::utils::new_reception_timestamp())
95122
/// };
96123
/// // Store or delete the sample depending the ChangeKind
97124
/// match kind {
@@ -106,7 +133,7 @@ pub const STORAGE_PATH_EXPR_PROPERTY: &str = "path_expr";
106133
/// let _key = sample.res_name;
107134
/// // TODO:
108135
/// // - check if timestamp is newer than the stored one for the same key
109-
/// // - if yes: mark (key, sample) as deleted (possibly scheduling definitive removal for later)
136+
/// // - if yes: mark key as deleted (possibly scheduling definitive removal for later)
110137
/// // - if not: drop the sample
111138
/// }
112139
/// ChangeKind::PATCH => {
@@ -219,14 +246,3 @@ impl TryFrom<&Query> for Selector {
219246
Selector::try_from(&q.q)
220247
}
221248
}
222-
223-
/// Generates a reception timestamp with id=0x00
224-
pub fn new_reception_timestamp() -> Timestamp {
225-
use std::time::{SystemTime, UNIX_EPOCH};
226-
use zenoh::TimestampID;
227-
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
228-
Timestamp::new(
229-
now.into(),
230-
TimestampID::new(1, [0u8; TimestampID::MAX_SIZE]),
231-
)
232-
}

plugins/zenoh-storages/src/backends_mgt.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@ use log::{debug, error, trace, warn};
2020
use std::collections::HashMap;
2121
use std::convert::TryFrom;
2222
use zenoh::{ChangeKind, Path, PathExpr, Selector, Value, ZError, ZErrorKind, ZResult, Zenoh};
23-
use zenoh_backend_traits::{IncomingDataInterceptor, OutgoingDataInterceptor};
23+
use zenoh_backend_traits::{
24+
IncomingDataInterceptor, OutgoingDataInterceptor, PROP_STORAGE_PATH_EXPR,
25+
};
2426
use zenoh_util::{zerror, zerror2};
2527

26-
pub(crate) const STORAGE_PATH_EXPR_PROPERTY: &str = "path_expr";
27-
2828
pub(crate) async fn start_backend(
2929
backend: Box<dyn zenoh_backend_traits::Backend>,
3030
admin_path: Path,
@@ -135,11 +135,11 @@ async fn create_and_start_storage(
135135
) -> ZResult<Sender<bool>> {
136136
trace!("Create storage {}", admin_path);
137137
if let Value::Properties(props) = value {
138-
let path_expr_str = props.get(STORAGE_PATH_EXPR_PROPERTY).ok_or_else(|| {
138+
let path_expr_str = props.get(PROP_STORAGE_PATH_EXPR).ok_or_else(|| {
139139
zerror2!(ZErrorKind::Other {
140140
descr: format!(
141141
"Can't create storage {}: no {} property",
142-
admin_path, STORAGE_PATH_EXPR_PROPERTY
142+
admin_path, PROP_STORAGE_PATH_EXPR
143143
)
144144
})
145145
})?;

plugins/zenoh-storages/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use log::{debug, error, warn};
2121
use std::collections::HashMap;
2222
use std::convert::TryFrom;
2323
use zenoh::{ChangeKind, Path, Properties, Selector, Value, ZError, ZErrorKind, ZResult, Zenoh};
24-
use zenoh_backend_traits::Backend;
24+
use zenoh_backend_traits::{Backend, PROP_STORAGE_PATH_EXPR};
2525
use zenoh_router::runtime::Runtime;
2626
use zenoh_util::{zerror, LibLoader};
2727

@@ -108,7 +108,7 @@ async fn run(runtime: Runtime, args: &'static ArgMatches<'_>) {
108108
mem_backend_path, MEMORY_STORAGE_NAME, i
109109
))
110110
.unwrap();
111-
let props = Properties::from([(STORAGE_PATH_EXPR_PROPERTY, path_expr)].as_ref());
111+
let props = Properties::from([(PROP_STORAGE_PATH_EXPR, path_expr)].as_ref());
112112
workspace
113113
.put(&storage_admin_path, Value::Properties(props))
114114
.await

plugins/zenoh-storages/src/memory_backend/mod.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@ use zenoh_backend_traits::*;
2424
use zenoh_util::collections::{Timed, TimedEvent, TimedHandle, Timer};
2525

2626
pub fn create_backend(_unused: Properties) -> ZResult<Box<dyn Backend>> {
27-
// For now admin status is static and only contains a "kind"
28-
let admin_status = Value::Json(r#"{"kind"="memory"}"#.to_string());
27+
// For now admin status is static and only contains a PROP_BACKEND_TYPE entry
28+
let properties = Properties::from(&[(PROP_BACKEND_TYPE, "memory")][..]);
29+
let admin_status = utils::properties_to_json_value(&properties);
2930
Ok(Box::new(MemoryBackend { admin_status }))
3031
}
3132

@@ -157,11 +158,11 @@ impl Storage for MemoryStorage {
157158
info.kind.map_or(ChangeKind::PUT, ChangeKind::from),
158159
match &info.timestamp {
159160
Some(ts) => ts.clone(),
160-
None => new_reception_timestamp(),
161+
None => utils::new_reception_timestamp(),
161162
},
162163
)
163164
} else {
164-
(ChangeKind::PUT, new_reception_timestamp())
165+
(ChangeKind::PUT, utils::new_reception_timestamp())
165166
};
166167
match kind {
167168
ChangeKind::PUT => match self.map.write().await.entry(sample.res_name.clone()) {

zenoh/src/utils.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,27 @@
1111
// Contributors:
1212
// ADLINK zenoh team, <[email protected]>
1313
//
14-
use crate::{Properties, Value};
1514

15+
//! Some useful operations for the zenoh API.
16+
17+
use crate::{Properties, Timestamp, TimestampID, Value};
18+
19+
/// Generates a reception [`Timestamp`] with id=0x00.
20+
/// This operation should be called if a timestamp is required for an incoming [`zenoh::net::Sample`](crate::net::Sample)
21+
/// that doesn't contain any data_info or timestamp within its data_info.
22+
pub fn new_reception_timestamp() -> Timestamp {
23+
use std::time::{SystemTime, UNIX_EPOCH};
24+
25+
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
26+
Timestamp::new(
27+
now.into(),
28+
TimestampID::new(1, [0u8; TimestampID::MAX_SIZE]),
29+
)
30+
}
31+
32+
/// Convert a set of [`Properties`] into a [`Value::Json`].
33+
/// For instance such Properties: `[("k1", "v1"), ("k2, v2")]`
34+
/// are converted into such Json: `{ "k1": "v1", "k2": "v2" }`
1635
pub fn properties_to_json_value(props: &Properties) -> Value {
1736
let json_map = props
1837
.iter()

0 commit comments

Comments
 (0)