Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ tracing = "0.1"

# rpc dependencies (optional)
nested_enum_utils = { version = "0.1.0", optional = true }
quic-rpc = { version = "0.15.0", optional = true }
quic-rpc = { version = "0.15.1", optional = true }
quic-rpc-derive = { version = "0.15.0", optional = true }
strum = { version = "0.26", optional = true }
serde-error = "0.1.3"
Expand Down
1 change: 1 addition & 0 deletions deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ license-files = [
[advisories]
ignore = [
"RUSTSEC-2024-0370", # unmaintained, no upgrade available
"RUSTSEC-2024-0384", # unmaintained, no upgrade available
]

[sources]
Expand Down
4 changes: 4 additions & 0 deletions src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ pub struct Gossip {
to_actor_tx: mpsc::Sender<ToActor>,
_actor_handle: Arc<AbortOnDropHandle<()>>,
max_message_size: usize,
#[cfg(feature = "rpc")]
pub(crate) rpc_handler: Arc<std::sync::OnceLock<crate::rpc::RpcHandler>>,
}

impl ProtocolHandler for Gossip {
Expand Down Expand Up @@ -143,6 +145,8 @@ impl Gossip {
to_actor_tx,
_actor_handle: Arc::new(AbortOnDropHandle::new(actor_handle)),
max_message_size,
#[cfg(feature = "rpc")]
rpc_handler: Default::default(),
}
}

Expand Down
40 changes: 34 additions & 6 deletions src/rpc.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,50 @@
//! Provides a rpc protocol as well as a client for the protocol
use proto::RpcService;
use quic_rpc::server::ChannelTypes;
use client::MemClient;
use proto::{Request, Response, RpcService};
use quic_rpc::{server::ChannelTypes, transport::flume::FlumeConnector, RpcClient, RpcServer};
use tokio_util::task::AbortOnDropHandle;

use crate::net::Gossip;
pub use crate::net::{Command as SubscribeUpdate, Event as SubscribeResponse};
pub mod client;
pub mod proto;

#[derive(Debug)]
pub(crate) struct RpcHandler {
/// Client to hand out
client: MemClient,
/// Handler task
_handler: AbortOnDropHandle<()>,
}

impl RpcHandler {
fn new(gossip: &Gossip) -> Self {
let gossip = gossip.clone();
let (listener, connector) = quic_rpc::transport::flume::channel(1);
let listener = RpcServer::new(listener);
let client = MemClient::new(RpcClient::new(connector));
let _handler = listener
.spawn_accept_loop(move |req, chan| gossip.clone().handle_rpc_request(req, chan));

Self { client, _handler }
}
}

impl Gossip {
/// Get an in-memory gossip client
pub fn client(&self) -> &client::Client<FlumeConnector<Response, Request>> {
let handler = self.rpc_handler.get_or_init(|| RpcHandler::new(self));
&handler.client
}

/// Handle a gossip request from the RPC server.
pub async fn handle_rpc_request<C: ChannelTypes<RpcService>>(
&self,
msg: crate::rpc::proto::Request,
self,
msg: Request,
chan: quic_rpc::server::RpcChannel<RpcService, C>,
) -> Result<(), quic_rpc::server::RpcServerError<C>> {
use quic_rpc::server::RpcServerError;

use crate::rpc::proto::Request::*;
use Request::*;
match msg {
Subscribe(msg) => {
let this = self.clone();
Expand Down
6 changes: 5 additions & 1 deletion src/rpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use anyhow::Result;
use futures_lite::{Stream, StreamExt};
use futures_util::{Sink, SinkExt};
use iroh_net::NodeId;
use quic_rpc::client::BoxedConnector;
use quic_rpc::{client::BoxedConnector, transport::flume::FlumeConnector};

use crate::{
net::{Command as SubscribeUpdate, Event as SubscribeResponse},
Expand All @@ -22,6 +22,10 @@ pub struct Client<C = BoxedConnector<RpcService>> {
pub(super) rpc: quic_rpc::RpcClient<RpcService, C>,
}

/// Type alias for a memory-backed client.
pub type MemClient =
Client<FlumeConnector<crate::rpc::proto::Response, crate::rpc::proto::Request>>;

/// Options for subscribing to a gossip topic.
#[derive(Debug, Clone)]
pub struct SubscribeOpts {
Expand Down
Loading