Skip to content

Commit 230a03a

Browse files
okg-cxftishun
andauthored
fix: prevent blocking event loop thread by replacing ArrayDeque with HashIndexedQueue (#2953)
* fix: prevent long stall during reconnection/disconnection by providing LinkedHashSetQueue instead of ArrayDeque for CommandHandler#stack * Polishing * Missed out a file --------- Co-authored-by: Tihomir Mateev <[email protected]>
1 parent f9d4509 commit 230a03a

File tree

4 files changed

+491
-9
lines changed

4 files changed

+491
-9
lines changed

src/main/java/io/lettuce/core/ClientOptions.java

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ public class ClientOptions implements Serializable {
8585

8686
public static final TimeoutOptions DEFAULT_TIMEOUT_OPTIONS = TimeoutOptions.enabled();
8787

88+
public static final boolean DEFAULT_USE_HASH_INDEX_QUEUE = true;
89+
8890
private final boolean autoReconnect;
8991

9092
private final boolean cancelCommandsOnReconnectFailure;
@@ -115,6 +117,8 @@ public class ClientOptions implements Serializable {
115117

116118
private final TimeoutOptions timeoutOptions;
117119

120+
private final boolean useHashIndexedQueue;
121+
118122
protected ClientOptions(Builder builder) {
119123
this.autoReconnect = builder.autoReconnect;
120124
this.cancelCommandsOnReconnectFailure = builder.cancelCommandsOnReconnectFailure;
@@ -131,6 +135,7 @@ protected ClientOptions(Builder builder) {
131135
this.sslOptions = builder.sslOptions;
132136
this.suspendReconnectOnProtocolFailure = builder.suspendReconnectOnProtocolFailure;
133137
this.timeoutOptions = builder.timeoutOptions;
138+
this.useHashIndexedQueue = builder.useHashIndexedQueue;
134139
}
135140

136141
protected ClientOptions(ClientOptions original) {
@@ -149,6 +154,7 @@ protected ClientOptions(ClientOptions original) {
149154
this.sslOptions = original.getSslOptions();
150155
this.suspendReconnectOnProtocolFailure = original.isSuspendReconnectOnProtocolFailure();
151156
this.timeoutOptions = original.getTimeoutOptions();
157+
this.useHashIndexedQueue = original.isUseHashIndexedQueue();
152158
}
153159

154160
/**
@@ -214,6 +220,8 @@ public static class Builder {
214220

215221
private TimeoutOptions timeoutOptions = DEFAULT_TIMEOUT_OPTIONS;
216222

223+
private boolean useHashIndexedQueue = DEFAULT_USE_HASH_INDEX_QUEUE;
224+
217225
protected Builder() {
218226
}
219227

@@ -269,8 +277,8 @@ public Builder bufferUsageRatio(int bufferUsageRatio) {
269277
*
270278
* @param policy the policy to use in {@link io.lettuce.core.protocol.CommandHandler}
271279
* @return {@code this}
272-
* @since 6.0
273280
* @see DecodeBufferPolicies
281+
* @since 6.0
274282
*/
275283
public Builder decodeBufferPolicy(DecodeBufferPolicy policy) {
276284

@@ -317,8 +325,8 @@ public Builder pingBeforeActivateConnection(boolean pingBeforeActivateConnection
317325
*
318326
* @param protocolVersion version to use.
319327
* @return {@code this}
320-
* @since 6.0
321328
* @see ProtocolVersion#newestSupported()
329+
* @since 6.0
322330
*/
323331
public Builder protocolVersion(ProtocolVersion protocolVersion) {
324332

@@ -337,9 +345,9 @@ public Builder protocolVersion(ProtocolVersion protocolVersion) {
337345
*
338346
* @param publishOnScheduler true/false
339347
* @return {@code this}
340-
* @since 5.2
341348
* @see org.reactivestreams.Subscriber#onNext(Object)
342349
* @see ClientResources#eventExecutorGroup()
350+
* @since 5.2
343351
*/
344352
public Builder publishOnScheduler(boolean publishOnScheduler) {
345353
this.publishOnScheduler = publishOnScheduler;
@@ -459,6 +467,20 @@ public Builder timeoutOptions(TimeoutOptions timeoutOptions) {
459467
return this;
460468
}
461469

470+
/**
471+
* Use hash indexed queue, which provides O(1) remove(Object) thus won't cause blocking issues.
472+
*
473+
* @param useHashIndexedQueue true/false
474+
* @return {@code this}
475+
* @see io.lettuce.core.protocol.CommandHandler.AddToStack
476+
* @since 6.6
477+
*/
478+
@SuppressWarnings("JavadocReference")
479+
public Builder useHashIndexQueue(boolean useHashIndexedQueue) {
480+
this.useHashIndexedQueue = useHashIndexedQueue;
481+
return this;
482+
}
483+
462484
/**
463485
* Create a new instance of {@link ClientOptions}.
464486
*
@@ -476,7 +498,6 @@ public ClientOptions build() {
476498
*
477499
* @return a {@link ClientOptions.Builder} to create new {@link ClientOptions} whose settings are replicated from the
478500
* current {@link ClientOptions}.
479-
*
480501
* @since 5.1
481502
*/
482503
public ClientOptions.Builder mutate() {
@@ -535,7 +556,6 @@ public DecodeBufferPolicy getDecodeBufferPolicy() {
535556
*
536557
* @return zero.
537558
* @since 5.2
538-
*
539559
* @deprecated since 6.0 in favor of {@link DecodeBufferPolicy}.
540560
*/
541561
@Deprecated
@@ -684,6 +704,15 @@ public TimeoutOptions getTimeoutOptions() {
684704
return timeoutOptions;
685705
}
686706

707+
/**
708+
* Whether we should use hash indexed queue, which provides O(1) remove(Object)
709+
*
710+
* @return if hash indexed queue should be used
711+
*/
712+
public boolean isUseHashIndexedQueue() {
713+
return useHashIndexedQueue;
714+
}
715+
687716
/**
688717
* Behavior of connections in disconnected state.
689718
*/
Lines changed: 266 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,266 @@
1+
/*
2+
* Copyright 2011-Present, Redis Ltd. and Contributors
3+
* All rights reserved.
4+
*
5+
* Licensed under the MIT License.
6+
*/
7+
package io.lettuce.core.datastructure.queue;
8+
9+
import java.util.AbstractQueue;
10+
import java.util.ArrayList;
11+
import java.util.Collection;
12+
import java.util.HashMap;
13+
import java.util.List;
14+
import java.util.Map;
15+
import java.util.NoSuchElementException;
16+
17+
import io.lettuce.core.internal.LettuceAssert;
18+
import org.jetbrains.annotations.NotNull;
19+
20+
/**
21+
* A queue implementation that supports O(1) removal of elements. The queue is backed by a hash map and a doubly linked list.
22+
*
23+
* @author chenxiaofan
24+
*/
25+
@SuppressWarnings("unchecked")
26+
public class HashIndexedQueue<E> extends AbstractQueue<E> {
27+
28+
private final Map<E, Object> map; // Object can be Node<E> or List<Node<E>>
29+
30+
private Node<E> head;
31+
32+
private Node<E> tail;
33+
34+
private int size;
35+
36+
private static class Node<E> {
37+
38+
E value;
39+
40+
Node<E> next;
41+
42+
Node<E> prev;
43+
44+
Node(E value) {
45+
this.value = value;
46+
}
47+
48+
}
49+
50+
/**
51+
* Create a new instance of the {@link HashIndexedQueue}.
52+
*/
53+
public HashIndexedQueue() {
54+
map = new HashMap<>();
55+
size = 0;
56+
}
57+
58+
@Override
59+
public boolean add(E e) {
60+
return offer(e);
61+
}
62+
63+
@Override
64+
public boolean offer(E e) {
65+
final Node<E> newNode = new Node<>(e);
66+
if (tail == null) {
67+
head = tail = newNode;
68+
} else {
69+
tail.next = newNode;
70+
newNode.prev = tail;
71+
tail = newNode;
72+
}
73+
74+
if (!map.containsKey(e)) {
75+
map.put(e, newNode);
76+
} else {
77+
Object current = map.get(e);
78+
if (current instanceof Node) {
79+
List<Node<E>> nodes = new ArrayList<>();
80+
nodes.add((Node<E>) current);
81+
nodes.add(newNode);
82+
map.put(e, nodes);
83+
} else {
84+
((List<Node<E>>) current).add(newNode);
85+
}
86+
}
87+
size++;
88+
return true;
89+
}
90+
91+
@Override
92+
public E poll() {
93+
if (head == null) {
94+
return null;
95+
}
96+
E value = head.value;
97+
removeNodeFromMap(head);
98+
head = head.next;
99+
if (head == null) {
100+
tail = null;
101+
} else {
102+
head.prev = null;
103+
}
104+
size--;
105+
return value;
106+
}
107+
108+
@Override
109+
public E peek() {
110+
if (head == null) {
111+
return null;
112+
}
113+
return head.value;
114+
}
115+
116+
@Override
117+
public boolean remove(Object o) {
118+
return removeFirstOccurrence(o);
119+
}
120+
121+
@Override
122+
public int size() {
123+
return size;
124+
}
125+
126+
@Override
127+
public boolean contains(Object o) {
128+
return map.containsKey(o);
129+
}
130+
131+
public class Iterator implements java.util.Iterator<E> {
132+
133+
private Node<E> current;
134+
135+
private Node<E> prev;
136+
137+
private Iterator() {
138+
current = HashIndexedQueue.this.head;
139+
prev = null;
140+
}
141+
142+
@Override
143+
public boolean hasNext() {
144+
return current != null;
145+
}
146+
147+
@Override
148+
public E next() {
149+
if (!hasNext()) {
150+
throw new NoSuchElementException();
151+
}
152+
E value = current.value;
153+
prev = current;
154+
current = current.next;
155+
return value;
156+
}
157+
158+
@Override
159+
public void remove() {
160+
if (prev != null) {
161+
removeNodeFromMap(prev);
162+
removeNode(prev);
163+
size--;
164+
// remove once
165+
prev = null;
166+
}
167+
}
168+
169+
}
170+
171+
@NotNull
172+
@Override
173+
public Iterator iterator() {
174+
return new Iterator();
175+
}
176+
177+
@Override
178+
public boolean removeAll(Collection<?> c) {
179+
boolean modified = false;
180+
for (Object e : c) {
181+
if (removeAllOccurrences(e)) {
182+
modified = true;
183+
}
184+
}
185+
return modified;
186+
}
187+
188+
@Override
189+
public void clear() {
190+
head = null;
191+
tail = null;
192+
map.clear();
193+
size = 0;
194+
}
195+
196+
private boolean removeFirstOccurrence(Object element) {
197+
Object current = map.get(element);
198+
if (current == null) {
199+
return false;
200+
}
201+
if (current instanceof Node) {
202+
Node<E> node = (Node<E>) current;
203+
removeNode(node);
204+
map.remove(element);
205+
} else {
206+
List<Node<E>> nodes = (List<Node<E>>) current;
207+
Node<E> node = nodes.remove(0);
208+
if (nodes.isEmpty()) {
209+
map.remove(element);
210+
}
211+
removeNode(node);
212+
}
213+
size--;
214+
return true;
215+
}
216+
217+
private boolean removeAllOccurrences(Object element) {
218+
Object current = map.get(element);
219+
if (current == null) {
220+
return false;
221+
}
222+
if (current instanceof Node) {
223+
final Node<E> node = (Node<E>) current;
224+
removeNode(node);
225+
size--;
226+
} else {
227+
final List<Node<E>> nodes = (List<Node<E>>) current;
228+
for (Node<E> node : nodes) {
229+
removeNode(node);
230+
size--;
231+
}
232+
}
233+
map.remove(element);
234+
return true;
235+
}
236+
237+
private void removeNode(Node<E> node) {
238+
if (node.prev != null) {
239+
node.prev.next = node.next;
240+
} else {
241+
head = node.next;
242+
}
243+
if (node.next != null) {
244+
node.next.prev = node.prev;
245+
} else {
246+
tail = node.prev;
247+
}
248+
}
249+
250+
private void removeNodeFromMap(Node<E> node) {
251+
E value = node.value;
252+
Object current = map.get(value);
253+
if (current instanceof Node) {
254+
LettuceAssert.assertState(current == node, "current != node");
255+
map.remove(value);
256+
} else {
257+
List<Node<E>> nodes = (List<Node<E>>) current;
258+
final boolean removed = nodes.remove(node);
259+
LettuceAssert.assertState(removed, "!nodes.remove(node)");
260+
if (nodes.isEmpty()) {
261+
map.remove(value);
262+
}
263+
}
264+
}
265+
266+
}

0 commit comments

Comments
 (0)