-
Notifications
You must be signed in to change notification settings - Fork 8
feat(gossipsub): switch internal async-channel, #570
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(gossipsub): switch internal async-channel, #570
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! I like the new queue being able to modify it from the behaviour side.
However I think there is a drawback to this approach and also I didn't understand how this queue can remove the priority/non-priority logic.
In the current version, we have a priority and a non-priority queue. The priority is queue is reserved for messages that simply cannot fail, and timing is not important. For example, GRAFT/PRUNE/SUBSCRIBE/UNSUBSCRIBE. It's fine if these messages go out late, but its not okay if we just have some internal error and we never send them.
For example, if we have PRUNED someone from our mesh, but never tell them, the bi-directionality of the mesh is broken and peers can now never know if we are in other's peoples mesh's and a lot of the principles of the network break down.
If I've understood this PR, we are now grouping these messages into the same queue as normal publish/forward messages and this queue is bounded. We can now drop these priority messages if for example the user is sending lots of messages. This wasn't possible before and I think this is a big problem. I think we still need the priority queue, which is unbounded and cannot fail, so that these very important messages always get sent, albiet they could be sent late.
The second drawback to this approach is that I dont think we can actually stop true in-flight messages. We can remove messages that are being sent from the behaviour and awaiting for the handler to send out, but for large messages that we have started sending, we can't cancel them in the behaviour. I don't think this is a big issue tho, maybe its the queue that is the concern and not the actual sending of the messages.
When we were discussing this problem, I was imagining the handler when calling:
Some(OutboundSubstreamState::PendingFlush(mut substream)) => {
If that message has been canceled, that we close the substream and stop sending the in-flight message. However, now that I think about it, closing the substream would constitute an error I think, so perhaps there is no actual way of stopping partially sent messages with the current gossipsub spec.
I went back to look at this and realize the O(1) complexity in the binary heap for push(), which is really nice. It does the prioritization for us, negating the need for a second queue. 😍 The only thing I think we might need to modify is to allow priority messages to ignore the queue's capacity. We shouldn't be generating these messages in volumes that would cause significant memory concerns. If we wanted to cap the queue if this is a concern, we should drop the peer at some limit. i.e We are never in a state where we are connected to a peer and threw away a priority message. If we are worried about memory, we should at worst case kick/drop/ban the peer before we throw away a priority message. If we go this route, we should be able to bring back the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the approach of filtering the IDONTWANT-ed messages from the queue directly!
But I am wondering if we really need a priority queue if only two priority levels are used.
What's the advantage of it, compared to having two separate FIFO queues for prio- and non-priority messages? The retain logic could still be implemented for them, but the push/pop operations would be faster, and we could directly use VecDequeue::retain_mut
.
protocols/gossipsub/src/handler.rs
Outdated
// Remove stale messages from the queue. | ||
let stale = self.message_queue.retain_mut(|rpc| match rpc { | ||
RpcOut::Publish { | ||
ref mut timeout, .. | ||
} | ||
| RpcOut::Forward { | ||
ref mut timeout, .. | ||
} => !timeout.poll_unpin(cx).is_ready(), | ||
_ => true, | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know that this is existing logic, but do we really need to remove stale messages from the queue here?
It results the the whole internal binary heap of the queue being rebuild every single time the handler is polled. Isn't it enough that the handler simply checks the timeout before sending out stale messages?
I know it shrinks the queue length, but I wonder how often in the real world a message actually times out, and whether in that case there is a larger issue at hand anyway. Do you have any data on that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good point.
In terms of data, we often see non-priority messages being timed out on slow machines, or if bandwidth isn't adequate. But to your point, we can probably drop them when we pop them, without any harm.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jxs - I think potentially the performance cost of this retain isn't worth it. Maybe should just drop the message when we read from the queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess you wanted to group all the stale messages. I feel like a O(n) search every poll might not be worth the grouping tho? What do you think?
protocols/gossipsub/src/types.rs
Outdated
Self::Publish { | ||
message: l_message, .. | ||
}, | ||
Self::Publish { | ||
message: r_message, .. | ||
}, | ||
) => l_message == r_message, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we just compare the message ids?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah makes sense, thanks Elena
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is looking good to me.
From our discussions tho and to make sure my understanding is correct.
Grouping the priority and non-priority into a single queue, makes the code a bit nicer, but it costs us an O(log(n)) when pop'ing elements vs an O(1) with two queues right?
I'm fine with the trade-off if its intended and you guys are also.
/// The number of publish messages dropped by the sender. | ||
publish_messages_dropped: Family<TopicHash, Counter>, | ||
/// The number of forward messages dropped by the sender. | ||
forward_messages_dropped: Family<TopicHash, Counter>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These were kind of useful, but if we can't have them in the new regime, all g
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can still have them, wdyt of recording all of the types of messages lost instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't seem to link it in the review.
But line 371 in metrics.rs and below need to be removed:
let non_priority_queue_size = Histogram::new(linear_buckets(0.0, 25.0, 100));
registry.register(
"non_priority_queue_size",
"Histogram of observed non-priority queue sizes",
non_priority_queue_size.clone(),
);
f9ed2d2
to
53f00b6
Compare
for an internal priority queue.
4d02fea
to
36b3e4d
Compare
I think this mostly looks good to me. Just got to update it and we can think about merging? |
c84f2ab
to
2ea6ee7
Compare
2ea6ee7
to
abae6d6
Compare
f39b4e0
to
e36e5d4
Compare
…ork-behaviour-handler-message-dispatch
ce1da5a
to
164ddfb
Compare
…ork-behaviour-handler-message-dispatch
2d607e3
to
165a48f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR replaces the internal async-channel
used for dispatching Gossipsub messages with a custom priority queue. It consolidates separate priority/non-priority channels into a single Queue<T>
, simplifies cancellation and stale‐message removal, and updates types, metrics, and tests accordingly.
- Introduce
Queue<T>
as the new internal priority queue. - Update
PeerDetails
,RpcOut
, andFailedMessages
to work withQueue
. - Merge metrics for priority/non-priority queues into one
queue_size
histogram and update changelog/tests.
Reviewed Changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated no comments.
Show a summary per file
File | Description |
---|---|
protocols/gossipsub/src/types.rs | Import and use Queue<RpcOut> in PeerDetails , refine FailedMessages , add PartialEq/Eq . |
protocols/gossipsub/src/queue.rs | Add new Queue<T> implementation to replace async-channel . |
protocols/gossipsub/src/handler.rs | Swap out Receiver for Queue<RpcOut> , rename MessageDropped to MessagesDropped . |
protocols/gossipsub/src/behaviour.rs | Replace channel send logic with Queue::try_push /push , update slow‐peer handling and metrics. |
protocols/gossipsub/src/metrics.rs | Remove separate histograms, add single queue_size histogram, adjust registration. |
protocols/gossipsub/src/lib.rs | Remove mod rpc , add mod queue . |
protocols/gossipsub/src/protocol.rs (tests) | Update tests to include message_id in RpcOut::Publish . |
protocols/gossipsub/CHANGELOG.md | Add entry for switching to internal priority queue (placeholder PR number). |
Comments suppressed due to low confidence (4)
protocols/gossipsub/CHANGELOG.md:7
- Replace the placeholder
PR XXXX
with the actual pull request number and update the link text accordingly.
with an internal priority queue. See [PR XXXX](https://github.com/libp2p/rust-libp2p/pull/XXXX)
protocols/gossipsub/src/metrics.rs:192
- Update this doc comment to reflect the unified queue (e.g. "The size of the internal message queue.").
/// The size of the priority queue.
protocols/gossipsub/src/queue.rs:112
- Remove or revise the phrase "Returns the cleared items." since
retain
does not return any value.
/// visited in unsorted (and unspecified) order. Returns the cleared items.
protocols/gossipsub/src/handler.rs:62
- [nitpick] Adjust this doc comment to match the renamed variant
MessagesDropped
, e.g. "A message that could not be sent in time."
/// A message to be published was dropped because it could not be sent in time.
use 3 way queue with only Subscribe and Unsubscribe messages uncapped, all else is capped
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice. I think this is good and something we should test. We loose some metrics, but we can test to see how necessary these are.
protocols/gossipsub/src/queue.rs
Outdated
high_priority: Shared::new(capacity), | ||
control: Shared::new(capacity), | ||
low_priority: Shared::new(capacity), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can see how this works, but I think it might be confusing for future devs to follow.
I.e we can set a SharedQueue with a capacity
, but the standard push()
function violates the capacity and in our instance the high_priority
is actually unbounded but we initiate it with a capacity
. I think this might be a foot-gun in the future where we could get confused.
I'd suggest we add a with_capacity()
and without_capacity()
function to instantiate Shared
so it's clear. And make push()
always respect the capacity. Capacity would be an option<> and if None then push()
never fails, but for generality it would have to return an error. We could then remove the try_push()
.
I think the functionality here is identical, its just more logical to follow for someone new to the code skimming over looking for bugs etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One other thing here, is that we have coupled the queue lengths. I was suggested we hardcode some number like 20,000 or something for the control
queue. Then in a future PR, if the control
queue gets full, we drop the peer. De-coupling here would mean that a user cannot just put a low number and we start dropping a bunch of peers.
If you don't think its necessary, all g.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup both make sense age, thanks! adopted them
…ork-behaviour-handler-message-dispatch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CI is failing but changes look good to me!
Description
This started with an attempt to solve libp2p#5751 using the previous internal
async-channel
.After multiple ideas were discussed off band, replacing the
async-channel
with an internal more tailored priority queue seemed inevitable.This priority queue allows us to implement the cancellation of in flight IDONTWANT's very cleanly with the
retain_mut
function.Clearing the stale messages likwise becomes simpler as we also make use of
retain_mut
And this has the added advantage of being able to only have a single priority queue and making the code simpler.
If a peer is not making progress we can assume it's not delivering High priority messages and we can penalize it.
Notes & open questions
I haven't performance tested this, but plan to do so with
lighthouse
if you agree this should be the path forward.I am curious if iterating all the messages to remove the IDONTWANT'ed and stall ones affects the overall performance.
Will also add tests to the queue once the design is finished.
Change checklist