Skip to content

Commit 1d75ffd

Browse files
authored
bitswap/client: broadcast reduction and metrics (#937)
* Spam reduction with metrics Reduce bitswap spam by only sending broadcast wants to: - peers that have previously replied with a have block - peers that are on the local network - peers configured for peering * boradcast to peer if peer already has a pending message * count total blocks and haves * mark broadcast targets instead of keeping counts * unique blocks received metric * disable piggybacking spam on existing messages * bitswap client options for broadcast reduction configuration * Configure control to local and peered peers separately * assume local if peer has no addrs * docs(changelog): document new metrics
1 parent 064912d commit 1d75ffd

File tree

18 files changed

+412
-52
lines changed

18 files changed

+412
-52
lines changed

CHANGELOG.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,23 @@ The following emojis are used to highlight certain changes:
1616

1717
### Added
1818

19+
- `bitswap/client`: New metrics:
20+
- `ipfs_bitswap_wanthaves_broadcast`: Count of want-haves broadcasts
21+
- `ipfs_bitswap_haves_received`: Count of total have responses
22+
- `ipfs_bitswap_bcast_skips_total{`: Count of broadcasts skipped as part of spam reduction logic (see "Changed" below)
23+
- `ipfs_bitswap_unique_blocks_received`: Count of non-duplicate blocks recieved
24+
1925
### Changed
2026

27+
- `bitswap/client`: Added an opt-in ability to reduce bitswap broadcast volume by limiting broadcasts to peers that have previously responded as having wanted blocks and peers on local network. The following bitswap client options are available to configure the behavior of broadcast reduction:
28+
- `BroadcastControlEnable` enables or disables broadcast reduction logic. Setting this to `false` restores the previous broadcast behavior of sending broadcasts to all peers, and ignores all other `BroadcastControl` options. Default is `false` (disabled).
29+
- `BroadcastControlMaxPeers` sets a hard limit on the number of peers to send broadcasts to. A value of `0` means no broadcasts are sent. A value of `-1` means there is no limit. Default is `-1` (unlimited).
30+
- `BroadcastControlLocalPeers` enables or disables broadcast control for peers on the local network. If `false`, then always broadcast to peers on the local network. If `true`, apply broadcast control to local peers. Default is `false` (always broadcast to local peers).
31+
- `BroadcastControlPeeredPeers` enables or disables broadcast control for peers configured for peering. If `false`, then always broadcast to peers configured for peering. If `true`, apply broadcast control to peered peers. Default is `false` (always broadcast to peered peers).
32+
- `BroadcastControlMaxRandomPeers` sets the number of peers to broadcast to anyway, even though broadcast control logic has determined that they are not broadcast targets. Setting this to a non-zero value ensures at least this number of random peers receives a broadcast. This may be helpful in cases where peers that are not receiving broadcasts may have wanted blocks. Default is `0` (no random broadcasts).
33+
- `BroadcastControlSendToPendingPeers` enables or disables sending broadcasts to any peers to which there is a pending message to send. When `true` (enabled), this sends broadcasts to many more peers, but does so in a way that does not increase the number of separate broadcast messages. There is still the increased cost of the recipients having to process and respond to the broadcasts. Default is `false`.
34+
35+
2136
### Removed
2237

2338
- `bitswap/server` do not allow override of peer ledger with `WithPeerLedger` [#938](https://github.com/ipfs/boxo/pull/938)

bitswap/client/client.go

Lines changed: 89 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,69 @@ func WithDefaultProviderQueryManager(defaultProviderQueryManager bool) Option {
137137
}
138138
}
139139

140+
// BroadcastControlEnable enables or disables broadcast reduction logic.
141+
// Setting this to false restores the previous broadcast behavior of sending
142+
// broadcasts to all peers, and ignores all other BroadcastControl options.
143+
// Default is false (disabled).
144+
func BroadcastControlEnable(enable bool) Option {
145+
return func(bs *Client) {
146+
bs.bcastControl.Enable = enable
147+
}
148+
}
149+
150+
// BroadcastControlMaxPeers sets a hard limit on the number of peers to send
151+
// broadcasts to. A value of 0 means no broadcasts are sent. A value of -1
152+
// means there is no limit. Default is -1 (unlimited).
153+
func BroadcastControlMaxPeers(limit int) Option {
154+
return func(bs *Client) {
155+
bs.bcastControl.MaxPeers = limit
156+
}
157+
}
158+
159+
// BroadcastControlLocalPeers enables or disables broadcast control for peers
160+
// on the local network. If false, than always broadcast to peers on the local
161+
// network. If true, apply broadcast control to local peers. Default is false
162+
// (always broadcast to local peers).
163+
func BroadcastControlLocalPeers(enable bool) Option {
164+
return func(bs *Client) {
165+
bs.bcastControl.LocalPeers = enable
166+
}
167+
}
168+
169+
// BroadcastControlPeeredPeers enables or disables broadcast control for peers
170+
// configured for peering. If false, than always broadcast to peers configured
171+
// for peering. If true, apply broadcast control to peered peers. Default is
172+
// false (always broadcast to peered peers).
173+
func BroadcastControlPeeredPeers(enable bool) Option {
174+
return func(bs *Client) {
175+
bs.bcastControl.PeeredPeers = enable
176+
}
177+
}
178+
179+
// BroadcastControlMaxRandomPeers sets the number of peers to broadcast to
180+
// anyway, even though broadcast control logic has determined that they are
181+
// not broadcast targets. Setting this to a non-zero value ensures at least
182+
// this number of random peers receives a broadcast. This may be helpful in
183+
// cases where peers that are not receiving broadcasts may have wanted blocks.
184+
// Default is 0 (no random broadcasts).
185+
func BroadcastControlMaxRandomPeers(n int) Option {
186+
return func(bs *Client) {
187+
bs.bcastControl.MaxRandomPeers = n
188+
}
189+
}
190+
191+
// BroadcastControlSendToPendingPeers, enables or disables sending broadcasts
192+
// to any peers to which there is a pending message to send. When enabled, this
193+
// sends broadcasts to many more peers, but does so in a way that does not
194+
// increase the number of separate broadcast messages. There is still the
195+
// increased cost of the recipients having to process and respond to the
196+
// broadcasts. Default is false.
197+
func BroadcastControlSendToPendingPeers(enable bool) Option {
198+
return func(bs *Client) {
199+
bs.bcastControl.SendToPendingPeers = enable
200+
}
201+
}
202+
140203
type BlockReceivedNotifier interface {
141204
// ReceivedBlocks notifies the decision engine that a peer is well-behaving
142205
// and gave us useful data, potentially increasing its score and making us
@@ -166,17 +229,30 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, providerFinder ro
166229
counters: new(counters),
167230
dupMetric: bmetrics.DupHist(ctx),
168231
allMetric: bmetrics.AllHist(ctx),
232+
havesReceivedGauge: bmetrics.HavesReceivedGauge(ctx),
233+
blocksReceivedGauge: bmetrics.BlocksReceivedGauge(ctx),
169234
provSearchDelay: defaults.ProvSearchDelay,
170235
rebroadcastDelay: delay.Fixed(defaults.RebroadcastDelay),
171236
simulateDontHavesOnTimeout: true,
172237
defaultProviderQueryManager: true,
238+
239+
bcastControl: bspm.BroadcastControl{
240+
MaxPeers: -1,
241+
},
173242
}
174243

175244
// apply functional options before starting and running bitswap
176245
for _, option := range options {
177246
option(bs)
178247
}
179248

249+
if bs.bcastControl.Enable {
250+
if bs.bcastControl.NeedHost() {
251+
bs.bcastControl.Host = network.Host()
252+
}
253+
bs.bcastControl.SkipGauge = bmetrics.BroadcastSkipGauge(ctx)
254+
}
255+
180256
// onDontHaveTimeout is called when a want-block is sent to a peer that
181257
// has an old version of Bitswap that doesn't support DONT_HAVE messages,
182258
// or when no response is received within a timeout.
@@ -201,7 +277,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, providerFinder ro
201277

202278
sim := bssim.New()
203279
bpm := bsbpm.New()
204-
pm := bspm.New(ctx, peerQueueFactory)
280+
pm := bspm.New(ctx, peerQueueFactory, bs.bcastControl)
205281

206282
if bs.providerFinder != nil && bs.defaultProviderQueryManager {
207283
// network can do dialing.
@@ -237,7 +313,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, providerFinder ro
237313
} else if providerFinder != nil {
238314
sessionProvFinder = providerFinder
239315
}
240-
return bssession.New(sessctx, sessmgr, id, spm, sessionProvFinder, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self)
316+
return bssession.New(sessctx, sessmgr, id, spm, sessionProvFinder, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self, bs.havesReceivedGauge)
241317
}
242318
sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.SessionPeerManager {
243319
return bsspm.New(id, network)
@@ -285,6 +361,9 @@ type Client struct {
285361
dupMetric metrics.Histogram
286362
allMetric metrics.Histogram
287363

364+
havesReceivedGauge bspm.Gauge
365+
blocksReceivedGauge bspm.Gauge
366+
288367
// External statistics interface
289368
tracer tracer.Tracer
290369

@@ -311,6 +390,9 @@ type Client struct {
311390
skipDuplicatedBlocksStats bool
312391

313392
perPeerSendDelay time.Duration
393+
394+
// Broadcast control configuration.
395+
bcastControl bspm.BroadcastControl
314396
}
315397

316398
type counters struct {
@@ -384,6 +466,10 @@ func (bs *Client) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []bl
384466
default:
385467
}
386468

469+
if len(blks) != 0 || len(haves) != 0 {
470+
bs.pm.MarkBroadcastTarget(from)
471+
}
472+
387473
wanted, notWanted := bs.sim.SplitWantedUnwanted(blks)
388474
if log.Level().Enabled(zapcore.DebugLevel) {
389475
for _, b := range notWanted {
@@ -484,6 +570,7 @@ func (bs *Client) updateReceiveCounters(blocks []blocks.Block) {
484570
c.dupBlocksRecvd++
485571
c.dupDataRecvd += uint64(blkLen)
486572
}
573+
bs.blocksReceivedGauge.Inc()
487574
}
488575
}
489576

bitswap/client/internal/messagequeue/messagequeue.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@ type MessageQueue struct {
107107
events chan<- messageEvent
108108

109109
perPeerDelay time.Duration
110+
111+
BcastInc func()
110112
}
111113

112114
// recallWantlist keeps a list of pending wants and a list of sent wants
@@ -424,6 +426,13 @@ func (mq *MessageQueue) AddCancels(cancelKs []cid.Cid) {
424426
}
425427
}
426428

429+
func (mq *MessageQueue) HasMessage() bool {
430+
mq.wllock.Lock()
431+
defer mq.wllock.Unlock()
432+
433+
return mq.bcstWants.pending.Len() != 0 || mq.peerWants.pending.Len() != 0 || mq.cancels.Len() != 0
434+
}
435+
427436
// ResponseReceived is called when a message is received from the network.
428437
// ks is the set of blocks, HAVEs and DONT_HAVEs in the message
429438
// Note that this is just used to calculate latency.
@@ -862,6 +871,9 @@ FINISH:
862871
for _, e := range bcstEntries[:sentBcstEntries] {
863872
if e.Cid.Defined() { // Check if want was canceled in the interim
864873
mq.bcstWants.setSentAt(e.Cid, now)
874+
if mq.BcastInc != nil {
875+
mq.BcastInc()
876+
}
865877
}
866878
}
867879

bitswap/client/internal/peermanager/peermanager.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"sync"
66

7+
"github.com/ipfs/boxo/bitswap/client/internal/messagequeue"
78
cid "github.com/ipfs/go-cid"
89
logging "github.com/ipfs/go-log/v2"
910
"github.com/ipfs/go-metrics-interface"
@@ -18,6 +19,7 @@ type PeerQueue interface {
1819
AddWants([]cid.Cid, []cid.Cid)
1920
AddCancels([]cid.Cid)
2021
ResponseReceived(ks []cid.Cid)
22+
HasMessage() bool
2123
Startup()
2224
Shutdown()
2325
}
@@ -44,20 +46,25 @@ type PeerManager struct {
4446
psLk sync.RWMutex
4547
sessions map[uint64]Session
4648
peerSessions map[peer.ID]map[uint64]struct{}
49+
50+
bcastGauge Gauge
4751
}
4852

4953
// New creates a new PeerManager, given a context and a peerQueueFactory.
50-
func New(ctx context.Context, createPeerQueue PeerQueueFactory) *PeerManager {
54+
func New(ctx context.Context, createPeerQueue PeerQueueFactory, bcastControl BroadcastControl) *PeerManager {
5155
wantGauge := metrics.NewCtx(ctx, "wantlist_total", "Number of items in wantlist.").Gauge()
5256
wantBlockGauge := metrics.NewCtx(ctx, "want_blocks_total", "Number of want-blocks in wantlist.").Gauge()
57+
5358
return &PeerManager{
5459
peerQueues: make(map[peer.ID]PeerQueue),
55-
pwm: newPeerWantManager(wantGauge, wantBlockGauge),
60+
pwm: newPeerWantManager(wantGauge, wantBlockGauge, bcastControl),
5661
createPeerQueue: createPeerQueue,
5762
ctx: ctx,
5863

5964
sessions: make(map[uint64]Session),
6065
peerSessions: make(map[peer.ID]map[uint64]struct{}),
66+
67+
bcastGauge: metrics.NewCtx(ctx, "wanthaves_broadcast", "Number of want-haves broadcast.").Gauge(),
6168
}
6269
}
6370

@@ -189,6 +196,10 @@ func (pm *PeerManager) getOrCreate(p peer.ID) PeerQueue {
189196
pq, ok := pm.peerQueues[p]
190197
if !ok {
191198
pq = pm.createPeerQueue(pm.ctx, p)
199+
mq, ok := pq.(*messagequeue.MessageQueue)
200+
if ok {
201+
mq.BcastInc = pm.bcastGauge.Inc
202+
}
192203
pq.Startup()
193204
pm.peerQueues[p] = pq
194205
}
@@ -227,6 +238,10 @@ func (pm *PeerManager) UnregisterSession(ses uint64) {
227238
delete(pm.sessions, ses)
228239
}
229240

241+
func (pm *PeerManager) MarkBroadcastTarget(from peer.ID) {
242+
pm.pwm.markBroadcastTarget(from)
243+
}
244+
230245
// signalAvailability is called when a peer's connectivity changes.
231246
// It informs interested sessions.
232247
func (pm *PeerManager) signalAvailability(p peer.ID, isConnected bool) {

bitswap/client/internal/peermanager/peermanager_test.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ func (fp *mockPeerQueue) AddCancels(cs []cid.Cid) {
3939
fp.msgs <- msg{fp.p, nil, nil, cs}
4040
}
4141

42+
func (fp *mockPeerQueue) HasMessage() bool {
43+
return true
44+
}
45+
4246
func (fp *mockPeerQueue) ResponseReceived(ks []cid.Cid) {
4347
}
4448

@@ -86,7 +90,7 @@ func TestAddingAndRemovingPeers(t *testing.T) {
8690

8791
tp := random.Peers(6)
8892
peer1, peer2, peer3, peer4, peer5 := tp[0], tp[1], tp[2], tp[3], tp[4]
89-
peerManager := New(ctx, peerQueueFactory)
93+
peerManager := New(ctx, peerQueueFactory, bcastAlways)
9094

9195
peerManager.Connected(peer1)
9296
peerManager.Connected(peer2)
@@ -129,7 +133,7 @@ func TestBroadcastOnConnect(t *testing.T) {
129133
peerQueueFactory := makePeerQueueFactory(msgs)
130134
tp := random.Peers(2)
131135
peer1 := tp[0]
132-
peerManager := New(ctx, peerQueueFactory)
136+
peerManager := New(ctx, peerQueueFactory, bcastAlways)
133137

134138
cids := random.Cids(2)
135139
peerManager.BroadcastWantHaves(ctx, cids)
@@ -150,7 +154,7 @@ func TestBroadcastWantHaves(t *testing.T) {
150154
peerQueueFactory := makePeerQueueFactory(msgs)
151155
tp := random.Peers(3)
152156
peer1, peer2 := tp[0], tp[1]
153-
peerManager := New(ctx, peerQueueFactory)
157+
peerManager := New(ctx, peerQueueFactory, bcastAlways)
154158

155159
cids := random.Cids(3)
156160

@@ -191,7 +195,7 @@ func TestSendWants(t *testing.T) {
191195
peerQueueFactory := makePeerQueueFactory(msgs)
192196
tp := random.Peers(2)
193197
peer1 := tp[0]
194-
peerManager := New(ctx, peerQueueFactory)
198+
peerManager := New(ctx, peerQueueFactory, bcastAlways)
195199
cids := random.Cids(4)
196200

197201
peerManager.Connected(peer1)
@@ -225,7 +229,7 @@ func TestSendCancels(t *testing.T) {
225229
peerQueueFactory := makePeerQueueFactory(msgs)
226230
tp := random.Peers(3)
227231
peer1, peer2 := tp[0], tp[1]
228-
peerManager := New(ctx, peerQueueFactory)
232+
peerManager := New(ctx, peerQueueFactory, bcastAlways)
229233
cids := random.Cids(4)
230234

231235
// Connect to peer1 and peer2
@@ -286,7 +290,7 @@ func TestSessionRegistration(t *testing.T) {
286290

287291
tp := random.Peers(3)
288292
p1, p2 := tp[0], tp[1]
289-
peerManager := New(ctx, peerQueueFactory)
293+
peerManager := New(ctx, peerQueueFactory, bcastAlways)
290294

291295
id := uint64(1)
292296
s := newSess(id)
@@ -332,6 +336,7 @@ func (*benchPeerQueue) Shutdown() {}
332336
func (*benchPeerQueue) AddBroadcastWantHaves(whs []cid.Cid) {}
333337
func (*benchPeerQueue) AddWants(wbs []cid.Cid, whs []cid.Cid) {}
334338
func (*benchPeerQueue) AddCancels(cs []cid.Cid) {}
339+
func (*benchPeerQueue) HasMessage() bool { return true }
335340
func (*benchPeerQueue) ResponseReceived(ks []cid.Cid) {}
336341

337342
// Simplistic benchmark to allow us to stress test
@@ -345,7 +350,7 @@ func BenchmarkPeerManager(b *testing.B) {
345350
}
346351

347352
peers := random.Peers(500)
348-
peerManager := New(ctx, peerQueueFactory)
353+
peerManager := New(ctx, peerQueueFactory, bcastAlways)
349354

350355
// Create a bunch of connections
351356
connected := 0

0 commit comments

Comments
 (0)