Skip to content

Commit 91d3854

Browse files
committed
basichost: move EvtLocalAddrsChanged to addrs_manager
We'll deprecate this event, but we still have to keep sending this for a few more releases. More importantly, we need to update the peerstore with the host's addresses and it's better to do this *before* sending update events so that consumers of the event can rely on the host addrs being updated in the peerstore.
1 parent 1b4f6c6 commit 91d3854

File tree

3 files changed

+252
-223
lines changed

3 files changed

+252
-223
lines changed

p2p/host/basic/addrs_manager.go

Lines changed: 227 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,12 @@ import (
1111
"sync/atomic"
1212
"time"
1313

14+
"github.com/libp2p/go-libp2p/core/crypto"
1415
"github.com/libp2p/go-libp2p/core/event"
1516
"github.com/libp2p/go-libp2p/core/network"
17+
"github.com/libp2p/go-libp2p/core/peer"
18+
"github.com/libp2p/go-libp2p/core/peerstore"
19+
"github.com/libp2p/go-libp2p/core/record"
1620
"github.com/libp2p/go-libp2p/p2p/host/basic/internal/backoff"
1721
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
1822
"github.com/libp2p/go-netroute"
@@ -28,6 +32,13 @@ var (
2832
natTypeChangeTickrInterval = time.Minute
2933
)
3034

35+
const maxPeerRecordSize = 8 * 1024 // 8k to be compatible with identify's limit
36+
37+
// addrStore is a minimal interface for storing peer addresses
38+
type addrStore interface {
39+
SetAddrs(peer.ID, []ma.Multiaddr, time.Duration)
40+
}
41+
3142
type observedAddrsManager interface {
3243
Addrs(minObservers int) []ma.Multiaddr
3344
AddrsFor(local ma.Multiaddr) []ma.Multiaddr
@@ -58,9 +69,6 @@ type addrsManager struct {
5869
interfaceAddrs *interfaceAddrsCache
5970
addrsReachabilityTracker *addrsReachabilityTracker
6071

61-
// addrsUpdatedChan is notified when addrs change. This is provided by the caller.
62-
addrsUpdatedChan chan struct{}
63-
6472
// triggerAddrsUpdateChan is used to trigger an addresses update.
6573
triggerAddrsUpdateChan chan struct{}
6674
// triggerReachabilityUpdate is notified when reachable addrs are updated.
@@ -71,6 +79,11 @@ type addrsManager struct {
7179
addrsMx sync.RWMutex
7280
currentAddrs hostAddrs
7381

82+
signKey crypto.PrivKey
83+
addrStore addrStore
84+
signedRecordStore peerstore.CertifiedAddrBook
85+
hostID peer.ID
86+
7487
wg sync.WaitGroup
7588
ctx context.Context
7689
ctxCancel context.CancelFunc
@@ -84,10 +97,13 @@ func newAddrsManager(
8497
addCertHashes func([]ma.Multiaddr) []ma.Multiaddr,
8598
disableObservedAddrs bool,
8699
observedAddrsManager observedAddrsManager,
87-
addrsUpdatedChan chan struct{},
88100
client autonatv2Client,
89101
enableMetrics bool,
90102
registerer prometheus.Registerer,
103+
disableSignedPeerRecord bool,
104+
signKey crypto.PrivKey,
105+
addrStore addrStore,
106+
hostID peer.ID,
91107
) (*addrsManager, error) {
92108
ctx, cancel := context.WithCancel(context.Background())
93109
as := &addrsManager{
@@ -98,8 +114,10 @@ func newAddrsManager(
98114
addrsFactory: addrsFactory,
99115
triggerAddrsUpdateChan: make(chan struct{}, 1),
100116
triggerReachabilityUpdate: make(chan struct{}, 1),
101-
addrsUpdatedChan: addrsUpdatedChan,
102117
interfaceAddrs: &interfaceAddrsCache{},
118+
signKey: signKey,
119+
addrStore: addrStore,
120+
hostID: hostID,
103121
ctx: ctx,
104122
ctxCancel: cancel,
105123
}
@@ -125,6 +143,14 @@ func newAddrsManager(
125143
}
126144
}
127145

146+
if !disableSignedPeerRecord {
147+
var ok bool
148+
as.signedRecordStore, ok = as.addrStore.(peerstore.CertifiedAddrBook)
149+
if !ok {
150+
return nil, errors.New("peerstore doesn't implement CertifiedAddrBook interface")
151+
}
152+
}
153+
128154
if client != nil {
129155
var metricsTracker MetricsTracker
130156
if enableMetrics {
@@ -246,6 +272,12 @@ func (a *addrsManager) startBackgroundWorker() (retErr error) {
246272
return fmt.Errorf("error creating reachability subscriber: %s", err)
247273
}
248274

275+
localAddrsEmitter, err := a.bus.Emitter(new(event.EvtLocalAddressesUpdated), eventbus.Stateful)
276+
if err != nil {
277+
return fmt.Errorf("error creating local addrs emitter: %s", err)
278+
}
279+
defer func() { retErr = closeIfError(retErr, localAddrsEmitter, "local addrs emitter") }()
280+
249281
var relayAddrs []ma.Multiaddr
250282
// update relay addrs in case we're private
251283
select {
@@ -265,17 +297,19 @@ func (a *addrsManager) startBackgroundWorker() (retErr error) {
265297
}
266298
// update addresses before starting the worker loop. This ensures that any address updates
267299
// before calling addrsManager.Start are correctly reported after Start returns.
268-
a.updateAddrs(true, relayAddrs)
300+
ha := a.updateAddrs(true, relayAddrs)
301+
a.updatePeerStore(ha.addrs, nil)
269302

270303
a.wg.Add(1)
271-
go a.background(autoRelayAddrsSub, autonatReachabilitySub, emitter, relayAddrs)
304+
go a.background(autoRelayAddrsSub, autonatReachabilitySub, emitter, localAddrsEmitter, relayAddrs)
272305
return nil
273306
}
274307

275308
func (a *addrsManager) background(
276309
autoRelayAddrsSub,
277310
autonatReachabilitySub event.Subscription,
278311
emitter event.Emitter,
312+
localAddrsEmitter event.Emitter,
279313
relayAddrs []ma.Multiaddr,
280314
) {
281315
defer a.wg.Done()
@@ -292,14 +326,18 @@ func (a *addrsManager) background(
292326
if err != nil {
293327
log.Warnf("error closing host reachability emitter: %s", err)
294328
}
329+
err = localAddrsEmitter.Close()
330+
if err != nil {
331+
log.Warnf("error closing local addrs emitter: %s", err)
332+
}
295333
}()
296334

297335
ticker := time.NewTicker(addrChangeTickrInterval)
298336
defer ticker.Stop()
299337
var previousAddrs hostAddrs
300338
for {
301339
currAddrs := a.updateAddrs(true, relayAddrs)
302-
a.notifyAddrsChanged(emitter, previousAddrs, currAddrs)
340+
a.notifyAddrsChanged(emitter, localAddrsEmitter, previousAddrs, currAddrs)
303341
previousAddrs = currAddrs
304342
select {
305343
case <-ticker.C:
@@ -391,19 +429,18 @@ func (a *addrsManager) updateAddrs(updateRelayAddrs bool, relayAddrs []ma.Multia
391429
}
392430
}
393431

394-
func (a *addrsManager) notifyAddrsChanged(emitter event.Emitter, previous, current hostAddrs) {
432+
func (a *addrsManager) notifyAddrsChanged(emitter event.Emitter, localAddrsEmitter event.Emitter, previous, current hostAddrs) {
395433
if areAddrsDifferent(previous.localAddrs, current.localAddrs) {
396434
log.Debugf("host local addresses updated: %s", current.localAddrs)
397435
if a.addrsReachabilityTracker != nil {
398436
a.addrsReachabilityTracker.UpdateAddrs(current.localAddrs)
399437
}
400438
}
401439
if areAddrsDifferent(previous.addrs, current.addrs) {
402-
log.Debugf("host addresses updated: %s", current.localAddrs)
403-
select {
404-
case a.addrsUpdatedChan <- struct{}{}:
405-
default:
406-
}
440+
log.Debugf("host addresses updated: %s", current.addrs)
441+
442+
// Emit EvtLocalAddressesUpdated event and handle peerstore operations
443+
a.handleHostAddrsUpdated(localAddrsEmitter, current.addrs, previous.addrs)
407444
}
408445

409446
// We *must* send both reachability changed and addrs changed events from the
@@ -608,6 +645,182 @@ func areAddrsDifferent(prev, current []ma.Multiaddr) bool {
608645
return false
609646
}
610647

648+
// diffAddrs diffs prev and current addrs and returns added, maintained, and removed addrs.
649+
// Both prev and current are expected to be sorted using ma.Compare()
650+
func (a *addrsManager) diffAddrs(prev, current []ma.Multiaddr) (added, maintained, removed []ma.Multiaddr) {
651+
i, j := 0, 0
652+
for i < len(prev) && j < len(current) {
653+
cmp := prev[i].Compare(current[j])
654+
switch {
655+
case cmp < 0:
656+
// prev < current
657+
removed = append(removed, prev[i])
658+
i++
659+
case cmp > 0:
660+
// current < prev
661+
added = append(added, current[j])
662+
j++
663+
default:
664+
maintained = append(maintained, current[j])
665+
i++
666+
j++
667+
}
668+
}
669+
// All remaining current addresses are added
670+
added = append(added, current[j:]...)
671+
672+
// All remaining previous addresses are removed
673+
removed = append(removed, prev[i:]...)
674+
return
675+
}
676+
677+
// makeSignedPeerRecord creates a signed peer record for the given addresses
678+
func (a *addrsManager) makeSignedPeerRecord(addrs []ma.Multiaddr) (*record.Envelope, error) {
679+
if a.signKey == nil {
680+
return nil, fmt.Errorf("signKey is nil")
681+
}
682+
// Limit the length of currentAddrs to ensure that our signed peer records aren't rejected
683+
peerRecordSize := 64 // HostID
684+
k, err := a.signKey.Raw()
685+
var nk int
686+
if err == nil {
687+
nk = len(k)
688+
} else {
689+
nk = 1024 // In case of error, use a large enough value.
690+
}
691+
peerRecordSize += 2 * nk // 1 for signature, 1 for public key
692+
// we want the final address list to be small for keeping the signed peer record in size
693+
addrs = trimHostAddrList(addrs, maxPeerRecordSize-peerRecordSize-256) // 256 B of buffer
694+
rec := peer.PeerRecordFromAddrInfo(peer.AddrInfo{
695+
ID: a.hostID,
696+
Addrs: addrs,
697+
})
698+
return record.Seal(rec, a.signKey)
699+
}
700+
701+
// trimHostAddrList trims the address list to fit within the maximum size
702+
func trimHostAddrList(addrs []ma.Multiaddr, maxSize int) []ma.Multiaddr {
703+
totalSize := 0
704+
for _, a := range addrs {
705+
totalSize += len(a.Bytes())
706+
}
707+
if totalSize <= maxSize {
708+
return addrs
709+
}
710+
711+
score := func(addr ma.Multiaddr) int {
712+
var res int
713+
if manet.IsPublicAddr(addr) {
714+
res |= 1 << 12
715+
} else if !manet.IsIPLoopback(addr) {
716+
res |= 1 << 11
717+
}
718+
var protocolWeight int
719+
ma.ForEach(addr, func(c ma.Component) bool {
720+
switch c.Protocol().Code {
721+
case ma.P_QUIC_V1:
722+
protocolWeight = 5
723+
case ma.P_TCP:
724+
protocolWeight = 4
725+
case ma.P_WSS:
726+
protocolWeight = 3
727+
case ma.P_WEBTRANSPORT:
728+
protocolWeight = 2
729+
case ma.P_WEBRTC_DIRECT:
730+
protocolWeight = 1
731+
case ma.P_P2P:
732+
return false
733+
}
734+
return true
735+
})
736+
res |= 1 << protocolWeight
737+
return res
738+
}
739+
740+
slices.SortStableFunc(addrs, func(a, b ma.Multiaddr) int {
741+
return score(b) - score(a) // b-a for reverse order
742+
})
743+
totalSize = 0
744+
for i, a := range addrs {
745+
totalSize += len(a.Bytes())
746+
if totalSize > maxSize {
747+
addrs = addrs[:i]
748+
break
749+
}
750+
}
751+
return addrs
752+
}
753+
754+
// handleHostAddrsUpdated emits an EvtLocalAddressesUpdated event and updates the addresses in the peerstore.
755+
func (a *addrsManager) handleHostAddrsUpdated(emitter event.Emitter, currentAddrs []ma.Multiaddr, lastAddrs []ma.Multiaddr) {
756+
added, maintained, removed := a.diffAddrs(lastAddrs, currentAddrs)
757+
if len(added) == 0 && len(removed) == 0 {
758+
return
759+
}
760+
761+
sr := a.updatePeerStore(currentAddrs, removed)
762+
763+
evt := &event.EvtLocalAddressesUpdated{
764+
Diffs: true,
765+
Current: make([]event.UpdatedAddress, 0, len(currentAddrs)),
766+
Removed: make([]event.UpdatedAddress, 0, len(removed)),
767+
SignedPeerRecord: sr,
768+
}
769+
770+
for _, addr := range maintained {
771+
evt.Current = append(evt.Current, event.UpdatedAddress{
772+
Address: addr,
773+
Action: event.Maintained,
774+
})
775+
}
776+
777+
for _, addr := range added {
778+
evt.Current = append(evt.Current, event.UpdatedAddress{
779+
Address: addr,
780+
Action: event.Added,
781+
})
782+
}
783+
784+
for _, addr := range removed {
785+
evt.Removed = append(evt.Removed, event.UpdatedAddress{
786+
Address: addr,
787+
Action: event.Removed,
788+
})
789+
}
790+
791+
// emit addr change event
792+
if err := emitter.Emit(*evt); err != nil {
793+
log.Warnf("error emitting event for updated addrs: %s", err)
794+
}
795+
}
796+
797+
// updatePeerStore updates the peer store and returns the signed peer record.
798+
// If the signed peer record is not created, it returns nil.
799+
func (a *addrsManager) updatePeerStore(currentAddrs []ma.Multiaddr, removedAddrs []ma.Multiaddr) *record.Envelope {
800+
// update host addresses in the peer store
801+
a.addrStore.SetAddrs(a.hostID, currentAddrs, peerstore.PermanentAddrTTL)
802+
a.addrStore.SetAddrs(a.hostID, removedAddrs, 0)
803+
804+
var sr *record.Envelope
805+
// Our addresses have changed.
806+
// store the signed peer record in the peer store.
807+
if a.signedRecordStore != nil {
808+
var err error
809+
// add signed peer record to the event
810+
// in case of an error drop this event.
811+
sr, err = a.makeSignedPeerRecord(currentAddrs)
812+
if err != nil {
813+
log.Errorf("error creating a signed peer record from the set of current addresses, err=%s", err)
814+
return nil
815+
}
816+
if _, err := a.signedRecordStore.ConsumePeerRecord(sr, peerstore.PermanentAddrTTL); err != nil {
817+
log.Errorf("failed to persist signed peer record in peer store, err=%s", err)
818+
return nil
819+
}
820+
}
821+
return sr
822+
}
823+
611824
const interfaceAddrsCacheTTL = time.Minute
612825

613826
type interfaceAddrsCache struct {

0 commit comments

Comments
 (0)