Skip to content

Commit 235ebb3

Browse files
committed
Try verbose simulate API to detect issues in pipelines
1 parent b2c2c7a commit 235ebb3

File tree

1 file changed

+35
-10
lines changed

1 file changed

+35
-10
lines changed

internal/elasticsearch/ingest/pipeline.go

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package ingest
77
import (
88
"bytes"
99
"encoding/json"
10+
"errors"
1011
"fmt"
1112
"io"
1213
"net/http"
@@ -22,18 +23,22 @@ type simulatePipelineRequest struct {
2223
}
2324

2425
type simulatePipelineResponse struct {
25-
Docs []pipelineIngestedDocument `json:"docs"`
26+
Docs []struct {
27+
ProcessorResults []verboseProcessorResult `json:"processor_results"`
28+
}
29+
}
30+
31+
type verboseProcessorResult struct {
32+
Processor string `json:"processor_type"`
33+
Status string `json:"status"`
34+
Doc pipelineDocument `json:"doc"`
2635
}
2736

2837
type pipelineDocument struct {
2938
Index string `json:"_index"`
3039
Source json.RawMessage `json:"_source"`
3140
}
3241

33-
type pipelineIngestedDocument struct {
34-
Doc pipelineDocument `json:"doc"`
35-
}
36-
3742
// Pipeline represents a pipeline resource loaded from a file
3843
type Pipeline struct {
3944
Path string // Path of the file with the pipeline definition.
@@ -86,9 +91,10 @@ func SimulatePipeline(api *elasticsearch.API, pipelineName string, events []json
8691
return nil, fmt.Errorf("marshalling simulate request failed: %w", err)
8792
}
8893

89-
r, err := api.Ingest.Simulate(bytes.NewReader(requestBody), func(request *elasticsearch.IngestSimulateRequest) {
90-
request.PipelineID = pipelineName
91-
})
94+
r, err := api.Ingest.Simulate(bytes.NewReader(requestBody),
95+
api.Ingest.Simulate.WithPipelineID(pipelineName),
96+
api.Ingest.Simulate.WithVerbose(true),
97+
)
9298
if err != nil {
9399
return nil, fmt.Errorf("simulate API call failed (pipelineName: %s): %w", pipelineName, err)
94100
}
@@ -110,10 +116,29 @@ func SimulatePipeline(api *elasticsearch.API, pipelineName string, events []json
110116
}
111117

112118
processedEvents := make([]json.RawMessage, len(response.Docs))
119+
var errs []error
113120
for i, doc := range response.Docs {
114-
processedEvents[i] = doc.Doc.Source
121+
var source json.RawMessage
122+
failed := false
123+
for _, result := range doc.ProcessorResults {
124+
switch result.Status {
125+
case "success":
126+
// Keep last successful document.
127+
source = result.Doc.Source
128+
case "skipped":
129+
continue
130+
case "failed":
131+
failed = true
132+
errs = append(errs, fmt.Errorf("%q processor failed (status: %s)", result.Processor, result.Status))
133+
}
134+
}
135+
136+
if !failed {
137+
processedEvents[i] = source
138+
}
115139
}
116-
return processedEvents, nil
140+
141+
return processedEvents, errors.Join(errs...)
117142
}
118143

119144
func UninstallPipelines(api *elasticsearch.API, pipelines []Pipeline) error {

0 commit comments

Comments
 (0)