Skip to content

Commit 0a30742

Browse files
Keith Masseysourcegraph-bot
authored andcommitted
elastic: Avoiding BulkProcessor deadlock in ILMHistoryStore (#91238)
Commit: 0e5844fb871763960742e416ad0391d000819408
1 parent ce606a6 commit 0a30742

File tree

10 files changed

+2120
-86
lines changed

10 files changed

+2120
-86
lines changed

elastic/docs/changelog/91238.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
pr: 91238
2+
summary: Avoiding `BulkProcessor` deadlock in ILMHistoryStore
3+
area: ILM+SLM
4+
type: bug
5+
issues:
6+
- 68468
7+
- 50440
Lines changed: 302 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,302 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.action.bulk;
10+
11+
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
12+
13+
import org.elasticsearch.action.get.MultiGetItemResponse;
14+
import org.elasticsearch.action.get.MultiGetRequestBuilder;
15+
import org.elasticsearch.action.get.MultiGetResponse;
16+
import org.elasticsearch.action.index.IndexRequest;
17+
import org.elasticsearch.client.internal.Client;
18+
import org.elasticsearch.client.internal.Requests;
19+
import org.elasticsearch.cluster.metadata.IndexMetadata;
20+
import org.elasticsearch.common.settings.Settings;
21+
import org.elasticsearch.common.unit.ByteSizeUnit;
22+
import org.elasticsearch.common.unit.ByteSizeValue;
23+
import org.elasticsearch.core.TimeValue;
24+
import org.elasticsearch.test.ESIntegTestCase;
25+
26+
import java.util.Arrays;
27+
import java.util.Comparator;
28+
import java.util.HashSet;
29+
import java.util.List;
30+
import java.util.Set;
31+
import java.util.concurrent.CopyOnWriteArrayList;
32+
import java.util.concurrent.CountDownLatch;
33+
import java.util.concurrent.TimeUnit;
34+
import java.util.concurrent.atomic.AtomicInteger;
35+
36+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
37+
import static org.hamcrest.Matchers.both;
38+
import static org.hamcrest.Matchers.either;
39+
import static org.hamcrest.Matchers.equalTo;
40+
import static org.hamcrest.Matchers.greaterThan;
41+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
42+
import static org.hamcrest.Matchers.is;
43+
import static org.hamcrest.Matchers.lessThanOrEqualTo;
44+
45+
public class BulkProcessor2IT extends ESIntegTestCase {
46+
47+
public void testThatBulkProcessor2CountIsCorrect() throws Exception {
48+
final CountDownLatch latch = new CountDownLatch(1);
49+
BulkProcessor2TestListener listener = new BulkProcessor2TestListener(latch);
50+
51+
int numDocs = randomIntBetween(10, 100);
52+
BulkProcessor2 processor = BulkProcessor2.builder(client()::bulk, listener, client().threadPool())
53+
// let's make sure that the bulk action limit trips, one single execution will index all the documents
54+
.setBulkActions(numDocs)
55+
.setFlushInterval(TimeValue.timeValueHours(24))
56+
.setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
57+
.build();
58+
try {
59+
60+
MultiGetRequestBuilder multiGetRequestBuilder = indexDocs(client(), processor, numDocs);
61+
62+
latch.await();
63+
64+
assertThat(listener.beforeCounts.get(), equalTo(1));
65+
assertThat(listener.afterCounts.get(), equalTo(1));
66+
assertThat(listener.bulkFailures.size(), equalTo(0));
67+
assertResponseItems(listener.bulkItems, numDocs);
68+
assertMultiGetResponse(multiGetRequestBuilder.get(), numDocs);
69+
assertThat(processor.getTotalBytesInFlight(), equalTo(0L));
70+
} finally {
71+
processor.awaitClose(1, TimeUnit.SECONDS);
72+
}
73+
}
74+
75+
public void testBulkProcessor2ConcurrentRequests() throws Exception {
76+
int bulkActions = randomIntBetween(10, 100);
77+
int numDocs = randomIntBetween(bulkActions, bulkActions + 100);
78+
79+
int expectedBulkActions = numDocs / bulkActions;
80+
81+
final CountDownLatch latch = new CountDownLatch(expectedBulkActions);
82+
int totalExpectedBulkActions = numDocs % bulkActions == 0 ? expectedBulkActions : expectedBulkActions + 1;
83+
final CountDownLatch closeLatch = new CountDownLatch(totalExpectedBulkActions);
84+
85+
BulkProcessor2TestListener listener = new BulkProcessor2TestListener(latch, closeLatch);
86+
87+
MultiGetRequestBuilder multiGetRequestBuilder;
88+
BulkProcessor2 processor = BulkProcessor2.builder(client()::bulk, listener, client().threadPool())
89+
.setBulkActions(bulkActions)
90+
// set interval and size to high values
91+
.setFlushInterval(TimeValue.timeValueHours(24))
92+
.setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
93+
.build();
94+
try {
95+
96+
multiGetRequestBuilder = indexDocs(client(), processor, numDocs);
97+
98+
latch.await();
99+
100+
assertThat(listener.beforeCounts.get(), equalTo(expectedBulkActions));
101+
assertThat(listener.afterCounts.get(), equalTo(expectedBulkActions));
102+
assertThat(listener.bulkFailures.size(), equalTo(0));
103+
assertThat(listener.bulkItems.size(), equalTo(numDocs - numDocs % bulkActions));
104+
} finally {
105+
processor.awaitClose(1, TimeUnit.SECONDS);
106+
}
107+
108+
closeLatch.await();
109+
110+
assertThat(listener.beforeCounts.get(), equalTo(totalExpectedBulkActions));
111+
assertThat(listener.afterCounts.get(), equalTo(totalExpectedBulkActions));
112+
assertThat(listener.bulkFailures.size(), equalTo(0));
113+
assertThat(listener.bulkItems.size(), equalTo(numDocs));
114+
115+
Set<String> ids = new HashSet<>();
116+
for (BulkItemResponse bulkItemResponse : listener.bulkItems) {
117+
assertThat(bulkItemResponse.getFailureMessage(), bulkItemResponse.isFailed(), equalTo(false));
118+
assertThat(bulkItemResponse.getIndex(), equalTo("test"));
119+
// with concurrent requests > 1 we can't rely on the order of the bulk requests
120+
assertThat(Integer.valueOf(bulkItemResponse.getId()), both(greaterThan(0)).and(lessThanOrEqualTo(numDocs)));
121+
// we do want to check that we don't get duplicate ids back
122+
assertThat(ids.add(bulkItemResponse.getId()), equalTo(true));
123+
}
124+
125+
assertMultiGetResponse(multiGetRequestBuilder.get(), numDocs);
126+
assertThat(processor.getTotalBytesInFlight(), equalTo(0L));
127+
}
128+
129+
public void testBulkProcessor2WaitOnClose() throws Exception {
130+
BulkProcessor2TestListener listener = new BulkProcessor2TestListener();
131+
132+
int numDocs = randomIntBetween(10, 100);
133+
BulkProcessor2 processor = BulkProcessor2.builder(client()::bulk, listener, client().threadPool())
134+
// let's make sure that the bulk action limit trips, one single execution will index all the documents
135+
.setBulkActions(numDocs)
136+
.setFlushInterval(TimeValue.timeValueHours(24))
137+
.setBulkSize(new ByteSizeValue(randomIntBetween(1, 10), RandomPicks.randomFrom(random(), ByteSizeUnit.values())))
138+
.build();
139+
140+
MultiGetRequestBuilder multiGetRequestBuilder = indexDocs(client(), processor, numDocs);
141+
processor.awaitClose(1, TimeUnit.MINUTES);
142+
assertThat(listener.beforeCounts.get(), greaterThanOrEqualTo(1));
143+
assertThat(listener.afterCounts.get(), greaterThanOrEqualTo(1));
144+
assertThat(listener.bulkFailures.size(), equalTo(0));
145+
assertResponseItems(listener.bulkItems, numDocs);
146+
assertMultiGetResponse(multiGetRequestBuilder.get(), numDocs);
147+
}
148+
149+
public void testBulkProcessor2ConcurrentRequestsReadOnlyIndex() throws Exception {
150+
createIndex("test-ro");
151+
assertAcked(
152+
client().admin()
153+
.indices()
154+
.prepareUpdateSettings("test-ro")
155+
.setSettings(Settings.builder().put(IndexMetadata.SETTING_BLOCKS_WRITE, true))
156+
);
157+
ensureGreen();
158+
159+
int bulkActions = randomIntBetween(10, 100);
160+
int numDocs = randomIntBetween(bulkActions, bulkActions + 100);
161+
162+
int expectedBulkActions = numDocs / bulkActions;
163+
164+
final CountDownLatch latch = new CountDownLatch(expectedBulkActions);
165+
int totalExpectedBulkActions = numDocs % bulkActions == 0 ? expectedBulkActions : expectedBulkActions + 1;
166+
final CountDownLatch closeLatch = new CountDownLatch(totalExpectedBulkActions);
167+
168+
int testDocs = 0;
169+
int testReadOnlyDocs = 0;
170+
MultiGetRequestBuilder multiGetRequestBuilder = client().prepareMultiGet();
171+
BulkProcessor2TestListener listener = new BulkProcessor2TestListener(latch, closeLatch);
172+
173+
BulkProcessor2 processor = BulkProcessor2.builder(client()::bulk, listener, client().threadPool())
174+
.setBulkActions(bulkActions)
175+
// set interval and size to high values
176+
.setFlushInterval(TimeValue.timeValueHours(24))
177+
.setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
178+
.build();
179+
try {
180+
181+
for (int i = 1; i <= numDocs; i++) {
182+
if (randomBoolean()) {
183+
testDocs++;
184+
processor.add(
185+
new IndexRequest("test").id(Integer.toString(testDocs)).source(Requests.INDEX_CONTENT_TYPE, "field", "value")
186+
);
187+
multiGetRequestBuilder.add("test", Integer.toString(testDocs));
188+
} else {
189+
testReadOnlyDocs++;
190+
processor.add(
191+
new IndexRequest("test-ro").id(Integer.toString(testReadOnlyDocs))
192+
.source(Requests.INDEX_CONTENT_TYPE, "field", "value")
193+
);
194+
}
195+
}
196+
} finally {
197+
processor.awaitClose(1, TimeUnit.SECONDS);
198+
}
199+
200+
closeLatch.await();
201+
202+
assertThat(listener.beforeCounts.get(), equalTo(totalExpectedBulkActions));
203+
assertThat(listener.afterCounts.get(), equalTo(totalExpectedBulkActions));
204+
assertThat(listener.bulkFailures.size(), equalTo(0));
205+
assertThat(listener.bulkItems.size(), equalTo(testDocs + testReadOnlyDocs));
206+
assertThat(processor.getTotalBytesInFlight(), equalTo(0L));
207+
Set<String> ids = new HashSet<>();
208+
Set<String> readOnlyIds = new HashSet<>();
209+
for (BulkItemResponse bulkItemResponse : listener.bulkItems) {
210+
assertThat(bulkItemResponse.getIndex(), either(equalTo("test")).or(equalTo("test-ro")));
211+
if (bulkItemResponse.getIndex().equals("test")) {
212+
assertThat(bulkItemResponse.isFailed(), equalTo(false));
213+
// with concurrent requests > 1 we can't rely on the order of the bulk requests
214+
assertThat(Integer.valueOf(bulkItemResponse.getId()), both(greaterThan(0)).and(lessThanOrEqualTo(testDocs)));
215+
// we do want to check that we don't get duplicate ids back
216+
assertThat(ids.add(bulkItemResponse.getId()), equalTo(true));
217+
} else {
218+
assertThat(bulkItemResponse.isFailed(), equalTo(true));
219+
// with concurrent requests > 1 we can't rely on the order of the bulk requests
220+
assertThat(Integer.valueOf(bulkItemResponse.getId()), both(greaterThan(0)).and(lessThanOrEqualTo(testReadOnlyDocs)));
221+
// we do want to check that we don't get duplicate ids back
222+
assertThat(readOnlyIds.add(bulkItemResponse.getId()), equalTo(true));
223+
}
224+
}
225+
226+
assertMultiGetResponse(multiGetRequestBuilder.get(), testDocs);
227+
}
228+
229+
private static MultiGetRequestBuilder indexDocs(Client client, BulkProcessor2 processor, int numDocs) throws Exception {
230+
MultiGetRequestBuilder multiGetRequestBuilder = client.prepareMultiGet();
231+
for (int i = 1; i <= numDocs; i++) {
232+
processor.add(
233+
new IndexRequest("test").id(Integer.toString(i))
234+
.source(Requests.INDEX_CONTENT_TYPE, "field", randomRealisticUnicodeOfLengthBetween(1, 30))
235+
);
236+
multiGetRequestBuilder.add("test", Integer.toString(i));
237+
}
238+
return multiGetRequestBuilder;
239+
}
240+
241+
private static void assertResponseItems(List<BulkItemResponse> bulkItemResponses, int numDocs) {
242+
assertThat(bulkItemResponses.size(), is(numDocs));
243+
int i = 1;
244+
List<BulkItemResponse> sortedResponses = bulkItemResponses.stream()
245+
.sorted(Comparator.comparing(o -> Integer.valueOf(o.getId())))
246+
.toList();
247+
for (BulkItemResponse bulkItemResponse : sortedResponses) {
248+
assertThat(bulkItemResponse.getIndex(), equalTo("test"));
249+
assertThat(bulkItemResponse.getId(), equalTo(Integer.toString(i++)));
250+
assertThat(
251+
"item " + i + " failed with cause: " + bulkItemResponse.getFailureMessage(),
252+
bulkItemResponse.isFailed(),
253+
equalTo(false)
254+
);
255+
}
256+
}
257+
258+
private static void assertMultiGetResponse(MultiGetResponse multiGetResponse, int numDocs) {
259+
assertThat(multiGetResponse.getResponses().length, equalTo(numDocs));
260+
int i = 1;
261+
for (MultiGetItemResponse multiGetItemResponse : multiGetResponse) {
262+
assertThat(multiGetItemResponse.getIndex(), equalTo("test"));
263+
assertThat(multiGetItemResponse.getId(), equalTo(Integer.toString(i++)));
264+
}
265+
}
266+
267+
private static class BulkProcessor2TestListener implements BulkProcessor2.Listener {
268+
269+
private final CountDownLatch[] latches;
270+
private final AtomicInteger beforeCounts = new AtomicInteger();
271+
private final AtomicInteger afterCounts = new AtomicInteger();
272+
private final List<BulkItemResponse> bulkItems = new CopyOnWriteArrayList<>();
273+
private final List<Throwable> bulkFailures = new CopyOnWriteArrayList<>();
274+
275+
private BulkProcessor2TestListener(CountDownLatch... latches) {
276+
this.latches = latches;
277+
}
278+
279+
@Override
280+
public void beforeBulk(long executionId, BulkRequest request) {
281+
beforeCounts.incrementAndGet();
282+
}
283+
284+
@Override
285+
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
286+
bulkItems.addAll(Arrays.asList(response.getItems()));
287+
afterCounts.incrementAndGet();
288+
for (CountDownLatch latch : latches) {
289+
latch.countDown();
290+
}
291+
}
292+
293+
@Override
294+
public void afterBulk(long executionId, BulkRequest request, Exception failure) {
295+
bulkFailures.add(failure);
296+
afterCounts.incrementAndGet();
297+
for (CountDownLatch latch : latches) {
298+
latch.countDown();
299+
}
300+
}
301+
}
302+
}

0 commit comments

Comments
 (0)