Skip to content

Commit 80509aa

Browse files
authored
feat: merge waits and recvs to wrCounter (#809)
* feat: merge waits and recvs to wrCounter * feat: use helpers * feat: fix pipe * feat: convert to private variable * feat: self review * feat: address comments * feat: incrWaits
1 parent f426952 commit 80509aa

File tree

2 files changed

+72
-47
lines changed

2 files changed

+72
-47
lines changed

pipe.go

Lines changed: 60 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,7 @@ type pipe struct {
8585
_ [10]int32
8686
blcksig int32
8787
state int32
88-
waits int32
89-
recvs int32
88+
wrCounter atomic.Uint64
9089
bgState int32
9190
r2ps bool // identify this pipe is used for resp2 pubsub or not
9291
noNoDelay bool
@@ -372,10 +371,10 @@ func (p *pipe) _background() {
372371
select {
373372
case <-p.close:
374373
default:
375-
atomic.AddInt32(&p.waits, 1)
374+
p.incrWaits()
376375
go func() {
377376
<-p.queue.PutOne(cmds.PingCmd) // avoid _backgroundWrite hanging at p.queue.WaitForWrite()
378-
atomic.AddInt32(&p.waits, -1)
377+
p.decrWaits()
379378
}()
380379
}
381380
}
@@ -406,7 +405,7 @@ func (p *pipe) _background() {
406405
}
407406

408407
resp := newErrResult(err)
409-
for atomic.LoadInt32(&p.waits) != 0 {
408+
for p.loadWaits() != 0 {
410409
select {
411410
case <-p.close: // p.queue.NextWriteCmd() can only be called after _backgroundWrite
412411
_, _, _ = p.queue.NextWriteCmd()
@@ -450,7 +449,7 @@ func (p *pipe) _backgroundWrite() (err error) {
450449
}
451450
}
452451
ones[0], multi, ch = p.queue.WaitForWrite()
453-
if flushDelay != 0 && atomic.LoadInt32(&p.waits) > 1 { // do not delay for sequential usage
452+
if flushDelay != 0 && p.loadWaits() > 1 { // do not delay for sequential usage
454453
// Blocking commands are executed in dedicated client which is acquired from pool.
455454
// So, there is no sense to wait other commands to be written.
456455
// https://github.com/redis/rueidis/issues/379
@@ -637,17 +636,17 @@ func (p *pipe) _backgroundRead() (err error) {
637636
func (p *pipe) backgroundPing() {
638637
var prev, recv int32
639638

640-
prev = atomic.LoadInt32(&p.recvs)
639+
prev = p.loadRecvs()
641640
p.pingTimer = time.AfterFunc(p.pinggap, func() {
642641
var err error
643-
recv = atomic.LoadInt32(&p.recvs)
642+
recv = p.loadRecvs()
644643
defer func() {
645644
if err == nil && p.Error() == nil {
646-
prev = atomic.LoadInt32(&p.recvs)
645+
prev = p.loadRecvs()
647646
p.pingTimer.Reset(p.pinggap)
648647
}
649648
}()
650-
if recv != prev || atomic.LoadInt32(&p.blcksig) != 0 || (atomic.LoadInt32(&p.state) == 0 && atomic.LoadInt32(&p.waits) != 0) {
649+
if recv != prev || atomic.LoadInt32(&p.blcksig) != 0 || (atomic.LoadInt32(&p.state) == 0 && p.loadWaits() != 0) {
651650
return
652651
}
653652
ch := make(chan error, 1)
@@ -840,10 +839,10 @@ func (p *pipe) SetPubSubHooks(hooks PubSubHooks) <-chan error {
840839
close(old.close)
841840
}
842841
}
843-
if atomic.AddInt32(&p.waits, 1) == 1 && atomic.LoadInt32(&p.state) == 0 {
842+
if p.incrWaits() == 1 && atomic.LoadInt32(&p.state) == 0 {
844843
p.background()
845844
}
846-
atomic.AddInt32(&p.waits, -1)
845+
p.decrWaits()
847846
return ch
848847
}
849848

@@ -884,7 +883,7 @@ func (p *pipe) Do(ctx context.Context, cmd Completed) (resp RedisResult) {
884883
return p._r2pipe(ctx).Do(ctx, cmd)
885884
}
886885
}
887-
waits := atomic.AddInt32(&p.waits, 1) // if this is 1, and background worker is not started, no need to queue
886+
waits := p.incrWaits() // if this is 1, and background worker is not started, no need to queue
888887
state := atomic.LoadInt32(&p.state)
889888

890889
if state == 1 {
@@ -908,10 +907,10 @@ func (p *pipe) Do(ctx context.Context, cmd Completed) (resp RedisResult) {
908907
} else {
909908
resp = newErrResult(p.Error())
910909
}
911-
if left := atomic.AddInt32(&p.waits, -1); state == 0 && left != 0 {
910+
911+
if _, left := p.decrWaitsAndIncrRecvs(); state == 0 && left != 0 {
912912
p.background()
913913
}
914-
atomic.AddInt32(&p.recvs, 1)
915914
return resp
916915

917916
queue:
@@ -925,14 +924,12 @@ queue:
925924
goto abort
926925
}
927926
}
928-
atomic.AddInt32(&p.waits, -1)
929-
atomic.AddInt32(&p.recvs, 1)
927+
p.decrWaitsAndIncrRecvs()
930928
return resp
931929
abort:
932930
go func(ch chan RedisResult) {
933931
<-ch
934-
atomic.AddInt32(&p.waits, -1)
935-
atomic.AddInt32(&p.recvs, 1)
932+
p.decrWaitsAndIncrRecvs()
936933
}(ch)
937934
return newErrResult(ctx.Err())
938935
}
@@ -990,7 +987,7 @@ func (p *pipe) DoMulti(ctx context.Context, multi ...Completed) *redisresults {
990987
}
991988
}
992989

993-
waits := atomic.AddInt32(&p.waits, 1) // if this is 1, and background worker is not started, no need to queue
990+
waits := p.incrWaits() // if this is 1, and background worker is not started, no need to queue
994991
state := atomic.LoadInt32(&p.state)
995992

996993
if state == 1 {
@@ -1017,10 +1014,9 @@ func (p *pipe) DoMulti(ctx context.Context, multi ...Completed) *redisresults {
10171014
resp.s[i] = err
10181015
}
10191016
}
1020-
if left := atomic.AddInt32(&p.waits, -1); state == 0 && left != 0 {
1017+
if _, left := p.decrWaitsAndIncrRecvs(); state == 0 && left != 0 {
10211018
p.background()
10221019
}
1023-
atomic.AddInt32(&p.recvs, 1)
10241020
return resp
10251021

10261022
queue:
@@ -1034,15 +1030,13 @@ queue:
10341030
goto abort
10351031
}
10361032
}
1037-
atomic.AddInt32(&p.waits, -1)
1038-
atomic.AddInt32(&p.recvs, 1)
1033+
p.decrWaitsAndIncrRecvs()
10391034
return resp
10401035
abort:
10411036
go func(resp *redisresults, ch chan RedisResult) {
10421037
<-ch
10431038
resultsp.Put(resp)
1044-
atomic.AddInt32(&p.waits, -1)
1045-
atomic.AddInt32(&p.recvs, 1)
1039+
p.decrWaitsAndIncrRecvs()
10461040
}(resp, ch)
10471041
resp = resultsp.Get(len(multi), len(multi))
10481042
err := newErrResult(ctx.Err())
@@ -1084,7 +1078,7 @@ func (s *RedisResultStream) WriteTo(w io.Writer) (n int64, err error) {
10841078
}
10851079
if s.n--; s.n == 0 {
10861080
atomic.AddInt32(&s.w.blcksig, -1)
1087-
atomic.AddInt32(&s.w.waits, -1)
1081+
s.w.decrWaits()
10881082
if s.e == nil {
10891083
s.e = io.EOF
10901084
} else {
@@ -1111,7 +1105,7 @@ func (p *pipe) DoStream(ctx context.Context, pool *pool, cmd Completed) RedisRes
11111105

11121106
if state == 0 {
11131107
atomic.AddInt32(&p.blcksig, 1)
1114-
waits := atomic.AddInt32(&p.waits, 1)
1108+
waits := p.incrWaits()
11151109
if waits != 1 {
11161110
panic("DoStream with racing is a bug")
11171111
}
@@ -1139,7 +1133,7 @@ func (p *pipe) DoStream(ctx context.Context, pool *pool, cmd Completed) RedisRes
11391133
}
11401134
}
11411135
atomic.AddInt32(&p.blcksig, -1)
1142-
atomic.AddInt32(&p.waits, -1)
1136+
p.decrWaits()
11431137
pool.Store(p)
11441138
return RedisResultStream{e: p.Error()}
11451139
}
@@ -1161,7 +1155,7 @@ func (p *pipe) DoMultiStream(ctx context.Context, pool *pool, multi ...Completed
11611155

11621156
if state == 0 {
11631157
atomic.AddInt32(&p.blcksig, 1)
1164-
waits := atomic.AddInt32(&p.waits, 1)
1158+
waits := p.incrWaits()
11651159
if waits != 1 {
11661160
panic("DoMultiStream with racing is a bug")
11671161
}
@@ -1204,7 +1198,7 @@ func (p *pipe) DoMultiStream(ctx context.Context, pool *pool, multi ...Completed
12041198
}
12051199
}
12061200
atomic.AddInt32(&p.blcksig, -1)
1207-
atomic.AddInt32(&p.waits, -1)
1201+
p.decrWaits()
12081202
pool.Store(p)
12091203
return RedisResultStream{e: p.Error()}
12101204
}
@@ -1559,6 +1553,36 @@ func (p *pipe) DoMultiCache(ctx context.Context, multi ...CacheableTTL) *redisre
15591553
return results
15601554
}
15611555

1556+
// incrWaits increments the lower 32 bits (waits).
1557+
func (p *pipe) incrWaits() uint32 {
1558+
// Increment the lower 32 bits (waits)
1559+
return uint32(p.wrCounter.Add(1))
1560+
}
1561+
1562+
// decrWaits decrements the lower 32 bits (waits).
1563+
func (p *pipe) decrWaits() uint32 {
1564+
// Decrement the lower 32 bits (waits)
1565+
return uint32(p.wrCounter.Add(^uint64(0)) & 0xFFFFFFFF)
1566+
}
1567+
1568+
// decrWaitsAndIncrRecvs decrements the lower 32 bits (waits) and increments the upper 32 bits (recvs).
1569+
func (p *pipe) decrWaitsAndIncrRecvs() (uint32, uint32) {
1570+
newValue := p.wrCounter.Add((1 << 32) - 1)
1571+
return uint32(newValue >> 32), uint32(newValue & 0xFFFFFFFF)
1572+
}
1573+
1574+
// loadRecvs loads the upper 32 bits (recvs).
1575+
func (p *pipe) loadRecvs() int32 {
1576+
// Load the upper 32 bits (recvs)
1577+
return int32(p.wrCounter.Load() >> 32)
1578+
}
1579+
1580+
// loadWaits loads the lower 32 bits (waits).
1581+
func (p *pipe) loadWaits() uint32 {
1582+
// Load the lower 32 bits (waits)
1583+
return uint32(p.wrCounter.Load() & 0xFFFFFFFF)
1584+
}
1585+
15621586
func (p *pipe) Error() error {
15631587
if err := p.error.Load(); err != nil {
15641588
return err.error
@@ -1569,28 +1593,28 @@ func (p *pipe) Error() error {
15691593
func (p *pipe) Close() {
15701594
p.error.CompareAndSwap(nil, errClosing)
15711595
block := atomic.AddInt32(&p.blcksig, 1)
1572-
waits := atomic.AddInt32(&p.waits, 1)
1596+
waits := p.incrWaits()
15731597
stopping1 := atomic.CompareAndSwapInt32(&p.state, 0, 2)
15741598
stopping2 := atomic.CompareAndSwapInt32(&p.state, 1, 2)
15751599
if p.queue != nil {
15761600
if stopping1 && waits == 1 { // make sure there is no sync read
15771601
p.background()
15781602
}
15791603
if block == 1 && (stopping1 || stopping2) { // make sure there is no block cmd
1580-
atomic.AddInt32(&p.waits, 1)
1604+
p.incrWaits()
15811605
ch := p.queue.PutOne(cmds.PingCmd)
15821606
select {
15831607
case <-ch:
1584-
atomic.AddInt32(&p.waits, -1)
1608+
p.decrWaits()
15851609
case <-time.After(time.Second):
15861610
go func(ch chan RedisResult) {
15871611
<-ch
1588-
atomic.AddInt32(&p.waits, -1)
1612+
p.decrWaits()
15891613
}(ch)
15901614
}
15911615
}
15921616
}
1593-
atomic.AddInt32(&p.waits, -1)
1617+
p.decrWaits()
15941618
atomic.AddInt32(&p.blcksig, -1)
15951619
if p.pingTimer != nil {
15961620
p.pingTimer.Stop()

pipe_test.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4951,8 +4951,8 @@ func TestOngoingCancelContextInPipelineMode_Do(t *testing.T) {
49514951
}()
49524952
}
49534953

4954-
for atomic.LoadInt32(&p.waits) != 5 {
4955-
t.Logf("wait p.waits to be 5 %v", atomic.LoadInt32(&p.waits))
4954+
for p.loadWaits() != 5 {
4955+
t.Logf("wait p.waits to be 5 %v", p.loadWaits())
49564956
time.Sleep(time.Millisecond * 100)
49574957
}
49584958

@@ -4991,8 +4991,8 @@ func TestOngoingWriteTimeoutInPipelineMode_Do(t *testing.T) {
49914991
}
49924992
}()
49934993
}
4994-
for atomic.LoadInt32(&p.waits) != 5 {
4995-
t.Logf("wait p.waits to be 5 %v", atomic.LoadInt32(&p.waits))
4994+
for p.loadWaits() != 5 {
4995+
t.Logf("wait p.waits to be 5 %v", p.loadWaits())
49964996
time.Sleep(time.Millisecond * 100)
49974997
}
49984998
for atomic.LoadInt32(&timeout) != 5 {
@@ -5025,8 +5025,8 @@ func TestOngoingCancelContextInPipelineMode_DoMulti(t *testing.T) {
50255025
}()
50265026
}
50275027

5028-
for atomic.LoadInt32(&p.waits) != 5 {
5029-
t.Logf("wait p.waits to be 5 %v", atomic.LoadInt32(&p.waits))
5028+
for p.loadWaits() != 5 {
5029+
t.Logf("wait p.waits to be 5 %v", p.loadWaits())
50305030
time.Sleep(time.Millisecond * 100)
50315031
}
50325032

@@ -5065,8 +5065,8 @@ func TestOngoingWriteTimeoutInPipelineMode_DoMulti(t *testing.T) {
50655065
}
50665066
}()
50675067
}
5068-
for atomic.LoadInt32(&p.waits) != 5 {
5069-
t.Logf("wait p.waits to be 5 %v", atomic.LoadInt32(&p.waits))
5068+
for p.loadWaits() != 5 {
5069+
t.Logf("wait p.waits to be 5 %v", p.loadWaits())
50705070
time.Sleep(time.Millisecond * 100)
50715071
}
50725072
for atomic.LoadInt32(&timeout) != 5 {
@@ -5325,13 +5325,13 @@ func TestBackgroundPing(t *testing.T) {
53255325
p, mock, cancel, _ := setup(t, opt)
53265326
defer cancel()
53275327
time.Sleep(50 * time.Millisecond)
5328-
prev := atomic.LoadInt32(&p.recvs)
5328+
prev := p.loadRecvs()
53295329

53305330
for i := range 10 {
53315331
atomic.AddInt32(&p.blcksig, 1) // block
53325332
time.Sleep(timeout)
53335333
atomic.AddInt32(&p.blcksig, -1) // unblock
5334-
recv := atomic.LoadInt32(&p.recvs)
5334+
recv := p.loadRecvs()
53355335
if prev != recv {
53365336
t.Fatalf("round %d unexpect recv %v, need be equal to prev %v", i, recv, prev)
53375337
}
@@ -5344,7 +5344,8 @@ func TestBackgroundPing(t *testing.T) {
53445344
}()
53455345
for i := range 10 {
53465346
time.Sleep(timeout)
5347-
recv := atomic.LoadInt32(&p.recvs)
5347+
recv := p.loadRecvs()
5348+
53485349
if prev == recv {
53495350
t.Fatalf("round %d unexpect recv %v, need be different from prev %v", i, recv, prev)
53505351
}

0 commit comments

Comments
 (0)