diff --git a/influxdb/src/integrations/serde_integration/de.rs b/influxdb/src/integrations/serde_integration/de.rs new file mode 100644 index 0000000..0a1343c --- /dev/null +++ b/influxdb/src/integrations/serde_integration/de.rs @@ -0,0 +1,346 @@ +use super::Series; +use serde::de::{ + value, DeserializeSeed, Deserializer, Error, IntoDeserializer, MapAccess, SeqAccess, Visitor, +}; +use serde::Deserialize; +use std::fmt; +use std::marker::PhantomData; + +// Based on https://serde.rs/deserialize-struct.html +impl<'de, T> Deserialize<'de> for Series +where + T: Deserialize<'de>, +{ + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + // Field name deserializer + #[derive(Deserialize)] + #[serde(field_identifier, rename_all = "lowercase")] + enum Field { + Name, + Columns, + Values, + }; + + struct SeriesVisitor { + _inner_type: PhantomData, + }; + + impl<'de, T> Visitor<'de> for SeriesVisitor + where + T: Deserialize<'de>, + { + type Value = Series; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("struct Series") + } + + fn visit_map(self, mut map: V) -> Result, V::Error> + where + V: MapAccess<'de>, + { + let mut name = None; + let mut columns: Option> = None; + let mut values: Option> = None; + while let Some(key) = map.next_key()? { + match key { + Field::Name => { + if name.is_some() { + return Err(Error::duplicate_field("name")); + } + name = Some(map.next_value()?); + } + Field::Columns => { + if columns.is_some() { + return Err(Error::duplicate_field("columns")); + } + columns = Some(map.next_value()?); + } + Field::Values => { + if values.is_some() { + return Err(Error::duplicate_field("values")); + } + // Error out if "values" is encountered before "columns" + // Hopefully, InfluxDB never does this. + if columns.is_none() { + return Err(Error::custom( + "series values encountered before columns", + )); + } + // Deserialize using a HeaderVec deserializer + // seeded with the headers from the "columns" field + values = Some(map.next_value_seed(HeaderVec:: { + header: columns.as_ref().unwrap(), + _inner_type: PhantomData, + })?); + } + } + } + let name = name.ok_or_else(|| Error::missing_field("name"))?; + let values = values.ok_or_else(|| Error::missing_field("values"))?; + Ok(Series { name, values }) + } + } + + const FIELDS: &[&str] = &["name", "values"]; + deserializer.deserialize_struct( + "Series", + FIELDS, + SeriesVisitor:: { + _inner_type: PhantomData, + }, + ) + } +} + +// Deserializer that takes a header as a seed +// and deserializes an array of arrays into a +// Vec of map-like values using the header as +// keys and the values as values. +struct HeaderVec<'h, T> { + header: &'h [String], + _inner_type: PhantomData, +} + +impl<'de, 'h, T> DeserializeSeed<'de> for HeaderVec<'h, T> +where + T: Deserialize<'de>, +{ + type Value = Vec; + + fn deserialize(self, deserializer: D) -> Result + where + D: Deserializer<'de>, + { + struct HeaderVecVisitor<'h, T> { + header: &'h [String], + _inner_type: PhantomData, + } + impl<'de, 'h, T> Visitor<'de> for HeaderVecVisitor<'h, T> + where + T: Deserialize<'de>, + { + type Value = Vec; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + write!(formatter, "an array of arrays") + } + + fn visit_seq(self, mut seq: A) -> Result, A::Error> + where + A: SeqAccess<'de>, + { + let mut vec = Vec::new(); + + while let Some(v) = seq.next_element_seed(RowWithHeader { + header: self.header, + _inner_type: PhantomData, + })? { + vec.push(v); + } + + Ok(vec) + } + } + deserializer.deserialize_seq(HeaderVecVisitor { + header: self.header, + _inner_type: PhantomData, + }) + } +} + +// Deserializer that takes a header as a seed +// and deserializes an array into a map-like +// value using the header as keys and the values +// as values. +struct RowWithHeader<'h, T> { + header: &'h [String], + _inner_type: PhantomData, +} + +impl<'de, 'h, T> DeserializeSeed<'de> for RowWithHeader<'h, T> +where + T: Deserialize<'de>, +{ + type Value = T; + + fn deserialize(self, deserializer: D) -> Result + where + D: Deserializer<'de>, + { + struct RowWithHeaderVisitor<'h, T> { + header: &'h [String], + _inner: PhantomData T>, + } + + impl<'de, 'h, T> Visitor<'de> for RowWithHeaderVisitor<'h, T> + where + T: Deserialize<'de>, + { + type Value = T; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("array") + } + + fn visit_seq(self, seq: A) -> Result + where + A: SeqAccess<'de>, + { + // `MapAccessDeserializer` is a wrapper that turns a `MapAccess` + // into a `Deserializer`, allowing it to be used as the input to T's + // `Deserialize` implementation. T then deserializes itself using + // the entries from the map visitor. + Deserialize::deserialize(value::MapAccessDeserializer::new(HeaderMapAccess { + header: self.header, + field: 0, + data: seq, + })) + } + } + + deserializer.deserialize_seq(RowWithHeaderVisitor { + header: self.header, + _inner: PhantomData, + }) + } +} + +// MapAccess implementation that holds a reference to +// the header for keys and a serde sequence for values. +// When asked for a key, it returns the next header and +// advances its header field index. When asked for a value, +// it tries to deserialize the next element in the serde +// sequence into the desired type, and returns an error +// if no element is returned (the sequence is exhausted). +struct HeaderMapAccess<'h, A> { + header: &'h [String], + field: usize, + data: A, +} + +impl<'de, 'h, A> MapAccess<'de> for HeaderMapAccess<'h, A> +where + A: SeqAccess<'de>, +{ + type Error = >::Error; + + fn next_key_seed>( + &mut self, + seed: K, + ) -> Result, Self::Error> { + let field = match self.header.get(self.field) { + None => return Ok(None), + Some(field) => field, + }; + self.field += 1; + seed.deserialize(field.clone().into_deserializer()) + .map(Some) + } + + fn next_value_seed>( + &mut self, + seed: K, + ) -> Result { + match self.data.next_element_seed(seed)? { + Some(value) => Ok(value), + None => Err(Error::custom("next_value_seed called but no value")), + } + } +} + +#[cfg(test)] +mod tests { + use super::Series; + use std::borrow::Cow; + use std::collections::HashMap; + + const TEST_DATA: &str = r#" + { + "name": "series_name", + "columns": ["foo", "bar"], + "values": [ + ["foo_a", "bar_a"], + ["foo_b", "bar_b"] + ] + } + "#; + + // we can derive all the impls we want here + #[derive(Debug, PartialEq, Eq)] + struct EqSeries { + pub name: String, + pub values: Vec, + } + + impl From> for EqSeries { + fn from(Series { name, values }: Series) -> Self { + EqSeries { name, values } + } + } + + #[test] + fn test_deserialize_cow() { + // Unfortunately, Cow is not automatically borrowed, + // so this is basically equivalent to String, String + let result = serde_json::from_str::, Cow>>>(TEST_DATA); + assert!(result.is_ok()); + assert_eq!( + EqSeries::from(result.unwrap()), + EqSeries { + name: "series_name".into(), + values: vec![ + { + let mut h = std::collections::HashMap::new(); + h.insert("foo".into(), "foo_a".into()); + h.insert("bar".into(), "bar_a".into()); + h + }, + { + let mut h = std::collections::HashMap::new(); + h.insert("foo".into(), "foo_b".into()); + h.insert("bar".into(), "bar_b".into()); + h + }, + ], + }, + ); + } + + #[test] + fn test_deserialize_borrowed() { + use serde::Deserialize; + + // Deserializing a string that cannot be passed through + // without escaping will result in an error like this: + // `invalid type: string "\n", expected a borrowed string at line 6 column 43` + // but if it doesn't need escaping it's fine. + #[derive(Deserialize, Debug, PartialEq, Eq)] + struct BorrowingStruct<'a> { + foo: &'a str, + bar: &'a str, + } + + let result = serde_json::from_str::>(TEST_DATA); + assert!(result.is_ok(), "{}", result.unwrap_err()); + assert_eq!( + EqSeries::from(result.unwrap()), + EqSeries { + name: "series_name".into(), + values: vec![ + BorrowingStruct { + foo: "foo_a", + bar: "bar_a", + }, + BorrowingStruct { + foo: "foo_b", + bar: "bar_b", + }, + ], + }, + ); + } +} diff --git a/influxdb/src/integrations/serde_integration.rs b/influxdb/src/integrations/serde_integration/mod.rs similarity index 99% rename from influxdb/src/integrations/serde_integration.rs rename to influxdb/src/integrations/serde_integration/mod.rs index bc52dfd..91dddde 100644 --- a/influxdb/src/integrations/serde_integration.rs +++ b/influxdb/src/integrations/serde_integration/mod.rs @@ -46,6 +46,8 @@ //! # } //! ``` +mod de; + use reqwest::{Client as ReqwestClient, StatusCode, Url}; use serde::{de::DeserializeOwned, Deserialize}; @@ -84,7 +86,7 @@ pub struct Return { pub series: Vec>, } -#[derive(Deserialize, Debug)] +#[derive(Debug)] /// Represents a returned series from InfluxDB pub struct Series { pub name: String, diff --git a/influxdb/tests/integration_tests.rs b/influxdb/tests/integration_tests.rs index 20c0c8d..44d519a 100644 --- a/influxdb/tests/integration_tests.rs +++ b/influxdb/tests/integration_tests.rs @@ -274,8 +274,10 @@ async fn test_write_and_read_option() { #[derive(Deserialize, Debug, PartialEq)] struct Weather { time: String, - temperature: i32, + // different order to verify field names + // are being used instead of just order wind_strength: Option, + temperature: i32, } let query =