Skip to content

Commit dd5a0a2

Browse files
authored
[7.14] Properly apply system flag on data streams when restoring a snapshot (#75854)
* Properly apply `system` flag on data streams when restoring a snapshot (#75819) This commit modifies the restore process to ensure that the `system` flag is properly applied to restored data streams. Otherwise, this flag is lost when restoring system data streams, which causes errors and/or assertion failures as the backing indices are properly marked as system indices, but the restored data stream is no longer a system data stream. Also adds a test to ensure this flag survives a round trip through the snapshot/restore process. * Fix compilation for backport
1 parent be276a7 commit dd5a0a2

File tree

2 files changed

+160
-1
lines changed

2 files changed

+160
-1
lines changed

server/src/main/java/org/elasticsearch/snapshots/RestoreService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -623,7 +623,8 @@ static DataStream updateDataStream(DataStream dataStream, Metadata.Builder metad
623623
dataStream.getGeneration(),
624624
dataStream.getMetadata(),
625625
dataStream.isHidden(),
626-
dataStream.isReplicated()
626+
dataStream.isReplicated(),
627+
dataStream.isSystem()
627628
);
628629
}
629630

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.datastreams;
9+
10+
import org.elasticsearch.action.DocWriteRequest;
11+
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
12+
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
13+
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
14+
import org.elasticsearch.action.index.IndexResponse;
15+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
16+
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
17+
import org.elasticsearch.common.xcontent.XContentType;
18+
import org.elasticsearch.indices.ExecutorNames;
19+
import org.elasticsearch.indices.SystemDataStreamDescriptor;
20+
import org.elasticsearch.plugins.Plugin;
21+
import org.elasticsearch.plugins.SystemIndexPlugin;
22+
import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase;
23+
import org.elasticsearch.snapshots.mockstore.MockRepository;
24+
import org.elasticsearch.test.ESIntegTestCase;
25+
import org.elasticsearch.threadpool.ThreadPool;
26+
import org.elasticsearch.xpack.core.action.CreateDataStreamAction;
27+
import org.elasticsearch.xpack.core.action.DeleteDataStreamAction;
28+
import org.elasticsearch.xpack.core.action.GetDataStreamAction;
29+
import org.elasticsearch.xpack.datastreams.DataStreamsPlugin;
30+
import org.junit.After;
31+
32+
import java.nio.file.Path;
33+
import java.util.Collection;
34+
import java.util.Collections;
35+
36+
import static org.elasticsearch.datastreams.SystemDataStreamSnapshotIT.SystemDataStreamTestPlugin.SYSTEM_DATA_STREAM_NAME;
37+
import static org.hamcrest.Matchers.arrayWithSize;
38+
import static org.hamcrest.Matchers.hasSize;
39+
import static org.hamcrest.Matchers.oneOf;
40+
41+
@ESIntegTestCase.ClusterScope(transportClientRatio = 0)
42+
public class SystemDataStreamSnapshotIT extends AbstractSnapshotIntegTestCase {
43+
44+
public static final String REPO = "repo";
45+
public static final String SNAPSHOT = "snap";
46+
47+
@Override
48+
protected Collection<Class<? extends Plugin>> nodePlugins() {
49+
return org.elasticsearch.core.List.of(MockRepository.Plugin.class, DataStreamsPlugin.class, SystemDataStreamTestPlugin.class);
50+
}
51+
52+
@After
53+
public void cleanUp() throws Exception {
54+
DeleteDataStreamAction.Request request = new DeleteDataStreamAction.Request(new String[] { SYSTEM_DATA_STREAM_NAME });
55+
AcknowledgedResponse response = client().execute(DeleteDataStreamAction.INSTANCE, request).get();
56+
assertTrue(response.isAcknowledged());
57+
}
58+
59+
public void testSystemDataStreamSnapshotIT() throws Exception {
60+
Path location = randomRepoPath();
61+
createRepository(REPO, "fs", location);
62+
63+
{
64+
CreateDataStreamAction.Request request = new CreateDataStreamAction.Request(SYSTEM_DATA_STREAM_NAME);
65+
final AcknowledgedResponse response = client().execute(CreateDataStreamAction.INSTANCE, request).get();
66+
assertTrue(response.isAcknowledged());
67+
}
68+
69+
// Index a doc so that a concrete backing index will be created
70+
IndexResponse indexRepsonse = client().prepareIndex(SYSTEM_DATA_STREAM_NAME, "_doc")
71+
.setId("42")
72+
.setSource("{ \"@timestamp\": \"2099-03-08T11:06:07.000Z\", \"name\": \"my-name\" }", XContentType.JSON)
73+
.setOpType(DocWriteRequest.OpType.CREATE)
74+
.execute()
75+
.actionGet();
76+
assertThat(indexRepsonse.status().getStatus(), oneOf(200, 201));
77+
78+
{
79+
GetDataStreamAction.Request request = new GetDataStreamAction.Request(new String[] { SYSTEM_DATA_STREAM_NAME });
80+
GetDataStreamAction.Response response = client().execute(GetDataStreamAction.INSTANCE, request).get();
81+
assertThat(response.getDataStreams(), hasSize(1));
82+
assertTrue(response.getDataStreams().get(0).getDataStream().isSystem());
83+
}
84+
85+
CreateSnapshotResponse createSnapshotResponse = client().admin()
86+
.cluster()
87+
.prepareCreateSnapshot(REPO, SNAPSHOT)
88+
.setWaitForCompletion(true)
89+
.setIncludeGlobalState(false)
90+
.get();
91+
92+
// We have to delete the data stream directly, as the feature reset API doesn't clean up system data streams yet
93+
// See https://github.com/elastic/elasticsearch/issues/75818
94+
{
95+
DeleteDataStreamAction.Request request = new DeleteDataStreamAction.Request(new String[] { SYSTEM_DATA_STREAM_NAME });
96+
AcknowledgedResponse response = client().execute(DeleteDataStreamAction.INSTANCE, request).get();
97+
assertTrue(response.isAcknowledged());
98+
}
99+
100+
{
101+
GetIndexResponse indicesRemaining = client().admin().indices().prepareGetIndex().addIndices("_all").get();
102+
assertThat(indicesRemaining.indices(), arrayWithSize(0));
103+
}
104+
105+
RestoreSnapshotResponse restoreSnapshotResponse = client().admin()
106+
.cluster()
107+
.prepareRestoreSnapshot(REPO, SNAPSHOT)
108+
.setWaitForCompletion(true)
109+
.setRestoreGlobalState(false)
110+
.get();
111+
assertEquals(restoreSnapshotResponse.getRestoreInfo().totalShards(), restoreSnapshotResponse.getRestoreInfo().successfulShards());
112+
113+
{
114+
GetDataStreamAction.Request request = new GetDataStreamAction.Request(new String[] { SYSTEM_DATA_STREAM_NAME });
115+
GetDataStreamAction.Response response = client().execute(GetDataStreamAction.INSTANCE, request).get();
116+
assertThat(response.getDataStreams(), hasSize(1));
117+
assertTrue(response.getDataStreams().get(0).getDataStream().isSystem());
118+
}
119+
}
120+
121+
public static class SystemDataStreamTestPlugin extends Plugin implements SystemIndexPlugin {
122+
123+
static final String SYSTEM_DATA_STREAM_NAME = ".test-data-stream";
124+
125+
@Override
126+
public Collection<SystemDataStreamDescriptor> getSystemDataStreamDescriptors() {
127+
return org.elasticsearch.core.List.of(
128+
new SystemDataStreamDescriptor(
129+
SYSTEM_DATA_STREAM_NAME,
130+
"a system data stream for testing",
131+
SystemDataStreamDescriptor.Type.EXTERNAL,
132+
new ComposableIndexTemplate(
133+
org.elasticsearch.core.List.of(".system-data-stream"),
134+
null,
135+
null,
136+
null,
137+
null,
138+
null,
139+
new ComposableIndexTemplate.DataStreamTemplate()
140+
),
141+
org.elasticsearch.core.Map.of(),
142+
Collections.singletonList("test"),
143+
new ExecutorNames(ThreadPool.Names.SYSTEM_CRITICAL_READ, ThreadPool.Names.SYSTEM_READ, ThreadPool.Names.SYSTEM_WRITE)
144+
)
145+
);
146+
}
147+
148+
@Override
149+
public String getFeatureName() {
150+
return SystemDataStreamTestPlugin.class.getSimpleName();
151+
}
152+
153+
@Override
154+
public String getFeatureDescription() {
155+
return "A plugin for testing snapshots of system data streams";
156+
}
157+
}
158+
}

0 commit comments

Comments
 (0)