Skip to content

Commit 6eacc93

Browse files
committed
feat(migrations): Add migration for inputs.tail from_beginning option
The 'from_beginning' option in inputs.tail was deprecated in v1.34.0 for removal in v1.40.0, replaced by 'initial_read_offset'.
1 parent 0ab75b1 commit 6eacc93

9 files changed

Lines changed: 256 additions & 0 deletions

File tree

migrations/all/inputs_tail.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
//go:build !custom || (migrations && (inputs || inputs.tail))
2+
3+
package all
4+
5+
import _ "github.com/influxdata/telegraf/migrations/inputs_tail" // register migration
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package inputs_tail
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/influxdata/toml"
7+
"github.com/influxdata/toml/ast"
8+
9+
"github.com/influxdata/telegraf/migrations"
10+
)
11+
12+
// Migration function to migrate the deprecated 'from_beginning' option to its
13+
// replacement 'initial_read_offset'.
14+
func migrate(tbl *ast.Table) ([]byte, string, error) {
15+
// Decode the old data structure
16+
var plugin map[string]interface{}
17+
if err := toml.UnmarshalTable(tbl, &plugin); err != nil {
18+
return nil, "", err
19+
}
20+
21+
// Check for the deprecated option and migrate it
22+
var applied bool
23+
var message string
24+
if rawOldValue, found := plugin["from_beginning"]; found {
25+
applied = true
26+
27+
// Convert to the actual type
28+
oldValue, ok := rawOldValue.(bool)
29+
if !ok {
30+
return nil, "", fmt.Errorf("unexpected type %T for 'from_beginning'", rawOldValue)
31+
}
32+
33+
// A 'from_beginning' value of 'true' corresponds to reading from the
34+
// 'beginning' while 'false' corresponds to the default 'saved-or-end'
35+
// behavior.
36+
expected := "saved-or-end"
37+
if oldValue {
38+
expected = "beginning"
39+
}
40+
41+
// The 'initial_read_offset' option supersedes 'from_beginning' and takes
42+
// precedence at runtime. If it is already set, keep it and just drop the
43+
// deprecated option, but warn the user if the two settings disagree.
44+
if rawNewValue, found := plugin["initial_read_offset"]; found {
45+
newValue, ok := rawNewValue.(string)
46+
if !ok {
47+
return nil, "", fmt.Errorf("unexpected type %T for 'initial_read_offset'", rawNewValue)
48+
}
49+
if newValue != expected {
50+
message = fmt.Sprintf("ignoring 'from_beginning = %v' as it conflicts with 'initial_read_offset = %q'", oldValue, newValue)
51+
}
52+
} else {
53+
plugin["initial_read_offset"] = expected
54+
}
55+
56+
// Remove the deprecated setting
57+
delete(plugin, "from_beginning")
58+
}
59+
60+
// No options migrated so we can exit early
61+
if !applied {
62+
return nil, "", migrations.ErrNotApplicable
63+
}
64+
65+
// Create the corresponding plugin configuration
66+
cfg := migrations.CreateTOMLStruct("inputs", "tail")
67+
cfg.Add("inputs", "tail", plugin)
68+
69+
output, err := toml.Marshal(cfg)
70+
return output, message, err
71+
}
72+
73+
// Register the migration function for the plugin type
74+
func init() {
75+
migrations.AddPluginOptionMigration("inputs.tail", migrate)
76+
}
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
package inputs_tail_test
2+
3+
import (
4+
"bytes"
5+
"log"
6+
"os"
7+
"path/filepath"
8+
"testing"
9+
10+
"github.com/stretchr/testify/require"
11+
12+
"github.com/influxdata/telegraf/config"
13+
_ "github.com/influxdata/telegraf/migrations/inputs_tail" // register migration
14+
"github.com/influxdata/telegraf/plugins/inputs/tail" // register plugin
15+
_ "github.com/influxdata/telegraf/plugins/parsers/influx" // register parser
16+
)
17+
18+
func TestNoMigration(t *testing.T) {
19+
plugin := &tail.Tail{}
20+
defaultCfg := plugin.SampleConfig()
21+
22+
// Migrate and check that nothing changed
23+
output, n, err := config.ApplyMigrations([]byte(defaultCfg))
24+
require.NoError(t, err)
25+
require.NotEmpty(t, output)
26+
require.Zero(t, n)
27+
require.Equal(t, defaultCfg, string(output))
28+
}
29+
30+
func TestConflictWarning(t *testing.T) {
31+
tests := []struct {
32+
name string
33+
conf string
34+
warn bool
35+
}{
36+
{
37+
name: "conflicting values warn",
38+
conf: `
39+
[[inputs.tail]]
40+
files = ["/var/mymetrics.out"]
41+
from_beginning = true
42+
initial_read_offset = "end"
43+
data_format = "influx"
44+
`,
45+
warn: true,
46+
},
47+
{
48+
name: "agreeing values do not warn",
49+
conf: `
50+
[[inputs.tail]]
51+
files = ["/var/mymetrics.out"]
52+
from_beginning = true
53+
initial_read_offset = "beginning"
54+
data_format = "influx"
55+
`,
56+
warn: false,
57+
},
58+
}
59+
60+
for _, tt := range tests {
61+
t.Run(tt.name, func(t *testing.T) {
62+
var buf bytes.Buffer
63+
log.SetOutput(&buf)
64+
t.Cleanup(func() { log.SetOutput(os.Stderr) })
65+
66+
output, n, err := config.ApplyMigrations([]byte(tt.conf))
67+
require.NoError(t, err)
68+
require.NotEmpty(t, output)
69+
require.Equal(t, uint64(1), n)
70+
71+
if tt.warn {
72+
require.Contains(t, buf.String(), `conflicts with 'initial_read_offset = "end"'`)
73+
} else {
74+
require.NotContains(t, buf.String(), "conflicts with")
75+
}
76+
})
77+
}
78+
}
79+
80+
func TestCases(t *testing.T) {
81+
// Get all directories in testcases
82+
folders, err := os.ReadDir("testcases")
83+
require.NoError(t, err)
84+
85+
for _, f := range folders {
86+
// Only handle folders
87+
if !f.IsDir() {
88+
continue
89+
}
90+
91+
t.Run(f.Name(), func(t *testing.T) {
92+
testcasePath := filepath.Join("testcases", f.Name())
93+
inputFile := filepath.Join(testcasePath, "telegraf.conf")
94+
expectedFile := filepath.Join(testcasePath, "expected.conf")
95+
96+
// Read the expected output
97+
expected := config.NewConfig()
98+
require.NoError(t, expected.LoadConfig(expectedFile))
99+
require.NotEmpty(t, expected.Inputs)
100+
101+
// Read the input data
102+
input, remote, err := config.LoadConfigFile(inputFile)
103+
require.NoError(t, err)
104+
require.False(t, remote)
105+
require.NotEmpty(t, input)
106+
107+
// Migrate
108+
output, n, err := config.ApplyMigrations(input)
109+
require.NoError(t, err)
110+
require.NotEmpty(t, output)
111+
require.GreaterOrEqual(t, n, uint64(1))
112+
actual := config.NewConfig()
113+
require.NoError(t, actual.LoadConfigData(output, config.EmptySourcePath))
114+
115+
// Test the output
116+
require.Len(t, actual.Inputs, len(expected.Inputs))
117+
actualIDs := make([]string, 0, len(expected.Inputs))
118+
expectedIDs := make([]string, 0, len(expected.Inputs))
119+
for i := range actual.Inputs {
120+
actualIDs = append(actualIDs, actual.Inputs[i].ID())
121+
expectedIDs = append(expectedIDs, expected.Inputs[i].ID())
122+
}
123+
require.ElementsMatch(t, expectedIDs, actualIDs, string(output))
124+
})
125+
}
126+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# Parse the new lines appended to a file
2+
[[inputs.tail]]
3+
files = ["/var/mymetrics.out"]
4+
5+
## The new option takes precedence over the deprecated one
6+
initial_read_offset = "end"
7+
8+
data_format = "influx"
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# Parse the new lines appended to a file
2+
[[inputs.tail]]
3+
files = ["/var/mymetrics.out"]
4+
5+
## The new option takes precedence over the deprecated one
6+
from_beginning = true
7+
initial_read_offset = "end"
8+
9+
data_format = "influx"
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# Parse the new lines appended to a file
2+
[[inputs.tail]]
3+
files = ["/var/mymetrics.out"]
4+
5+
## Keep the default behavior using the deprecated option
6+
initial_read_offset = "saved-or-end"
7+
8+
data_format = "influx"
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# Parse the new lines appended to a file
2+
[[inputs.tail]]
3+
files = ["/var/mymetrics.out"]
4+
5+
## Keep the default behavior using the deprecated option
6+
from_beginning = false
7+
8+
data_format = "influx"
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# Parse the new lines appended to a file
2+
[[inputs.tail]]
3+
files = ["/var/mymetrics.out"]
4+
5+
## Read from the beginning using the deprecated option
6+
initial_read_offset = "beginning"
7+
8+
data_format = "influx"
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# Parse the new lines appended to a file
2+
[[inputs.tail]]
3+
files = ["/var/mymetrics.out"]
4+
5+
## Read from the beginning using the deprecated option
6+
from_beginning = true
7+
8+
data_format = "influx"

0 commit comments

Comments
 (0)