Skip to content

Commit a873303

Browse files
authored
feat: make recv queue size configurable for chainsync/blockfetch (#956)
Signed-off-by: Aurora Gaffney <[email protected]>
1 parent 6aadb95 commit a873303

File tree

6 files changed

+30
-0
lines changed

6 files changed

+30
-0
lines changed

protocol/blockfetch/blockfetch.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ type Config struct {
9292
RequestRangeFunc RequestRangeFunc
9393
BatchStartTimeout time.Duration
9494
BlockTimeout time.Duration
95+
RecvQueueSize int
9596
}
9697

9798
// Callback context
@@ -168,3 +169,11 @@ func WithBlockTimeout(timeout time.Duration) BlockFetchOptionFunc {
168169
c.BlockTimeout = timeout
169170
}
170171
}
172+
173+
// WithRecvQueueSize specifies the size of the received messages queue. This is useful to adjust
174+
// the number of blocks that can be fetched at once when acting as a client
175+
func WithRecvQueueSize(size int) BlockFetchOptionFunc {
176+
return func(c *Config) {
177+
c.RecvQueueSize = size
178+
}
179+
}

protocol/blockfetch/client.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,9 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
7575
StateMap: stateMap,
7676
InitialState: StateIdle,
7777
}
78+
if c.config != nil {
79+
protoConfig.RecvQueueSize = c.config.RecvQueueSize
80+
}
7881
c.Protocol = protocol.New(protoConfig)
7982
return c
8083
}

protocol/blockfetch/server.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ func (s *Server) initProtocol() {
5757
StateMap: StateMap,
5858
InitialState: StateIdle,
5959
}
60+
if s.config != nil {
61+
protoConfig.RecvQueueSize = s.config.RecvQueueSize
62+
}
6063
s.Protocol = protocol.New(protoConfig)
6164
}
6265

protocol/chainsync/chainsync.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@ type Config struct {
206206
IntersectTimeout time.Duration
207207
BlockTimeout time.Duration
208208
PipelineLimit int
209+
RecvQueueSize int
209210
}
210211

211212
// Callback context
@@ -320,3 +321,11 @@ func WithPipelineLimit(limit int) ChainSyncOptionFunc {
320321
c.PipelineLimit = limit
321322
}
322323
}
324+
325+
// WithRecvQueueSize specifies the size of the received messages queue. This is useful to adjust
326+
// the number of pipelined messages that can be supported when acting as a server
327+
func WithRecvQueueSize(size int) ChainSyncOptionFunc {
328+
return func(c *Config) {
329+
c.RecvQueueSize = size
330+
}
331+
}

protocol/chainsync/client.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,9 @@ func NewClient(
112112
StateContext: stateContext,
113113
InitialState: stateIdle,
114114
}
115+
if c.config != nil {
116+
protoConfig.RecvQueueSize = c.config.RecvQueueSize
117+
}
115118
c.Protocol = protocol.New(protoConfig)
116119
return c
117120
}

protocol/chainsync/server.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,9 @@ func (s *Server) initProtocol() {
7575
StateContext: s.stateContext,
7676
InitialState: stateIdle,
7777
}
78+
if s.config != nil {
79+
protoConfig.RecvQueueSize = s.config.RecvQueueSize
80+
}
7881
s.Protocol = protocol.New(protoConfig)
7982
}
8083

0 commit comments

Comments
 (0)