@@ -13,6 +13,7 @@ use candid::{
1313 utils:: check_unique,
1414 Principal , TypeEnv ,
1515} ;
16+ use futures:: future:: try_join_all;
1617use std:: collections:: BTreeMap ;
1718
1819#[ derive( Debug , Clone ) ]
@@ -24,6 +25,9 @@ pub enum Exp {
2425 args : Option < Vec < Exp > > ,
2526 mode : CallMode ,
2627 } ,
28+ ParCall {
29+ calls : Vec < FuncCall > ,
30+ } ,
2731 Decode {
2832 method : Option < Method > ,
2933 blob : Box < Exp > ,
@@ -57,12 +61,18 @@ pub enum CallMode {
5761 Proxy ( String ) ,
5862}
5963#[ derive( Debug , Clone ) ]
64+ pub struct FuncCall {
65+ pub method : Method ,
66+ pub args : Vec < Exp > ,
67+ }
68+ #[ derive( Debug , Clone ) ]
6069pub struct Field {
6170 pub id : Label ,
6271 pub val : Exp ,
6372}
6473impl Exp {
6574 pub fn is_call ( & self ) -> bool {
75+ // Used to decide if we want to report profiling numbers. Ignore par_call for now
6676 matches ! (
6777 self ,
6878 Exp :: Call {
@@ -568,6 +578,42 @@ impl Exp {
568578 } ;
569579 args_to_value ( args)
570580 }
581+ Exp :: ParCall { calls } => {
582+ let mut futures = Vec :: with_capacity ( calls. len ( ) ) ;
583+ for call in calls {
584+ let mut args = Vec :: with_capacity ( call. args . len ( ) ) ;
585+ for arg in call. args . into_iter ( ) {
586+ args. push ( arg. eval ( helper) ?) ;
587+ }
588+ let args = IDLArgs { args } ;
589+ let info = call. method . get_info ( helper, false ) ?;
590+ let bytes = if let Some ( ( env, func) ) = & info. signature {
591+ args. to_bytes_with_types ( env, & func. args ) ?
592+ } else {
593+ args. to_bytes ( ) ?
594+ } ;
595+ let method = & call. method . method ;
596+ let effective_id = get_effective_canister_id ( info. canister_id , method, & bytes) ?;
597+ let mut builder = helper. agent . update ( & info. canister_id , method) ;
598+ builder = builder
599+ . with_arg ( bytes)
600+ . with_effective_canister_id ( effective_id) ;
601+ let call_future = async move {
602+ let res = builder. call_and_wait ( ) . await ?;
603+ if let Some ( ( env, func) ) = & info. signature {
604+ Ok ( IDLArgs :: from_bytes_with_types ( & res, env, & func. rets ) ?)
605+ } else {
606+ Ok ( IDLArgs :: from_bytes ( & res) ?)
607+ }
608+ } ;
609+ futures. push ( call_future) ;
610+ }
611+ let res = parallel_calls ( futures) ?;
612+ let res = IDLArgs {
613+ args : res. into_iter ( ) . map ( args_to_value) . collect ( ) ,
614+ } ;
615+ args_to_value ( res)
616+ }
571617 Exp :: Call { method, args, mode } => {
572618 let args = if let Some ( args) = args {
573619 let mut res = Vec :: with_capacity ( args. len ( ) ) ;
@@ -866,6 +912,13 @@ pub fn apply_func(helper: &MyHelper, func: &str, args: Vec<IDLValue>) -> Result<
866912 }
867913 }
868914}
915+ #[ tokio:: main( flavor = "multi_thread" , worker_threads = 10 ) ]
916+ async fn parallel_calls (
917+ futures : Vec < impl std:: future:: Future < Output = anyhow:: Result < IDLArgs > > > ,
918+ ) -> anyhow:: Result < Vec < IDLArgs > > {
919+ let res = try_join_all ( futures) . await ?;
920+ Ok ( res)
921+ }
869922#[ tokio:: main]
870923async fn call (
871924 helper : & MyHelper ,
0 commit comments