Skip to content

Commit f10b04a

Browse files
toepkerdtoepkerd-zz
andcommitted
PPL Alerting: Delete Monitor, More V1/V2 Separation (opensearch-project#1968)
* PPL Alerting: Delete Monitor, More V1/V2 Separation Signed-off-by: Dennis Toepker <[email protected]> * making v1 v2 separation error messaging more actionable Signed-off-by: Dennis Toepker <[email protected]> --------- Signed-off-by: Dennis Toepker <[email protected]> Co-authored-by: Dennis Toepker <[email protected]>
1 parent 852eeee commit f10b04a

File tree

14 files changed

+479
-14
lines changed

14 files changed

+479
-14
lines changed

alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import org.opensearch.alerting.action.GetEmailGroupAction
1414
import org.opensearch.alerting.action.GetRemoteIndexesAction
1515
import org.opensearch.alerting.action.SearchEmailAccountAction
1616
import org.opensearch.alerting.action.SearchEmailGroupAction
17+
import org.opensearch.alerting.actionv2.DeleteMonitorV2Action
1718
import org.opensearch.alerting.actionv2.GetMonitorV2Action
1819
import org.opensearch.alerting.actionv2.IndexMonitorV2Action
1920
import org.opensearch.alerting.actionv2.SearchMonitorV2Action
@@ -55,6 +56,7 @@ import org.opensearch.alerting.resthandler.RestSearchAlertingCommentAction
5556
import org.opensearch.alerting.resthandler.RestSearchEmailAccountAction
5657
import org.opensearch.alerting.resthandler.RestSearchEmailGroupAction
5758
import org.opensearch.alerting.resthandler.RestSearchMonitorAction
59+
import org.opensearch.alerting.resthandlerv2.RestDeleteMonitorV2Action
5860
import org.opensearch.alerting.resthandlerv2.RestGetMonitorV2Action
5961
import org.opensearch.alerting.resthandlerv2.RestIndexMonitorV2Action
6062
import org.opensearch.alerting.resthandlerv2.RestSearchMonitorV2Action
@@ -90,6 +92,7 @@ import org.opensearch.alerting.transport.TransportSearchAlertingCommentAction
9092
import org.opensearch.alerting.transport.TransportSearchEmailAccountAction
9193
import org.opensearch.alerting.transport.TransportSearchEmailGroupAction
9294
import org.opensearch.alerting.transport.TransportSearchMonitorAction
95+
import org.opensearch.alerting.transportv2.TransportDeleteMonitorV2Action
9396
import org.opensearch.alerting.transportv2.TransportGetMonitorV2Action
9497
import org.opensearch.alerting.transportv2.TransportIndexMonitorV2Action
9598
import org.opensearch.alerting.transportv2.TransportSearchMonitorV2Action
@@ -242,6 +245,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
242245

243246
// Alerting V2
244247
RestIndexMonitorV2Action(),
248+
RestDeleteMonitorV2Action(),
245249
RestGetMonitorV2Action(),
246250
RestSearchMonitorV2Action(settings, clusterService),
247251
)
@@ -282,6 +286,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
282286
ActionPlugin.ActionHandler(IndexMonitorV2Action.INSTANCE, TransportIndexMonitorV2Action::class.java),
283287
ActionPlugin.ActionHandler(GetMonitorV2Action.INSTANCE, TransportGetMonitorV2Action::class.java),
284288
ActionPlugin.ActionHandler(SearchMonitorV2Action.INSTANCE, TransportSearchMonitorV2Action::class.java),
289+
ActionPlugin.ActionHandler(DeleteMonitorV2Action.INSTANCE, TransportDeleteMonitorV2Action::class.java),
285290
)
286291
}
287292

alerting/src/main/kotlin/org/opensearch/alerting/AlertingV2Utils.kt

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import org.apache.lucene.search.TotalHits
99
import org.apache.lucene.search.TotalHits.Relation
1010
import org.opensearch.action.search.SearchResponse
1111
import org.opensearch.action.search.ShardSearchFailure
12+
import org.opensearch.alerting.AlertingPlugin.Companion.MONITOR_BASE_URI
13+
import org.opensearch.alerting.AlertingPlugin.Companion.MONITOR_V2_BASE_URI
1214
import org.opensearch.alerting.modelv2.MonitorV2
1315
import org.opensearch.commons.alerting.model.Monitor
1416
import org.opensearch.commons.alerting.model.ScheduledJob
@@ -27,9 +29,16 @@ object AlertingV2Utils {
2729
// returns the exception to pass into actionListener.onFailure if not.
2830
fun validateMonitorV1(scheduledJob: ScheduledJob): Exception? {
2931
if (scheduledJob is MonitorV2) {
30-
return IllegalStateException("The ID given corresponds to a V2 Monitor, but a V1 Monitor was expected")
32+
return IllegalStateException(
33+
"The ID given corresponds to an Alerting V2 Monitor, but a V1 Monitor was expected. " +
34+
"If you wish to operate on a V1 Monitor (e.g. Per Query, Per Document, etc), please use " +
35+
"the Alerting V1 APIs with endpoint prefix: $MONITOR_BASE_URI."
36+
)
3137
} else if (scheduledJob !is Monitor && scheduledJob !is Workflow) {
32-
return IllegalStateException("The ID given corresponds to a scheduled job of unknown type: ${scheduledJob.javaClass.name}")
38+
return IllegalStateException(
39+
"The ID given corresponds to a scheduled job of unknown type: ${scheduledJob.javaClass.name}. " +
40+
"Please validate the ID and ensure it corresponds to a valid Monitor."
41+
)
3342
}
3443
return null
3544
}
@@ -38,9 +47,16 @@ object AlertingV2Utils {
3847
// returns the exception to pass into actionListener.onFailure if not.
3948
fun validateMonitorV2(scheduledJob: ScheduledJob): Exception? {
4049
if (scheduledJob is Monitor || scheduledJob is Workflow) {
41-
return IllegalStateException("The ID given corresponds to a V1 Monitor, but a V2 Monitor was expected")
50+
return IllegalStateException(
51+
"The ID given corresponds to an Alerting V1 Monitor, but a V2 Monitor was expected. " +
52+
"If you wish to operate on a V2 Monitor (e.g. PPL Monitor), please use " +
53+
"the Alerting V2 APIs with endpoint prefix: $MONITOR_V2_BASE_URI."
54+
)
4255
} else if (scheduledJob !is MonitorV2) {
43-
return IllegalStateException("The ID given corresponds to a scheduled job of unknown type: ${scheduledJob.javaClass.name}")
56+
return IllegalStateException(
57+
"The ID given corresponds to a scheduled job of unknown type: ${scheduledJob.javaClass.name}. " +
58+
"Please validate the ID and ensure it corresponds to a valid Monitor."
59+
)
4460
}
4561
return null
4662
}

alerting/src/main/kotlin/org/opensearch/alerting/WorkflowService.kt

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import org.opensearch.action.admin.indices.exists.indices.IndicesExistsRequest
1111
import org.opensearch.action.admin.indices.exists.indices.IndicesExistsResponse
1212
import org.opensearch.action.search.SearchRequest
1313
import org.opensearch.action.search.SearchResponse
14+
import org.opensearch.alerting.AlertingV2Utils.validateMonitorV1
1415
import org.opensearch.alerting.opensearchapi.suspendUntil
1516
import org.opensearch.client.Client
1617
import org.opensearch.common.xcontent.LoggingDeprecationHandler
@@ -132,7 +133,12 @@ class WorkflowService(
132133
xContentRegistry,
133134
LoggingDeprecationHandler.INSTANCE, hit.sourceAsString
134135
).use { hitsParser ->
135-
val monitor = ScheduledJob.parse(hitsParser, hit.id, hit.version) as Monitor
136+
val scheduledJob = ScheduledJob.parse(hitsParser, hit.id, hit.version)
137+
validateMonitorV1(scheduledJob)?.let {
138+
throw OpenSearchException(it)
139+
}
140+
141+
val monitor = scheduledJob as Monitor
136142
monitors.add(monitor)
137143
}
138144
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.alerting.actionv2
7+
8+
import org.opensearch.action.ActionType
9+
10+
class DeleteMonitorV2Action private constructor() : ActionType<DeleteMonitorV2Response>(NAME, ::DeleteMonitorV2Response) {
11+
companion object {
12+
val INSTANCE = DeleteMonitorV2Action()
13+
const val NAME = "cluster:admin/opensearch/alerting/v2/monitor/delete"
14+
}
15+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.alerting.actionv2
7+
8+
import org.opensearch.action.ActionRequest
9+
import org.opensearch.action.ActionRequestValidationException
10+
import org.opensearch.action.support.WriteRequest
11+
import org.opensearch.core.common.io.stream.StreamInput
12+
import org.opensearch.core.common.io.stream.StreamOutput
13+
import java.io.IOException
14+
15+
class DeleteMonitorV2Request : ActionRequest {
16+
val monitorV2Id: String
17+
val refreshPolicy: WriteRequest.RefreshPolicy
18+
19+
constructor(monitorV2Id: String, refreshPolicy: WriteRequest.RefreshPolicy) : super() {
20+
this.monitorV2Id = monitorV2Id
21+
this.refreshPolicy = refreshPolicy
22+
}
23+
24+
@Throws(IOException::class)
25+
constructor(sin: StreamInput) : this(
26+
monitorV2Id = sin.readString(),
27+
refreshPolicy = WriteRequest.RefreshPolicy.readFrom(sin)
28+
)
29+
30+
override fun validate(): ActionRequestValidationException? {
31+
return null
32+
}
33+
34+
@Throws(IOException::class)
35+
override fun writeTo(out: StreamOutput) {
36+
out.writeString(monitorV2Id)
37+
refreshPolicy.writeTo(out)
38+
}
39+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.alerting.actionv2
7+
8+
import org.opensearch.commons.alerting.util.IndexUtils
9+
import org.opensearch.commons.notifications.action.BaseResponse
10+
import org.opensearch.core.common.io.stream.StreamInput
11+
import org.opensearch.core.common.io.stream.StreamOutput
12+
import org.opensearch.core.xcontent.ToXContent
13+
import org.opensearch.core.xcontent.XContentBuilder
14+
15+
class DeleteMonitorV2Response : BaseResponse {
16+
var id: String
17+
var version: Long
18+
19+
constructor(
20+
id: String,
21+
version: Long
22+
) : super() {
23+
this.id = id
24+
this.version = version
25+
}
26+
27+
constructor(sin: StreamInput) : this(
28+
sin.readString(), // id
29+
sin.readLong() // version
30+
)
31+
32+
override fun writeTo(out: StreamOutput) {
33+
out.writeString(id)
34+
out.writeLong(version)
35+
}
36+
37+
override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
38+
return builder.startObject()
39+
.field(IndexUtils._ID, id)
40+
.field(IndexUtils._VERSION, version)
41+
.endObject()
42+
}
43+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.alerting.resthandlerv2
7+
8+
import org.apache.logging.log4j.LogManager
9+
import org.apache.logging.log4j.Logger
10+
import org.opensearch.action.support.WriteRequest.RefreshPolicy
11+
import org.opensearch.alerting.AlertingPlugin
12+
import org.opensearch.alerting.actionv2.DeleteMonitorV2Action
13+
import org.opensearch.alerting.actionv2.DeleteMonitorV2Request
14+
import org.opensearch.alerting.util.REFRESH
15+
import org.opensearch.client.node.NodeClient
16+
import org.opensearch.rest.BaseRestHandler
17+
import org.opensearch.rest.RestHandler.Route
18+
import org.opensearch.rest.RestRequest
19+
import org.opensearch.rest.RestRequest.Method.DELETE
20+
import org.opensearch.rest.action.RestToXContentListener
21+
import java.io.IOException
22+
23+
private val log: Logger = LogManager.getLogger(RestDeleteMonitorV2Action::class.java)
24+
25+
class RestDeleteMonitorV2Action : BaseRestHandler() {
26+
27+
override fun getName(): String {
28+
return "delete_monitor_v2_action"
29+
}
30+
31+
override fun routes(): List<Route> {
32+
return mutableListOf(
33+
Route(
34+
DELETE,
35+
"${AlertingPlugin.MONITOR_V2_BASE_URI}/{monitorV2Id}"
36+
)
37+
)
38+
}
39+
40+
@Throws(IOException::class)
41+
override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
42+
val monitorV2Id = request.param("monitorV2Id")
43+
log.info("${request.method()} ${AlertingPlugin.MONITOR_V2_BASE_URI}/$monitorV2Id")
44+
45+
val refreshPolicy = RefreshPolicy.parse(request.param(REFRESH, RefreshPolicy.IMMEDIATE.value))
46+
val deleteMonitorV2Request = DeleteMonitorV2Request(monitorV2Id, refreshPolicy)
47+
48+
return RestChannelConsumer { channel ->
49+
client.execute(DeleteMonitorV2Action.INSTANCE, deleteMonitorV2Request, RestToXContentListener(channel))
50+
}
51+
}
52+
}

alerting/src/main/kotlin/org/opensearch/alerting/service/DeleteMonitorService.kt

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import org.opensearch.action.support.IndicesOptions
2222
import org.opensearch.action.support.WriteRequest.RefreshPolicy
2323
import org.opensearch.action.support.master.AcknowledgedResponse
2424
import org.opensearch.alerting.MonitorMetadataService
25+
import org.opensearch.alerting.actionv2.DeleteMonitorV2Response
2526
import org.opensearch.alerting.core.lock.LockModel
2627
import org.opensearch.alerting.core.lock.LockService
2728
import org.opensearch.alerting.opensearchapi.suspendUntil
@@ -73,6 +74,19 @@ object DeleteMonitorService :
7374
return DeleteMonitorResponse(deleteResponse.id, deleteResponse.version)
7475
}
7576

77+
/**
78+
* Deletes the monitorV2, which does not come with other metadata and queries
79+
* like doc level monitors
80+
* @param monitorV2Id monitorV2 ID to be deleted
81+
* @param refreshPolicy
82+
*/
83+
suspend fun deleteMonitorV2(monitorV2Id: String, refreshPolicy: RefreshPolicy): DeleteMonitorV2Response {
84+
val deleteResponse = deleteMonitor(monitorV2Id, refreshPolicy)
85+
deleteLock(monitorV2Id)
86+
return DeleteMonitorV2Response(deleteResponse.id, deleteResponse.version)
87+
}
88+
89+
// both Alerting v1 and v2 workflows flow through this function
7690
private suspend fun deleteMonitor(monitorId: String, refreshPolicy: RefreshPolicy): DeleteResponse {
7791
val deleteMonitorRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, monitorId)
7892
.setRefreshPolicy(refreshPolicy)
@@ -166,7 +180,12 @@ object DeleteMonitorService :
166180
}
167181

168182
private suspend fun deleteLock(monitor: Monitor) {
169-
client.suspendUntil<Client, Boolean> { lockService.deleteLock(LockModel.generateLockId(monitor.id), it) }
183+
deleteLock(monitor.id)
184+
}
185+
186+
// both Alerting v1 and v2 workflows flow through this function
187+
private suspend fun deleteLock(monitorId: String) {
188+
client.suspendUntil<Client, Boolean> { lockService.deleteLock(LockModel.generateLockId(monitorId), it) }
170189
}
171190

172191
/**

alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import org.opensearch.action.get.GetResponse
1616
import org.opensearch.action.support.ActionFilters
1717
import org.opensearch.action.support.HandledTransportAction
1818
import org.opensearch.action.support.WriteRequest.RefreshPolicy
19+
import org.opensearch.alerting.AlertingV2Utils.validateMonitorV1
1920
import org.opensearch.alerting.opensearchapi.suspendUntil
2021
import org.opensearch.alerting.service.DeleteMonitorService
2122
import org.opensearch.alerting.settings.AlertingSettings
@@ -90,7 +91,7 @@ class TransportDeleteMonitorAction @Inject constructor(
9091
) {
9192
suspend fun resolveUserAndStart(refreshPolicy: RefreshPolicy) {
9293
try {
93-
val monitor = getMonitor()
94+
val monitor = getMonitor() ?: return // null means there was an issue retrieving the Monitor
9495

9596
val canDelete = user == null || !doFilterForUser(user) ||
9697
checkUserPermissionsWithResource(user, monitor.user, actionListener, "monitor", monitorId)
@@ -118,11 +119,11 @@ class TransportDeleteMonitorAction @Inject constructor(
118119
}
119120
}
120121

121-
private suspend fun getMonitor(): Monitor {
122+
private suspend fun getMonitor(): Monitor? {
122123
val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, monitorId)
123124

124125
val getResponse: GetResponse = client.suspendUntil { get(getRequest, it) }
125-
if (getResponse.isExists == false) {
126+
if (!getResponse.isExists) {
126127
actionListener.onFailure(
127128
AlertingException.wrap(
128129
OpenSearchStatusException("Monitor with $monitorId is not found", RestStatus.NOT_FOUND)
@@ -135,7 +136,16 @@ class TransportDeleteMonitorAction @Inject constructor(
135136
getResponse.sourceAsBytesRef,
136137
XContentType.JSON
137138
)
138-
return ScheduledJob.parse(xcp, getResponse.id, getResponse.version) as Monitor
139+
val scheduledJob = ScheduledJob.parse(xcp, getResponse.id, getResponse.version)
140+
141+
validateMonitorV1(scheduledJob)?.let {
142+
actionListener.onFailure(AlertingException.wrap(it))
143+
return null
144+
}
145+
146+
val monitor = scheduledJob as Monitor
147+
148+
return monitor
139149
}
140150
}
141151
}

alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteWorkflowAction.kt

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import org.opensearch.action.search.SearchResponse
2323
import org.opensearch.action.support.ActionFilters
2424
import org.opensearch.action.support.HandledTransportAction
2525
import org.opensearch.action.support.WriteRequest.RefreshPolicy
26+
import org.opensearch.alerting.AlertingV2Utils.validateMonitorV1
2627
import org.opensearch.alerting.core.lock.LockModel
2728
import org.opensearch.alerting.core.lock.LockService
2829
import org.opensearch.alerting.opensearchapi.addFilter
@@ -297,7 +298,13 @@ class TransportDeleteWorkflowAction @Inject constructor(
297298
xContentRegistry,
298299
LoggingDeprecationHandler.INSTANCE, hit.sourceAsString
299300
).use { hitsParser ->
300-
val monitor = ScheduledJob.parse(hitsParser, hit.id, hit.version) as Monitor
301+
val scheduledJob = ScheduledJob.parse(hitsParser, hit.id, hit.version)
302+
303+
validateMonitorV1(scheduledJob)?.let {
304+
throw OpenSearchException(it)
305+
}
306+
307+
val monitor = scheduledJob as Monitor
301308
deletableMonitors.add(monitor)
302309
}
303310
}
@@ -325,12 +332,17 @@ class TransportDeleteWorkflowAction @Inject constructor(
325332
)
326333
}
327334

328-
private fun parseWorkflow(getResponse: GetResponse): Workflow {
335+
private fun parseWorkflow(getResponse: GetResponse): Workflow? {
329336
val xcp = XContentHelper.createParser(
330337
xContentRegistry, LoggingDeprecationHandler.INSTANCE,
331338
getResponse.sourceAsBytesRef, XContentType.JSON
332339
)
333-
return ScheduledJob.parse(xcp, getResponse.id, getResponse.version) as Workflow
340+
val scheduledJob = ScheduledJob.parse(xcp, getResponse.id, getResponse.version)
341+
validateMonitorV1(scheduledJob)?.let {
342+
actionListener.onFailure(AlertingException.wrap(it))
343+
return null
344+
}
345+
return scheduledJob as Workflow
334346
}
335347

336348
private suspend fun deleteWorkflow(deleteRequest: DeleteRequest): DeleteResponse {

0 commit comments

Comments
 (0)