File tree Expand file tree Collapse file tree 3 files changed +29
-21
lines changed Expand file tree Collapse file tree 3 files changed +29
-21
lines changed Original file line number Diff line number Diff line change @@ -212,9 +212,8 @@ func (c *Connection) shutdown() {
212
212
}
213
213
// Wait for other goroutines to finish
214
214
c .waitGroup .Wait ()
215
- // Close channels
215
+ // Close consumer error channel to signify connection shutdown
216
216
close (c .errorChan )
217
- close (c .protoErrorChan )
218
217
// We can only close a channel once, so we have to jump through a few hoops
219
218
select {
220
219
// The channel is either closed or has an item pending
Original file line number Diff line number Diff line change @@ -103,13 +103,6 @@ func (m *Muxer) Stop() {
103
103
_ = m .conn .Close ()
104
104
// Wait for other goroutines to shutdown
105
105
m .waitGroup .Wait ()
106
- // Close protocol receive channels
107
- // We rely on the individual mini-protocols to close the sender channel
108
- for _ , protocolRoles := range m .protocolReceivers {
109
- for _ , recvChan := range protocolRoles {
110
- close (recvChan )
111
- }
112
- }
113
106
// Close ErrorChan to signify to consumer that we're shutting down
114
107
close (m .errorChan )
115
108
})
@@ -161,7 +154,10 @@ func (m *Muxer) RegisterProtocol(
161
154
if ! ok {
162
155
return
163
156
}
164
- case msg := <- senderChan :
157
+ case msg , ok := <- senderChan :
158
+ if ! ok {
159
+ return
160
+ }
165
161
if err := m .Send (msg ); err != nil {
166
162
m .sendError (err )
167
163
return
@@ -200,7 +196,15 @@ func (m *Muxer) Send(msg *Segment) error {
200
196
// readLoop waits for incoming data on the connection, parses the segment, and passes it to the appropriate
201
197
// protocol
202
198
func (m * Muxer ) readLoop () {
203
- defer m .waitGroup .Done ()
199
+ defer func () {
200
+ m .waitGroup .Done ()
201
+ // Close receiver channels
202
+ for _ , protocolRoles := range m .protocolReceivers {
203
+ for _ , recvChan := range protocolRoles {
204
+ close (recvChan )
205
+ }
206
+ }
207
+ }()
204
208
started := false
205
209
for {
206
210
// Break out of read loop if we're shutting down
Original file line number Diff line number Diff line change @@ -131,11 +131,6 @@ func (p *Protocol) Start() {
131
131
<- p .doneChan
132
132
// Wait for all other goroutines to finish
133
133
p .waitGroup .Wait ()
134
- // Close channels
135
- close (p .sendQueueChan )
136
- close (p .sendStateQueueChan )
137
- close (p .recvReadyChan )
138
- close (p .sendReadyChan )
139
134
// Cancel any timer
140
135
if p .stateTransitionTimer != nil {
141
136
p .stateTransitionTimer .Stop ()
@@ -174,20 +169,30 @@ func (p *Protocol) SendMessage(msg Message) error {
174
169
175
170
// SendError sends an error to the handler in the Ouroboros object
176
171
func (p * Protocol ) SendError (err error ) {
177
- p .config .ErrorChan <- err
172
+ select {
173
+ case p .config .ErrorChan <- err :
174
+ default :
175
+ // Discard error if the buffer is full
176
+ // The connection will get closed on the first error, so any
177
+ // additional errors are unnecessary
178
+ return
179
+ }
178
180
}
179
181
180
182
func (p * Protocol ) sendLoop () {
181
- defer p .waitGroup .Done ()
183
+ defer func () {
184
+ p .waitGroup .Done ()
185
+ // Close muxer send channel
186
+ // We are responsible for closing this channel as the sender, even through it
187
+ // was created by the muxer
188
+ close (p .muxerSendChan )
189
+ }()
182
190
var setNewState bool
183
191
var newState State
184
192
var err error
185
193
for {
186
194
select {
187
195
case <- p .doneChan :
188
- // We are responsible for closing this channel as the sender, even through it
189
- // was created by the muxer
190
- close (p .muxerSendChan )
191
196
// Break out of send loop if we're shutting down
192
197
return
193
198
case <- p .sendReadyChan :
You can’t perform that action at this time.
0 commit comments