Skip to content

Commit 2fc3663

Browse files
committed
wip
1 parent efec262 commit 2fc3663

File tree

6 files changed

+209
-6
lines changed

6 files changed

+209
-6
lines changed

pkg/icingadb/db.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -506,7 +506,9 @@ func (db *DB) BatchSizeByPlaceholders(n int) int {
506506
// YieldAll executes the query with the supplied scope,
507507
// scans each resulting row into an entity returned by the factory function,
508508
// and streams them into a returned channel.
509-
func (db *DB) YieldAll(ctx context.Context, factoryFunc contracts.EntityFactoryFunc, query string, scope interface{}) (<-chan contracts.Entity, <-chan error) {
509+
func (db *DB) YieldAll(
510+
ctx context.Context, factoryFunc contracts.EntityFactoryFunc, query string, namedQueryParams bool, scope ...interface{},
511+
) (<-chan contracts.Entity, <-chan error) {
510512
entities := make(chan contracts.Entity, 1)
511513
g, ctx := errgroup.WithContext(ctx)
512514

@@ -515,7 +517,14 @@ func (db *DB) YieldAll(ctx context.Context, factoryFunc contracts.EntityFactoryF
515517
defer db.log(ctx, query, &counter).Stop()
516518
defer close(entities)
517519

518-
rows, err := db.NamedQueryContext(ctx, query, scope)
520+
var rows *sqlx.Rows
521+
var err error
522+
if namedQueryParams {
523+
rows, err = db.NamedQueryContext(ctx, query, scope)
524+
} else {
525+
rows, err = db.QueryxContext(ctx, query, scope...)
526+
}
527+
519528
if err != nil {
520529
return internal.CantPerformQuery(err, query)
521530
}

pkg/icingadb/sla.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package icingadb
2+
3+
import (
4+
"context"
5+
"crypto/rand"
6+
"fmt"
7+
"github.com/icinga/icingadb/pkg/contracts"
8+
v1 "github.com/icinga/icingadb/pkg/icingadb/v1"
9+
"github.com/icinga/icingadb/pkg/types"
10+
"github.com/pkg/errors"
11+
"golang.org/x/sync/errgroup"
12+
"time"
13+
)
14+
15+
type SlaHistoryTrail struct {
16+
v1.EntityWithoutChecksum `json:",inline" db:"-"`
17+
EnvironmentId types.Binary `json:"environment_id"`
18+
HostId types.Binary `json:"host_id"`
19+
ServiceId types.Binary `json:"service_id"`
20+
EventTime types.UnixMilli `json:"event_time"`
21+
EventType string `json:"event_type"`
22+
}
23+
24+
type SlaHostHistoryTrailColumns struct {
25+
v1.EntityWithoutChecksum `json:",inline"`
26+
EnvironmentId `json:",inline"`
27+
}
28+
29+
type SlaServiceHistoryTrailColumns struct {
30+
v1.EntityWithoutChecksum `json:",inline"`
31+
EnvironmentId `json:",inline"`
32+
HostId types.Binary `json:"host_id"`
33+
}
34+
35+
type EnvironmentId struct {
36+
EnvironmentId types.Binary `json:"environment_id"`
37+
}
38+
39+
func CheckableToSlaTrailEntities(ctx context.Context, g *errgroup.Group, checkables <-chan contracts.Entity, eventType string) <-chan contracts.Entity {
40+
entities := make(chan contracts.Entity, 1)
41+
42+
g.Go(func() error {
43+
defer close(entities)
44+
45+
for {
46+
select {
47+
case checkable, ok := <-checkables:
48+
if !ok {
49+
return nil
50+
}
51+
52+
id := make([]byte, 20)
53+
_, err := rand.Read(id)
54+
if err != nil {
55+
return errors.Wrap(err, "can't generate sla history trail ID")
56+
}
57+
58+
entity := &SlaHistoryTrail{
59+
EventTime: types.UnixMilli(time.Now()),
60+
EventType: eventType,
61+
}
62+
63+
switch ptr := checkable.(type) {
64+
case *v1.Host:
65+
entity.HostId = ptr.Id
66+
entity.EnvironmentId = ptr.EnvironmentId
67+
case *v1.Service:
68+
entity.HostId = ptr.HostId
69+
entity.ServiceId = ptr.Id
70+
entity.EnvironmentId = ptr.EnvironmentId
71+
}
72+
73+
if eventType == "delete" {
74+
fmt.Printf("Deleted Entity: %#v", entity)
75+
}
76+
entities <- entity
77+
case <-ctx.Done():
78+
return ctx.Err()
79+
}
80+
}
81+
})
82+
83+
return entities
84+
}

pkg/icingadb/sync.go

Lines changed: 66 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"go.uber.org/zap"
1616
"golang.org/x/sync/errgroup"
1717
"runtime"
18+
"strings"
1819
"time"
1920
)
2021

@@ -85,7 +86,8 @@ func (s Sync) Sync(ctx context.Context, subject *common.SyncSubject) error {
8586

8687
actual, dbErrs := s.db.YieldAll(
8788
ctx, subject.FactoryForDelta(),
88-
s.db.BuildSelectStmt(NewScopedEntity(subject.Entity(), e.Meta()), subject.Entity().Fingerprint()), e.Meta(),
89+
s.db.BuildSelectStmt(NewScopedEntity(subject.Entity(), e.Meta()), subject.Entity().Fingerprint()),
90+
true, e.Meta(),
8991
)
9092
// Let errors from DB cancel our group.
9193
com.ErrgroupReceive(g, dbErrs)
@@ -106,6 +108,8 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error {
106108
g, ctx := errgroup.WithContext(ctx)
107109
stat := getCounterForEntity(delta.Subject.Entity())
108110

111+
var subjectType = delta.Subject.Entity()
112+
109113
// Create
110114
if len(delta.Create) > 0 {
111115
s.logger.Infof("Inserting %d items of type %s", len(delta.Create), utils.Key(utils.Name(delta.Subject.Entity()), ' '))
@@ -128,9 +132,31 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error {
128132
entities = delta.Create.Entities(ctx)
129133
}
130134

135+
var slaTrailEntities chan contracts.Entity
136+
onSuccessHandlers := []OnSuccess[contracts.Entity]{
137+
OnSuccessIncrement[contracts.Entity](stat),
138+
}
139+
140+
switch subjectType.(type) {
141+
case *v1.Host, *v1.Service:
142+
slaTrailEntities = make(chan contracts.Entity)
143+
onSuccessHandlers = append(onSuccessHandlers, OnSuccessSendTo[contracts.Entity](slaTrailEntities))
144+
}
145+
131146
g.Go(func() error {
132-
return s.db.CreateStreamed(ctx, entities, OnSuccessIncrement[contracts.Entity](stat))
147+
if slaTrailEntities != nil {
148+
defer close(slaTrailEntities)
149+
}
150+
151+
return s.db.CreateStreamed(ctx, entities, onSuccessHandlers...)
133152
})
153+
154+
if slaTrailEntities != nil {
155+
s.logger.Infof("Inserting %d items of type %s sla history trails of type create", len(delta.Create), utils.Key(utils.Name(delta.Subject.Entity()), ' '))
156+
g.Go(func() error {
157+
return s.db.CreateStreamed(ctx, CheckableToSlaTrailEntities(ctx, g, slaTrailEntities, "create"))
158+
})
159+
}
134160
}
135161

136162
// Update
@@ -160,6 +186,40 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error {
160186
// Delete
161187
if len(delta.Delete) > 0 {
162188
s.logger.Infof("Deleting %d items of type %s", len(delta.Delete), utils.Key(utils.Name(delta.Subject.Entity()), ' '))
189+
switch subjectType.(type) {
190+
case *v1.Host, *v1.Service:
191+
g.Go(func() error {
192+
var entities <-chan contracts.Entity
193+
var columns interface{}
194+
195+
if _, ok := subjectType.(*v1.Host); ok {
196+
columns = &SlaHostHistoryTrailColumns{}
197+
} else {
198+
columns = &SlaServiceHistoryTrailColumns{}
199+
}
200+
201+
var placeholders []string
202+
for i := 0; i < len(delta.Delete); i++ {
203+
// scopes = append(scopes, id.(types.Binary))
204+
placeholders = append(placeholders, "?")
205+
}
206+
207+
query := s.db.BuildSelectStmt(delta.Subject.Entity(), columns)
208+
if placeholders != nil {
209+
query += fmt.Sprintf(` WHERE id IN (%s)`, strings.Join(placeholders, `, `))
210+
}
211+
212+
s.logger.Infof("Halloo QUERY: %s", query)
213+
214+
var err <-chan error
215+
entities, err = s.db.YieldAll(ctx, delta.Subject.Factory(), query, false, delta.Delete.IDs()...)
216+
com.ErrgroupReceive(g, err)
217+
218+
s.logger.Infof("Inserting %d items of type %s sla history trails of type delete", len(delta.Delete), utils.Key(utils.Name(delta.Subject.Entity()), ' '))
219+
return s.db.CreateStreamed(ctx, CheckableToSlaTrailEntities(ctx, g, entities, "delete"))
220+
})
221+
}
222+
163223
g.Go(func() error {
164224
return s.db.Delete(ctx, delta.Subject.Entity(), delta.Delete.IDs(), OnSuccessIncrement[any](stat))
165225
})
@@ -187,7 +247,8 @@ func (s Sync) SyncCustomvars(ctx context.Context) error {
187247

188248
actualCvs, errs := s.db.YieldAll(
189249
ctx, cv.FactoryForDelta(),
190-
s.db.BuildSelectStmt(NewScopedEntity(cv.Entity(), e.Meta()), cv.Entity().Fingerprint()), e.Meta(),
250+
s.db.BuildSelectStmt(NewScopedEntity(cv.Entity(), e.Meta()), cv.Entity().Fingerprint()),
251+
true, e.Meta(),
191252
)
192253
com.ErrgroupReceive(g, errs)
193254

@@ -199,7 +260,8 @@ func (s Sync) SyncCustomvars(ctx context.Context) error {
199260

200261
actualFlatCvs, errs := s.db.YieldAll(
201262
ctx, flatCv.FactoryForDelta(),
202-
s.db.BuildSelectStmt(NewScopedEntity(flatCv.Entity(), e.Meta()), flatCv.Entity().Fingerprint()), e.Meta(),
263+
s.db.BuildSelectStmt(NewScopedEntity(flatCv.Entity(), e.Meta()), flatCv.Entity().Fingerprint()),
264+
true, e.Meta(),
203265
)
204266
com.ErrgroupReceive(g, errs)
205267

schema/mysql/schema.sql

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1321,6 +1321,18 @@ CREATE TABLE sla_history_downtime (
13211321
INDEX idx_sla_history_downtime_env_downtime_end (environment_id, downtime_end) COMMENT 'Filter for sla history retention'
13221322
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin ROW_FORMAT=DYNAMIC;
13231323

1324+
CREATE TABLE sla_history_trail (
1325+
id int unsigned NOT NULL AUTO_INCREMENT,
1326+
environment_id binary(20) NOT NULL COMMENT 'environment.id',
1327+
host_id binary(20) NOT NULL COMMENT 'host.id (may reference already deleted hosts)',
1328+
service_id binary(20) DEFAULT NULL COMMENT 'service.id (may reference already deleted services)',
1329+
1330+
PRIMARY KEY (id),
1331+
1332+
event_type enum('delete', 'create') NOT NULL,
1333+
event_time bigint unsigned NOT NULL COMMENT 'unix timestamp the event occurred'
1334+
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin ROW_FORMAT=DYNAMIC;
1335+
13241336
CREATE TABLE icingadb_schema (
13251337
id int unsigned NOT NULL AUTO_INCREMENT,
13261338
version smallint unsigned NOT NULL,

schema/pgsql/schema.sql

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ CREATE TYPE boolenum AS ENUM ( 'n', 'y' );
1717
CREATE TYPE acked AS ENUM ( 'n', 'y', 'sticky' );
1818
CREATE TYPE state_type AS ENUM ( 'hard', 'soft' );
1919
CREATE TYPE checkable_type AS ENUM ( 'host', 'service' );
20+
CREATE TYPE sla_trail_event_type AS ENUM ( 'create', 'delete' );
2021
CREATE TYPE comment_type AS ENUM ( 'comment', 'ack' );
2122
CREATE TYPE notification_type AS ENUM ( 'downtime_start', 'downtime_end', 'downtime_removed', 'custom', 'acknowledgement', 'problem', 'recovery', 'flapping_start', 'flapping_end' );
2223
CREATE TYPE history_type AS ENUM ( 'notification', 'state_change', 'downtime_start', 'downtime_end', 'comment_add', 'comment_remove', 'flapping_start', 'flapping_end', 'ack_set', 'ack_clear' );
@@ -2147,6 +2148,23 @@ COMMENT ON COLUMN sla_history_downtime.downtime_id IS 'downtime.id (may referenc
21472148
COMMENT ON COLUMN sla_history_downtime.downtime_start IS 'start time of the downtime';
21482149
COMMENT ON COLUMN sla_history_downtime.downtime_end IS 'end time of the downtime';
21492150

2151+
CREATE TABLE sla_history_trail (
2152+
id serial NOT NULL,
2153+
environment_id bytea20 NOT NULL,
2154+
host_id bytea20 NOT NULL,
2155+
service_id bytea20 DEFAULT NULL,
2156+
2157+
event_type sla_trail_event_type NOT NULL,
2158+
event_time biguint NOT NULL,
2159+
2160+
CONSTRAINT pk_sla_history_trail PRIMARY KEY (id)
2161+
);
2162+
2163+
COMMENT ON COLUMN sla_history_trail.environment_id IS 'environment.id';
2164+
COMMENT ON COLUMN sla_history_trail.host_id IS 'host.id (may reference already deleted hosts)';
2165+
COMMENT ON COLUMN sla_history_trail.service_id IS 'service.id (may reference already deleted services)';
2166+
COMMENT ON COLUMN sla_history_trail.event_time IS 'unix timestamp the event occurred';
2167+
21502168
CREATE SEQUENCE icingadb_schema_id_seq;
21512169

21522170
CREATE TABLE icingadb_schema (

tests/object_sync_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,24 @@ func TestObjectSync(t *testing.T) {
310310
t.Skip()
311311
})
312312

313+
t.Run("Sla History Trail", func(t *testing.T) {
314+
t.Parallel()
315+
316+
assert.Eventuallyf(t, func() bool {
317+
var count int
318+
err := db.Get(&count, db.Rebind("SELECT COUNT(*) FROM sla_history_trail WHERE service_id IS NULL"))
319+
require.NoError(t, err, "querying hosts sla history trail should not fail")
320+
return count == len(data.Hosts)
321+
}, 20*time.Second, 200*time.Millisecond, "Newly created hosts should exists in database")
322+
323+
assert.Eventuallyf(t, func() bool {
324+
var count int
325+
err := db.Get(&count, db.Rebind("SELECT COUNT(*) FROM sla_history_trail WHERE service_id IS NOT NULL"))
326+
require.NoError(t, err, "querying services sla history trail should not fail")
327+
return count == len(data.Services)
328+
}, 20*time.Second, 200*time.Millisecond, "Newly created services should exists in database")
329+
})
330+
313331
t.Run("RuntimeUpdates", func(t *testing.T) {
314332
t.Parallel()
315333

0 commit comments

Comments
 (0)