Skip to content

Commit 3608786

Browse files
manishrjaindanielmai
authored andcommitted
fix(conn): JoinCluster loop should use latest conn (#7950)
JoinCluster loop was getting the connection from pool upfront, and then looping over it. This opened up a bug because in #7918 , we close the connection in case it becomes unhealthy. This PR gets the latest connection available in the loop. This was the only place in the codebase where I found this issue. (cherry picked from commit 7531e95) (cherry picked from commit c4098ed)
1 parent d03d5ad commit 3608786

6 files changed

Lines changed: 6 additions & 12 deletions

File tree

dgraph/cmd/zero/raft.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -576,10 +576,9 @@ func (n *node) initAndStartNode() error {
576576
return errors.Errorf("Unhealthy connection to %v", opts.peer)
577577
}
578578

579-
gconn := p.Get()
580-
c := pb.NewRaftClient(gconn)
581579
timeout := 8 * time.Second
582580
for {
581+
c := pb.NewRaftClient(p.Get())
583582
ctx, cancel := context.WithTimeout(n.ctx, timeout)
584583
// JoinCluster can block indefinitely, raft ignores conf change proposal
585584
// if it has pending configuration.

worker/draft.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1628,8 +1628,7 @@ func (n *node) joinPeers() error {
16281628
return err
16291629
}
16301630

1631-
gconn := pl.Get()
1632-
c := pb.NewRaftClient(gconn)
1631+
c := pb.NewRaftClient(pl.Get())
16331632
glog.Infof("Calling JoinCluster via leader: %s", pl.Addr)
16341633
if _, err := c.JoinCluster(n.ctx, n.RaftContext); err != nil {
16351634
return errors.Wrapf(err, "error while joining cluster")

worker/mutation.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -546,10 +546,9 @@ func proposeOrSend(ctx context.Context, gid uint32, m *pb.Mutations, chr chan re
546546
chr <- res
547547
return
548548
}
549-
con := pl.Get()
550549

551550
var tc *api.TxnContext
552-
c := pb.NewWorkerClient(con)
551+
c := pb.NewWorkerClient(pl.Get())
553552

554553
ch := make(chan error, 1)
555554
go func() {

worker/schema.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,8 +173,7 @@ func getSchemaOverNetwork(ctx context.Context, gid uint32, s *pb.SchemaRequest,
173173
ch <- resultErr{err: conn.ErrNoConnection}
174174
return
175175
}
176-
conn := pl.Get()
177-
c := pb.NewWorkerClient(conn)
176+
c := pb.NewWorkerClient(pl.Get())
178177
schema, e := c.Schema(ctx, s)
179178
ch <- resultErr{result: schema, err: e}
180179
}

worker/snapshot.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,7 @@ type badgerWriter interface {
4747

4848
// populateSnapshot gets data for a shard from the leader and writes it to BadgerDB on the follower.
4949
func (n *node) populateSnapshot(snap pb.Snapshot, pl *conn.Pool) error {
50-
con := pl.Get()
51-
c := pb.NewWorkerClient(con)
50+
c := pb.NewWorkerClient(pl.Get())
5251

5352
// We should absolutely cancel the context when we return from this function, that way, the
5453
// leader who is sending the snapshot would stop sending.

worker/task.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,10 @@ func invokeNetworkRequest(ctx context.Context, addr string,
5353
return nil, errors.Wrapf(err, "dispatchTaskOverNetwork: while retrieving connection.")
5454
}
5555

56-
con := pl.Get()
5756
if span := otrace.FromContext(ctx); span != nil {
5857
span.Annotatef(nil, "invokeNetworkRequest: Sending request to %v", addr)
5958
}
60-
c := pb.NewWorkerClient(con)
59+
c := pb.NewWorkerClient(pl.Get())
6160
return f(ctx, c)
6261
}
6362

0 commit comments

Comments
 (0)