Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 147 additions & 78 deletions interop/orcalb.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,135 +24,204 @@ import (
"sync"
"time"

v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/balancer/endpointsharding"
"google.golang.org/grpc/balancer/pickfirst"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
internalgrpclog "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/orca"
"google.golang.org/grpc/resolver"

v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
)

var orcaLogger = grpclog.Component("orca")

func init() {
balancer.Register(orcabb{})
}

type orcabb struct{}

func (orcabb) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer {
return &orcab{cc: cc}
// Build creates a new orcab balancer.
func (orcabb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
b := &orcab{
ClientConn: cc,
stopOOBListeners: make(map[balancer.SubConn]func()),
oobState: &oobState{},
}
b.logger = internalgrpclog.NewPrefixLogger(orcaLogger, fmt.Sprintf("[%p] ", b))
b.child = endpointsharding.NewBalancer(b, bOpts, balancer.Get(pickfirst.Name).Build, endpointsharding.Options{})
b.logger.Infof("Created")
return b
}

func (orcabb) Name() string {
return "test_backend_metrics_load_balancer"
}

// orcab is the balancer for the test_backend_metrics_load_balancer policy.
// It delegates SubConn management to endpointsharding + pick_first and
// intercepts NewSubConn calls to register OOB listeners on READY SubConns.
type orcab struct {
cc balancer.ClientConn
sc balancer.SubConn
// Embeds balancer.ClientConn to intercept NewSubConn and UpdateState
// calls from child balancers.
balancer.ClientConn
child balancer.Balancer
// mu guards stopOOBListeners.
mu sync.Mutex
stopOOBListeners map[balancer.SubConn]func()

oobState *oobState
logger *internalgrpclog.PrefixLogger
}
Comment on lines 67 to +78
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good example of how we format our structs in grpc-go:

type DependencyManager struct {
. We split it up into different sections:

  • ones that are read-only after init
  • ones that don't need synchronization because of the way it is written and read from
  • ones that need to be synchronized, and a note on how they are synchronized

Please try to follow that pattern here.


func (b *orcab) UpdateClientConnState(s balancer.ClientConnState) error {
// Delegate to the child endpoint sharding balancer, which distributes
// state updates to its pick_first children.
return b.child.UpdateClientConnState(s)
}

reportMu sync.Mutex
report *v3orcapb.OrcaLoadReport
func (b *orcab) ResolverError(err error) {
// Will cause an inline picker update from endpoint sharding.
b.child.ResolverError(err)
}

func (o *orcab) ExitIdle() {
if o.sc != nil {
o.sc.Connect()
func (b *orcab) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
orcaLogger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, state)
}

func (b *orcab) ExitIdle() {
// Propagate to the child endpoint sharding balancer.
b.child.ExitIdle()
}

func (b *orcab) Close() {
b.mu.Lock()
listeners := b.stopOOBListeners
b.stopOOBListeners = nil
b.mu.Unlock()
for _, stop := range listeners {
stop()
}
b.child.Close()
}

// endpointsToAddrs flattens a list of endpoints to addresses to maintain
// existing behavior.
// TODO: https://github.com/grpc/grpc-go/issues/8809 - delegate subchannel
// management to the pickfirst balancer using the endpoint sharding balancer.
func endpointsToAddrs(eps []resolver.Endpoint) []resolver.Address {
addrs := make([]resolver.Address, 0, len(eps))
for _, ep := range eps {
if len(ep.Addresses) == 0 {
continue
// NewSubConn intercepts SubConn creation from pick_first children to wrap the
// StateListener for OOB listener management on READY transitions.
func (b *orcab) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
// The variable sc is declared before the closure so the closure captures
// the variable, not its (zero) value. After ClientConn.NewSubConn assigns
// to sc, the closure sees the real SubConn when invoked.
var sc balancer.SubConn
oldListener := opts.StateListener
opts.StateListener = func(state balancer.SubConnState) {
b.updateSubConnState(sc, state)
if oldListener != nil {
oldListener(state)
}
addrs = append(addrs, ep.Addresses[0])
}
return addrs
sc, err := b.ClientConn.NewSubConn(addrs, opts)
if err != nil {
return nil, err
}
return sc, nil
}

func (o *orcab) UpdateClientConnState(s balancer.ClientConnState) error {
if o.sc != nil {
o.sc.UpdateAddresses(endpointsToAddrs(s.ResolverState.Endpoints))
return nil
func (b *orcab) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
b.mu.Lock()
if b.stopOOBListeners == nil {
b.mu.Unlock()
return
}

if state.ConnectivityState == connectivity.Ready {
oldStop, exists := b.stopOOBListeners[sc]
stop := orca.RegisterOOBListener(sc, &orcaOOBListener{subConn: sc, balancer: b}, orca.OOBListenerOptions{ReportInterval: time.Second})
b.stopOOBListeners[sc] = stop
b.mu.Unlock()

if exists {
oldStop()
}
Comment on lines +145 to +147
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here and below, using if oldStop != nil { oldStop() } would read better than the current way of reading two return values from the map. What do you think?

return
}

if len(s.ResolverState.Endpoints) == 0 {
o.ResolverError(fmt.Errorf("produced no endpoints"))
return fmt.Errorf("resolver produced no endpoints")
stop, ok := b.stopOOBListeners[sc]
if ok {
delete(b.stopOOBListeners, sc)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we call stop from inside of this if?

Also, following up from the previous comment, we could just check for stop != nil.

}
var err error
o.sc, err = o.cc.NewSubConn(endpointsToAddrs(s.ResolverState.Endpoints), balancer.NewSubConnOptions{StateListener: o.updateSubConnState})
if err != nil {
o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPicker(fmt.Errorf("error creating subconn: %v", err))})
return nil
b.mu.Unlock()

if ok {
stop()
}

if state.ConnectivityState == connectivity.Shutdown {
b.oobState.reports.Delete(sc)
}
o.sc.Connect()
o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable)})
return nil
}

func (o *orcab) ResolverError(err error) {
if o.sc == nil {
o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPicker(fmt.Errorf("resolver error: %v", err))})
// UpdateState intercepts state updates from endpointsharding to wrap the
// picker with ORCA load report handling.
func (b *orcab) UpdateState(state balancer.State) {
// If READY, wrap the picker to inject the ORCA Done callback; otherwise,
// pass through the child picker as-is.
if state.ConnectivityState == connectivity.Ready {
state = balancer.State{
ConnectivityState: state.ConnectivityState,
Picker: &orcaPicker{childPicker: state.Picker, oobState: b.oobState},
}
}
b.ClientConn.UpdateState(state)
}

func (o *orcab) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, state)
type orcaOOBListener struct {
subConn balancer.SubConn
balancer *orcab
}

func (o *orcab) updateSubConnState(state balancer.SubConnState) {
switch state.ConnectivityState {
case connectivity.Ready:
orca.RegisterOOBListener(o.sc, o, orca.OOBListenerOptions{ReportInterval: time.Second})
o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Ready, Picker: &orcaPicker{o: o}})
case connectivity.TransientFailure:
o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPicker(fmt.Errorf("all subchannels in transient failure: %v", state.ConnectionError))})
case connectivity.Connecting:
// Ignore; picker already set to "connecting".
case connectivity.Idle:
o.sc.Connect()
o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable)})
case connectivity.Shutdown:
// Ignore; we are closing but handle that in Close instead.
// OnLoadReport implements orca.OOBListener.
func (l *orcaOOBListener) OnLoadReport(r *v3orcapb.OrcaLoadReport) {
if r == nil {
return
}
l.balancer.oobState.reports.Store(l.subConn, r)
}

func (o *orcab) Close() {}

func (o *orcab) OnLoadReport(r *v3orcapb.OrcaLoadReport) {
o.reportMu.Lock()
defer o.reportMu.Unlock()
logger.Infof("received OOB load report: %v", r)
o.report = r
type oobState struct {
reports sync.Map // map[balancer.SubConn]*v3orcapb.OrcaLoadReport
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sync package documentation says the following about the sync.Map type:

The Map type is optimized for two common use cases: (1) when the entry for a given key is only ever written once but read many times, as in caches that only grow, or (2) when multiple goroutines read, write, and overwrite entries for disjoint sets of keys. In these two cases, use of a Map may significantly reduce lock contention compared to a Go map paired with a separate Mutex or RWMutex.

I don't think the usage here falls into either of these two cases.

  • Writes happen to the same key (subConn) for every received load report
  • Reads and writes happen to the same key from multiple goroutines

I think we should revert back to using a vanilla map with appropriate synchronization using a sync.Mutex.

}

type orcaPicker struct {
o *orcab
childPicker balancer.Picker
oobState *oobState
}

func (p *orcaPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
doneCB := func(di balancer.DoneInfo) {
if lr, _ := di.ServerLoad.(*v3orcapb.OrcaLoadReport); lr != nil &&
(lr.CpuUtilization != 0 || lr.MemUtilization != 0 || len(lr.Utilization) > 0 || len(lr.RequestCost) > 0) {
// Since all RPCs will respond with a load report due to the
// presence of the DialOption, we need to inspect every field and
// use the out-of-band report instead if all are unset/zero.
res, err := p.childPicker.Pick(info)
if err != nil {
return res, err
}

var lr *v3orcapb.OrcaLoadReport
if val, ok := p.oobState.reports.Load(res.SubConn); ok {
lr = val.(*v3orcapb.OrcaLoadReport)
}

origDone := res.Done
res.Done = func(di balancer.DoneInfo) {
if perRPCLR, _ := di.ServerLoad.(*v3orcapb.OrcaLoadReport); perRPCLR != nil &&
(perRPCLR.CpuUtilization != 0 || perRPCLR.MemUtilization != 0 || len(perRPCLR.Utilization) > 0 || len(perRPCLR.RequestCost) > 0) {
Comment on lines +214 to +215
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that some of this was existing code. But this is quite hard to read. Can we instead write this as:

perRPCLR, _ := di.ServerLoad.(*v3orcapb.OrcaLoadReport)
if perRPCLR != nil && (perRPCLR.CpuUtilization != 0 || perRPCLR.MemUtilization != 0 || len(perRPCLR.Utilization) > 0 || len(perRPCLR.RequestCost) > 0) {
    setContextCMR(info.Ctx, perRPCLR)
} else if lr != nil {
	setContextCMR(info.Ctx, lr)
}

See: https://google.github.io/styleguide/go/guide#line-length

setContextCMR(info.Ctx, perRPCLR)
} else if lr != nil {
setContextCMR(info.Ctx, lr)
} else {
p.o.reportMu.Lock()
defer p.o.reportMu.Unlock()
if lr := p.o.report; lr != nil {
setContextCMR(info.Ctx, lr)
}
}
if origDone != nil {
origDone(di)
}
}
return balancer.PickResult{SubConn: p.o.sc, Done: doneCB}, nil
return res, nil
}

func setContextCMR(ctx context.Context, lr *v3orcapb.OrcaLoadReport) {
Expand Down
Loading
Loading