@@ -12,16 +12,14 @@ use {
12
12
tracer_packet_stats:: TracerPacketStats ,
13
13
unprocessed_packet_batches:: { self , * } ,
14
14
} ,
15
+ core:: iter:: repeat,
15
16
crossbeam_channel:: {
16
17
Receiver as CrossbeamReceiver , RecvTimeoutError , Sender as CrossbeamSender ,
17
18
} ,
18
19
histogram:: Histogram ,
19
20
itertools:: Itertools ,
20
21
min_max_heap:: MinMaxHeap ,
21
- solana_client:: {
22
- connection_cache:: ConnectionCache , tpu_connection:: TpuConnection ,
23
- udp_client:: UdpTpuConnection ,
24
- } ,
22
+ solana_client:: { connection_cache:: ConnectionCache , tpu_connection:: TpuConnection } ,
25
23
solana_entry:: entry:: hash_transactions,
26
24
solana_gossip:: { cluster_info:: ClusterInfo , contact_info:: ContactInfo } ,
27
25
solana_ledger:: blockstore_processor:: TransactionStatusSender ,
@@ -59,14 +57,15 @@ use {
59
57
} ,
60
58
transport:: TransportError ,
61
59
} ,
60
+ solana_streamer:: sendmmsg:: batch_send,
62
61
solana_transaction_status:: token_balances:: {
63
62
collect_token_balances, TransactionTokenBalancesSet ,
64
63
} ,
65
64
std:: {
66
65
cmp,
67
66
collections:: HashMap ,
68
67
env,
69
- net:: SocketAddr ,
68
+ net:: { SocketAddr , UdpSocket } ,
70
69
rc:: Rc ,
71
70
sync:: {
72
71
atomic:: { AtomicU64 , AtomicUsize , Ordering } ,
@@ -539,6 +538,7 @@ impl BankingStage {
539
538
forward_option : & ForwardOption ,
540
539
cluster_info : & ClusterInfo ,
541
540
poh_recorder : & Arc < Mutex < PohRecorder > > ,
541
+ socket : & UdpSocket ,
542
542
filter_forwarding_results : & FilterForwardingResults ,
543
543
data_budget : & DataBudget ,
544
544
banking_stage_stats : & BankingStageStats ,
@@ -600,21 +600,22 @@ impl BankingStage {
600
600
601
601
let mut measure = Measure :: start ( "banking_stage-forward-us" ) ;
602
602
603
- let conn = if let ForwardOption :: ForwardTpuVote = forward_option {
604
- // The vote must be forwarded using only UDP. Let's get the UDP connection.
603
+ let res = if let ForwardOption :: ForwardTpuVote = forward_option {
604
+ // The vote must be forwarded using only UDP.
605
605
banking_stage_stats
606
606
. forwarded_vote_count
607
607
. fetch_add ( packet_vec_len, Ordering :: Relaxed ) ;
608
- Arc :: new ( UdpTpuConnection :: new_from_addr ( addr) . into ( ) )
608
+ let pkts: Vec < _ > = packet_vec. into_iter ( ) . zip ( repeat ( addr) ) . collect ( ) ;
609
+ batch_send ( socket, & pkts) . map_err ( |err| err. into ( ) )
609
610
} else {
610
611
// All other transactions can be forwarded using QUIC, get_connection() will use
611
612
// system wide setting to pick the correct connection object.
612
613
banking_stage_stats
613
614
. forwarded_transaction_count
614
615
. fetch_add ( packet_vec_len, Ordering :: Relaxed ) ;
615
- connection_cache. get_connection ( & addr)
616
+ let conn = connection_cache. get_connection ( & addr) ;
617
+ conn. send_wire_transaction_batch_async ( packet_vec)
616
618
} ;
617
- let res = conn. send_wire_transaction_batch_async ( packet_vec) ;
618
619
619
620
measure. stop ( ) ;
620
621
inc_new_counter_info ! (
@@ -903,6 +904,7 @@ impl BankingStage {
903
904
#[ allow( clippy:: too_many_arguments) ]
904
905
fn process_buffered_packets (
905
906
my_pubkey : & Pubkey ,
907
+ socket : & UdpSocket ,
906
908
poh_recorder : & Arc < Mutex < PohRecorder > > ,
907
909
cluster_info : & ClusterInfo ,
908
910
buffered_packet_batches : & mut UnprocessedPacketBatches ,
@@ -988,6 +990,7 @@ impl BankingStage {
988
990
cluster_info,
989
991
buffered_packet_batches,
990
992
poh_recorder,
993
+ socket,
991
994
false ,
992
995
data_budget,
993
996
slot_metrics_tracker,
@@ -1009,6 +1012,7 @@ impl BankingStage {
1009
1012
cluster_info,
1010
1013
buffered_packet_batches,
1011
1014
poh_recorder,
1015
+ socket,
1012
1016
true ,
1013
1017
data_budget,
1014
1018
slot_metrics_tracker,
@@ -1032,6 +1036,7 @@ impl BankingStage {
1032
1036
cluster_info : & ClusterInfo ,
1033
1037
buffered_packet_batches : & mut UnprocessedPacketBatches ,
1034
1038
poh_recorder : & Arc < Mutex < PohRecorder > > ,
1039
+ socket : & UdpSocket ,
1035
1040
hold : bool ,
1036
1041
data_budget : & DataBudget ,
1037
1042
slot_metrics_tracker : & mut LeaderSlotMetricsTracker ,
@@ -1055,6 +1060,7 @@ impl BankingStage {
1055
1060
forward_option,
1056
1061
cluster_info,
1057
1062
poh_recorder,
1063
+ socket,
1058
1064
& filter_forwarding_result,
1059
1065
data_budget,
1060
1066
banking_stage_stats,
@@ -1105,6 +1111,7 @@ impl BankingStage {
1105
1111
connection_cache : Arc < ConnectionCache > ,
1106
1112
) {
1107
1113
let recorder = poh_recorder. lock ( ) . unwrap ( ) . recorder ( ) ;
1114
+ let socket = UdpSocket :: bind ( "0.0.0.0:0" ) . unwrap ( ) ;
1108
1115
let mut buffered_packet_batches = UnprocessedPacketBatches :: with_capacity ( batch_limit) ;
1109
1116
let mut banking_stage_stats = BankingStageStats :: new ( id) ;
1110
1117
let mut tracer_packet_stats = TracerPacketStats :: new ( id) ;
@@ -1121,6 +1128,7 @@ impl BankingStage {
1121
1128
let ( _, process_buffered_packets_time) = measure ! (
1122
1129
Self :: process_buffered_packets(
1123
1130
& my_pubkey,
1131
+ & socket,
1124
1132
poh_recorder,
1125
1133
cluster_info,
1126
1134
& mut buffered_packet_batches,
@@ -4160,6 +4168,7 @@ mod tests {
4160
4168
] ;
4161
4169
4162
4170
let connection_cache = ConnectionCache :: default ( ) ;
4171
+ let socket = UdpSocket :: bind ( "0.0.0.0:0" ) . unwrap ( ) ;
4163
4172
for ( name, data_budget, expected_num_forwarded) in test_cases {
4164
4173
let mut unprocessed_packet_batches: UnprocessedPacketBatches =
4165
4174
UnprocessedPacketBatches :: from_iter (
@@ -4172,6 +4181,7 @@ mod tests {
4172
4181
& cluster_info,
4173
4182
& mut unprocessed_packet_batches,
4174
4183
& poh_recorder,
4184
+ & socket,
4175
4185
true ,
4176
4186
& data_budget,
4177
4187
& mut LeaderSlotMetricsTracker :: new ( 0 ) ,
@@ -4277,13 +4287,15 @@ mod tests {
4277
4287
) ,
4278
4288
] ;
4279
4289
4290
+ let socket = UdpSocket :: bind ( "0.0.0.0:0" ) . unwrap ( ) ;
4280
4291
for ( name, forward_option, hold, expected_ids, expected_num_unprocessed) in test_cases {
4281
4292
let stats = BankingStageStats :: default ( ) ;
4282
4293
BankingStage :: handle_forwarding (
4283
4294
& forward_option,
4284
4295
& cluster_info,
4285
4296
& mut unprocessed_packet_batches,
4286
4297
& poh_recorder,
4298
+ & socket,
4287
4299
hold,
4288
4300
& DataBudget :: default ( ) ,
4289
4301
& mut LeaderSlotMetricsTracker :: new ( 0 ) ,
0 commit comments