Skip to content

Commit fa2baed

Browse files
committed
Write create & delete sla events when initially syncing configs
1 parent 15c8ddc commit fa2baed

File tree

4 files changed

+233
-2
lines changed

4 files changed

+233
-2
lines changed

pkg/icingadb/sla.go

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
package icingadb
2+
3+
import (
4+
"context"
5+
"github.com/icinga/icingadb/pkg/contracts"
6+
v1 "github.com/icinga/icingadb/pkg/icingadb/v1"
7+
"github.com/icinga/icingadb/pkg/types"
8+
"github.com/pkg/errors"
9+
"golang.org/x/sync/errgroup"
10+
"time"
11+
)
12+
13+
type SlaHistoryTrail struct {
14+
Id types.Int `json:"id" db:"-"`
15+
v1.EnvironmentMeta `json:",inline"`
16+
HostId types.Binary `json:"host_id"`
17+
ServiceId types.Binary `json:"service_id"`
18+
EventType string `json:"event_type"`
19+
EventTime types.UnixMilli `json:"event_time"`
20+
}
21+
22+
// Fingerprint implements the contracts.Fingerprinter interface.
23+
func (sht SlaHistoryTrail) Fingerprint() contracts.Fingerprinter {
24+
return sht
25+
}
26+
27+
// ID implements part of the contracts.IDer interface.
28+
func (sht SlaHistoryTrail) ID() contracts.ID {
29+
return sht.Id
30+
}
31+
32+
// SetID implements part of the contracts.IDer interface.
33+
func (sht *SlaHistoryTrail) SetID(id contracts.ID) {
34+
sht.Id = id.(types.Int)
35+
}
36+
37+
type SlaServiceHistoryTrailColumns struct {
38+
v1.EntityWithoutChecksum `json:",inline"`
39+
HostId types.Binary `json:"host_id"`
40+
}
41+
42+
func CheckableToSlaTrailEntities(ctx context.Context, g *errgroup.Group, checkables <-chan contracts.Entity, eventType string) <-chan contracts.Entity {
43+
entities := make(chan contracts.Entity, 1)
44+
45+
g.Go(func() error {
46+
defer close(entities)
47+
48+
env, err := getEnvironmentId(ctx)
49+
if err != nil {
50+
return err
51+
}
52+
53+
// Use the same event time for all chackables
54+
now := time.Now()
55+
56+
for {
57+
select {
58+
case checkable, ok := <-checkables:
59+
if !ok {
60+
return nil
61+
}
62+
63+
entity := &SlaHistoryTrail{
64+
EnvironmentMeta: v1.EnvironmentMeta{EnvironmentId: env},
65+
EventType: eventType,
66+
EventTime: types.UnixMilli(now),
67+
}
68+
69+
switch ptr := checkable.(type) {
70+
case *v1.Host:
71+
entity.HostId = ptr.Id
72+
case *v1.Service:
73+
entity.HostId = ptr.HostId
74+
entity.ServiceId = ptr.Id
75+
}
76+
77+
entities <- entity
78+
case <-ctx.Done():
79+
return ctx.Err()
80+
}
81+
}
82+
})
83+
84+
return entities
85+
}
86+
87+
// HostIdsToSlaHistoryTrail transforms the IDs from the passed channel into sla history trail struct
88+
// and streams them into a returned channel.
89+
func HostIdsToSlaHistoryTrail(ctx context.Context, g *errgroup.Group, ids <-chan any, eventType string) <-chan contracts.Entity {
90+
entities := make(chan contracts.Entity, 1)
91+
g.Go(func() error {
92+
defer close(entities)
93+
94+
env, err := getEnvironmentId(ctx)
95+
if err != nil {
96+
return err
97+
}
98+
99+
// Use the same event time for all hosts
100+
now := time.Now()
101+
102+
for {
103+
select {
104+
case id, ok := <-ids:
105+
if !ok {
106+
return nil
107+
}
108+
109+
entities <- &SlaHistoryTrail{
110+
EnvironmentMeta: v1.EnvironmentMeta{EnvironmentId: env},
111+
HostId: id.(types.Binary),
112+
EventType: eventType,
113+
EventTime: types.UnixMilli(now),
114+
}
115+
case <-ctx.Done():
116+
return ctx.Err()
117+
}
118+
}
119+
})
120+
121+
return entities
122+
}
123+
124+
// Get environment id from the given context
125+
func getEnvironmentId(ctx context.Context) (types.Binary, error) {
126+
env, ok := v1.EnvironmentFromContext(ctx)
127+
if !ok {
128+
return nil, errors.New("can't get environment from context")
129+
}
130+
131+
return env.Id, nil
132+
}
133+
134+
// Assert interface compliance.
135+
var (
136+
_ contracts.Entity = (*SlaHistoryTrail)(nil)
137+
)

pkg/icingadb/sync.go

Lines changed: 66 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"go.uber.org/zap"
1616
"golang.org/x/sync/errgroup"
1717
"runtime"
18+
"strings"
1819
"time"
1920
)
2021

@@ -129,9 +130,29 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error {
129130
entities = delta.Create.Entities(ctx)
130131
}
131132

133+
var createdEntities chan contracts.Entity
134+
onSuccessHandlers := []OnSuccess[contracts.Entity]{OnSuccessIncrement[contracts.Entity](stat)}
135+
136+
switch delta.Subject.Entity().(type) {
137+
case *v1.Host, *v1.Service:
138+
createdEntities = make(chan contracts.Entity)
139+
onSuccessHandlers = append(onSuccessHandlers, OnSuccessSendTo[contracts.Entity](createdEntities))
140+
}
141+
132142
g.Go(func() error {
133-
return s.db.CreateStreamed(ctx, entities, OnSuccessIncrement[contracts.Entity](stat))
143+
if createdEntities != nil {
144+
defer close(createdEntities)
145+
}
146+
147+
return s.db.CreateStreamed(ctx, entities, onSuccessHandlers...)
134148
})
149+
150+
if createdEntities != nil {
151+
s.logger.Infof("Inserting %d items of type %s sla history trails of type create", len(delta.Create), utils.Key(utils.Name(delta.Subject.Entity()), ' '))
152+
g.Go(func() error {
153+
return s.db.CreateStreamed(ctx, CheckableToSlaTrailEntities(ctx, g, createdEntities, "create"))
154+
})
155+
}
135156
}
136157

137158
// Update
@@ -161,9 +182,52 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error {
161182
// Delete
162183
if len(delta.Delete) > 0 {
163184
s.logger.Infof("Deleting %d items of type %s", len(delta.Delete), utils.Key(utils.Name(delta.Subject.Entity()), ' '))
185+
entity := delta.Subject.Entity()
186+
if _, ok := entity.(*v1.Service); ok {
187+
s.logger.Infof("Inserting %d items of type service sla history trails of type delete", len(delta.Delete))
188+
g.Go(func() error {
189+
columns := &SlaServiceHistoryTrailColumns{}
190+
query := s.db.BuildSelectStmt(entity, columns)
191+
if len(delta.Delete) == 1 {
192+
query += ` WHERE id = ?`
193+
} else {
194+
var placeholders []string
195+
for i := 0; i < len(delta.Delete); i++ {
196+
placeholders = append(placeholders, "?")
197+
}
198+
199+
query += fmt.Sprintf(` WHERE id IN (%s)`, strings.Join(placeholders, `, `))
200+
}
201+
entities, err := s.db.YieldAll(ctx, delta.Subject.Factory(), query, false, delta.Delete.IDs()...)
202+
com.ErrgroupReceive(g, err)
203+
204+
return s.db.CreateStreamed(ctx, CheckableToSlaTrailEntities(ctx, g, entities, "delete"))
205+
})
206+
}
207+
208+
var hostIds chan any
209+
onSuccessHandlers := []OnSuccess[any]{OnSuccessIncrement[any](stat)}
210+
211+
_, isHost := entity.(*v1.Host)
212+
if isHost {
213+
hostIds = make(chan any, 1)
214+
onSuccessHandlers = append(onSuccessHandlers, OnSuccessSendTo[any](hostIds))
215+
}
216+
164217
g.Go(func() error {
165-
return s.db.Delete(ctx, delta.Subject.Entity(), delta.Delete.IDs(), OnSuccessIncrement[any](stat))
218+
if isHost {
219+
defer close(hostIds)
220+
}
221+
222+
return s.db.Delete(ctx, delta.Subject.Entity(), delta.Delete.IDs(), onSuccessHandlers...)
166223
})
224+
225+
if isHost {
226+
s.logger.Infof("Inserting %d items of type host sla history trails of type delete", len(delta.Delete))
227+
g.Go(func() error {
228+
return s.db.CreateStreamed(ctx, HostIdsToSlaHistoryTrail(ctx, g, hostIds, "delete"))
229+
})
230+
}
167231
}
168232

169233
return g.Wait()

schema/mysql/schema.sql

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1321,6 +1321,18 @@ CREATE TABLE sla_history_downtime (
13211321
INDEX idx_sla_history_downtime_env_downtime_end (environment_id, downtime_end) COMMENT 'Filter for sla history retention'
13221322
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin ROW_FORMAT=DYNAMIC;
13231323

1324+
CREATE TABLE sla_history_trail (
1325+
id bigint NOT NULL AUTO_INCREMENT,
1326+
environment_id binary(20) NOT NULL COMMENT 'environment.id',
1327+
host_id binary(20) NOT NULL COMMENT 'host.id (may reference already deleted hosts)',
1328+
service_id binary(20) DEFAULT NULL COMMENT 'service.id (may reference already deleted services)',
1329+
1330+
event_type enum('delete', 'create') NOT NULL,
1331+
event_time bigint unsigned NOT NULL COMMENT 'unix timestamp the event occurred',
1332+
1333+
PRIMARY KEY (id)
1334+
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin ROW_FORMAT=DYNAMIC;
1335+
13241336
CREATE TABLE icingadb_schema (
13251337
id int unsigned NOT NULL AUTO_INCREMENT,
13261338
version smallint unsigned NOT NULL,

schema/pgsql/schema.sql

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ CREATE TYPE boolenum AS ENUM ( 'n', 'y' );
1717
CREATE TYPE acked AS ENUM ( 'n', 'y', 'sticky' );
1818
CREATE TYPE state_type AS ENUM ( 'hard', 'soft' );
1919
CREATE TYPE checkable_type AS ENUM ( 'host', 'service' );
20+
CREATE TYPE sla_trail_event_type AS ENUM ( 'create', 'delete' );
2021
CREATE TYPE comment_type AS ENUM ( 'comment', 'ack' );
2122
CREATE TYPE notification_type AS ENUM ( 'downtime_start', 'downtime_end', 'downtime_removed', 'custom', 'acknowledgement', 'problem', 'recovery', 'flapping_start', 'flapping_end' );
2223
CREATE TYPE history_type AS ENUM ( 'notification', 'state_change', 'downtime_start', 'downtime_end', 'comment_add', 'comment_remove', 'flapping_start', 'flapping_end', 'ack_set', 'ack_clear' );
@@ -2147,6 +2148,23 @@ COMMENT ON COLUMN sla_history_downtime.downtime_id IS 'downtime.id (may referenc
21472148
COMMENT ON COLUMN sla_history_downtime.downtime_start IS 'start time of the downtime';
21482149
COMMENT ON COLUMN sla_history_downtime.downtime_end IS 'end time of the downtime';
21492150

2151+
CREATE TABLE sla_history_trail (
2152+
id bigserial NOT NULL,
2153+
environment_id bytea20 NOT NULL,
2154+
host_id bytea20 NOT NULL,
2155+
service_id bytea20 DEFAULT NULL,
2156+
2157+
event_type sla_trail_event_type NOT NULL,
2158+
event_time biguint NOT NULL,
2159+
2160+
CONSTRAINT pk_sla_history_trail PRIMARY KEY (id)
2161+
);
2162+
2163+
COMMENT ON COLUMN sla_history_trail.environment_id IS 'environment.id';
2164+
COMMENT ON COLUMN sla_history_trail.host_id IS 'host.id (may reference already deleted hosts)';
2165+
COMMENT ON COLUMN sla_history_trail.service_id IS 'service.id (may reference already deleted services)';
2166+
COMMENT ON COLUMN sla_history_trail.event_time IS 'unix timestamp the event occurred';
2167+
21502168
CREATE SEQUENCE icingadb_schema_id_seq;
21512169

21522170
CREATE TABLE icingadb_schema (

0 commit comments

Comments
 (0)