Skip to content
8 changes: 5 additions & 3 deletions conn/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type Node struct {
_raft raft.Node

// Fields which are never changed after init.
BeginTime time.Time
Cfg *raft.Config
MyAddr string
Id uint64
Expand All @@ -84,9 +85,10 @@ func NewNode(rc *pb.RaftContext, store *raftwal.DiskStorage) *Node {
x.Check(err)

n := &Node{
Id: rc.Id,
MyAddr: rc.Addr,
Store: store,
BeginTime: time.Now(),
Id: rc.Id,
MyAddr: rc.Addr,
Store: store,
Cfg: &raft.Config{
ID: rc.Id,
ElectionTick: 20, // 2s if we call Tick() every 100 ms.
Expand Down
25 changes: 17 additions & 8 deletions conn/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"time"

"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/dgo/v2/protos/api"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
"github.com/golang/glog"
Expand All @@ -48,9 +47,10 @@ type Pool struct {
// messages in the same TCP stream.
conn *grpc.ClientConn

lastEcho time.Time
Addr string
closer *y.Closer
lastEcho time.Time
Addr string
closer *y.Closer
healthInfo pb.HealthInfo
}

// Pools manages a concurrency-safe set of Pool.
Expand Down Expand Up @@ -197,7 +197,7 @@ func (p *Pool) listenToHeartbeat() error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

s, err := c.Heartbeat(ctx, &api.Payload{})
s, err := c.Heartbeat(ctx, &pb.HealthInfo{})
if err != nil {
return err
}
Expand All @@ -212,13 +212,15 @@ func (p *Pool) listenToHeartbeat() error {

// This loop can block indefinitely as long as it keeps on receiving pings back.
for {
_, err := s.Recv()
if err != nil {
res, err := s.Recv()
if err != nil || res == nil {
return err
}

// We do this periodic stream receive based approach to defend against network partitions.
p.Lock()
p.lastEcho = time.Now()
p.healthInfo = *res
p.Unlock()
}
}
Expand All @@ -235,7 +237,7 @@ func (p *Pool) MonitorHealth() {
default:
err := p.listenToHeartbeat()
if lastErr != nil && err == nil {
glog.Infof("Connection established with %v\n", p.Addr)
glog.Infof("Connection re-established with %v\n", p.Addr)
} else if err != nil && lastErr == nil {
glog.Warningf("Connection lost with %v. Error: %v\n", p.Addr, err)
}
Expand All @@ -255,3 +257,10 @@ func (p *Pool) IsHealthy() bool {
defer p.RUnlock()
return time.Since(p.lastEcho) < 4*echoDuration
}

// GetHealthInfo returns the healthinfo.
func (p *Pool) GetHealthInfo() pb.HealthInfo {
p.RLock()
defer p.RUnlock()
return p.healthInfo
}
23 changes: 20 additions & 3 deletions conn/raft_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"encoding/binary"
"math/rand"
"strconv"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -271,18 +272,34 @@ func (w *RaftServer) RaftMessage(server pb.Raft_RaftMessageServer) error {

// Heartbeat rpc call is used to check connection with other workers after worker
// tcp server for this instance starts.
func (w *RaftServer) Heartbeat(in *api.Payload, stream pb.Raft_HeartbeatServer) error {
func (w *RaftServer) Heartbeat(in *pb.HealthInfo, stream pb.Raft_HeartbeatServer) error {
ticker := time.NewTicker(echoDuration)
defer ticker.Stop()

node := w.GetNode()
if node == nil {
return ErrNoNode
}
info := pb.HealthInfo{
Instance: "alpha",
Group: strconv.Itoa(int(node.RaftContext.GetGroup())),
Addr: node.MyAddr,
Version: x.Version(),
Uptime: uint64(time.Since(node.BeginTime)),
}
if info.Group == "0" {
info.Instance = "zero"
}

ctx := stream.Context()
out := &api.Payload{Data: []byte("beat")}

for {
info.Uptime = uint64(time.Since(node.BeginTime))
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
if err := stream.Send(out); err != nil {
if err := stream.Send(&info); err != nil {
return err
}
}
Expand Down
22 changes: 22 additions & 0 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,27 @@ func grpcPort() int {
func healthCheck(w http.ResponseWriter, r *http.Request) {
x.AddCorsHeaders(w)

if _, ok := r.URL.Query()["all"]; ok {
var err error
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)

ctx := context.Background()
ctx = attachAccessJwt(ctx, r)

var aResp *api.Response
if aResp, err = (&edgraph.Server{}).HealthAll(ctx); err != nil {
x.SetStatus(w, x.Error, err.Error())
return
}
if aResp == nil {
x.SetStatus(w, x.ErrorNoData, "No state information available.")
return
}
_, _ = w.Write(aResp.Json)
return
}

_, ok := r.URL.Query()["live"]
if !ok {
if err := x.HealthCheck(); err != nil {
Expand Down Expand Up @@ -532,6 +553,7 @@ func run() {
AclEnabled: secretFile != "",
SnapshotAfter: Alpha.Conf.GetInt("snapshot_after"),
AbortOlderThan: abortDur,
BeginTime: beginTime,
}

setupCustomTokenizers()
Expand Down
2 changes: 1 addition & 1 deletion edgraph/access.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func authorizeQuery(ctx context.Context, parsedReq *gql.Result) error {
return nil
}

func authorizeState(ctx context.Context) error {
func authorizeForGroot(ctx context.Context) error {
// always allow access
return nil
}
8 changes: 4 additions & 4 deletions edgraph/access_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -784,16 +784,16 @@ func authorizeQuery(ctx context.Context, parsedReq *gql.Result) error {
return nil
}

// authorizeState authorizes the State operation
func authorizeState(ctx context.Context) error {
// authorizeForGroot authorizes the operation for Groot users.
func authorizeForGroot(ctx context.Context) error {
if len(worker.Config.HmacSecret) == 0 {
// the user has not turned on the acl feature
return nil
}

var userID string
// doAuthorizeState checks if the user is authorized to perform this API request
doAuthorizeState := func() error {
doAuthorizeGroot := func() error {
userData, err := extractUserAndGroups(ctx)
switch {
case err == errNoJwt:
Expand All @@ -811,7 +811,7 @@ func authorizeState(ctx context.Context) error {
}
}

return doAuthorizeState()
return doAuthorizeGroot()
}

func removePredsFromQuery(gqs []*gql.GraphQuery,
Expand Down
71 changes: 70 additions & 1 deletion edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/dgraph-io/dgo/v2/protos/api"

"github.com/dgraph-io/dgraph/chunker"
"github.com/dgraph-io/dgraph/conn"
"github.com/dgraph-io/dgraph/dgraph/cmd/zero"
"github.com/dgraph-io/dgraph/gql"
"github.com/dgraph-io/dgraph/posting"
Expand Down Expand Up @@ -598,6 +599,74 @@ type queryContext struct {
span *trace.Span
}

// HealthAll handles health?all requests
func (s *Server) HealthAll(ctx context.Context) (*api.Response, error) {
return s.doHealthAll(ctx, NeedAuthorize)
}

func (s *Server) doHealthAll(ctx context.Context, authorize int) (*api.Response, error) {
var err error

if ctx.Err() != nil {
return nil, ctx.Err()
}

if authorize == NeedAuthorize {
if err := authorizeForGroot(ctx); err != nil {
return nil, err
}
}

ms := worker.GetMembershipState()
if ms == nil {
return nil, errors.Errorf("No membership state found")
}

health := make(map[string][]pb.HealthInfo)

process := func(vm *pb.Member) {
curr := pb.HealthInfo{
Addr: vm.GetAddr(),
}

// get health locally if self.
if vm.GetAddr() == x.WorkerConfig.MyAddr {
curr.Instance = "alpha"
curr.Group = strconv.Itoa(int(vm.GroupId))
curr.Version = x.Version()
curr.Uptime = uint64(time.Since(x.WorkerConfig.BeginTime))
} else {
p, err := conn.GetPools().Get(vm.GetAddr())
if err != nil {
health["unhealthy"] = append(health["unhealthy"], curr)
return
}
curr = p.GetHealthInfo()
}
health["healthy"] = append(health["healthy"], curr)
return

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

S1023: redundant return statement (from gosimple)

}

// get health from each group member
for _, vg := range ms.Groups {
for _, vm := range vg.Members {
process(vm)
}
}

// get health from zeros.
for _, vz := range ms.Zeros {
process(vz)
}

var jsonOut []byte
if jsonOut, err = json.Marshal(health); err != nil {
return nil, errors.Errorf("Unable to Marshal. Err %v", err)
}

return &api.Response{Json: jsonOut}, nil
}

// State handles state requests
func (s *Server) State(ctx context.Context) (*api.Response, error) {
return s.doState(ctx, NeedAuthorize)
Expand All @@ -611,7 +680,7 @@ func (s *Server) doState(ctx context.Context, authorize int) (
}

if authorize == NeedAuthorize {
if err := authorizeState(ctx); err != nil {
if err := authorizeForGroot(ctx); err != nil {
return nil, err
}
}
Expand Down
10 changes: 9 additions & 1 deletion protos/pb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,14 @@ message ConnectionState {
uint64 max_pending = 3; // Used to determine the timstamp for reading after bulk load
}

message HealthInfo {
string Instance = 1;
string Group = 2; // string so group = 0 can be printed in JSON.
string Addr = 3;
string Version = 4;
uint64 Uptime = 5;
}

message Tablet {
uint32 group_id = 1; // Served by which group.
string predicate = 2;
Expand Down Expand Up @@ -467,7 +475,7 @@ message RaftBatch {
}

service Raft {
rpc Heartbeat (api.Payload) returns (stream api.Payload) {}
rpc Heartbeat (HealthInfo) returns (stream HealthInfo) {}
rpc RaftMessage (stream RaftBatch) returns (api.Payload) {}
rpc JoinCluster (RaftContext) returns (api.Payload) {}
rpc IsPeer (RaftContext) returns (PeerResponse) {}
Expand Down
Loading