@@ -7,19 +7,17 @@ use fnv::FnvHashMap;
77use lighthouse_network:: PeerId ;
88use lighthouse_network:: service:: api_types:: { CustodyId , DataColumnsByRootRequester } ;
99use lighthouse_tracing:: SPAN_OUTGOING_CUSTODY_REQUEST ;
10- use lru_cache:: LRUTimeCache ;
1110use parking_lot:: RwLock ;
12- use rand:: Rng ;
1311use std:: collections:: HashSet ;
12+ use std:: hash:: { BuildHasher , RandomState } ;
1413use std:: time:: { Duration , Instant } ;
1514use std:: { collections:: HashMap , marker:: PhantomData , sync:: Arc } ;
16- use tracing:: { Span , debug, debug_span, field , warn} ;
15+ use tracing:: { Span , debug, debug_span, warn} ;
1716use types:: { DataColumnSidecar , Hash256 , data_column_sidecar:: ColumnIndex } ;
1817use types:: { DataColumnSidecarList , EthSpec } ;
1918
2019use super :: { LookupRequestResult , PeerGroup , RpcResponseResult , SyncNetworkContext } ;
2120
22- const FAILED_PEERS_CACHE_EXPIRY_SECONDS : u64 = 5 ;
2321const MAX_STALE_NO_PEERS_DURATION : Duration = Duration :: from_secs ( 30 ) ;
2422
2523pub struct ActiveCustodyRequest < T : BeaconChainTypes > {
@@ -30,9 +28,7 @@ pub struct ActiveCustodyRequest<T: BeaconChainTypes> {
3028 /// Active requests for 1 or more columns each
3129 active_batch_columns_requests :
3230 FnvHashMap < DataColumnsByRootRequestId , ActiveBatchColumnsRequest > ,
33- /// Peers that have recently failed to successfully respond to a columns by root request.
34- /// Having a LRUTimeCache allows this request to not have to track disconnecting peers.
35- failed_peers : LRUTimeCache < PeerId > ,
31+ peer_attempts : HashMap < PeerId , usize > ,
3632 /// Set of peers that claim to have imported this block and their custody columns
3733 lookup_peers : Arc < RwLock < HashSet < PeerId > > > ,
3834 /// Span for tracing the lifetime of this request.
@@ -71,7 +67,11 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
7167 column_indices : & [ ColumnIndex ] ,
7268 lookup_peers : Arc < RwLock < HashSet < PeerId > > > ,
7369 ) -> Self {
74- let span = debug_span ! ( parent: None , SPAN_OUTGOING_CUSTODY_REQUEST , %block_root) ;
70+ let span = debug_span ! (
71+ parent: Span :: current( ) ,
72+ SPAN_OUTGOING_CUSTODY_REQUEST ,
73+ %block_root,
74+ ) ;
7575 Self {
7676 block_root,
7777 custody_id,
@@ -81,7 +81,7 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
8181 . map ( |index| ( * index, ColumnRequest :: new ( ) ) ) ,
8282 ) ,
8383 active_batch_columns_requests : <_ >:: default ( ) ,
84- failed_peers : LRUTimeCache :: new ( Duration :: from_secs ( FAILED_PEERS_CACHE_EXPIRY_SECONDS ) ) ,
84+ peer_attempts : HashMap :: new ( ) ,
8585 lookup_peers,
8686 span,
8787 _phantom : PhantomData ,
@@ -170,13 +170,6 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
170170 ?missing_column_indexes,
171171 "Custody column peer claims to not have some data"
172172 ) ;
173-
174- batch_request. span . record (
175- "missing_column_indexes" ,
176- field:: debug ( missing_column_indexes) ,
177- ) ;
178-
179- self . failed_peers . insert ( peer_id) ;
180173 }
181174 }
182175 Err ( err) => {
@@ -195,13 +188,6 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
195188 . ok_or ( Error :: BadState ( "unknown column_index" . to_owned ( ) ) ) ?
196189 . on_download_error_and_mark_failure ( req_id) ?;
197190 }
198-
199- batch_request. span . record (
200- "missing_column_indexes" ,
201- field:: debug ( & batch_request. indices ) ,
202- ) ;
203-
204- self . failed_peers . insert ( peer_id) ;
205191 }
206192 } ;
207193
@@ -238,52 +224,29 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
238224 let active_request_count_by_peer = cx. active_request_count_by_peer ( ) ;
239225 let mut columns_to_request_by_peer = HashMap :: < PeerId , Vec < ColumnIndex > > :: new ( ) ;
240226 let lookup_peers = self . lookup_peers . read ( ) ;
227+ // Create deterministic hasher per request to ensure consistent peer ordering within
228+ // this request (avoiding fragmentation) while varying selection across different requests
229+ let random_state = RandomState :: new ( ) ;
241230
242- // Need to:
243- // - track how many active requests a peer has for load balancing
244- // - which peers have failures to attempt others
245- // - which peer returned what to have PeerGroup attributability
246-
247- for ( column_index, request) in self . column_requests . iter_mut ( ) {
231+ for ( column_index, request) in self . column_requests . iter ( ) {
248232 if let Some ( wait_duration) = request. is_awaiting_download ( ) {
233+ // Note: an empty response is considered a successful response, so we may end up
234+ // retrying many more times than `MAX_CUSTODY_COLUMN_DOWNLOAD_ATTEMPTS`.
249235 if request. download_failures > MAX_CUSTODY_COLUMN_DOWNLOAD_ATTEMPTS {
250236 return Err ( Error :: TooManyFailures ) ;
251237 }
252238
253- // TODO(das): When is a fork and only a subset of your peers know about a block, we should
254- // only query the peers on that fork. Should this case be handled? How to handle it?
255- let custodial_peers = cx. get_custodial_peers ( * column_index) ;
239+ let peer_to_request = self . select_column_peer (
240+ cx,
241+ & active_request_count_by_peer,
242+ & lookup_peers,
243+ * column_index,
244+ & random_state,
245+ ) ;
256246
257- // We draw from the total set of peers, but prioritize those peers who we have
258- // received an attestation / status / block message claiming to have imported the
259- // lookup. The frequency of those messages is low, so drawing only from lookup_peers
260- // could cause many lookups to take much longer or fail as they don't have enough
261- // custody peers on a given column
262- let mut priorized_peers = custodial_peers
263- . iter ( )
264- . map ( |peer| {
265- (
266- // Prioritize peers that claim to know have imported this block
267- if lookup_peers. contains ( peer) { 0 } else { 1 } ,
268- // De-prioritize peers that have failed to successfully respond to
269- // requests recently
270- self . failed_peers . contains ( peer) ,
271- // Prefer peers with fewer requests to load balance across peers.
272- // We batch requests to the same peer, so count existence in the
273- // `columns_to_request_by_peer` as a single 1 request.
274- active_request_count_by_peer. get ( peer) . copied ( ) . unwrap_or ( 0 )
275- + columns_to_request_by_peer. get ( peer) . map ( |_| 1 ) . unwrap_or ( 0 ) ,
276- // Random factor to break ties, otherwise the PeerID breaks ties
277- rand:: rng ( ) . random :: < u32 > ( ) ,
278- * peer,
279- )
280- } )
281- . collect :: < Vec < _ > > ( ) ;
282- priorized_peers. sort_unstable ( ) ;
283-
284- if let Some ( ( _, _, _, _, peer_id) ) = priorized_peers. first ( ) {
247+ if let Some ( peer_id) = peer_to_request {
285248 columns_to_request_by_peer
286- . entry ( * peer_id)
249+ . entry ( peer_id)
287250 . or_default ( )
288251 . push ( * column_index) ;
289252 } else if wait_duration > MAX_STALE_NO_PEERS_DURATION {
@@ -298,6 +261,23 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
298261 }
299262 }
300263
264+ let peer_requests = columns_to_request_by_peer. len ( ) ;
265+ if peer_requests > 0 {
266+ let columns_requested_count = columns_to_request_by_peer
267+ . values ( )
268+ . map ( |v| v. len ( ) )
269+ . sum :: < usize > ( ) ;
270+ debug ! (
271+ lookup_peers = lookup_peers. len( ) ,
272+ "Requesting {} columns from {} peers" , columns_requested_count, peer_requests,
273+ ) ;
274+ } else {
275+ debug ! (
276+ lookup_peers = lookup_peers. len( ) ,
277+ "No column peers found for look up" ,
278+ ) ;
279+ }
280+
301281 for ( peer_id, indices) in columns_to_request_by_peer. into_iter ( ) {
302282 let request_result = cx
303283 . data_column_lookup_request (
@@ -317,8 +297,14 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
317297
318298 match request_result {
319299 LookupRequestResult :: RequestSent ( req_id) => {
300+ * self . peer_attempts . entry ( peer_id) . or_insert ( 0 ) += 1 ;
301+
320302 let client = cx. network_globals ( ) . client ( & peer_id) . kind ;
321- let batch_columns_req_span = debug_span ! ( "batch_columns_req" , %peer_id, %client, missing_column_indexes = tracing:: field:: Empty ) ;
303+ let batch_columns_req_span = debug_span ! (
304+ "batch_columns_req" ,
305+ %peer_id,
306+ %client,
307+ ) ;
322308 let _guard = batch_columns_req_span. clone ( ) . entered ( ) ;
323309 for column_index in & indices {
324310 let column_request = self
@@ -345,11 +331,54 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
345331
346332 Ok ( None )
347333 }
334+
335+ fn select_column_peer (
336+ & self ,
337+ cx : & mut SyncNetworkContext < T > ,
338+ active_request_count_by_peer : & HashMap < PeerId , usize > ,
339+ lookup_peers : & HashSet < PeerId > ,
340+ column_index : ColumnIndex ,
341+ random_state : & RandomState ,
342+ ) -> Option < PeerId > {
343+ // We draw from the total set of peers, but prioritize those peers who we have
344+ // received an attestation or a block from (`lookup_peers`), as the `lookup_peers` may take
345+ // time to build up and we are likely to not find any column peers initially.
346+ let custodial_peers = cx. get_custodial_peers ( column_index) ;
347+ let mut prioritized_peers = custodial_peers
348+ . iter ( )
349+ . filter ( |peer| {
350+ // Exclude peers that we have already made too many attempts to.
351+ self . peer_attempts . get ( peer) . copied ( ) . unwrap_or ( 0 ) <= MAX_CUSTODY_PEER_ATTEMPTS
352+ } )
353+ . map ( |peer| {
354+ (
355+ // Prioritize peers that claim to know have imported this block
356+ if lookup_peers. contains ( peer) { 0 } else { 1 } ,
357+ // De-prioritize peers that we have already attempted to download from
358+ self . peer_attempts . get ( peer) . copied ( ) . unwrap_or ( 0 ) ,
359+ // Prefer peers with fewer requests to load balance across peers.
360+ active_request_count_by_peer. get ( peer) . copied ( ) . unwrap_or ( 0 ) ,
361+ // The hash ensures consistent peer ordering within this request
362+ // to avoid fragmentation while varying selection across different requests.
363+ random_state. hash_one ( peer) ,
364+ * peer,
365+ )
366+ } )
367+ . collect :: < Vec < _ > > ( ) ;
368+ prioritized_peers. sort_unstable ( ) ;
369+
370+ prioritized_peers
371+ . first ( )
372+ . map ( |( _, _, _, _, peer_id) | * peer_id)
373+ }
348374}
349375
350376/// TODO(das): this attempt count is nested into the existing lookup request count.
351377const MAX_CUSTODY_COLUMN_DOWNLOAD_ATTEMPTS : usize = 3 ;
352378
379+ /// Max number of attempts to request custody columns from a single peer.
380+ const MAX_CUSTODY_PEER_ATTEMPTS : usize = 3 ;
381+
353382struct ColumnRequest < E : EthSpec > {
354383 status : Status < E > ,
355384 download_failures : usize ,
0 commit comments