Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions share/shwap/p2p/shrex/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ package shrex

import (
"context"
"errors"
"fmt"
"io"
"time"

"github.com/libp2p/go-libp2p/core/host"
Expand Down Expand Up @@ -104,9 +102,6 @@ func (c *Client) doRequest(
var statusResp shrexpb.Response
_, err = serde.Read(stream, &statusResp)
if err != nil {
if errors.Is(err, io.EOF) {
return statusRateLimited, fmt.Errorf("reading a response: %w", ErrRateLimited)
}
return statusReadStatusErr, fmt.Errorf("unexpected error during reading the status from stream: %w", err)
}

Expand Down
2 changes: 0 additions & 2 deletions share/shwap/p2p/shrex/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
// available at the moment. The request may be retried later, but it's unlikely to succeed.
var ErrNotFound = errors.New("the requested data or resource could not be found")

var ErrRateLimited = errors.New("server is overloaded and rate limited the request")

// ErrInvalidResponse is returned when a peer returns an invalid response or caused an internal
// error. It is used to signal that the peer couldn't serve the data successfully, and should not be
// retried.
Expand Down
68 changes: 0 additions & 68 deletions share/shwap/p2p/shrex/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@ package shrex

import (
"context"
"sync"
"sync/atomic"
"testing"
"time"

libhost "github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -66,72 +64,6 @@ func TestExchange_RequestND_NotFound(t *testing.T) {
})
}

func TestExchange_RequestND(t *testing.T) {
t.Run("ND_concurrency_limit", func(t *testing.T) {
net, err := mocknet.FullMeshConnected(2)
require.NoError(t, err)

client, err := NewClient(DefaultClientParameters(), net.Hosts()[0])
require.NoError(t, err)
server, err := NewServer(DefaultServerParameters(), net.Hosts()[1], nil)
require.NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
t.Cleanup(cancel)

rateLimit := 2
wg := sync.WaitGroup{}
wg.Add(rateLimit)

// mockHandler will block requests on server side until test is over
lock := make(chan struct{})
defer close(lock)
mockHandler := func(network.Stream) {
wg.Done()
select {
case <-lock:
case <-ctx.Done():
t.Fatal("timeout")
}
}
middleware, err := newMiddleware(rateLimit)
require.NoError(t, err)
for _, reqID := range registry {
server.host.SetStreamHandler(
ProtocolID(server.params.NetworkID(), reqID().Name()),
middleware.rateLimitHandler(
ctx,
mockHandler,
nil,
reqID().Name(),
),
)
}

// take server concurrency slots with blocked requests
for i := range rateLimit {
go func(i int) {
namespace := libshare.RandomNamespace()
id, err := shwap.NewNamespaceDataID(1, namespace)
data := shwap.NamespaceData{}
require.NoError(t, err)

client.Get(ctx, &id, &data, server.host.ID()) //nolint:errcheck
}(i)
}

// wait until all server slots are taken
wg.Wait()
namespace := libshare.RandomNamespace()
id, err := shwap.NewNamespaceDataID(1, namespace)
data := shwap.NamespaceData{}
require.NoError(t, err)

err = client.Get(ctx, &id, &data, server.host.ID())
require.ErrorIs(t, err, ErrRateLimited)
})
}

func createMocknet(t *testing.T, amount int) []libhost.Host {
t.Helper()

Expand Down
11 changes: 1 addition & 10 deletions share/shwap/p2p/shrex/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ const (
statusSendReqErr status = "send_req_err"
statusReadStatusErr status = "read_status_err"
statusReadRespErr status = "read_resp_err"
statusRateLimited status = "rate_limited"

// statuses used by the server
statusReadReqErr status = "read_req_err"
Expand All @@ -37,7 +36,6 @@ const (

type Metrics struct {
totalRequestCounter metric.Int64Counter
rateLimiterCounter metric.Int64Counter
requestDuration metric.Float64Histogram
}

Expand Down Expand Up @@ -81,6 +79,7 @@ func InitClientMetrics() (*Metrics, error) {
if err != nil {
return nil, err
}

return &Metrics{
totalRequestCounter: totalRequestCounter,
requestDuration: requestDuration,
Expand All @@ -96,13 +95,6 @@ func InitServerMetrics() (*Metrics, error) {
return nil, err
}

rateLimiter, err := meter.Int64Counter("shrex_rate_limit_counter",
metric.WithDescription("concurrency limit of the shrex server"),
)
if err != nil {
return nil, err
}

requestDuration, err := meter.Float64Histogram(
"shrex_server_request_duration",
metric.WithDescription("Time taken to handle a shrex client request"),
Expand All @@ -112,7 +104,6 @@ func InitServerMetrics() (*Metrics, error) {
}
return &Metrics{
totalRequestCounter: totalRequestCounter,
rateLimiterCounter: rateLimiter,
requestDuration: requestDuration,
}, nil
}
52 changes: 0 additions & 52 deletions share/shwap/p2p/shrex/middleware.go

This file was deleted.

25 changes: 8 additions & 17 deletions share/shwap/p2p/shrex/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,8 @@ type Server struct {

store *store.Store

params *ServerParams
// TODO: decouple middleware metrics from shrex and remove middleware from Server
middleware *Middleware
metrics *Metrics
params *ServerParams
metrics *Metrics
}

// NewServer creates a new shrEx-Server. It configures the server with the provided
Expand All @@ -47,19 +45,13 @@ func NewServer(
return nil, fmt.Errorf("shrex/server: parameters are not valid: %w", err)
}

middleware, err := newMiddleware(params.ConcurrencyLimit)
if err != nil {
return nil, fmt.Errorf("shrex/server: could not initialize middleware: %w", err)
}

ctx, cancel := context.WithCancel(context.Background())
srv := &Server{
ctx: ctx,
cancel: cancel,
store: store,
host: host,
middleware: middleware,
params: params,
ctx: ctx,
cancel: cancel,
store: store,
host: host,
params: params,
}
return srv, nil
}
Expand All @@ -72,8 +64,7 @@ func (srv *Server) Start(_ context.Context) error {
for _, reqID := range registry {
id := reqID()
handler := srv.streamHandler(srv.ctx, reqID)
withRateLimit := srv.middleware.rateLimitHandler(srv.ctx, handler, srv.metrics, id.Name())
withRecovery := RecoveryMiddleware(withRateLimit)
withRecovery := RecoveryMiddleware(handler)

p := ProtocolID(srv.params.NetworkID(), id.Name())

Expand Down
Loading