diff --git a/p2p/host/basic/addrs_manager.go b/p2p/host/basic/addrs_manager.go index f561877434..2674535f72 100644 --- a/p2p/host/basic/addrs_manager.go +++ b/p2p/host/basic/addrs_manager.go @@ -11,8 +11,12 @@ import ( "sync/atomic" "time" + "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/libp2p/go-libp2p/core/record" "github.com/libp2p/go-libp2p/p2p/host/basic/internal/backoff" "github.com/libp2p/go-libp2p/p2p/host/eventbus" "github.com/libp2p/go-netroute" @@ -26,6 +30,13 @@ const maxObservedAddrsPerListenAddr = 3 // addrChangeTickrInterval is the interval to recompute host addrs. var addrChangeTickrInterval = 5 * time.Second +const maxPeerRecordSize = 8 * 1024 // 8k to be compatible with identify's limit + +// addrStore is a minimal interface for storing peer addresses +type addrStore interface { + SetAddrs(peer.ID, []ma.Multiaddr, time.Duration) +} + // ObservedAddrsManager maps our local listen addrs to externally observed addrs. type ObservedAddrsManager interface { Addrs(minObservers int) []ma.Multiaddr @@ -51,9 +62,6 @@ type addrsManager struct { interfaceAddrs *interfaceAddrsCache addrsReachabilityTracker *addrsReachabilityTracker - // addrsUpdatedChan is notified when addrs change. This is provided by the caller. - addrsUpdatedChan chan struct{} - // triggerAddrsUpdateChan is used to trigger an addresses update. triggerAddrsUpdateChan chan chan struct{} // started is used to check whether the addrsManager has started. @@ -66,6 +74,11 @@ type addrsManager struct { addrsMx sync.RWMutex currentAddrs hostAddrs + signKey crypto.PrivKey + addrStore addrStore + signedRecordStore peerstore.CertifiedAddrBook + hostID peer.ID + wg sync.WaitGroup ctx context.Context ctxCancel context.CancelFunc @@ -78,10 +91,13 @@ func newAddrsManager( listenAddrs func() []ma.Multiaddr, addCertHashes func([]ma.Multiaddr) []ma.Multiaddr, observedAddrsManager ObservedAddrsManager, - addrsUpdatedChan chan struct{}, client autonatv2Client, enableMetrics bool, registerer prometheus.Registerer, + disableSignedPeerRecord bool, + signKey crypto.PrivKey, + addrStore addrStore, + hostID peer.ID, ) (*addrsManager, error) { ctx, cancel := context.WithCancel(context.Background()) as := &addrsManager{ @@ -93,14 +109,24 @@ func newAddrsManager( addrsFactory: addrsFactory, triggerAddrsUpdateChan: make(chan chan struct{}, 1), triggerReachabilityUpdate: make(chan struct{}, 1), - addrsUpdatedChan: addrsUpdatedChan, interfaceAddrs: &interfaceAddrsCache{}, + signKey: signKey, + addrStore: addrStore, + hostID: hostID, ctx: ctx, ctxCancel: cancel, } unknownReachability := network.ReachabilityUnknown as.hostReachability.Store(&unknownReachability) + if !disableSignedPeerRecord { + var ok bool + as.signedRecordStore, ok = as.addrStore.(peerstore.CertifiedAddrBook) + if !ok { + return nil, errors.New("peerstore doesn't implement CertifiedAddrBook interface") + } + } + if client != nil { var metricsTracker MetricsTracker if enableMetrics { @@ -118,7 +144,14 @@ func (a *addrsManager) Start() error { return fmt.Errorf("error starting addrs reachability tracker: %s", err) } } - return a.startBackgroundWorker() + if err := a.startBackgroundWorker(); err != nil { + return fmt.Errorf("error starting background worker: %s", err) + } + + // this ensures that listens concurrent with Start are reflected correctly after Start exits. + a.started.Store(true) + a.updateAddrsSync() + return nil } func (a *addrsManager) Close() { @@ -183,6 +216,15 @@ func (a *addrsManager) startBackgroundWorker() (retErr error) { mc.Close(), ) } + mc = append(mc, emitter) + + localAddrsEmitter, err := a.bus.Emitter(new(event.EvtLocalAddressesUpdated), eventbus.Stateful) + if err != nil { + return errors.Join( + fmt.Errorf("error creating local addrs emitter: %s", err), + mc.Close(), + ) + } var relayAddrs []ma.Multiaddr // update relay addrs in case we're private @@ -201,19 +243,18 @@ func (a *addrsManager) startBackgroundWorker() (retErr error) { } default: } - // this ensures that listens concurrent with Start are reflected correctly after Start exits. - a.started.Store(true) - // update addresses before starting the worker loop. This ensures that any address updates - // before calling addrsManager.Start are correctly reported after Start returns. - a.updateAddrs(relayAddrs) a.wg.Add(1) - go a.background(autoRelayAddrsSub, autonatReachabilitySub, emitter, relayAddrs) + go a.background(autoRelayAddrsSub, autonatReachabilitySub, emitter, localAddrsEmitter, relayAddrs) return nil } -func (a *addrsManager) background(autoRelayAddrsSub, autonatReachabilitySub event.Subscription, - emitter event.Emitter, relayAddrs []ma.Multiaddr, +func (a *addrsManager) background( + autoRelayAddrsSub, + autonatReachabilitySub event.Subscription, + emitter event.Emitter, + localAddrsEmitter event.Emitter, + relayAddrs []ma.Multiaddr, ) { defer a.wg.Done() defer func() { @@ -229,20 +270,17 @@ func (a *addrsManager) background(autoRelayAddrsSub, autonatReachabilitySub even if err != nil { log.Warn("error closing host reachability emitter", "err", err) } + err = localAddrsEmitter.Close() + if err != nil { + log.Warn("error closing local addrs emitter", "err", err) + } }() ticker := time.NewTicker(addrChangeTickrInterval) defer ticker.Stop() var previousAddrs hostAddrs - var notifCh chan struct{} + notifCh := make(chan struct{}) for { - currAddrs := a.updateAddrs(relayAddrs) - if notifCh != nil { - close(notifCh) - notifCh = nil - } - a.notifyAddrsChanged(emitter, previousAddrs, currAddrs) - previousAddrs = currAddrs select { case <-ticker.C: case notifCh = <-a.triggerAddrsUpdateChan: @@ -258,13 +296,21 @@ func (a *addrsManager) background(autoRelayAddrsSub, autonatReachabilitySub even case <-a.ctx.Done(): return } + + currAddrs := a.updateAddrs(previousAddrs, relayAddrs) + if notifCh != nil { + close(notifCh) + notifCh = nil + } + a.notifyAddrsUpdated(emitter, localAddrsEmitter, previousAddrs, currAddrs) + previousAddrs = currAddrs } } // updateAddrs updates the addresses of the host and returns the new updated // addrs. This must only be called from the background goroutine or from the Start method otherwise // we may end up with stale addrs. -func (a *addrsManager) updateAddrs(relayAddrs []ma.Multiaddr) hostAddrs { +func (a *addrsManager) updateAddrs(prevHostAddrs hostAddrs, relayAddrs []ma.Multiaddr) hostAddrs { localAddrs := a.getLocalAddrs() var currReachableAddrs, currUnreachableAddrs, currUnknownAddrs []ma.Multiaddr if a.addrsReachabilityTracker != nil { @@ -273,6 +319,11 @@ func (a *addrsManager) updateAddrs(relayAddrs []ma.Multiaddr) hostAddrs { relayAddrs = slices.Clone(relayAddrs) currAddrs := a.getAddrs(slices.Clone(localAddrs), relayAddrs) + if areAddrsDifferent(prevHostAddrs.addrs, currAddrs) { + _, _, removed := diffAddrs(prevHostAddrs.addrs, currAddrs) + a.updatePeerStore(currAddrs, removed) + } + a.addrsMx.Lock() a.currentAddrs = hostAddrs{ addrs: append(a.currentAddrs.addrs[:0], currAddrs...), @@ -294,7 +345,32 @@ func (a *addrsManager) updateAddrs(relayAddrs []ma.Multiaddr) hostAddrs { } } -func (a *addrsManager) notifyAddrsChanged(emitter event.Emitter, previous, current hostAddrs) { +// updatePeerStore updates the peer store for the host +func (a *addrsManager) updatePeerStore(currentAddrs []ma.Multiaddr, removedAddrs []ma.Multiaddr) { + // update host addresses in the peer store + a.addrStore.SetAddrs(a.hostID, currentAddrs, peerstore.PermanentAddrTTL) + a.addrStore.SetAddrs(a.hostID, removedAddrs, 0) + + var sr *record.Envelope + // Our addresses have changed. + // store the signed peer record in the peer store. + if a.signedRecordStore != nil { + var err error + // add signed peer record to the event + // in case of an error drop this event. + sr, err = a.makeSignedPeerRecord(currentAddrs) + if err != nil { + log.Error("error creating a signed peer record from the set of current addresses", "err", err) + return + } + if _, err := a.signedRecordStore.ConsumePeerRecord(sr, peerstore.PermanentAddrTTL); err != nil { + log.Error("failed to persist signed peer record in peer store", "err", err) + return + } + } +} + +func (a *addrsManager) notifyAddrsUpdated(emitter event.Emitter, localAddrsEmitter event.Emitter, previous, current hostAddrs) { if areAddrsDifferent(previous.localAddrs, current.localAddrs) { log.Debug("host local addresses updated", "addrs", current.localAddrs) if a.addrsReachabilityTracker != nil { @@ -303,10 +379,7 @@ func (a *addrsManager) notifyAddrsChanged(emitter event.Emitter, previous, curre } if areAddrsDifferent(previous.addrs, current.addrs) { log.Debug("host addresses updated", "addrs", current.localAddrs) - select { - case a.addrsUpdatedChan <- struct{}{}: - default: - } + a.emitLocalAddrsUpdated(localAddrsEmitter, current.addrs, previous.addrs) } // We *must* send both reachability changed and addrs changed events from the @@ -489,6 +562,76 @@ func (a *addrsManager) appendObservedAddrs(dst []ma.Multiaddr, listenAddrs, ifac return dst } +// makeSignedPeerRecord creates a signed peer record for the given addresses +func (a *addrsManager) makeSignedPeerRecord(addrs []ma.Multiaddr) (*record.Envelope, error) { + if a.signKey == nil { + return nil, errors.New("signKey is nil") + } + // Limit the length of currentAddrs to ensure that our signed peer records aren't rejected + peerRecordSize := 64 // HostID + k, err := a.signKey.Raw() + var nk int + if err == nil { + nk = len(k) + } else { + nk = 1024 // In case of error, use a large enough value. + } + peerRecordSize += 2 * nk // 1 for signature, 1 for public key + // we want the final address list to be small for keeping the signed peer record in size + addrs = trimHostAddrList(addrs, maxPeerRecordSize-peerRecordSize-256) // 256 B of buffer + rec := peer.PeerRecordFromAddrInfo(peer.AddrInfo{ + ID: a.hostID, + Addrs: addrs, + }) + return record.Seal(rec, a.signKey) +} + +// emitLocalAddrsUpdated emits an EvtLocalAddressesUpdated event and updates the addresses in the peerstore. +func (a *addrsManager) emitLocalAddrsUpdated(emitter event.Emitter, currentAddrs []ma.Multiaddr, lastAddrs []ma.Multiaddr) { + added, maintained, removed := diffAddrs(lastAddrs, currentAddrs) + if len(added) == 0 && len(removed) == 0 { + return + } + + var sr *record.Envelope + if a.signedRecordStore != nil { + sr = a.signedRecordStore.GetPeerRecord(a.hostID) + } + + evt := &event.EvtLocalAddressesUpdated{ + Diffs: true, + Current: make([]event.UpdatedAddress, 0, len(currentAddrs)), + Removed: make([]event.UpdatedAddress, 0, len(removed)), + SignedPeerRecord: sr, + } + + for _, addr := range maintained { + evt.Current = append(evt.Current, event.UpdatedAddress{ + Address: addr, + Action: event.Maintained, + }) + } + + for _, addr := range added { + evt.Current = append(evt.Current, event.UpdatedAddress{ + Address: addr, + Action: event.Added, + }) + } + + for _, addr := range removed { + evt.Removed = append(evt.Removed, event.UpdatedAddress{ + Address: addr, + Action: event.Removed, + }) + } + + // emit addr change event + if err := emitter.Emit(*evt); err != nil { + log.Warn("error emitting event for updated addrs", "err", err) + } +} + func areAddrsDifferent(prev, current []ma.Multiaddr) bool { // TODO: make the sorted nature of ma.Unique a guarantee in multiaddrs prev = ma.Unique(prev) @@ -506,6 +649,88 @@ func areAddrsDifferent(prev, current []ma.Multiaddr) bool { return false } +// diffAddrs diffs prev and current addrs and returns added, maintained, and removed addrs. +// Both prev and current are expected to be sorted using ma.Compare() +func diffAddrs(prev, current []ma.Multiaddr) (added, maintained, removed []ma.Multiaddr) { + i, j := 0, 0 + for i < len(prev) && j < len(current) { + cmp := prev[i].Compare(current[j]) + switch { + case cmp < 0: + // prev < current + removed = append(removed, prev[i]) + i++ + case cmp > 0: + // current < prev + added = append(added, current[j]) + j++ + default: + maintained = append(maintained, current[j]) + i++ + j++ + } + } + // All remaining current addresses are added + added = append(added, current[j:]...) + + // All remaining previous addresses are removed + removed = append(removed, prev[i:]...) + return +} + +// trimHostAddrList trims the address list to fit within the maximum size +func trimHostAddrList(addrs []ma.Multiaddr, maxSize int) []ma.Multiaddr { + totalSize := 0 + for _, a := range addrs { + totalSize += len(a.Bytes()) + } + if totalSize <= maxSize { + return addrs + } + + score := func(addr ma.Multiaddr) int { + var res int + if manet.IsPublicAddr(addr) { + res |= 1 << 12 + } else if !manet.IsIPLoopback(addr) { + res |= 1 << 11 + } + var protocolWeight int + ma.ForEach(addr, func(c ma.Component) bool { + switch c.Protocol().Code { + case ma.P_QUIC_V1: + protocolWeight = 5 + case ma.P_TCP: + protocolWeight = 4 + case ma.P_WSS: + protocolWeight = 3 + case ma.P_WEBTRANSPORT: + protocolWeight = 2 + case ma.P_WEBRTC_DIRECT: + protocolWeight = 1 + case ma.P_P2P: + return false + } + return true + }) + res |= 1 << protocolWeight + return res + } + + slices.SortStableFunc(addrs, func(a, b ma.Multiaddr) int { + return score(b) - score(a) // b-a for reverse order + }) + totalSize = 0 + for i, a := range addrs { + totalSize += len(a.Bytes()) + if totalSize > maxSize { + addrs = addrs[:i] + break + } + } + return addrs +} + const interfaceAddrsCacheTTL = time.Minute type interfaceAddrsCache struct { diff --git a/p2p/host/basic/addrs_manager_test.go b/p2p/host/basic/addrs_manager_test.go index 442979dc16..23daee115c 100644 --- a/p2p/host/basic/addrs_manager_test.go +++ b/p2p/host/basic/addrs_manager_test.go @@ -2,6 +2,7 @@ package basichost import ( "context" + "crypto/rand" "errors" "fmt" "slices" @@ -9,9 +10,13 @@ import ( "testing" "time" + "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/p2p/host/eventbus" + "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem" "github.com/libp2p/go-libp2p/p2p/protocol/autonatv2" ma "github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multiaddr/matest" @@ -52,6 +57,13 @@ func (m *mockObservedAddrs) AddrsFor(local ma.Multiaddr) []ma.Multiaddr { return var _ ObservedAddrsManager = &mockObservedAddrs{} +type addrStoreArgs struct { + AddrStore addrStore + SignKey crypto.PrivKey + HostID peer.ID + DisableSignedPeerRecord bool +} + type addrsManagerArgs struct { NATManager NATManager AddrsFactory AddrsFactory @@ -60,6 +72,7 @@ type addrsManagerArgs struct { AddCertHashes func([]ma.Multiaddr) []ma.Multiaddr AutoNATClient autonatv2Client Bus event.Bus + AddrStoreArgs addrStoreArgs } type addrsManagerTestCase struct { @@ -76,7 +89,6 @@ func newAddrsManagerTestCase(tb testing.TB, args addrsManagerArgs) addrsManagerT if args.AddrsFactory == nil { args.AddrsFactory = func(addrs []ma.Multiaddr) []ma.Multiaddr { return addrs } } - addrsUpdatedChan := make(chan struct{}, 1) addCertHashes := func(addrs []ma.Multiaddr) []ma.Multiaddr { return addrs @@ -84,6 +96,18 @@ func newAddrsManagerTestCase(tb testing.TB, args addrsManagerArgs) addrsManagerT if args.AddCertHashes != nil { addCertHashes = args.AddCertHashes } + signKey := args.AddrStoreArgs.SignKey + addrStore := args.AddrStoreArgs.AddrStore + pid := args.AddrStoreArgs.HostID + if args.AddrStoreArgs == (addrStoreArgs{}) { + var err error + signKey, _, err = crypto.GenerateEd25519Key(rand.Reader) + require.NoError(tb, err) + addrStore, err = pstoremem.NewPeerstore() + require.NoError(tb, err) + pid, err = peer.IDFromPrivateKey(signKey) + require.NoError(tb, err) + } am, err := newAddrsManager( eb, args.NATManager, @@ -91,10 +115,13 @@ func newAddrsManagerTestCase(tb testing.TB, args addrsManagerArgs) addrsManagerT args.ListenAddrs, addCertHashes, args.ObservedAddrsManager, - addrsUpdatedChan, args.AutoNATClient, true, prometheus.DefaultRegisterer, + false, + signKey, + addrStore, + pid, ) require.NoError(tb, err) @@ -176,6 +203,7 @@ func TestAddrsManager(t *testing.T) { assert.ElementsMatch(collect, am.Addrs(), expected, "%s\n%s", am.Addrs(), expected) }, 5*time.Second, 50*time.Millisecond) }) + t.Run("nat returns unspecified addr", func(t *testing.T) { quicPort1 := ma.StringCast("/ip4/3.3.3.3/udp/1/quic-v1") // port from nat, IP from observed addr @@ -417,6 +445,47 @@ func TestAddrsManagerReachabilityEvent(t *testing.T) { } } +func TestAddrsManagerPeerstoreUpdated(t *testing.T) { + quic1 := ma.StringCast("/ip4/1.2.3.4/udp/1234/quic-v1") + quic2 := ma.StringCast("/ip4/1.2.3.5/udp/1/quic-v1") + + pstore, err := pstoremem.NewPeerstore() + require.NoError(t, err) + cab, _ := peerstore.GetCertifiedAddrBook(pstore) + signKey, _, err := crypto.GenerateEd25519Key(rand.Reader) + require.NoError(t, err) + pid, err := peer.IDFromPrivateKey(signKey) + require.NoError(t, err) + + var update atomic.Bool + am := newAddrsManagerTestCase(t, addrsManagerArgs{ + ListenAddrs: func() []ma.Multiaddr { return nil }, + AddrsFactory: func([]ma.Multiaddr) []ma.Multiaddr { + if !update.Load() { + return []ma.Multiaddr{quic1} + } + return []ma.Multiaddr{quic2} + }, + AddrStoreArgs: addrStoreArgs{ + AddrStore: pstore, + HostID: pid, + SignKey: signKey, + }, + }) + defer am.Close() + matest.AssertEqualMultiaddrs(t, []ma.Multiaddr{quic1}, pstore.Addrs(pid)) + ev := cab.GetPeerRecord(pid) + pr := peerRecordFromEnvelope(t, ev) + require.Equal(t, pr.Addrs, []ma.Multiaddr{quic1}) + update.Store(true) + am.updateAddrsSync() + matest.AssertEqualMultiaddrs(t, []ma.Multiaddr{quic2}, pstore.Addrs(pid)) + ev = cab.GetPeerRecord(pid) + pr = peerRecordFromEnvelope(t, ev) + require.Equal(t, pr.Addrs, []ma.Multiaddr{quic2}) + +} + func TestRemoveIfNotInSource(t *testing.T) { var addrs []ma.Multiaddr for i := 0; i < 10; i++ { diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index 96e7ed8be1..d98aa0e337 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -6,19 +6,16 @@ import ( "fmt" "io" "log/slog" - "slices" "sync" "time" "github.com/libp2p/go-libp2p/core/connmgr" - "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/core/protocol" - "github.com/libp2p/go-libp2p/core/record" "github.com/libp2p/go-libp2p/p2p/host/autonat" "github.com/libp2p/go-libp2p/p2p/host/eventbus" "github.com/libp2p/go-libp2p/p2p/host/pstoremanager" @@ -32,7 +29,6 @@ import ( logging "github.com/libp2p/go-libp2p/gologshim" ma "github.com/multiformats/go-multiaddr" - manet "github.com/multiformats/go-multiaddr/net" msmux "github.com/multiformats/go-multistream" ) @@ -46,8 +42,6 @@ var ( DefaultAddrsFactory = func(addrs []ma.Multiaddr) []ma.Multiaddr { return addrs } ) -const maxPeerRecordSize = 8 * 1024 // 8k to be compatible with identify's limit - // AddrsFactory functions can be passed to New in order to override // addresses returned by Addrs. type AddrsFactory func([]ma.Multiaddr) []ma.Multiaddr @@ -79,19 +73,13 @@ type BasicHost struct { emitters struct { evtLocalProtocolsUpdated event.Emitter - evtLocalAddrsUpdated event.Emitter } - disableSignedPeerRecord bool - signKey crypto.PrivKey - caBook peerstore.CertifiedAddrBook - autoNATMx sync.RWMutex autoNat autonat.AutoNAT - autonatv2 *autonatv2.AutoNAT - addressManager *addrsManager - addrsUpdatedChan chan struct{} + autonatv2 *autonatv2.AutoNAT + addressManager *addrsManager } var _ host.Host = (*BasicHost)(nil) @@ -173,23 +161,18 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) { hostCtx, cancel := context.WithCancel(context.Background()) h := &BasicHost{ - network: n, - psManager: psManager, - mux: msmux.NewMultistreamMuxer[protocol.ID](), - negtimeout: DefaultNegotiationTimeout, - eventbus: opts.EventBus, - ctx: hostCtx, - ctxCancel: cancel, - disableSignedPeerRecord: opts.DisableSignedPeerRecord, - addrsUpdatedChan: make(chan struct{}, 1), + network: n, + psManager: psManager, + mux: msmux.NewMultistreamMuxer[protocol.ID](), + negtimeout: DefaultNegotiationTimeout, + eventbus: opts.EventBus, + ctx: hostCtx, + ctxCancel: cancel, } if h.emitters.evtLocalProtocolsUpdated, err = h.eventbus.Emitter(&event.EvtLocalProtocolsUpdated{}, eventbus.Stateful); err != nil { return nil, err } - if h.emitters.evtLocalAddrsUpdated, err = h.eventbus.Emitter(&event.EvtLocalAddressesUpdated{}, eventbus.Stateful); err != nil { - return nil, err - } if opts.MultistreamMuxer != nil { h.mux = opts.MultistreamMuxer @@ -201,7 +184,7 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) { } // we can't set this as a default above because it depends on the *BasicHost. - if h.disableSignedPeerRecord { + if opts.DisableSignedPeerRecord { idOpts = append(idOpts, identify.DisableSignedPeerRecord()) } if opts.EnableMetrics { @@ -251,10 +234,13 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) { h.Network().ListenAddresses, addCertHashesFunc, opts.ObservedAddrsManager, - h.addrsUpdatedChan, autonatv2Client, opts.EnableMetrics, opts.PrometheusRegisterer, + opts.DisableSignedPeerRecord, + h.Peerstore().PrivKey(h.ID()), + h.Peerstore(), + h.ID(), ) if err != nil { return nil, fmt.Errorf("failed to create address service: %w", err) @@ -299,22 +285,6 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) { h.pings = ping.NewPingService(h) } - if !h.disableSignedPeerRecord { - h.signKey = h.Peerstore().PrivKey(h.ID()) - cab, ok := peerstore.GetCertifiedAddrBook(h.Peerstore()) - if !ok { - return nil, errors.New("peerstore should also be a certified address book") - } - h.caBook = cab - - rec, err := h.makeSignedPeerRecord(h.addressManager.Addrs()) - if err != nil { - return nil, fmt.Errorf("failed to create signed record for self: %w", err) - } - if _, err := h.caBook.ConsumePeerRecord(rec, peerstore.PermanentAddrTTL); err != nil { - return nil, fmt.Errorf("failed to persist signed record to peerstore: %w", err) - } - } n.SetStreamHandler(h.newStreamHandler) return h, nil @@ -337,21 +307,7 @@ func (h *BasicHost) Start() { log.Error("address service failed to start", "err", err) } - if !h.disableSignedPeerRecord { - // Ensure we have the correct peer record after Start returns - rec, err := h.makeSignedPeerRecord(h.addressManager.Addrs()) - if err != nil { - log.Error("failed to create signed record", "err", err) - } - if _, err := h.caBook.ConsumePeerRecord(rec, peerstore.PermanentAddrTTL); err != nil { - log.Error("failed to persist signed record to peerstore", "err", err) - } - } - h.ids.Start() - - h.refCount.Add(1) - go h.background() } // newStreamHandler is the remote-opened stream handler for network.Network @@ -402,117 +358,6 @@ func (h *BasicHost) newStreamHandler(s network.Stream) { handle(protoID, s) } -func (h *BasicHost) makeUpdatedAddrEvent(prev, current []ma.Multiaddr) *event.EvtLocalAddressesUpdated { - if prev == nil && current == nil { - return nil - } - prevmap := make(map[string]ma.Multiaddr, len(prev)) - currmap := make(map[string]ma.Multiaddr, len(current)) - evt := &event.EvtLocalAddressesUpdated{Diffs: true} - addrsAdded := false - - for _, addr := range prev { - prevmap[string(addr.Bytes())] = addr - } - for _, addr := range current { - currmap[string(addr.Bytes())] = addr - } - for _, addr := range currmap { - _, ok := prevmap[string(addr.Bytes())] - updated := event.UpdatedAddress{Address: addr} - if ok { - updated.Action = event.Maintained - } else { - updated.Action = event.Added - addrsAdded = true - } - evt.Current = append(evt.Current, updated) - delete(prevmap, string(addr.Bytes())) - } - for _, addr := range prevmap { - updated := event.UpdatedAddress{Action: event.Removed, Address: addr} - evt.Removed = append(evt.Removed, updated) - } - - if !addrsAdded && len(evt.Removed) == 0 { - return nil - } - - // Our addresses have changed. Make a new signed peer record. - if !h.disableSignedPeerRecord { - // add signed peer record to the event - sr, err := h.makeSignedPeerRecord(current) - if err != nil { - log.Error("error creating a signed peer record from the set of current addresses", "err", err) - // drop this change - return nil - } - evt.SignedPeerRecord = sr - } - - return evt -} - -func (h *BasicHost) makeSignedPeerRecord(addrs []ma.Multiaddr) (*record.Envelope, error) { - // Limit the length of currentAddrs to ensure that our signed peer records aren't rejected - peerRecordSize := 64 // HostID - k, err := h.signKey.Raw() - if err != nil { - peerRecordSize += 2 * len(k) // 1 for signature, 1 for public key - } - // we want the final address list to be small for keeping the signed peer record in size - addrs = trimHostAddrList(addrs, maxPeerRecordSize-peerRecordSize-256) // 256 B of buffer - rec := peer.PeerRecordFromAddrInfo(peer.AddrInfo{ - ID: h.ID(), - Addrs: addrs, - }) - return record.Seal(rec, h.signKey) -} - -func (h *BasicHost) background() { - defer h.refCount.Done() - var lastAddrs []ma.Multiaddr - - emitAddrChange := func(currentAddrs []ma.Multiaddr, lastAddrs []ma.Multiaddr) { - changeEvt := h.makeUpdatedAddrEvent(lastAddrs, currentAddrs) - if changeEvt == nil { - return - } - // Our addresses have changed. - // store the signed peer record in the peer store. - if !h.disableSignedPeerRecord { - if _, err := h.caBook.ConsumePeerRecord(changeEvt.SignedPeerRecord, peerstore.PermanentAddrTTL); err != nil { - log.Error("failed to persist signed peer record in peer store", "err", err) - return - } - } - // update host addresses in the peer store - removedAddrs := make([]ma.Multiaddr, 0, len(changeEvt.Removed)) - for _, ua := range changeEvt.Removed { - removedAddrs = append(removedAddrs, ua.Address) - } - h.Peerstore().SetAddrs(h.ID(), currentAddrs, peerstore.PermanentAddrTTL) - h.Peerstore().SetAddrs(h.ID(), removedAddrs, 0) - - // emit addr change event - if err := h.emitters.evtLocalAddrsUpdated.Emit(*changeEvt); err != nil { - log.Warn("error emitting event for updated addrs", "err", err) - } - } - - for { - curr := h.Addrs() - emitAddrChange(curr, lastAddrs) - lastAddrs = curr - - select { - case <-h.addrsUpdatedChan: - case <-h.ctx.Done(): - return - } - } -} - // ID returns the (local) peer.ID associated with this Host func (h *BasicHost) ID() peer.ID { return h.Network().LocalPeer() @@ -755,58 +600,6 @@ func (h *BasicHost) ConfirmedAddrs() (reachable []ma.Multiaddr, unreachable []ma return h.addressManager.ConfirmedAddrs() } -func trimHostAddrList(addrs []ma.Multiaddr, maxSize int) []ma.Multiaddr { - totalSize := 0 - for _, a := range addrs { - totalSize += len(a.Bytes()) - } - if totalSize <= maxSize { - return addrs - } - - score := func(addr ma.Multiaddr) int { - var res int - if manet.IsPublicAddr(addr) { - res |= 1 << 12 - } else if !manet.IsIPLoopback(addr) { - res |= 1 << 11 - } - var protocolWeight int - ma.ForEach(addr, func(c ma.Component) bool { - switch c.Protocol().Code { - case ma.P_QUIC_V1: - protocolWeight = 5 - case ma.P_TCP: - protocolWeight = 4 - case ma.P_WSS: - protocolWeight = 3 - case ma.P_WEBTRANSPORT: - protocolWeight = 2 - case ma.P_WEBRTC_DIRECT: - protocolWeight = 1 - case ma.P_P2P: - return false - } - return true - }) - res |= 1 << protocolWeight - return res - } - - slices.SortStableFunc(addrs, func(a, b ma.Multiaddr) int { - return score(b) - score(a) // b-a for reverse order - }) - totalSize = 0 - for i, a := range addrs { - totalSize += len(a.Bytes()) - if totalSize > maxSize { - addrs = addrs[:i] - break - } - } - return addrs -} - // SetAutoNat sets the autonat service for the host. func (h *BasicHost) SetAutoNat(a autonat.AutoNAT) { h.autoNATMx.Lock() @@ -855,7 +648,6 @@ func (h *BasicHost) Close() error { } _ = h.emitters.evtLocalProtocolsUpdated.Close() - _ = h.emitters.evtLocalAddrsUpdated.Close() if err := h.network.Close(); err != nil { log.Error("swarm close failed", "err", err) diff --git a/p2p/host/basic/basic_host_test.go b/p2p/host/basic/basic_host_test.go index b8c69a0b28..648356a5a4 100644 --- a/p2p/host/basic/basic_host_test.go +++ b/p2p/host/basic/basic_host_test.go @@ -8,7 +8,6 @@ import ( "reflect" "strings" "sync" - "sync/atomic" "testing" "time" @@ -600,14 +599,7 @@ func TestAddrChangeImmediatelyIfAddressNonEmpty(t *testing.T) { ctx := context.Background() taddrs := []ma.Multiaddr{ma.StringCast("/ip4/1.2.3.4/tcp/1234")} - starting := make(chan struct{}, 1) - var count atomic.Int32 h, err := NewHost(swarmt.GenSwarm(t), &HostOpts{AddrsFactory: func(addrs []ma.Multiaddr) []ma.Multiaddr { - // The first call here is made from the constructor. Don't block. - if count.Add(1) == 1 { - return addrs - } - <-starting return taddrs }}) require.NoError(t, err) @@ -618,7 +610,6 @@ func TestAddrChangeImmediatelyIfAddressNonEmpty(t *testing.T) { t.Error(err) } defer sub.Close() - close(starting) h.Start() expected := event.EvtLocalAddressesUpdated{