Skip to content

Commit 1cc1f09

Browse files
committed
WIP
1 parent efec262 commit 1cc1f09

File tree

4 files changed

+124
-0
lines changed

4 files changed

+124
-0
lines changed

pkg/icingadb/sla.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package icingadb
2+
3+
import (
4+
"context"
5+
"crypto/rand"
6+
"github.com/icinga/icingadb/pkg/com"
7+
"github.com/icinga/icingadb/pkg/contracts"
8+
v1 "github.com/icinga/icingadb/pkg/icingadb/v1"
9+
"github.com/icinga/icingadb/pkg/types"
10+
"github.com/icinga/icingadb/pkg/utils"
11+
"github.com/pkg/errors"
12+
"golang.org/x/sync/errgroup"
13+
)
14+
15+
type SlaHistoryTrail struct {
16+
v1.EntityWithoutChecksum `json:",inline"`
17+
EnvironmentId types.Binary `json:"environment_id"`
18+
ObjectType string `json:"object_type"`
19+
HostId types.Binary `json:"host_id"`
20+
ServiceId types.Binary `json:"service_id"`
21+
EventTime types.UnixMilli `json:"event_time"`
22+
EventType string `json:"event_type"`
23+
}
24+
25+
func CheckableToSlaTrailEntities(ctx context.Context, checkables <-chan contracts.Entity, eventType string) (<-chan contracts.Entity, <-chan error) {
26+
entities := make(chan contracts.Entity)
27+
g, ctx := errgroup.WithContext(ctx)
28+
29+
g.Go(func() error {
30+
defer close(entities)
31+
32+
for {
33+
select {
34+
case checkable, ok := <-checkables:
35+
if !ok {
36+
return nil
37+
}
38+
39+
id, err := generateBinaryId()
40+
if err != nil {
41+
return errors.Wrap(err, "can't generate sla history trail ID")
42+
}
43+
44+
entity := &SlaHistoryTrail{
45+
EntityWithoutChecksum: v1.EntityWithoutChecksum{
46+
IdMeta: v1.IdMeta{Id: id},
47+
},
48+
ObjectType: utils.Name(checkable),
49+
EventTime: types.UnixMilli{},
50+
EventType: eventType,
51+
}
52+
53+
switch ptr := checkable.(type) {
54+
case *v1.Host:
55+
entity.HostId = ptr.Id
56+
entity.EnvironmentId = ptr.EnvironmentId
57+
case *v1.Service:
58+
entity.HostId = ptr.HostId
59+
entity.ServiceId = ptr.Id
60+
entity.EnvironmentId = ptr.EnvironmentId
61+
}
62+
63+
entities <- entity
64+
case <-ctx.Done():
65+
return ctx.Err()
66+
}
67+
}
68+
})
69+
70+
return entities, com.WaitAsync(g)
71+
}
72+
73+
// GenerateBinaryId generates a 20 byte length random id
74+
func generateBinaryId() (types.Binary, error) {
75+
id := make([]byte, 20)
76+
_, err := rand.Read(id)
77+
78+
return id, err
79+
}

pkg/icingadb/sync.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,20 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error {
131131
g.Go(func() error {
132132
return s.db.CreateStreamed(ctx, entities, OnSuccessIncrement[contracts.Entity](stat))
133133
})
134+
135+
subjectType := delta.Subject.Entity()
136+
switch subjectType.(type) {
137+
case *v1.Host, *v1.Service:
138+
s.logger.Infof("Inserting %d items of type %s sla history trails", len(delta.Create), utils.Key(utils.Name(delta.Subject.Entity()), ' '))
139+
140+
var slaTrails <-chan contracts.Entity
141+
slaTrails, errs := CheckableToSlaTrailEntities(ctx, entities, "create")
142+
com.ErrgroupReceive(g, errs)
143+
144+
g.Go(func() error {
145+
return s.db.CreateStreamed(ctx, slaTrails, OnSuccessIncrement[contracts.Entity](stat))
146+
})
147+
}
134148
}
135149

136150
// Update

schema/mysql/schema.sql

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1321,6 +1321,19 @@ CREATE TABLE sla_history_downtime (
13211321
INDEX idx_sla_history_downtime_env_downtime_end (environment_id, downtime_end) COMMENT 'Filter for sla history retention'
13221322
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin ROW_FORMAT=DYNAMIC;
13231323

1324+
CREATE TABLE sla_history_trail (
1325+
id binary(20) NOT NULL,
1326+
environment_id binary(20) NOT NULL COMMENT 'environment.id',
1327+
object_type enum('host', 'service') NOT NULL,
1328+
host_id binary(20) NOT NULL COMMENT 'host.id (may reference already deleted hosts)',
1329+
service_id binary(20) NOT NULL COMMENT 'service.id (may reference already deleted services)',
1330+
1331+
PRIMARY KEY (id),
1332+
1333+
event_type enum('delete', 'create') NOT NULL,
1334+
event_time bigint unsigned NOT NULL COMMENT 'unix timestamp the event occurred'
1335+
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin ROW_FORMAT=DYNAMIC;
1336+
13241337
CREATE TABLE icingadb_schema (
13251338
id int unsigned NOT NULL AUTO_INCREMENT,
13261339
version smallint unsigned NOT NULL,

tests/object_sync_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,24 @@ func TestObjectSync(t *testing.T) {
310310
t.Skip()
311311
})
312312

313+
t.Run("Sla History Trail", func(t *testing.T) {
314+
t.Parallel()
315+
316+
assert.Eventuallyf(t, func() bool {
317+
var count int
318+
err := db.Get(&count, db.Rebind("SELECT COUNT(*) FROM sla_history_trail WHERE service_id IS NULL"))
319+
require.NoError(t, err, "querying hosts sla history trail should not fail")
320+
return count == len(data.Hosts)
321+
}, 20*time.Second, 200*time.Millisecond, "Newly created hosts should exists in database")
322+
323+
assert.Eventuallyf(t, func() bool {
324+
var count int
325+
err := db.Get(&count, db.Rebind("SELECT COUNT(*) FROM sla_history_trail WHERE service_id IS NOT NULL"))
326+
require.NoError(t, err, "querying services sla history trail should not fail")
327+
return count == len(data.Services)
328+
}, 20*time.Second, 200*time.Millisecond, "Newly created services should exists in database")
329+
})
330+
313331
t.Run("RuntimeUpdates", func(t *testing.T) {
314332
t.Parallel()
315333

0 commit comments

Comments
 (0)