Skip to content

Commit dfa5289

Browse files
authored
refactor(share/shwap/p2p/shrex): Remove rate-limiting middleware (#4661)
1 parent e90fa29 commit dfa5289

File tree

6 files changed

+9
-154
lines changed

6 files changed

+9
-154
lines changed

share/shwap/p2p/shrex/client.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,7 @@ package shrex
22

33
import (
44
"context"
5-
"errors"
65
"fmt"
7-
"io"
86
"time"
97

108
"github.com/libp2p/go-libp2p/core/host"
@@ -104,9 +102,6 @@ func (c *Client) doRequest(
104102
var statusResp shrexpb.Response
105103
_, err = serde.Read(stream, &statusResp)
106104
if err != nil {
107-
if errors.Is(err, io.EOF) {
108-
return statusRateLimited, fmt.Errorf("reading a response: %w", ErrRateLimited)
109-
}
110105
return statusReadStatusErr, fmt.Errorf("unexpected error during reading the status from stream: %w", err)
111106
}
112107

share/shwap/p2p/shrex/errors.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@ import (
99
// available at the moment. The request may be retried later, but it's unlikely to succeed.
1010
var ErrNotFound = errors.New("the requested data or resource could not be found")
1111

12-
var ErrRateLimited = errors.New("server is overloaded and rate limited the request")
13-
1412
// ErrInvalidResponse is returned when a peer returns an invalid response or caused an internal
1513
// error. It is used to signal that the peer couldn't serve the data successfully, and should not be
1614
// retried.

share/shwap/p2p/shrex/exchange_test.go

Lines changed: 0 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,11 @@ package shrex
22

33
import (
44
"context"
5-
"sync"
65
"sync/atomic"
76
"testing"
87
"time"
98

109
libhost "github.com/libp2p/go-libp2p/core/host"
11-
"github.com/libp2p/go-libp2p/core/network"
1210
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
1311
"github.com/stretchr/testify/require"
1412

@@ -66,72 +64,6 @@ func TestExchange_RequestND_NotFound(t *testing.T) {
6664
})
6765
}
6866

69-
func TestExchange_RequestND(t *testing.T) {
70-
t.Run("ND_concurrency_limit", func(t *testing.T) {
71-
net, err := mocknet.FullMeshConnected(2)
72-
require.NoError(t, err)
73-
74-
client, err := NewClient(DefaultClientParameters(), net.Hosts()[0])
75-
require.NoError(t, err)
76-
server, err := NewServer(DefaultServerParameters(), net.Hosts()[1], nil)
77-
require.NoError(t, err)
78-
79-
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
80-
t.Cleanup(cancel)
81-
82-
rateLimit := 2
83-
wg := sync.WaitGroup{}
84-
wg.Add(rateLimit)
85-
86-
// mockHandler will block requests on server side until test is over
87-
lock := make(chan struct{})
88-
defer close(lock)
89-
mockHandler := func(network.Stream) {
90-
wg.Done()
91-
select {
92-
case <-lock:
93-
case <-ctx.Done():
94-
t.Fatal("timeout")
95-
}
96-
}
97-
middleware, err := newMiddleware(rateLimit)
98-
require.NoError(t, err)
99-
for _, reqID := range registry {
100-
server.host.SetStreamHandler(
101-
ProtocolID(server.params.NetworkID(), reqID().Name()),
102-
middleware.rateLimitHandler(
103-
ctx,
104-
mockHandler,
105-
nil,
106-
reqID().Name(),
107-
),
108-
)
109-
}
110-
111-
// take server concurrency slots with blocked requests
112-
for i := range rateLimit {
113-
go func(i int) {
114-
namespace := libshare.RandomNamespace()
115-
id, err := shwap.NewNamespaceDataID(1, namespace)
116-
data := shwap.NamespaceData{}
117-
require.NoError(t, err)
118-
119-
client.Get(ctx, &id, &data, server.host.ID()) //nolint:errcheck
120-
}(i)
121-
}
122-
123-
// wait until all server slots are taken
124-
wg.Wait()
125-
namespace := libshare.RandomNamespace()
126-
id, err := shwap.NewNamespaceDataID(1, namespace)
127-
data := shwap.NamespaceData{}
128-
require.NoError(t, err)
129-
130-
err = client.Get(ctx, &id, &data, server.host.ID())
131-
require.ErrorIs(t, err, ErrRateLimited)
132-
})
133-
}
134-
13567
func createMocknet(t *testing.T, amount int) []libhost.Host {
13668
t.Helper()
13769

share/shwap/p2p/shrex/metrics.go

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ const (
2121
statusSendReqErr status = "send_req_err"
2222
statusReadStatusErr status = "read_status_err"
2323
statusReadRespErr status = "read_resp_err"
24-
statusRateLimited status = "rate_limited"
2524

2625
// statuses used by the server
2726
statusReadReqErr status = "read_req_err"
@@ -37,7 +36,6 @@ const (
3736

3837
type Metrics struct {
3938
totalRequestCounter metric.Int64Counter
40-
rateLimiterCounter metric.Int64Counter
4139
requestDuration metric.Float64Histogram
4240
}
4341

@@ -81,6 +79,7 @@ func InitClientMetrics() (*Metrics, error) {
8179
if err != nil {
8280
return nil, err
8381
}
82+
8483
return &Metrics{
8584
totalRequestCounter: totalRequestCounter,
8685
requestDuration: requestDuration,
@@ -96,13 +95,6 @@ func InitServerMetrics() (*Metrics, error) {
9695
return nil, err
9796
}
9897

99-
rateLimiter, err := meter.Int64Counter("shrex_rate_limit_counter",
100-
metric.WithDescription("concurrency limit of the shrex server"),
101-
)
102-
if err != nil {
103-
return nil, err
104-
}
105-
10698
requestDuration, err := meter.Float64Histogram(
10799
"shrex_server_request_duration",
108100
metric.WithDescription("Time taken to handle a shrex client request"),
@@ -112,7 +104,6 @@ func InitServerMetrics() (*Metrics, error) {
112104
}
113105
return &Metrics{
114106
totalRequestCounter: totalRequestCounter,
115-
rateLimiterCounter: rateLimiter,
116107
requestDuration: requestDuration,
117108
}, nil
118109
}

share/shwap/p2p/shrex/middleware.go

Lines changed: 0 additions & 52 deletions
This file was deleted.

share/shwap/p2p/shrex/server.go

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,8 @@ type Server struct {
2929

3030
store *store.Store
3131

32-
params *ServerParams
33-
// TODO: decouple middleware metrics from shrex and remove middleware from Server
34-
middleware *Middleware
35-
metrics *Metrics
32+
params *ServerParams
33+
metrics *Metrics
3634
}
3735

3836
// NewServer creates a new shrEx-Server. It configures the server with the provided
@@ -47,19 +45,13 @@ func NewServer(
4745
return nil, fmt.Errorf("shrex/server: parameters are not valid: %w", err)
4846
}
4947

50-
middleware, err := newMiddleware(params.ConcurrencyLimit)
51-
if err != nil {
52-
return nil, fmt.Errorf("shrex/server: could not initialize middleware: %w", err)
53-
}
54-
5548
ctx, cancel := context.WithCancel(context.Background())
5649
srv := &Server{
57-
ctx: ctx,
58-
cancel: cancel,
59-
store: store,
60-
host: host,
61-
middleware: middleware,
62-
params: params,
50+
ctx: ctx,
51+
cancel: cancel,
52+
store: store,
53+
host: host,
54+
params: params,
6355
}
6456
return srv, nil
6557
}
@@ -72,8 +64,7 @@ func (srv *Server) Start(_ context.Context) error {
7264
for _, reqID := range registry {
7365
id := reqID()
7466
handler := srv.streamHandler(srv.ctx, reqID)
75-
withRateLimit := srv.middleware.rateLimitHandler(srv.ctx, handler, srv.metrics, id.Name())
76-
withRecovery := RecoveryMiddleware(withRateLimit)
67+
withRecovery := RecoveryMiddleware(handler)
7768

7869
p := ProtocolID(srv.params.NetworkID(), id.Name())
7970

0 commit comments

Comments
 (0)