Skip to content

Commit 366f2b7

Browse files
authored
Merge pull request #925 from CortexFoundation/dev
RPC sync shutdown & track deletions more accurately
2 parents e388608 + 15b1b05 commit 366f2b7

File tree

7 files changed

+199
-43
lines changed

7 files changed

+199
-43
lines changed

core/state/snapshot/snapshot.go

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -481,8 +481,17 @@ func diffToDisk(bottom *diffLayer) *diskLayer {
481481
if key := it.Key(); len(key) == 65 { // TODO(karalabe): Yuck, we should move this into the iterator
482482
batch.Delete(key)
483483
base.cache.Del(key[1:])
484-
485484
snapshotFlushStorageItemMeter.Mark(1)
485+
486+
// Ensure we don't delete too much data blindly (contract can be
487+
// huge). It's ok to flush, the root will go missing in case of a
488+
// crash and we'll detect and regenerate the snapshot.
489+
if batch.ValueSize() > ctxcdb.IdealBatchSize {
490+
if err := batch.Write(); err != nil {
491+
log.Crit("Failed to write storage deletions", "err", err)
492+
}
493+
batch.Reset()
494+
}
486495
}
487496
}
488497
it.Release()
@@ -500,6 +509,16 @@ func diffToDisk(bottom *diffLayer) *diskLayer {
500509

501510
snapshotFlushAccountItemMeter.Mark(1)
502511
snapshotFlushAccountSizeMeter.Mark(int64(len(data)))
512+
513+
// Ensure we don't write too much data blindly. It's ok to flush, the
514+
// root will go missing in case of a crash and we'll detect and regen
515+
// the snapshot.
516+
if batch.ValueSize() > ctxcdb.IdealBatchSize {
517+
if err := batch.Write(); err != nil {
518+
log.Crit("Failed to write storage deletions", "err", err)
519+
}
520+
batch.Reset()
521+
}
503522
}
504523
// Push all the storage slots into the database
505524
for accountHash, storage := range bottom.storageData {

ctxcdb/leveldb/leveldb.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -432,7 +432,7 @@ func (b *batch) Put(key, value []byte) error {
432432
// Delete inserts the a key removal into the batch for later committing.
433433
func (b *batch) Delete(key []byte) error {
434434
b.b.Delete(key)
435-
b.size++
435+
b.size += len(key)
436436
return nil
437437
}
438438

ctxcdb/memorydb/memorydb.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ func (b *batch) Put(key, value []byte) error {
206206
// Delete inserts the a key removal into the batch for later committing.
207207
func (b *batch) Delete(key []byte) error {
208208
b.writes = append(b.writes, keyvalue{common.CopyBytes(key), nil, true})
209-
b.size += 1
209+
b.size += len(key)
210210
return nil
211211
}
212212

rpc/client_test.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package rpc
1818

1919
import (
2020
"context"
21+
"encoding/json"
2122
"fmt"
2223
"math/rand"
2324
"net"
@@ -376,6 +377,93 @@ func TestClientCloseUnsubscribeRace(t *testing.T) {
376377
}
377378
}
378379

380+
// unsubscribeRecorder collects the subscription IDs of *_unsubscribe calls.
381+
type unsubscribeRecorder struct {
382+
ServerCodec
383+
unsubscribes map[string]bool
384+
}
385+
386+
func (r *unsubscribeRecorder) readBatch() ([]*jsonrpcMessage, bool, error) {
387+
if r.unsubscribes == nil {
388+
r.unsubscribes = make(map[string]bool)
389+
}
390+
391+
msgs, batch, err := r.ServerCodec.readBatch()
392+
for _, msg := range msgs {
393+
if msg.isUnsubscribe() {
394+
var params []string
395+
if err := json.Unmarshal(msg.Params, &params); err != nil {
396+
panic("unsubscribe decode error: " + err.Error())
397+
}
398+
r.unsubscribes[params[0]] = true
399+
}
400+
}
401+
return msgs, batch, err
402+
}
403+
404+
// This checks that Client calls the _unsubscribe method on the server when Unsubscribe is
405+
// called on a subscription.
406+
func TestClientSubscriptionUnsubscribeServer(t *testing.T) {
407+
t.Parallel()
408+
409+
// Create the server.
410+
srv := NewServer()
411+
srv.RegisterName("nftest", new(notificationTestService))
412+
p1, p2 := net.Pipe()
413+
recorder := &unsubscribeRecorder{ServerCodec: NewCodec(p1)}
414+
go srv.ServeCodec(recorder, OptionMethodInvocation|OptionSubscriptions)
415+
defer srv.Stop()
416+
417+
// Create the client on the other end of the pipe.
418+
client, _ := newClient(context.Background(), func(context.Context) (ServerCodec, error) {
419+
return NewCodec(p2), nil
420+
})
421+
defer client.Close()
422+
423+
// Create the subscription.
424+
ch := make(chan int)
425+
sub, err := client.Subscribe(context.Background(), "nftest", ch, "someSubscription", 1, 1)
426+
if err != nil {
427+
t.Fatal(err)
428+
}
429+
430+
// Unsubscribe and check that unsubscribe was called.
431+
sub.Unsubscribe()
432+
if !recorder.unsubscribes[sub.subid] {
433+
t.Fatal("client did not call unsubscribe method")
434+
}
435+
if _, open := <-sub.Err(); open {
436+
t.Fatal("subscription error channel not closed after unsubscribe")
437+
}
438+
}
439+
440+
// This checks that the subscribed channel can be closed after Unsubscribe.
441+
// It is the reproducer for https://github.com/ethereum/go-ethereum/issues/22322
442+
func TestClientSubscriptionChannelClose(t *testing.T) {
443+
t.Parallel()
444+
445+
var (
446+
srv = NewServer()
447+
httpsrv = httptest.NewServer(srv.WebsocketHandler(nil))
448+
wsURL = "ws:" + strings.TrimPrefix(httpsrv.URL, "http:")
449+
)
450+
defer srv.Stop()
451+
defer httpsrv.Close()
452+
453+
srv.RegisterName("nftest", new(notificationTestService))
454+
client, _ := Dial(wsURL)
455+
456+
for i := 0; i < 100; i++ {
457+
ch := make(chan int, 100)
458+
sub, err := client.Subscribe(context.Background(), "nftest", ch, "someSubscription", maxClientSubscriptionBuffer-1, 1)
459+
if err != nil {
460+
t.Fatal(err)
461+
}
462+
sub.Unsubscribe()
463+
close(ch)
464+
}
465+
}
466+
379467
// This test checks that Client doesn't lock up when a single subscriber
380468
// doesn't read subscription events.
381469
func TestClientNotificationStorm(t *testing.T) {

rpc/handler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ func (h *handler) cancelAllRequests(err error, inflightReq *requestOp) {
189189
}
190190
for id, sub := range h.clientSubs {
191191
delete(h.clientSubs, id)
192-
sub.quitWithError(false, err)
192+
sub.close(err)
193193
}
194194
}
195195

@@ -281,7 +281,7 @@ func (h *handler) handleResponse(msg *jsonrpcMessage) {
281281
return
282282
}
283283
if op.err = json.Unmarshal(msg.Result, &op.sub.subid); op.err == nil {
284-
go op.sub.start()
284+
go op.sub.run()
285285
h.clientSubs[op.sub.subid] = op.sub
286286
}
287287
}

rpc/subscription.go

Lines changed: 83 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -208,33 +208,47 @@ type ClientSubscription struct {
208208
channel reflect.Value
209209
namespace string
210210
subid string
211-
in chan json.RawMessage
212211

213-
quitOnce sync.Once // ensures quit is closed once
214-
quit chan struct{} // quit is closed when the subscription exits
215-
errOnce sync.Once // ensures err is closed once
216-
err chan error
212+
// The in channel receives notification values from client dispatcher.
213+
in chan json.RawMessage
214+
215+
// The error channel receives the error from the forwarding loop.
216+
// It is closed by Unsubscribe.
217+
err chan error
218+
errOnce sync.Once
219+
220+
// Closing of the subscription is requested by sending on 'quit'. This is handled by
221+
// the forwarding loop, which closes 'forwardDone' when it has stopped sending to
222+
// sub.channel. Finally, 'unsubDone' is closed after unsubscribing on the server side.
223+
quit chan error
224+
forwardDone chan struct{}
225+
unsubDone chan struct{}
217226
}
218227

228+
// This is the sentinel value sent on sub.quit when Unsubscribe is called.
229+
var errUnsubscribed = errors.New("unsubscribed")
230+
219231
func newClientSubscription(c *Client, namespace string, channel reflect.Value) *ClientSubscription {
220232
sub := &ClientSubscription{
221-
client: c,
222-
namespace: namespace,
223-
etype: channel.Type().Elem(),
224-
channel: channel,
225-
quit: make(chan struct{}),
226-
err: make(chan error, 1),
227-
in: make(chan json.RawMessage),
233+
client: c,
234+
namespace: namespace,
235+
etype: channel.Type().Elem(),
236+
channel: channel,
237+
in: make(chan json.RawMessage),
238+
quit: make(chan error),
239+
forwardDone: make(chan struct{}),
240+
unsubDone: make(chan struct{}),
241+
err: make(chan error, 1),
228242
}
229243
return sub
230244
}
231245

232246
// Err returns the subscription error channel. The intended use of Err is to schedule
233247
// resubscription when the client connection is closed unexpectedly.
234248
//
235-
// The error channel receives a value when the subscription has ended due
236-
// to an error. The received error is nil if Close has been called
237-
// on the underlying client and no other error has occurred.
249+
// The error channel receives a value when the subscription has ended due to an error. The
250+
// received error is nil if Close has been called on the underlying client and no other
251+
// error has occurred.
238252
//
239253
// The error channel is closed when Unsubscribe is called on the subscription.
240254
func (sub *ClientSubscription) Err() <-chan error {
@@ -244,49 +258,71 @@ func (sub *ClientSubscription) Err() <-chan error {
244258
// Unsubscribe unsubscribes the notification and closes the error channel.
245259
// It can safely be called more than once.
246260
func (sub *ClientSubscription) Unsubscribe() {
247-
sub.quitWithError(true, nil)
248-
sub.errOnce.Do(func() { close(sub.err) })
249-
}
250-
251-
func (sub *ClientSubscription) quitWithError(unsubscribeServer bool, err error) {
252-
sub.quitOnce.Do(func() {
253-
// The dispatch loop won't be able to execute the unsubscribe call
254-
// if it is blocked on deliver. Close sub.quit first because it
255-
// unblocks deliver.
256-
close(sub.quit)
257-
if unsubscribeServer {
258-
sub.requestUnsubscribe()
259-
}
260-
if err != nil {
261-
if err == ErrClientQuit {
262-
err = nil // Adhere to subscription semantics.
263-
}
264-
sub.err <- err
261+
sub.errOnce.Do(func() {
262+
select {
263+
case sub.quit <- errUnsubscribed:
264+
<-sub.unsubDone
265+
case <-sub.unsubDone:
265266
}
267+
close(sub.err)
266268
})
267269
}
268270

271+
// deliver is called by the client's message dispatcher to send a notification value.
269272
func (sub *ClientSubscription) deliver(result json.RawMessage) (ok bool) {
270273
select {
271274
case sub.in <- result:
272275
return true
273-
case <-sub.quit:
276+
case <-sub.forwardDone:
274277
return false
275278
}
276279
}
277280

278-
func (sub *ClientSubscription) start() {
279-
sub.quitWithError(sub.forward())
281+
// close is called by the client's message dispatcher when the connection is closed.
282+
func (sub *ClientSubscription) close(err error) {
283+
select {
284+
case sub.quit <- err:
285+
case <-sub.forwardDone:
286+
}
287+
}
288+
289+
// run is the forwarding loop of the subscription. It runs in its own goroutine and
290+
// is launched by the client's handler after the subscription has been created.
291+
func (sub *ClientSubscription) run() {
292+
defer close(sub.unsubDone)
293+
294+
unsubscribe, err := sub.forward()
295+
296+
// The client's dispatch loop won't be able to execute the unsubscribe call if it is
297+
// blocked in sub.deliver() or sub.close(). Closing forwardDone unblocks them.
298+
close(sub.forwardDone)
299+
300+
// Call the unsubscribe method on the server.
301+
if unsubscribe {
302+
sub.requestUnsubscribe()
303+
}
304+
305+
// Send the error.
306+
if err != nil {
307+
if err == ErrClientQuit {
308+
// ErrClientQuit gets here when Client.Close is called. This is reported as a
309+
// nil error because it's not an error, but we can't close sub.err here.
310+
err = nil
311+
}
312+
sub.err <- err
313+
}
280314
}
281315

316+
// forward is the forwarding loop. It takes in RPC notifications and sends them
317+
// on the subscription channel.
282318
func (sub *ClientSubscription) forward() (unsubscribeServer bool, err error) {
283319
cases := []reflect.SelectCase{
284320
{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.quit)},
285321
{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.in)},
286322
{Dir: reflect.SelectSend, Chan: sub.channel},
287323
}
288324
buffer := list.New()
289-
defer buffer.Init()
325+
290326
for {
291327
var chosen int
292328
var recv reflect.Value
@@ -301,7 +337,15 @@ func (sub *ClientSubscription) forward() (unsubscribeServer bool, err error) {
301337

302338
switch chosen {
303339
case 0: // <-sub.quit
304-
return false, nil
340+
if !recv.IsNil() {
341+
err = recv.Interface().(error)
342+
}
343+
if err == errUnsubscribed {
344+
// Exiting because Unsubscribe was called, unsubscribe on server.
345+
return true, nil
346+
}
347+
return false, err
348+
305349
case 1: // <-sub.in
306350
val, err := sub.unmarshal(recv.Interface().(json.RawMessage))
307351
if err != nil {
@@ -311,6 +355,7 @@ func (sub *ClientSubscription) forward() (unsubscribeServer bool, err error) {
311355
return true, ErrSubscriptionQueueOverflow
312356
}
313357
buffer.PushBack(val)
358+
314359
case 2: // sub.channel<-
315360
cases[2].Send = reflect.Value{} // Don't hold onto the value.
316361
buffer.Remove(buffer.Front())

rpc/websocket_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,11 +129,15 @@ func TestClientWebsocketPing(t *testing.T) {
129129
if err != nil {
130130
t.Fatalf("client dial error: %v", err)
131131
}
132+
defer client.Close()
133+
132134
resultChan := make(chan int)
133135
sub, err := client.CortexSubscribe(ctx, resultChan, "foo")
134136
if err != nil {
135137
t.Fatalf("client subscribe error: %v", err)
136138
}
139+
// Note: Unsubscribe is not called on this subscription because the mockup
140+
// server can't handle the request.
137141

138142
// Wait for the context's deadline to be reached before proceeding.
139143
// This is important for reproducing https://github.com/CortexFoundation/CortexTheseus/issues/19798

0 commit comments

Comments
 (0)