Skip to content

Commit d65888c

Browse files
authored
plugins/status: FIFO buffer channel for status events to prevent slow status API blocking (#7522)
If a status API is slow to respond it can cause OPA to be blocked writing to an unbuffered channel. This fixes it by using a buffered channel that never blocks but drops the oldest status update if full. Signed-off-by: Sebastian Spaink <[email protected]>
1 parent eb77d10 commit d65888c

File tree

5 files changed

+144
-43
lines changed

5 files changed

+144
-43
lines changed

docs/content/configuration.md

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -767,15 +767,15 @@ included in the actual bundle gzipped tarball.
767767

768768
## Status
769769

770-
| Field | Type | Required | Description |
771-
| --- | --- | --- | --- |
772-
| `status.service` | `string` | Yes | Name of service to use to contact remote server. |
773-
| `status.partition_name` | `string` | No | Path segment to include in status updates. |
774-
| `status.console` | `boolean` | No (default: `false`) | Log the status updates locally to the console. When enabled alongside a remote status update API the `service` must be configured, the default `service` selection will be disabled. |
775-
| `status.prometheus` | `boolean` | No (default: `false`) | Export the status (bundle and plugin) metrics to prometheus (see [the monitoring documentation](../monitoring/#prometheus)). When enabled alongside a remote status update API the `service` must be configured, the default `service` selection will be disabled. |
776-
| `status.prometheus_config.collectors.bundle_loading_duration_ns.buckets` | `[]float64` | No, (Only use when status.prometheus true, default: [1000, 2000, 4000, 8000, 16_000, 32_000, 64_000, 128_000, 256_000, 512_000, 1_024_000, 2_048_000, 4_096_000, 8_192_000, 16_384_000, 32_768_000, 65_536_000, 131_072_000, 262_144_000, 524_288_000]) | Specifies the buckets for the `bundle_loading_duration_ns` metric. Each value is a float, it is expressed in nanoseconds. |
777-
| `status.plugin` | `string` | No | Use the named plugin for status updates. If this field exists, the other configuration fields are not required. |
778-
| `status.trigger` | `string` (default: `periodic`) | No | Controls how status updates are reported to the remote server. Allowed values are `periodic` and `manual` (`manual` triggers are only possible when using OPA as a Go package). |
770+
| Field | Type | Required | Description |
771+
|--------------------------------------------------------------------------|-------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
772+
| `status.service` | `string` | Yes | Name of service to use to contact remote server. |
773+
| `status.partition_name` | `string` | No | Path segment to include in status updates. |
774+
| `status.console` | `boolean` | No (default: `false`) | Log the status updates locally to the console. When enabled alongside a remote status update API the `service` must be configured, the default `service` selection will be disabled. |
775+
| `status.prometheus` | `boolean` | No (default: `false`) | Export the status (bundle and plugin) metrics to prometheus (see [the monitoring documentation](../monitoring/#prometheus)). When enabled alongside a remote status update API the `service` must be configured, the default `service` selection will be disabled. |
776+
| `status.prometheus_config.collectors.bundle_loading_duration_ns.buckets` | `[]float64` | No, (Only use when status.prometheus true, default: [1000, 2000, 4000, 8000, 16_000, 32_000, 64_000, 128_000, 256_000, 512_000, 1_024_000, 2_048_000, 4_096_000, 8_192_000, 16_384_000, 32_768_000, 65_536_000, 131_072_000, 262_144_000, 524_288_000]) | Specifies the buckets for the `bundle_loading_duration_ns` metric. Each value is a float, it is expressed in nanoseconds. |
777+
| `status.plugin` | `string` | No | Use the named plugin for status updates. If this field exists, the other configuration fields are not required. |
778+
| `status.trigger` | `string` | No (default: `periodic`) | Controls how status updates are reported to the remote server. Allowed values are `periodic` and `manual` (`manual` triggers are only possible when using OPA as a Go package). |
779779

780780
## Decision Logs
781781

v1/plugins/logs/eventBuffer.go

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/open-policy-agent/opa/v1/logging"
1414
"github.com/open-policy-agent/opa/v1/metrics"
1515
"github.com/open-policy-agent/opa/v1/plugins/rest"
16+
"github.com/open-policy-agent/opa/v1/util"
1617
)
1718

1819
type bufferItem struct {
@@ -94,26 +95,7 @@ func (b *eventBuffer) Push(event *EventV1) {
9495
}
9596

9697
func (b *eventBuffer) push(event *bufferItem) {
97-
// This prevents getting blocked forever writing to a full buffer, in case another routine fills the last space.
98-
// Retrying maxEventRetry times to drop the oldest event. Dropping the incoming event if there still isn't room.
99-
maxEventRetry := 1000
100-
101-
for range maxEventRetry {
102-
// non-blocking send to the buffer, to prevent blocking if buffer is full so room can be made.
103-
select {
104-
case b.buffer <- event:
105-
return
106-
default:
107-
}
108-
109-
// non-blocking drop from the buffer to make room for incoming event.
110-
// the buffer could have emptied due to an upload.
111-
select {
112-
case <-b.buffer:
113-
b.incrMetric(logBufferEventDropCounterName)
114-
default:
115-
}
116-
}
98+
util.PushFIFO(b.buffer, event, b.metrics, logBufferEventDropCounterName)
11799
}
118100

119101
// Upload reads events from the buffer and uploads them to the configured client.

v1/plugins/status/plugin.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ import (
2323
"github.com/open-policy-agent/opa/v1/util"
2424
)
2525

26+
const (
27+
statusBufferLimit = int64(10)
28+
statusBufferDropCounterName = "status_dropped_buffer_limit_exceeded"
29+
)
30+
2631
// Logger defines the interface for status plugins.
2732
type Logger interface {
2833
plugins.Plugin
@@ -207,8 +212,8 @@ func New(parsedConfig *Config, manager *plugins.Manager) *Plugin {
207212
p := &Plugin{
208213
manager: manager,
209214
config: *parsedConfig,
210-
bundleCh: make(chan bundle.Status),
211-
bulkBundleCh: make(chan map[string]*bundle.Status),
215+
bundleCh: make(chan bundle.Status, statusBufferLimit),
216+
bulkBundleCh: make(chan map[string]*bundle.Status, statusBufferLimit),
212217
discoCh: make(chan bundle.Status),
213218
decisionLogsCh: make(chan lstat.Status),
214219
stop: make(chan chan struct{}),
@@ -278,12 +283,12 @@ func (p *Plugin) Stop(_ context.Context) {
278283
// UpdateBundleStatus notifies the plugin that the policy bundle was updated.
279284
// Deprecated: Use BulkUpdateBundleStatus instead.
280285
func (p *Plugin) UpdateBundleStatus(status bundle.Status) {
281-
p.bundleCh <- status
286+
util.PushFIFO(p.bundleCh, status, p.metrics, statusBufferDropCounterName)
282287
}
283288

284289
// BulkUpdateBundleStatus notifies the plugin that the policy bundle was updated.
285290
func (p *Plugin) BulkUpdateBundleStatus(status map[string]*bundle.Status) {
286-
p.bulkBundleCh <- status
291+
util.PushFIFO(p.bulkBundleCh, status, p.metrics, statusBufferDropCounterName)
287292
}
288293

289294
// UpdateDiscoveryStatus notifies the plugin that the discovery bundle was updated.
@@ -436,7 +441,7 @@ func (p *Plugin) oneShot(ctx context.Context) error {
436441
WithJSON(req).
437442
Do(ctx, "POST", fmt.Sprintf("/status/%v", p.config.PartitionName))
438443
if err != nil {
439-
return fmt.Errorf("Status update failed: %w", err)
444+
return fmt.Errorf("status update failed: %w", err)
440445
}
441446

442447
defer util.Close(resp)

v1/plugins/status/plugin_test.go

Lines changed: 91 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"os"
1515
"reflect"
1616
"slices"
17+
"strconv"
1718
"strings"
1819
"testing"
1920
"time"
@@ -38,6 +39,60 @@ func TestMain(m *testing.M) {
3839
os.Exit(m.Run())
3940
}
4041

42+
func TestStatusUpdateBuffer(t *testing.T) {
43+
44+
tests := []struct {
45+
name string
46+
numberOfStatusUpdates int
47+
expectedStatusUpdates int
48+
expectedNameDropped string
49+
}{
50+
{
51+
name: "add one over the limit and drop oldest",
52+
numberOfStatusUpdates: 11,
53+
expectedStatusUpdates: 10,
54+
expectedNameDropped: "0",
55+
},
56+
{
57+
name: "don't drop anything",
58+
numberOfStatusUpdates: 5,
59+
expectedStatusUpdates: 5,
60+
},
61+
}
62+
63+
for _, tc := range tests {
64+
t.Run(tc.name, func(t *testing.T) {
65+
fixture := newTestFixture(t, nil)
66+
ctx := context.Background()
67+
68+
err := fixture.plugin.Start(ctx)
69+
if err != nil {
70+
t.Fatal(err)
71+
}
72+
defer fixture.plugin.Stop(ctx)
73+
74+
for i := range tc.numberOfStatusUpdates {
75+
s := bundle.Status{
76+
Name: strconv.Itoa(i),
77+
}
78+
fixture.plugin.BulkUpdateBundleStatus(map[string]*bundle.Status{
79+
"test": &s,
80+
})
81+
}
82+
83+
if len(fixture.plugin.bulkBundleCh) != tc.expectedStatusUpdates {
84+
t.Fatalf("expected %d updates, got %d", tc.expectedStatusUpdates, len(fixture.plugin.bulkBundleCh))
85+
}
86+
for _, v := range <-fixture.plugin.bulkBundleCh {
87+
if v.Name == tc.expectedNameDropped {
88+
t.Fatalf("expected %s dropped", tc.expectedNameDropped)
89+
}
90+
}
91+
})
92+
}
93+
94+
}
95+
4196
func TestConfigValueParse(t *testing.T) {
4297
tests := []struct {
4398
note string
@@ -87,17 +142,17 @@ func TestConfigValueParse(t *testing.T) {
87142
},
88143
}
89144

90-
for _, test := range tests {
91-
t.Run(test.note, func(t *testing.T) {
92-
config, err := ParseConfig([]byte(test.input), []string{}, []string{"status"})
145+
for _, tc := range tests {
146+
t.Run(tc.note, func(t *testing.T) {
147+
config, err := ParseConfig([]byte(tc.input), []string{}, []string{"status"})
93148
if err != nil {
94149
t.Errorf("expected no error: %v", err)
95150
}
96-
if test.expectedNoConfig && config != nil {
151+
if tc.expectedNoConfig && config != nil {
97152
t.Errorf("expected parsed config is nil, got %v", config)
98153
}
99-
if !test.expectedNoConfig && !slices.Equal(config.PrometheusConfig.Collectors.BundleLoadDurationNanoseconds.Buckets, test.expectedValue) {
100-
t.Errorf("expected %v, got %v", test.expectedValue, config.PrometheusConfig.Collectors.BundleLoadDurationNanoseconds.Buckets)
154+
if !tc.expectedNoConfig && !slices.Equal(config.PrometheusConfig.Collectors.BundleLoadDurationNanoseconds.Buckets, tc.expectedValue) {
155+
t.Errorf("expected %v, got %v", tc.expectedValue, config.PrometheusConfig.Collectors.BundleLoadDurationNanoseconds.Buckets)
101156
}
102157
})
103158
}
@@ -385,7 +440,7 @@ func TestPluginNoLogging(t *testing.T) {
385440
}
386441
}
387442

388-
func TestPluginStartTriggerManual(t *testing.T) {
443+
func TestPluginStartTriggerManualStart(t *testing.T) {
389444

390445
fixture := newTestFixture(t, nil)
391446
fixture.server.ch = make(chan UpdateRequestV1)
@@ -421,18 +476,45 @@ func TestPluginStartTriggerManual(t *testing.T) {
421476
if !maps.Equal(result.Labels, exp.Labels) {
422477
t.Fatalf("Expected: %v but got: %v", exp, result)
423478
}
479+
}
480+
481+
func TestPluginStartTriggerManual(t *testing.T) {
482+
fixture := newTestFixture(t, nil)
483+
fixture.server.ch = make(chan UpdateRequestV1)
484+
defer fixture.server.stop()
485+
tr := plugins.TriggerManual
486+
fixture.plugin.config.Trigger = &tr
424487

425488
status := testStatus()
426489

427490
fixture.plugin.BulkUpdateBundleStatus(map[string]*bundle.Status{"test": status})
428491

492+
statuses := <-fixture.plugin.bulkBundleCh
493+
fixture.plugin.lastBundleStatuses = statuses
494+
429495
// trigger the status update
430496
go func() {
431-
_ = fixture.plugin.Trigger(ctx)
497+
_ = fixture.plugin.Trigger(context.Background())
432498
}()
433499

434-
result = <-fixture.server.ch
500+
go func() {
501+
update := <-fixture.plugin.trigger
502+
err := fixture.plugin.oneShot(update.ctx)
503+
if err != nil {
504+
t.Error(err)
505+
return
506+
}
507+
}()
435508

509+
result := <-fixture.server.ch
510+
511+
exp := UpdateRequestV1{
512+
Labels: map[string]string{
513+
"id": "test-instance-id",
514+
"app": "example-app",
515+
"version": version.Version,
516+
},
517+
}
436518
exp.Bundles = map[string]*bundle.Status{"test": status}
437519

438520
if !maps.EqualFunc(result.Bundles, exp.Bundles, (*bundle.Status).Equal) {

v1/util/channel.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package util
2+
3+
import (
4+
"github.com/open-policy-agent/opa/v1/metrics"
5+
)
6+
7+
// This prevents getting blocked forever writing to a full buffer, in case another routine fills the last space.
8+
// Retrying maxEventRetry times to drop the oldest event. Dropping the incoming event if there still isn't room.
9+
const maxEventRetry = 1000
10+
11+
// PushFIFO pushes data into a buffered channel without blocking when full, making room by dropping the oldest data.
12+
// An optional metric can be recorded when data is dropped.
13+
func PushFIFO[T any](buffer chan T, data T, metrics metrics.Metrics, metricName string) {
14+
15+
for range maxEventRetry {
16+
// non-blocking send to the buffer, to prevent blocking if buffer is full so room can be made.
17+
select {
18+
case buffer <- data:
19+
return
20+
default:
21+
}
22+
23+
// non-blocking drop from the buffer to make room for incoming event
24+
select {
25+
case <-buffer:
26+
if metrics != nil && metricName != "" {
27+
metrics.Counter(metricName).Incr()
28+
}
29+
default:
30+
}
31+
}
32+
}

0 commit comments

Comments
 (0)