Skip to content
This repository was archived by the owner on Oct 23, 2022. It is now read-only.

Commit 72ff94c

Browse files
Joonas KoivunenDavid Craven
authored andcommitted
wip salvage
This is originally from the non-merging branch by David in #126. Co-authored-by: David Craven <[email protected]>
1 parent 44e91e2 commit 72ff94c

File tree

7 files changed

+282
-9
lines changed

7 files changed

+282
-9
lines changed

bitswap/src/block.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,8 @@ impl Block {
1818
pub fn data(&self) -> &[u8] {
1919
&self.data
2020
}
21+
22+
pub fn to_vec(self) -> Vec<u8> {
23+
self.data.into()
24+
}
2125
}

http/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,5 @@ percent-encoding = "*"
2727
openssl = "0.10"
2828

2929
prost = "0.6.1"
30+
cid = "0.4"
31+
multihash = "0.10"

http/src/v0.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ pub mod id;
55
pub mod pubsub;
66
pub mod swarm;
77
pub mod version;
8+
pub mod block;
9+
pub mod bitswap;
810

911
pub mod support;
1012
pub use support::recover_as_message_response;
@@ -32,7 +34,10 @@ where
3234
// Placeholder paths
3335
// https://docs.rs/warp/0.2.2/warp/macro.path.html#path-prefixes
3436
.or(warp::path!("add").and_then(not_implemented))
35-
.or(warp::path!("bitswap" / ..).and_then(not_implemented))
37+
.or(bitswap::wantlist(ipfs))
38+
.or(bitswap::stat(ipfs))
39+
.or(block::get(ipfs))
40+
.or(block::put(ipfs))
3641
.or(warp::path!("block" / ..).and_then(not_implemented))
3742
.or(warp::path!("bootstrap" / ..).and_then(not_implemented))
3843
.or(warp::path!("config" / ..).and_then(not_implemented))

http/src/v0/bitswap.rs

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
use crate::v0::support::{with_ipfs, InvalidPeerId, StringError};
2+
use ipfs::{BitswapStats, Ipfs, IpfsTypes};
3+
use serde::{Deserialize, Serialize};
4+
use serde_json::{json, Value};
5+
use warp::{path, query, reply, Filter, Rejection, Reply};
6+
7+
#[derive(Debug, Deserialize)]
8+
pub struct WantlistQuery {
9+
peer: Option<String>,
10+
}
11+
12+
#[derive(Debug, Serialize)]
13+
#[serde(rename_all = "PascalCase")]
14+
pub struct WantlistResponse {
15+
keys: Vec<Value>,
16+
}
17+
18+
async fn wantlist_query<T: IpfsTypes>(ipfs: Ipfs<T>, query: WantlistQuery) -> Result<impl Reply, Rejection> {
19+
let peer_id = if let Some(peer_id) = query.peer {
20+
let peer_id = peer_id.parse().map_err(|_| InvalidPeerId)?;
21+
Some(peer_id)
22+
} else {
23+
None
24+
};
25+
let cids = ipfs
26+
.bitswap_wantlist(peer_id)
27+
.await
28+
.map_err(StringError::from)?;
29+
let keys = cids
30+
.into_iter()
31+
.map(|(cid, _)| json!({"/": cid.to_string()}))
32+
.collect();
33+
let response = WantlistResponse { keys };
34+
Ok(reply::json(&response))
35+
}
36+
37+
pub fn wantlist<T: IpfsTypes>(ipfs: &Ipfs<T>) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
38+
path!("bitswap" / "wantlist")
39+
.and(with_ipfs(ipfs))
40+
.and(query::<WantlistQuery>())
41+
.and_then(wantlist_query)
42+
}
43+
44+
#[derive(Debug, Serialize)]
45+
#[serde(rename_all = "PascalCase")]
46+
pub struct StatResponse {
47+
blocks_received: u64,
48+
blocks_sent: u64,
49+
data_received: u64,
50+
data_sent: u64,
51+
dup_blks_received: u64,
52+
dup_data_received: u64,
53+
messages_received: u64,
54+
peers: Vec<String>,
55+
provide_buf_len: i32,
56+
wantlist: Vec<Value>,
57+
}
58+
59+
impl From<BitswapStats> for StatResponse {
60+
fn from(stats: BitswapStats) -> Self {
61+
let wantlist = stats
62+
.wantlist
63+
.into_iter()
64+
.map(|(cid, _)| json!({"/": cid.to_string()}))
65+
.collect();
66+
let peers = stats
67+
.peers
68+
.into_iter()
69+
.map(|peer_id| peer_id.to_string())
70+
.collect();
71+
Self {
72+
blocks_received: stats.blocks_received,
73+
blocks_sent: stats.blocks_sent,
74+
data_received: stats.data_received,
75+
data_sent: stats.data_sent,
76+
dup_blks_received: stats.dup_blks_received,
77+
dup_data_received: stats.dup_data_received,
78+
peers,
79+
wantlist,
80+
messages_received: 0,
81+
provide_buf_len: 0,
82+
}
83+
}
84+
}
85+
86+
async fn stat_query<T: IpfsTypes>(ipfs: Ipfs<T>) -> Result<impl Reply, Rejection> {
87+
let stats: StatResponse = ipfs
88+
.bitswap_stats()
89+
.await
90+
.map_err(StringError::from)?
91+
.into();
92+
Ok(reply::json(&stats))
93+
}
94+
95+
pub fn stat<T: IpfsTypes>(ipfs: &Ipfs<T>) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
96+
path!("bitswap" / "stat")
97+
.and(with_ipfs(ipfs))
98+
.and_then(stat_query)
99+
}

http/src/v0/block.rs

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
use crate::v0::support::{with_ipfs, InvalidMultipartFormData, StringError};
2+
use cid::{Codec, Version};
3+
use futures::stream::StreamExt;
4+
use ipfs::{Cid, Ipfs, IpfsTypes};
5+
use serde::{Deserialize, Serialize};
6+
use warp::{http::Response, multipart, path, query, reply, Filter, Rejection, Reply, Buf};
7+
8+
#[derive(Debug, Deserialize)]
9+
pub struct GetQuery {
10+
arg: String,
11+
}
12+
13+
async fn get_query<T: IpfsTypes>(mut ipfs: Ipfs<T>, query: GetQuery) -> Result<impl Reply, Rejection> {
14+
let cid: Cid = query.arg.parse().map_err(StringError::from)?;
15+
let data = ipfs
16+
.get_block(&cid)
17+
.await
18+
.map_err(StringError::from)?
19+
.to_vec();
20+
21+
let response = Response::builder().body(data);
22+
Ok(response)
23+
}
24+
25+
pub fn get<T: IpfsTypes>(ipfs: &Ipfs<T>) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
26+
path!("block" / "get")
27+
.and(with_ipfs(ipfs))
28+
.and(query::<GetQuery>())
29+
.and_then(get_query)
30+
}
31+
32+
#[derive(Debug, Deserialize)]
33+
pub struct PutQuery {
34+
format: Option<String>,
35+
mhtype: Option<String>,
36+
version: Option<u8>,
37+
}
38+
39+
#[derive(Debug, Serialize)]
40+
#[serde(rename_all = "PascalCase")]
41+
pub struct PutResponse {
42+
key: String,
43+
size: usize,
44+
}
45+
46+
async fn put_query<T: IpfsTypes>(
47+
mut ipfs: Ipfs<T>,
48+
query: PutQuery,
49+
mut form: multipart::FormData,
50+
) -> Result<impl Reply, Rejection> {
51+
let format = match query
52+
.format
53+
.as_ref()
54+
.map(|s| s.as_str())
55+
.unwrap_or("dag-pb")
56+
{
57+
"dag-cbor" => Codec::DagCBOR,
58+
"dag-pb" => Codec::DagProtobuf,
59+
"dag-json" => Codec::DagJSON,
60+
"raw" => Codec::Raw,
61+
_ => return Err(StringError::from("unknown codec").into()),
62+
};
63+
let hasher = match query
64+
.mhtype
65+
.as_ref()
66+
.map(|s| s.as_str())
67+
.unwrap_or("sha2-256")
68+
{
69+
"sha2-256" => multihash::Sha2_256::digest,
70+
"sha2-512" => multihash::Sha2_512::digest,
71+
_ => return Err(StringError::from("unknown hash").into()),
72+
};
73+
let version = match query.version.unwrap_or(0) {
74+
0 => Version::V0,
75+
1 => Version::V1,
76+
_ => return Err(StringError::from("invalid cid version").into()),
77+
};
78+
let mut buf = form
79+
.next()
80+
.await
81+
.ok_or(InvalidMultipartFormData)?
82+
.map_err(|_| InvalidMultipartFormData)?
83+
.data()
84+
.await
85+
.ok_or(InvalidMultipartFormData)?
86+
.map_err(|_| InvalidMultipartFormData)?;
87+
let data = buf.to_bytes().as_ref().to_vec().into_boxed_slice();
88+
let digest = hasher(&data);
89+
let cid = Cid::new(version, format, digest).map_err(StringError::from)?;
90+
let response = PutResponse {
91+
key: cid.to_string(),
92+
size: data.len(),
93+
};
94+
let block = ipfs::Block { cid, data };
95+
ipfs.put_block(block).await.map_err(StringError::from)?;
96+
Ok(reply::json(&response))
97+
}
98+
99+
pub fn put<T: IpfsTypes>(ipfs: &Ipfs<T>) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
100+
path!("block" / "put")
101+
.and(with_ipfs(ipfs))
102+
.and(query::<PutQuery>())
103+
.and(multipart::form())
104+
.and_then(put_query)
105+
}
106+
107+
#[derive(Debug, Deserialize)]
108+
pub struct RmQuery {}
109+
110+
#[derive(Debug, Serialize)]
111+
#[serde(rename_all = "PascalCase")]
112+
pub struct RmResponse {}
113+
114+
async fn rm_query<T: IpfsTypes>(_ipfs: Ipfs<T>, _query: RmQuery) -> Result<impl Reply, Rejection> {
115+
//let _data = ipfs.put_block(query.cid).await
116+
// .map_err(|e| reject::custom(StringError::from(e)))?;
117+
let response = RmResponse {};
118+
Ok(reply::json(&response))
119+
}
120+
121+
pub fn rm<T: IpfsTypes>(ipfs: &Ipfs<T>) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
122+
path!("block" / "rm")
123+
.and(with_ipfs(ipfs))
124+
.and(query::<RmQuery>())
125+
.and_then(rm_query)
126+
}
127+
128+
pub fn stat<T: IpfsTypes>(ipfs: &Ipfs<T>) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
129+
path!("block" / "stat")
130+
.and(with_ipfs(ipfs))
131+
.and(query::<RmQuery>())
132+
.and_then(rm_query)
133+
}

http/src/v0/support.rs

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -69,32 +69,50 @@ impl warp::reject::Reject for NonUtf8Topic {}
6969
pub(crate) struct RequiredArgumentMissing(pub(crate) &'static [u8]);
7070
impl warp::reject::Reject for RequiredArgumentMissing {}
7171

72+
#[derive(Debug)]
73+
pub(crate) struct InvalidMultipartFormData;
74+
impl warp::reject::Reject for InvalidMultipartFormData {}
75+
impl From<InvalidMultipartFormData> for warp::Rejection {
76+
fn from(err: InvalidMultipartFormData) -> warp::Rejection {
77+
warp::reject::custom(err)
78+
}
79+
}
80+
7281
/// Marker for `warp` specific rejections when something is unimplemented
7382
#[derive(Debug)]
7483
pub(crate) struct NotImplemented;
7584
impl warp::reject::Reject for NotImplemented {}
76-
impl Into<warp::reject::Rejection> for NotImplemented {
77-
fn into(self) -> warp::reject::Rejection {
78-
warp::reject::custom(self)
85+
impl From<NotImplemented> for warp::Rejection {
86+
fn from(err: NotImplemented) -> warp::Rejection {
87+
warp::reject::custom(err)
7988
}
8089
}
8190

8291
/// PeerId parsing error, details from `libp2p::identity::ParseError` are lost.
8392
#[derive(Debug)]
8493
pub(crate) struct InvalidPeerId;
8594
impl warp::reject::Reject for InvalidPeerId {}
95+
impl From<InvalidPeerId> for warp::Rejection {
96+
fn from(err: InvalidPeerId) -> warp::Rejection {
97+
warp::reject::custom(err)
98+
}
99+
}
86100

87101
/// Default placeholder for ipfs::Error but once we get more typed errors we could start making
88102
/// them more readable, if needed.
103+
// TODO: needs to be considered if this is even needed..
89104
#[derive(Debug)]
90105
pub(crate) struct StringError(Cow<'static, str>);
91106
impl warp::reject::Reject for StringError {}
107+
impl From<StringError> for warp::Rejection {
108+
fn from(err: StringError) -> warp::Rejection {
109+
warp::reject::custom(err)
110+
}
111+
}
92112

93-
// FIXME: it's a bit questionable to keep this but in the beginning it might help us glide in the
94-
// right direction.
95-
impl From<ipfs::Error> for StringError {
96-
fn from(e: ipfs::Error) -> Self {
97-
Self(format!("{}", e).into())
113+
impl<D: std::fmt::Display> From<D> for StringError {
114+
fn from(d: D) -> Self {
115+
Self(format!("{}", d).into())
98116
}
99117
}
100118

src/lib.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -632,6 +632,18 @@ impl<Types: SwarmTypes> Future for IpfsFuture<Types> {
632632
}
633633
}
634634

635+
#[derive(Clone, Debug, Eq, PartialEq)]
636+
pub struct BitswapStats {
637+
pub blocks_sent: u64,
638+
pub data_sent: u64,
639+
pub blocks_received: u64,
640+
pub data_received: u64,
641+
pub dup_blks_received: u64,
642+
pub dup_data_received: u64,
643+
pub peers: Vec<PeerId>,
644+
pub wantlist: Vec<(Cid, bitswap::Priority)>,
645+
}
646+
635647
#[doc(hidden)]
636648
pub use node::Node;
637649

0 commit comments

Comments
 (0)