Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1200,8 +1200,15 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
.chain
.custody_columns_for_epoch(Some(request_start_epoch));

let indices_to_retrieve = req
.columns
.iter()
.copied()
.filter(|c| available_columns.contains(c))
.collect::<Vec<_>>();

for root in block_roots {
for index in available_columns {
for index in &indices_to_retrieve {
match self.chain.get_data_column(&root, index) {
Ok(Some(data_column_sidecar)) => {
// Due to skip slots, data columns could be out of the range, we ensure they
Expand Down
72 changes: 71 additions & 1 deletion beacon_node/network/src/network_beacon_processor/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ use beacon_processor::{work_reprocessing_queue::*, *};
use gossipsub::MessageAcceptance;
use itertools::Itertools;
use lighthouse_network::rpc::InboundRequestId;
use lighthouse_network::rpc::methods::{BlobsByRangeRequest, MetaDataV3};
use lighthouse_network::rpc::methods::{
BlobsByRangeRequest, DataColumnsByRangeRequest, MetaDataV3,
};
use lighthouse_network::{
Client, MessageId, NetworkConfig, NetworkGlobals, PeerId, Response,
discv5::enr::{self, CombinedKey},
Expand All @@ -30,6 +32,7 @@ use lighthouse_network::{
};
use matches::assert_matches;
use slot_clock::SlotClock;
use std::collections::HashSet;
use std::iter::Iterator;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -432,6 +435,20 @@ impl TestRig {
.unwrap();
}

pub fn enqueue_data_columns_by_range_request(&self, count: u64, columns: Vec<u64>) {
self.network_beacon_processor
.send_data_columns_by_range_request(
PeerId::random(),
InboundRequestId::new_unchecked(42, 24),
DataColumnsByRangeRequest {
start_slot: 0,
count,
columns,
},
)
.unwrap();
}

pub fn enqueue_backfill_batch(&self) {
self.network_beacon_processor
.send_chain_segment(
Expand Down Expand Up @@ -1365,3 +1382,56 @@ async fn test_blobs_by_range() {
}
assert_eq!(blob_count, actual_count);
}

#[tokio::test]
async fn test_data_columns_by_range_request_only_returns_requested_columns() {
if test_spec::<E>().fulu_fork_epoch.is_none() {
return;
};

let mut rig = TestRig::new(64).await;
let slot_count = 4;

let all_custody_columns = rig
.chain
.sampling_columns_for_epoch(rig.chain.epoch().unwrap());
let available_columns: Vec<u64> = all_custody_columns.into_iter().copied().collect();

let requested_columns = vec![available_columns[0], available_columns[2]];

rig.enqueue_data_columns_by_range_request(slot_count, requested_columns.clone());

let mut received_columns = Vec::new();

while let Some(next) = rig.network_rx.recv().await {
if let NetworkMessage::SendResponse {
peer_id: _,
response: Response::DataColumnsByRange(data_column),
inbound_request_id: _,
} = next
{
if let Some(column) = data_column {
received_columns.push(column.index);
} else {
break;
}
} else {
panic!("unexpected message {:?}", next);
}
}

for received_index in &received_columns {
assert!(
requested_columns.contains(received_index),
"Received column index {} was not in requested columns {:?}",
received_index,
requested_columns
);
}

let unique_received: HashSet<_> = received_columns.into_iter().collect();
assert!(
!unique_received.is_empty(),
"Should have received at least some data columns"
);
}
Loading