Skip to content

Commit a185e61

Browse files
toepkerdtoepkerd-zz
andcommitted
PPL Alerting: Get Alerts and Alert Lifecycle (#1972)
* PPL Alerting: Get Alerts and Alert Lifecycle Signed-off-by: Dennis Toepker <[email protected]> * updating scheduled jobs schema version in alert indices IT Signed-off-by: Dennis Toepker <[email protected]> * enabling alerting v2 by default Signed-off-by: Dennis Toepker <[email protected]> * making get alerts response fields snake cased Signed-off-by: Dennis Toepker <[email protected]> * removing unused get alerts params and adding serde tests for alertsv2 request and response Signed-off-by: Dennis Toepker <[email protected]> * removing misleading comments Signed-off-by: Dennis Toepker <[email protected]> * minor changes Signed-off-by: Dennis Toepker <[email protected]> * changed some setting names to go under alerting prefix, and optimizing alert expire logic to search for expired alerts in OS query instead of scanning Alerts for expiration in memory Signed-off-by: Dennis Toepker <[email protected]> --------- Signed-off-by: Dennis Toepker <[email protected]> Co-authored-by: Dennis Toepker <[email protected]>
1 parent 154fcb2 commit a185e61

File tree

23 files changed

+1900
-38
lines changed

23 files changed

+1900
-38
lines changed

alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,15 @@ import org.opensearch.alerting.action.GetRemoteIndexesAction
1515
import org.opensearch.alerting.action.SearchEmailAccountAction
1616
import org.opensearch.alerting.action.SearchEmailGroupAction
1717
import org.opensearch.alerting.actionv2.DeleteMonitorV2Action
18+
import org.opensearch.alerting.actionv2.GetAlertsV2Action
1819
import org.opensearch.alerting.actionv2.GetMonitorV2Action
1920
import org.opensearch.alerting.actionv2.IndexMonitorV2Action
2021
import org.opensearch.alerting.actionv2.SearchMonitorV2Action
2122
import org.opensearch.alerting.alerts.AlertIndices
2223
import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_ALERT_INDEX_PATTERN
24+
import org.opensearch.alerting.alertsv2.AlertV2Indices
25+
import org.opensearch.alerting.alertsv2.AlertV2Indices.Companion.ALL_ALERT_V2_INDEX_PATTERN
26+
import org.opensearch.alerting.alertsv2.AlertV2Mover
2327
import org.opensearch.alerting.comments.CommentsIndices
2428
import org.opensearch.alerting.comments.CommentsIndices.Companion.ALL_COMMENTS_INDEX_PATTERN
2529
import org.opensearch.alerting.core.JobSweeper
@@ -29,6 +33,7 @@ import org.opensearch.alerting.core.action.node.ScheduledJobsStatsTransportActio
2933
import org.opensearch.alerting.core.lock.LockService
3034
import org.opensearch.alerting.core.resthandler.RestScheduledJobStatsHandler
3135
import org.opensearch.alerting.core.schedule.JobScheduler
36+
import org.opensearch.alerting.core.settings.AlertingV2Settings
3237
import org.opensearch.alerting.core.settings.LegacyOpenDistroScheduledJobSettings
3338
import org.opensearch.alerting.core.settings.ScheduledJobSettings
3439
import org.opensearch.alerting.modelv2.MonitorV2
@@ -57,6 +62,7 @@ import org.opensearch.alerting.resthandler.RestSearchEmailAccountAction
5762
import org.opensearch.alerting.resthandler.RestSearchEmailGroupAction
5863
import org.opensearch.alerting.resthandler.RestSearchMonitorAction
5964
import org.opensearch.alerting.resthandlerv2.RestDeleteMonitorV2Action
65+
import org.opensearch.alerting.resthandlerv2.RestGetAlertsV2Action
6066
import org.opensearch.alerting.resthandlerv2.RestGetMonitorV2Action
6167
import org.opensearch.alerting.resthandlerv2.RestIndexMonitorV2Action
6268
import org.opensearch.alerting.resthandlerv2.RestSearchMonitorV2Action
@@ -93,6 +99,7 @@ import org.opensearch.alerting.transport.TransportSearchEmailAccountAction
9399
import org.opensearch.alerting.transport.TransportSearchEmailGroupAction
94100
import org.opensearch.alerting.transport.TransportSearchMonitorAction
95101
import org.opensearch.alerting.transportv2.TransportDeleteMonitorV2Action
102+
import org.opensearch.alerting.transportv2.TransportGetAlertsV2Action
96103
import org.opensearch.alerting.transportv2.TransportGetMonitorV2Action
97104
import org.opensearch.alerting.transportv2.TransportIndexMonitorV2Action
98105
import org.opensearch.alerting.transportv2.TransportSearchMonitorV2Action
@@ -203,8 +210,10 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
203210
lateinit var docLevelMonitorQueries: DocLevelMonitorQueries
204211
lateinit var threadPool: ThreadPool
205212
lateinit var alertIndices: AlertIndices
213+
lateinit var alertV2Indices: AlertV2Indices
206214
lateinit var clusterService: ClusterService
207215
lateinit var destinationMigrationCoordinator: DestinationMigrationCoordinator
216+
lateinit var alertV2Mover: AlertV2Mover
208217
var monitorTypeToMonitorRunners: MutableMap<String, RemoteMonitorRegistry> = mutableMapOf()
209218

210219
override fun getRestHandlers(
@@ -248,6 +257,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
248257
RestDeleteMonitorV2Action(),
249258
RestGetMonitorV2Action(),
250259
RestSearchMonitorV2Action(settings, clusterService),
260+
RestGetAlertsV2Action(),
251261
)
252262
}
253263

@@ -287,6 +297,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
287297
ActionPlugin.ActionHandler(GetMonitorV2Action.INSTANCE, TransportGetMonitorV2Action::class.java),
288298
ActionPlugin.ActionHandler(SearchMonitorV2Action.INSTANCE, TransportSearchMonitorV2Action::class.java),
289299
ActionPlugin.ActionHandler(DeleteMonitorV2Action.INSTANCE, TransportDeleteMonitorV2Action::class.java),
300+
ActionPlugin.ActionHandler(GetAlertsV2Action.INSTANCE, TransportGetAlertsV2Action::class.java)
290301
)
291302
}
292303

@@ -323,6 +334,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
323334
val settings = environment.settings()
324335
val lockService = LockService(client, clusterService)
325336
alertIndices = AlertIndices(settings, client, threadPool, clusterService)
337+
alertV2Indices = AlertV2Indices(settings, client, threadPool, clusterService)
326338
val alertService = AlertService(client, xContentRegistry, alertIndices)
327339
val triggerService = TriggerService(scriptService)
328340
runner = MonitorRunnerService
@@ -334,6 +346,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
334346
.registerSettings(settings)
335347
.registerThreadPool(threadPool)
336348
.registerAlertIndices(alertIndices)
349+
.registerAlertV2Indices(alertV2Indices)
337350
.registerInputService(
338351
InputService(
339352
client,
@@ -360,6 +373,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
360373
scheduler = JobScheduler(threadPool, runner)
361374
sweeper = JobSweeper(environment.settings(), client, clusterService, threadPool, xContentRegistry, scheduler, ALERTING_JOB_TYPES)
362375
destinationMigrationCoordinator = DestinationMigrationCoordinator(client, clusterService, threadPool, scheduledJobIndices)
376+
alertV2Mover = AlertV2Mover(environment.settings(), client, threadPool, clusterService, xContentRegistry)
363377
this.threadPool = threadPool
364378
this.clusterService = clusterService
365379

@@ -387,6 +401,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
387401
commentsIndices,
388402
docLevelMonitorQueries,
389403
destinationMigrationCoordinator,
404+
alertV2Mover,
390405
lockService,
391406
alertService,
392407
triggerService
@@ -484,7 +499,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
484499
AlertingSettings.ALERT_V2_QUERY_RESULTS_MAX_SIZE,
485500
AlertingSettings.ALERT_V2_PER_RESULT_TRIGGER_MAX_ALERTS,
486501
AlertingSettings.NOTIFICATION_SUBJECT_SOURCE_MAX_LENGTH,
487-
AlertingSettings.NOTIFICATION_MESSAGE_SOURCE_MAX_LENGTH
502+
AlertingSettings.NOTIFICATION_MESSAGE_SOURCE_MAX_LENGTH,
503+
AlertingV2Settings.ALERTING_V2_ENABLED
488504
)
489505
}
490506

@@ -503,6 +519,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
503519
SystemIndexDescriptor(ALL_ALERT_INDEX_PATTERN, "Alerting Plugin system index pattern"),
504520
SystemIndexDescriptor(SCHEDULED_JOBS_INDEX, "Alerting Plugin Configuration index"),
505521
SystemIndexDescriptor(ALL_COMMENTS_INDEX_PATTERN, "Alerting Comments system index pattern"),
522+
SystemIndexDescriptor(ALL_ALERT_V2_INDEX_PATTERN, "Alerting V2 Alerts index pattern")
506523
)
507524
}
508525

alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package org.opensearch.alerting
77

88
import org.opensearch.action.bulk.BackoffPolicy
99
import org.opensearch.alerting.alerts.AlertIndices
10+
import org.opensearch.alerting.alertsv2.AlertV2Indices
1011
import org.opensearch.alerting.core.lock.LockService
1112
import org.opensearch.alerting.model.destination.DestinationContextFactory
1213
import org.opensearch.alerting.remote.monitors.RemoteMonitorRegistry
@@ -35,6 +36,7 @@ data class MonitorRunnerExecutionContext(
3536
var settings: Settings? = null,
3637
var threadPool: ThreadPool? = null,
3738
var alertIndices: AlertIndices? = null,
39+
var alertV2Indices: AlertV2Indices? = null,
3840
var inputService: InputService? = null,
3941
var triggerService: TriggerService? = null,
4042
var alertService: AlertService? = null,

alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt

Lines changed: 47 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,14 @@ import org.opensearch.alerting.action.ExecuteWorkflowRequest
2323
import org.opensearch.alerting.action.ExecuteWorkflowResponse
2424
import org.opensearch.alerting.alerts.AlertIndices
2525
import org.opensearch.alerting.alerts.AlertMover.Companion.moveAlerts
26+
import org.opensearch.alerting.alertsv2.AlertV2Indices
27+
import org.opensearch.alerting.alertsv2.AlertV2Mover.Companion.moveAlertV2s
2628
import org.opensearch.alerting.core.JobRunner
2729
import org.opensearch.alerting.core.ScheduledJobIndices
2830
import org.opensearch.alerting.core.lock.LockModel
2931
import org.opensearch.alerting.core.lock.LockService
3032
import org.opensearch.alerting.model.destination.DestinationContextFactory
33+
import org.opensearch.alerting.modelv2.MonitorV2
3134
import org.opensearch.alerting.opensearchapi.retry
3235
import org.opensearch.alerting.opensearchapi.suspendUntil
3336
import org.opensearch.alerting.remote.monitors.RemoteDocumentLevelMonitorRunner
@@ -137,6 +140,11 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
137140
return this
138141
}
139142

143+
fun registerAlertV2Indices(alertV2Indices: AlertV2Indices): MonitorRunnerService {
144+
this.monitorCtx.alertV2Indices = alertV2Indices
145+
return this
146+
}
147+
140148
fun registerInputService(inputService: InputService): MonitorRunnerService {
141149
this.monitorCtx.inputService = inputService
142150
return this
@@ -316,6 +324,18 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
316324
logger.error("Failed to move active alerts for monitor [${job.id}].", e)
317325
}
318326
}
327+
} else if (job is MonitorV2) {
328+
launch {
329+
try {
330+
monitorCtx.moveAlertsRetryPolicy!!.retry(logger) {
331+
if (monitorCtx.alertV2Indices!!.isAlertV2Initialized()) {
332+
moveAlertV2s(job.id, job, monitorCtx)
333+
}
334+
}
335+
} catch (e: Exception) {
336+
logger.error("Failed to move active alertV2s for monitorV2 [${job.id}].", e)
337+
}
338+
}
319339
} else {
320340
throw IllegalArgumentException("Invalid job type")
321341
}
@@ -339,6 +359,15 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
339359
} catch (e: Exception) {
340360
logger.error("Failed to move active alerts for monitor [$jobId].", e)
341361
}
362+
try {
363+
monitorCtx.moveAlertsRetryPolicy!!.retry(logger) {
364+
if (monitorCtx.alertV2Indices!!.isAlertV2Initialized()) {
365+
moveAlertV2s(jobId, null, monitorCtx)
366+
}
367+
}
368+
} catch (e: Exception) {
369+
logger.error("Failed to move active alertV2s for monitorV2 [$jobId].", e)
370+
}
342371
}
343372
}
344373

@@ -433,20 +462,7 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
433462
): MonitorRunResult<*> {
434463
// Updating the scheduled job index at the start of monitor execution runs for when there is an upgrade the the schema mapping
435464
// has not been updated.
436-
if (!IndexUtils.scheduledJobIndexUpdated && monitorCtx.clusterService != null && monitorCtx.client != null) {
437-
IndexUtils.updateIndexMapping(
438-
ScheduledJob.SCHEDULED_JOBS_INDEX,
439-
ScheduledJobIndices.scheduledJobMappings(), monitorCtx.clusterService!!.state(), monitorCtx.client!!.admin().indices(),
440-
object : ActionListener<AcknowledgedResponse> {
441-
override fun onResponse(response: AcknowledgedResponse) {
442-
}
443-
444-
override fun onFailure(t: Exception) {
445-
logger.error("Failed to update config index schema", t)
446-
}
447-
}
448-
)
449-
}
465+
updateAlertingConfigIndexSchema()
450466

451467
if (job is Workflow) {
452468
logger.info("Executing scheduled workflow - id: ${job.id}, periodStart: $periodStart, periodEnd: $periodEnd, dryrun: $dryrun")
@@ -582,4 +598,21 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
582598
.newInstance(template.params + mapOf("ctx" to ctx.asTemplateArg()))
583599
.execute()
584600
}
601+
602+
private fun updateAlertingConfigIndexSchema() {
603+
if (!IndexUtils.scheduledJobIndexUpdated && monitorCtx.clusterService != null && monitorCtx.client != null) {
604+
IndexUtils.updateIndexMapping(
605+
ScheduledJob.SCHEDULED_JOBS_INDEX,
606+
ScheduledJobIndices.scheduledJobMappings(), monitorCtx.clusterService!!.state(), monitorCtx.client!!.admin().indices(),
607+
object : ActionListener<AcknowledgedResponse> {
608+
override fun onResponse(response: AcknowledgedResponse) {
609+
}
610+
611+
override fun onFailure(t: Exception) {
612+
logger.error("Failed to update config index schema", t)
613+
}
614+
}
615+
)
616+
}
617+
}
585618
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.alerting.actionv2
7+
8+
import org.opensearch.action.ActionType
9+
10+
class GetAlertsV2Action private constructor() : ActionType<GetAlertsV2Response>(NAME, ::GetAlertsV2Response) {
11+
companion object {
12+
val INSTANCE = GetAlertsV2Action()
13+
const val NAME = "cluster:admin/opensearch/alerting/v2/alerts/get"
14+
}
15+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.alerting.actionv2
7+
8+
import org.opensearch.action.ActionRequest
9+
import org.opensearch.action.ActionRequestValidationException
10+
import org.opensearch.commons.alerting.model.Table
11+
import org.opensearch.core.common.io.stream.StreamInput
12+
import org.opensearch.core.common.io.stream.StreamOutput
13+
import java.io.IOException
14+
15+
class GetAlertsV2Request : ActionRequest {
16+
val table: Table
17+
val severityLevel: String
18+
val monitorV2Ids: List<String>?
19+
20+
constructor(
21+
table: Table,
22+
severityLevel: String,
23+
monitorV2Ids: List<String>? = null,
24+
) : super() {
25+
this.table = table
26+
this.severityLevel = severityLevel
27+
this.monitorV2Ids = monitorV2Ids
28+
}
29+
30+
@Throws(IOException::class)
31+
constructor(sin: StreamInput) : this(
32+
table = Table.readFrom(sin),
33+
severityLevel = sin.readString(),
34+
monitorV2Ids = sin.readOptionalStringList(),
35+
)
36+
37+
override fun validate(): ActionRequestValidationException? {
38+
return null
39+
}
40+
41+
@Throws(IOException::class)
42+
override fun writeTo(out: StreamOutput) {
43+
table.writeTo(out)
44+
out.writeString(severityLevel)
45+
out.writeOptionalStringCollection(monitorV2Ids)
46+
}
47+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.alerting.actionv2
7+
8+
import org.opensearch.alerting.modelv2.AlertV2
9+
import org.opensearch.commons.notifications.action.BaseResponse
10+
import org.opensearch.core.common.io.stream.StreamInput
11+
import org.opensearch.core.common.io.stream.StreamOutput
12+
import org.opensearch.core.xcontent.ToXContent
13+
import org.opensearch.core.xcontent.XContentBuilder
14+
import java.io.IOException
15+
import java.util.Collections
16+
17+
class GetAlertsV2Response : BaseResponse {
18+
val alertV2s: List<AlertV2>
19+
20+
// totalAlertV2s is not the same as the size of alertV2s because there can be 30 alerts from the request, but
21+
// the request only asked for 5 alerts, so totalAlertV2s will be 30, but alertV2s will only contain 5 alerts
22+
val totalAlertV2s: Int?
23+
24+
constructor(
25+
alertV2s: List<AlertV2>,
26+
totalAlertV2s: Int?
27+
) : super() {
28+
this.alertV2s = alertV2s
29+
this.totalAlertV2s = totalAlertV2s
30+
}
31+
32+
@Throws(IOException::class)
33+
constructor(sin: StreamInput) : this(
34+
alertV2s = Collections.unmodifiableList(sin.readList(::AlertV2)),
35+
totalAlertV2s = sin.readOptionalInt()
36+
)
37+
38+
@Throws(IOException::class)
39+
override fun writeTo(out: StreamOutput) {
40+
out.writeCollection(alertV2s)
41+
out.writeOptionalInt(totalAlertV2s)
42+
}
43+
44+
@Throws(IOException::class)
45+
override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
46+
builder.startObject()
47+
.field("alerts_v2", alertV2s)
48+
.field("total_alerts_v2", totalAlertV2s)
49+
50+
return builder.endObject()
51+
}
52+
}

0 commit comments

Comments
 (0)