@@ -1005,12 +1005,14 @@ mod tests {
1005
1005
//! An iroh node that just has the blobs transport
1006
1006
use std:: { path:: Path , sync:: Arc } ;
1007
1007
1008
- use iroh_net:: { NodeAddr , NodeId } ;
1009
- use quic_rpc :: transport :: { Connector , Listener } ;
1008
+ use iroh_net:: { Endpoint , NodeAddr , NodeId } ;
1009
+ use iroh_router :: Router ;
1010
1010
use tokio_util:: task:: AbortOnDropHandle ;
1011
1011
1012
1012
use super :: RpcService ;
1013
1013
use crate :: {
1014
+ downloader:: Downloader ,
1015
+ net_protocol:: Blobs ,
1014
1016
provider:: { CustomEventSender , EventSender } ,
1015
1017
rpc:: client:: { blobs, tags} ,
1016
1018
util:: local_pool:: LocalPool ,
@@ -1054,40 +1056,20 @@ mod tests {
1054
1056
1055
1057
/// Spawns the node
1056
1058
pub async fn spawn ( self ) -> anyhow:: Result < Node > {
1057
- let ( client, router, rpc_task, _local_pool) = self . setup_router ( ) . await ?;
1058
- Ok ( Node {
1059
- router,
1060
- client,
1061
- _rpc_task : AbortOnDropHandle :: new ( rpc_task) ,
1062
- _local_pool,
1063
- } )
1064
- }
1065
-
1066
- async fn setup_router (
1067
- self ,
1068
- ) -> anyhow:: Result < (
1069
- RpcClient ,
1070
- iroh_router:: Router ,
1071
- tokio:: task:: JoinHandle < ( ) > ,
1072
- LocalPool ,
1073
- ) > {
1074
1059
let store = self . store ;
1075
1060
let events = self . events ;
1076
1061
let endpoint = self
1077
1062
. endpoint
1078
- . unwrap_or_else ( || iroh_net :: Endpoint :: builder ( ) . discovery_n0 ( ) )
1063
+ . unwrap_or_else ( || Endpoint :: builder ( ) . discovery_n0 ( ) )
1079
1064
. bind ( )
1080
1065
. await ?;
1081
1066
let local_pool = LocalPool :: single ( ) ;
1082
- let mut router = iroh_router :: Router :: builder ( endpoint. clone ( ) ) ;
1067
+ let mut router = Router :: builder ( endpoint. clone ( ) ) ;
1083
1068
1084
1069
// Setup blobs
1085
- let downloader = crate :: downloader:: Downloader :: new (
1086
- store. clone ( ) ,
1087
- endpoint. clone ( ) ,
1088
- local_pool. handle ( ) . clone ( ) ,
1089
- ) ;
1090
- let blobs = Arc :: new ( crate :: net_protocol:: Blobs :: new_with_events (
1070
+ let downloader =
1071
+ Downloader :: new ( store. clone ( ) , endpoint. clone ( ) , local_pool. handle ( ) . clone ( ) ) ;
1072
+ let blobs = Arc :: new ( Blobs :: new_with_events (
1091
1073
store. clone ( ) ,
1092
1074
local_pool. handle ( ) . clone ( ) ,
1093
1075
events,
@@ -1101,31 +1083,17 @@ mod tests {
1101
1083
1102
1084
// Setup RPC
1103
1085
let ( internal_rpc, controller) = quic_rpc:: transport:: flume:: channel ( 32 ) ;
1104
- let controller = controller. boxed ( ) ;
1105
- let internal_rpc = internal_rpc. boxed ( ) ;
1106
- let internal_rpc = quic_rpc:: RpcServer :: new ( internal_rpc) ;
1107
-
1108
- let rpc_server_task: tokio:: task:: JoinHandle < ( ) > = tokio:: task:: spawn ( async move {
1109
- loop {
1110
- let request = internal_rpc. accept ( ) . await ;
1111
- match request {
1112
- Ok ( accepting) => {
1113
- let blobs = blobs. clone ( ) ;
1114
- tokio:: task:: spawn ( async move {
1115
- let ( msg, chan) = accepting. read_first ( ) . await . unwrap ( ) ;
1116
- blobs. handle_rpc_request ( msg, chan) . await . unwrap ( ) ;
1117
- } ) ;
1118
- }
1119
- Err ( err) => {
1120
- tracing:: warn!( "rpc error: {:?}" , err) ;
1121
- }
1122
- }
1123
- }
1086
+ let internal_rpc = quic_rpc:: RpcServer :: new ( internal_rpc) . boxed ( ) ;
1087
+ let _rpc_task = internal_rpc. spawn_accept_loop ( move |msg, chan| {
1088
+ blobs. clone ( ) . handle_rpc_request ( msg, chan)
1124
1089
} ) ;
1125
-
1126
- let client = quic_rpc:: RpcClient :: new ( controller) ;
1127
-
1128
- Ok ( ( client, router, rpc_server_task, local_pool) )
1090
+ let client = quic_rpc:: RpcClient :: new ( controller) . boxed ( ) ;
1091
+ Ok ( Node {
1092
+ router,
1093
+ client,
1094
+ _rpc_task,
1095
+ _local_pool : local_pool,
1096
+ } )
1129
1097
}
1130
1098
}
1131
1099
0 commit comments