Skip to content

Commit 70513d4

Browse files
committed
feat: [interop/orcalb] Delegate SubConn management to pick_first
1 parent ad2d82e commit 70513d4

File tree

2 files changed

+487
-74
lines changed

2 files changed

+487
-74
lines changed

interop/orcalb.go

Lines changed: 126 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -24,135 +24,187 @@ import (
2424
"sync"
2525
"time"
2626

27-
v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
2827
"google.golang.org/grpc/balancer"
29-
"google.golang.org/grpc/balancer/base"
28+
"google.golang.org/grpc/balancer/endpointsharding"
29+
"google.golang.org/grpc/balancer/pickfirst"
3030
"google.golang.org/grpc/connectivity"
31+
"google.golang.org/grpc/grpclog"
32+
internalgrpclog "google.golang.org/grpc/internal/grpclog"
3133
"google.golang.org/grpc/orca"
3234
"google.golang.org/grpc/resolver"
35+
36+
v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
3337
)
3438

39+
var orcaLogger = grpclog.Component("orca")
40+
3541
func init() {
3642
balancer.Register(orcabb{})
3743
}
3844

3945
type orcabb struct{}
4046

41-
func (orcabb) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer {
42-
return &orcab{cc: cc}
47+
// Build creates a new orcab balancer.
48+
func (orcabb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
49+
b := &orcab{
50+
ClientConn: cc,
51+
stopOOBListeners: make(map[balancer.SubConn]func()),
52+
oobState: &oobState{},
53+
}
54+
b.logger = internalgrpclog.NewPrefixLogger(orcaLogger, fmt.Sprintf("[%p] ", b))
55+
b.child = endpointsharding.NewBalancer(b, bOpts, balancer.Get(pickfirst.Name).Build, endpointsharding.Options{})
56+
b.logger.Infof("Created")
57+
return b
4358
}
4459

4560
func (orcabb) Name() string {
4661
return "test_backend_metrics_load_balancer"
4762
}
4863

64+
// orcab is the balancer for the test_backend_metrics_load_balancer policy.
65+
// It delegates SubConn management to endpointsharding + pick_first and
66+
// intercepts NewSubConn calls to register OOB listeners on READY SubConns.
4967
type orcab struct {
50-
cc balancer.ClientConn
51-
sc balancer.SubConn
68+
// Embeds balancer.ClientConn to intercept NewSubConn and UpdateState
69+
// calls from child balancers.
70+
balancer.ClientConn
71+
child balancer.Balancer
72+
// mu guards stopOOBListeners.
73+
mu sync.Mutex
74+
stopOOBListeners map[balancer.SubConn]func()
5275

53-
reportMu sync.Mutex
54-
report *v3orcapb.OrcaLoadReport
76+
oobState *oobState
77+
logger *internalgrpclog.PrefixLogger
5578
}
5679

57-
func (o *orcab) ExitIdle() {
58-
if o.sc != nil {
59-
o.sc.Connect()
60-
}
80+
func (b *orcab) UpdateClientConnState(s balancer.ClientConnState) error {
81+
// Delegate to the child endpoint sharding balancer, which distributes
82+
// state updates to its pick_first children.
83+
return b.child.UpdateClientConnState(s)
6184
}
6285

63-
// endpointsToAddrs flattens a list of endpoints to addresses to maintain
64-
// existing behavior.
65-
// TODO: https://github.com/grpc/grpc-go/issues/8809 - delegate subchannel
66-
// management to the pickfirst balancer using the endpoint sharding balancer.
67-
func endpointsToAddrs(eps []resolver.Endpoint) []resolver.Address {
68-
addrs := make([]resolver.Address, 0, len(eps))
69-
for _, ep := range eps {
70-
if len(ep.Addresses) == 0 {
71-
continue
72-
}
73-
addrs = append(addrs, ep.Addresses[0])
74-
}
75-
return addrs
86+
func (b *orcab) ResolverError(err error) {
87+
// Will cause an inline picker update from endpoint sharding.
88+
b.child.ResolverError(err)
7689
}
7790

78-
func (o *orcab) UpdateClientConnState(s balancer.ClientConnState) error {
79-
if o.sc != nil {
80-
o.sc.UpdateAddresses(endpointsToAddrs(s.ResolverState.Endpoints))
81-
return nil
82-
}
91+
func (b *orcab) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
92+
orcaLogger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, state)
93+
}
94+
95+
func (b *orcab) ExitIdle() {
96+
// Propagate to the child endpoint sharding balancer.
97+
b.child.ExitIdle()
98+
}
8399

84-
if len(s.ResolverState.Endpoints) == 0 {
85-
o.ResolverError(fmt.Errorf("produced no endpoints"))
86-
return fmt.Errorf("resolver produced no endpoints")
100+
func (b *orcab) Close() {
101+
b.mu.Lock()
102+
for _, stop := range b.stopOOBListeners {
103+
stop()
104+
}
105+
b.stopOOBListeners = nil
106+
b.mu.Unlock()
107+
b.child.Close()
108+
}
109+
110+
// NewSubConn intercepts SubConn creation from pick_first children to wrap the
111+
// StateListener for OOB listener management on READY transitions.
112+
func (b *orcab) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
113+
// The variable sc is declared before the closure so the closure captures
114+
// the variable, not its (zero) value. After ClientConn.NewSubConn assigns
115+
// to sc, the closure sees the real SubConn when invoked.
116+
var sc balancer.SubConn
117+
oldListener := opts.StateListener
118+
opts.StateListener = func(state balancer.SubConnState) {
119+
b.updateSubConnState(sc, state)
120+
if oldListener != nil {
121+
oldListener(state)
122+
}
87123
}
88-
var err error
89-
o.sc, err = o.cc.NewSubConn(endpointsToAddrs(s.ResolverState.Endpoints), balancer.NewSubConnOptions{StateListener: o.updateSubConnState})
124+
sc, err := b.ClientConn.NewSubConn(addrs, opts)
90125
if err != nil {
91-
o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPicker(fmt.Errorf("error creating subconn: %v", err))})
92-
return nil
126+
return nil, err
93127
}
94-
o.sc.Connect()
95-
o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable)})
96-
return nil
128+
return sc, nil
97129
}
98130

99-
func (o *orcab) ResolverError(err error) {
100-
if o.sc == nil {
101-
o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPicker(fmt.Errorf("resolver error: %v", err))})
131+
func (b *orcab) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
132+
b.mu.Lock()
133+
defer b.mu.Unlock()
134+
if b.stopOOBListeners == nil {
135+
// Already closed; drop the state update.
136+
return
137+
}
138+
if state.ConnectivityState == connectivity.Ready {
139+
// Register an OOB listener when the SubConn becomes READY.
140+
stop := orca.RegisterOOBListener(sc, b, orca.OOBListenerOptions{ReportInterval: time.Second})
141+
b.stopOOBListeners[sc] = stop
142+
return
143+
}
144+
// For any other state (including Shutdown), stop and remove the OOB
145+
// listener if one was registered for this SubConn.
146+
if stop, ok := b.stopOOBListeners[sc]; ok {
147+
stop()
148+
delete(b.stopOOBListeners, sc)
102149
}
103150
}
104151

105-
func (o *orcab) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
106-
logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, state)
107-
}
108-
109-
func (o *orcab) updateSubConnState(state balancer.SubConnState) {
110-
switch state.ConnectivityState {
111-
case connectivity.Ready:
112-
orca.RegisterOOBListener(o.sc, o, orca.OOBListenerOptions{ReportInterval: time.Second})
113-
o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Ready, Picker: &orcaPicker{o: o}})
114-
case connectivity.TransientFailure:
115-
o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPicker(fmt.Errorf("all subchannels in transient failure: %v", state.ConnectionError))})
116-
case connectivity.Connecting:
117-
// Ignore; picker already set to "connecting".
118-
case connectivity.Idle:
119-
o.sc.Connect()
120-
o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable)})
121-
case connectivity.Shutdown:
122-
// Ignore; we are closing but handle that in Close instead.
152+
// UpdateState intercepts state updates from endpointsharding to wrap the
153+
// picker with ORCA load report handling.
154+
func (b *orcab) UpdateState(state balancer.State) {
155+
// If READY, wrap the picker to inject the ORCA Done callback; otherwise,
156+
// pass through the child picker as-is.
157+
if state.ConnectivityState == connectivity.Ready {
158+
state = balancer.State{
159+
ConnectivityState: state.ConnectivityState,
160+
Picker: &orcaPicker{childPicker: state.Picker, oobState: b.oobState},
161+
}
123162
}
163+
b.ClientConn.UpdateState(state)
124164
}
125165

126-
func (o *orcab) Close() {}
127-
128-
func (o *orcab) OnLoadReport(r *v3orcapb.OrcaLoadReport) {
129-
o.reportMu.Lock()
130-
defer o.reportMu.Unlock()
131-
logger.Infof("received OOB load report: %v", r)
132-
o.report = r
166+
// OnLoadReport implements orca.OOBListener.
167+
func (b *orcab) OnLoadReport(r *v3orcapb.OrcaLoadReport) {
168+
b.oobState.mu.Lock()
169+
defer b.oobState.mu.Unlock()
170+
b.logger.Infof("Received OOB load report: %v", r)
171+
b.oobState.report = r
133172
}
134173

174+
type oobState struct {
175+
mu sync.Mutex
176+
report *v3orcapb.OrcaLoadReport
177+
}
135178
type orcaPicker struct {
136-
o *orcab
179+
childPicker balancer.Picker
180+
oobState *oobState
137181
}
138182

139183
func (p *orcaPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
140-
doneCB := func(di balancer.DoneInfo) {
184+
res, err := p.childPicker.Pick(info)
185+
if err != nil {
186+
return res, err
187+
}
188+
origDone := res.Done
189+
res.Done = func(di balancer.DoneInfo) {
141190
if lr, _ := di.ServerLoad.(*v3orcapb.OrcaLoadReport); lr != nil &&
142191
(lr.CpuUtilization != 0 || lr.MemUtilization != 0 || len(lr.Utilization) > 0 || len(lr.RequestCost) > 0) {
143192
// Since all RPCs will respond with a load report due to the
144193
// presence of the DialOption, we need to inspect every field and
145194
// use the out-of-band report instead if all are unset/zero.
146195
setContextCMR(info.Ctx, lr)
147196
} else {
148-
p.o.reportMu.Lock()
149-
defer p.o.reportMu.Unlock()
150-
if lr := p.o.report; lr != nil {
197+
p.oobState.mu.Lock()
198+
defer p.oobState.mu.Unlock()
199+
if lr := p.oobState.report; lr != nil {
151200
setContextCMR(info.Ctx, lr)
152201
}
153202
}
203+
if origDone != nil {
204+
origDone(di)
205+
}
154206
}
155-
return balancer.PickResult{SubConn: p.o.sc, Done: doneCB}, nil
207+
return res, nil
156208
}
157209

158210
func setContextCMR(ctx context.Context, lr *v3orcapb.OrcaLoadReport) {

0 commit comments

Comments
 (0)