Skip to content

Commit 41da858

Browse files
committed
feat: [interop/orcalb] Delegate SubConn management to pick_first
1 parent c1a9239 commit 41da858

File tree

2 files changed

+528
-72
lines changed

2 files changed

+528
-72
lines changed

interop/orcalb.go

Lines changed: 119 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,15 @@ import (
2424
"sync"
2525
"time"
2626

27-
v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
2827
"google.golang.org/grpc/balancer"
29-
"google.golang.org/grpc/balancer/base"
28+
"google.golang.org/grpc/balancer/endpointsharding"
29+
"google.golang.org/grpc/balancer/pickfirst"
3030
"google.golang.org/grpc/connectivity"
31+
internalgrpclog "google.golang.org/grpc/internal/grpclog"
3132
"google.golang.org/grpc/orca"
3233
"google.golang.org/grpc/resolver"
34+
35+
v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
3336
)
3437

3538
func init() {
@@ -38,106 +41,147 @@ func init() {
3841

3942
type orcabb struct{}
4043

41-
func (orcabb) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer {
42-
return &orcab{cc: cc}
44+
// Build creates a new orcab balancer.
45+
func (orcabb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
46+
b := &orcab{
47+
ClientConn: cc,
48+
stopOOBListeners: make(map[balancer.SubConn]func()),
49+
}
50+
b.child = endpointsharding.NewBalancer(b, bOpts, balancer.Get(pickfirst.Name).Build, endpointsharding.Options{})
51+
b.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[%p] ", b))
52+
b.logger.Infof("Created")
53+
return b
4354
}
4455

4556
func (orcabb) Name() string {
4657
return "test_backend_metrics_load_balancer"
4758
}
4859

60+
// orcab is the balancer for the test_backend_metrics_load_balancer policy.
61+
// It delegates SubConn management to endpointsharding + pick_first and
62+
// intercepts NewSubConn calls to register OOB listeners on READY SubConns.
4963
type orcab struct {
50-
cc balancer.ClientConn
51-
sc balancer.SubConn
52-
64+
// Embeds balancer.ClientConn to intercept NewSubConn and UpdateState
65+
// calls from child balancers.
66+
balancer.ClientConn
67+
child balancer.Balancer
68+
logger *internalgrpclog.PrefixLogger
69+
70+
// mu guards stopOOBListeners.
71+
mu sync.Mutex
72+
stopOOBListeners map[balancer.SubConn]func()
73+
74+
// reportMu guards report. It is held by OnLoadReport (called
75+
// asynchronously by the ORCA producer) and by the picker's Done callback.
5376
reportMu sync.Mutex
5477
report *v3orcapb.OrcaLoadReport
5578
}
5679

57-
func (o *orcab) ExitIdle() {
58-
if o.sc != nil {
59-
o.sc.Connect()
60-
}
80+
func (b *orcab) UpdateClientConnState(s balancer.ClientConnState) error {
81+
// Delegate to the child endpoint sharding balancer, which distributes
82+
// state updates to its pick_first children.
83+
return b.child.UpdateClientConnState(s)
6184
}
6285

63-
// endpointsToAddrs flattens a list of endpoints to addresses to maintain
64-
// existing behavior.
65-
// TODO: https://github.com/grpc/grpc-go/issues/8809 - delegate subchannel
66-
// management to the pickfirst balancer using the endpoint sharding balancer.
67-
func endpointsToAddrs(eps []resolver.Endpoint) []resolver.Address {
68-
addrs := make([]resolver.Address, 0, len(eps))
69-
for _, ep := range eps {
70-
if len(ep.Addresses) == 0 {
71-
continue
72-
}
73-
addrs = append(addrs, ep.Addresses[0])
74-
}
75-
return addrs
86+
func (b *orcab) ResolverError(err error) {
87+
// Will cause an inline picker update from endpoint sharding.
88+
b.child.ResolverError(err)
7689
}
7790

78-
func (o *orcab) UpdateClientConnState(s balancer.ClientConnState) error {
79-
if o.sc != nil {
80-
o.sc.UpdateAddresses(endpointsToAddrs(s.ResolverState.Endpoints))
81-
return nil
82-
}
91+
func (b *orcab) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
92+
b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, state)
93+
}
94+
95+
func (b *orcab) ExitIdle() {
96+
// Propagate to the child endpoint sharding balancer.
97+
b.child.ExitIdle()
98+
}
8399

84-
if len(s.ResolverState.Endpoints) == 0 {
85-
o.ResolverError(fmt.Errorf("produced no endpoints"))
86-
return fmt.Errorf("resolver produced no endpoints")
100+
func (b *orcab) Close() {
101+
b.mu.Lock()
102+
for _, stop := range b.stopOOBListeners {
103+
stop()
104+
}
105+
b.stopOOBListeners = nil
106+
b.mu.Unlock()
107+
b.child.Close()
108+
}
109+
110+
// NewSubConn intercepts SubConn creation from pick_first children to wrap the
111+
// StateListener for OOB listener management on READY transitions.
112+
func (b *orcab) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
113+
// The variable sc is declared before the closure so the closure captures
114+
// the variable, not its (zero) value. After ClientConn.NewSubConn assigns
115+
// to sc, the closure sees the real SubConn when invoked.
116+
var sc balancer.SubConn
117+
oldListener := opts.StateListener
118+
opts.StateListener = func(state balancer.SubConnState) {
119+
b.updateSubConnState(sc, state)
120+
if oldListener != nil {
121+
oldListener(state)
122+
}
87123
}
88-
var err error
89-
o.sc, err = o.cc.NewSubConn(endpointsToAddrs(s.ResolverState.Endpoints), balancer.NewSubConnOptions{StateListener: o.updateSubConnState})
124+
sc, err := b.ClientConn.NewSubConn(addrs, opts)
90125
if err != nil {
91-
o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPicker(fmt.Errorf("error creating subconn: %v", err))})
92-
return nil
126+
return nil, err
93127
}
94-
o.sc.Connect()
95-
o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable)})
96-
return nil
97-
}
98-
99-
func (o *orcab) ResolverError(err error) {
100-
if o.sc == nil {
101-
o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPicker(fmt.Errorf("resolver error: %v", err))})
128+
return sc, nil
129+
}
130+
131+
func (b *orcab) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
132+
b.mu.Lock()
133+
defer b.mu.Unlock()
134+
if state.ConnectivityState == connectivity.Ready {
135+
// Register an OOB listener when the SubConn becomes READY.
136+
stop := orca.RegisterOOBListener(sc, b, orca.OOBListenerOptions{
137+
ReportInterval: time.Second,
138+
})
139+
b.stopOOBListeners[sc] = stop
140+
return
141+
}
142+
// For any other state (including Shutdown), stop and remove the OOB
143+
// listener if one was registered for this SubConn.
144+
if stop, ok := b.stopOOBListeners[sc]; ok {
145+
stop()
146+
delete(b.stopOOBListeners, sc)
102147
}
103148
}
104149

105-
func (o *orcab) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
106-
logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, state)
107-
}
108-
109-
func (o *orcab) updateSubConnState(state balancer.SubConnState) {
110-
switch state.ConnectivityState {
111-
case connectivity.Ready:
112-
orca.RegisterOOBListener(o.sc, o, orca.OOBListenerOptions{ReportInterval: time.Second})
113-
o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Ready, Picker: &orcaPicker{o: o}})
114-
case connectivity.TransientFailure:
115-
o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPicker(fmt.Errorf("all subchannels in transient failure: %v", state.ConnectionError))})
116-
case connectivity.Connecting:
117-
// Ignore; picker already set to "connecting".
118-
case connectivity.Idle:
119-
o.sc.Connect()
120-
o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable)})
121-
case connectivity.Shutdown:
122-
// Ignore; we are closing but handle that in Close instead.
150+
// UpdateState intercepts state updates from endpointsharding to wrap the
151+
// picker with ORCA load report handling.
152+
func (b *orcab) UpdateState(state balancer.State) {
153+
// When at least one child is READY, wrap the picker to inject the ORCA
154+
// load report Done callback. For non-READY states, pass through the
155+
// endpointsharding picker as-is.
156+
if state.ConnectivityState == connectivity.Ready {
157+
state.Picker = &orcaPicker{
158+
childPicker: state.Picker,
159+
o: b,
160+
}
123161
}
162+
b.ClientConn.UpdateState(state)
124163
}
125164

126-
func (o *orcab) Close() {}
127-
128-
func (o *orcab) OnLoadReport(r *v3orcapb.OrcaLoadReport) {
129-
o.reportMu.Lock()
130-
defer o.reportMu.Unlock()
131-
logger.Infof("received OOB load report: %v", r)
132-
o.report = r
165+
// OnLoadReport implements orca.OOBListener.
166+
func (b *orcab) OnLoadReport(r *v3orcapb.OrcaLoadReport) {
167+
b.reportMu.Lock()
168+
defer b.reportMu.Unlock()
169+
b.logger.Infof("received OOB load report: %v", r)
170+
b.report = r
133171
}
134172

135173
type orcaPicker struct {
136-
o *orcab
174+
childPicker balancer.Picker
175+
o *orcab
137176
}
138177

139178
func (p *orcaPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
140-
doneCB := func(di balancer.DoneInfo) {
179+
res, err := p.childPicker.Pick(info)
180+
if err != nil {
181+
return res, err
182+
}
183+
origDone := res.Done
184+
res.Done = func(di balancer.DoneInfo) {
141185
if lr, _ := di.ServerLoad.(*v3orcapb.OrcaLoadReport); lr != nil &&
142186
(lr.CpuUtilization != 0 || lr.MemUtilization != 0 || len(lr.Utilization) > 0 || len(lr.RequestCost) > 0) {
143187
// Since all RPCs will respond with a load report due to the
@@ -151,8 +195,11 @@ func (p *orcaPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
151195
setContextCMR(info.Ctx, lr)
152196
}
153197
}
198+
if origDone != nil {
199+
origDone(di)
200+
}
154201
}
155-
return balancer.PickResult{SubConn: p.o.sc, Done: doneCB}, nil
202+
return res, nil
156203
}
157204

158205
func setContextCMR(ctx context.Context, lr *v3orcapb.OrcaLoadReport) {

0 commit comments

Comments
 (0)