diff --git a/Cargo.lock b/Cargo.lock index d4204da7..fbc0415c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3086,9 +3086,9 @@ dependencies = [ [[package]] name = "quic-rpc" -version = "0.15.0" +version = "0.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61e131f594054d27d077162815db3b5e9ddd76a28fbb9091b68095971e75c286" +checksum = "dc623a188942fc875926f7baeb2cb08ed4288b64f29072656eb051e360ee7623" dependencies = [ "anyhow", "derive_more", @@ -3102,6 +3102,7 @@ dependencies = [ "serde", "slab", "tokio", + "tokio-util", "tracing", ] diff --git a/Cargo.toml b/Cargo.toml index c5b4f817..8ff53308 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,7 +54,7 @@ tracing = "0.1" # rpc dependencies (optional) nested_enum_utils = { version = "0.1.0", optional = true } -quic-rpc = { version = "0.15.0", optional = true } +quic-rpc = { version = "0.15.1", optional = true } quic-rpc-derive = { version = "0.15.0", optional = true } strum = { version = "0.26", optional = true } serde-error = "0.1.3" diff --git a/deny.toml b/deny.toml index 1b3068ff..6f7ba486 100644 --- a/deny.toml +++ b/deny.toml @@ -34,6 +34,7 @@ license-files = [ [advisories] ignore = [ "RUSTSEC-2024-0370", # unmaintained, no upgrade available + "RUSTSEC-2024-0384", # unmaintained, no upgrade available ] [sources] diff --git a/src/net.rs b/src/net.rs index ca916535..3aaf8a7f 100644 --- a/src/net.rs +++ b/src/net.rs @@ -92,6 +92,8 @@ pub struct Gossip { to_actor_tx: mpsc::Sender, _actor_handle: Arc>, max_message_size: usize, + #[cfg(feature = "rpc")] + pub(crate) rpc_handler: Arc>, } impl ProtocolHandler for Gossip { @@ -143,6 +145,8 @@ impl Gossip { to_actor_tx, _actor_handle: Arc::new(AbortOnDropHandle::new(actor_handle)), max_message_size, + #[cfg(feature = "rpc")] + rpc_handler: Default::default(), } } diff --git a/src/rpc.rs b/src/rpc.rs index 5fc8000b..874a355b 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -1,22 +1,50 @@ //! Provides a rpc protocol as well as a client for the protocol -use proto::RpcService; -use quic_rpc::server::ChannelTypes; +use client::MemClient; +use proto::{Request, Response, RpcService}; +use quic_rpc::{server::ChannelTypes, transport::flume::FlumeConnector, RpcClient, RpcServer}; +use tokio_util::task::AbortOnDropHandle; use crate::net::Gossip; pub use crate::net::{Command as SubscribeUpdate, Event as SubscribeResponse}; pub mod client; pub mod proto; +#[derive(Debug)] +pub(crate) struct RpcHandler { + /// Client to hand out + client: MemClient, + /// Handler task + _handler: AbortOnDropHandle<()>, +} + +impl RpcHandler { + fn new(gossip: &Gossip) -> Self { + let gossip = gossip.clone(); + let (listener, connector) = quic_rpc::transport::flume::channel(1); + let listener = RpcServer::new(listener); + let client = MemClient::new(RpcClient::new(connector)); + let _handler = listener + .spawn_accept_loop(move |req, chan| gossip.clone().handle_rpc_request(req, chan)); + + Self { client, _handler } + } +} + impl Gossip { + /// Get an in-memory gossip client + pub fn client(&self) -> &client::Client> { + let handler = self.rpc_handler.get_or_init(|| RpcHandler::new(self)); + &handler.client + } + /// Handle a gossip request from the RPC server. pub async fn handle_rpc_request>( - &self, - msg: crate::rpc::proto::Request, + self, + msg: Request, chan: quic_rpc::server::RpcChannel, ) -> Result<(), quic_rpc::server::RpcServerError> { use quic_rpc::server::RpcServerError; - - use crate::rpc::proto::Request::*; + use Request::*; match msg { Subscribe(msg) => { let this = self.clone(); diff --git a/src/rpc/client.rs b/src/rpc/client.rs index 458a903a..b384ec9e 100644 --- a/src/rpc/client.rs +++ b/src/rpc/client.rs @@ -8,7 +8,7 @@ use anyhow::Result; use futures_lite::{Stream, StreamExt}; use futures_util::{Sink, SinkExt}; use iroh_net::NodeId; -use quic_rpc::client::BoxedConnector; +use quic_rpc::{client::BoxedConnector, transport::flume::FlumeConnector}; use crate::{ net::{Command as SubscribeUpdate, Event as SubscribeResponse}, @@ -22,6 +22,10 @@ pub struct Client> { pub(super) rpc: quic_rpc::RpcClient, } +/// Type alias for a memory-backed client. +pub type MemClient = + Client>; + /// Options for subscribing to a gossip topic. #[derive(Debug, Clone)] pub struct SubscribeOpts {