Skip to content

Commit 78b80ef

Browse files
boaty82artembilan
authored andcommitted
GH-2354 Injection Executor to LockLeaderInitiator
Fixes #2354 INT-2354 missed file in previous commit INT-2354 fix failing test INT-2354 changes following feedback INT-2354 feedback changes INT-2354 feedback changes * Simple code style polishing
1 parent d361eef commit 78b80ef

File tree

2 files changed

+84
-38
lines changed

2 files changed

+84
-38
lines changed

spring-integration-core/src/main/java/org/springframework/integration/support/leader/LockRegistryLeaderInitiator.java

Lines changed: 39 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
* @author Artem Bilan
5858
* @author Vedran Pavic
5959
* @author Glenn Renfro
60+
* @author Kiel Boatman
6061
*
6162
* @since 4.3.1
6263
*/
@@ -72,12 +73,6 @@ public class LockRegistryLeaderInitiator implements SmartLifecycle, DisposableBe
7273

7374
private final Object lifecycleMonitor = new Object();
7475

75-
/**
76-
* Executor service for running leadership daemon.
77-
*/
78-
private final ExecutorService executorService =
79-
Executors.newSingleThreadExecutor(new CustomizableThreadFactory("lock-leadership-"));
80-
8176
/**
8277
* A lock registry. The locks it manages should be global (whatever that means for the
8378
* system) and expiring, in case the holder dies without notifying anyone.
@@ -92,6 +87,17 @@ public class LockRegistryLeaderInitiator implements SmartLifecycle, DisposableBe
9287
*/
9388
private final Candidate candidate;
9489

90+
/**
91+
* Executor service for running leadership daemon.
92+
*/
93+
private ExecutorService executorService =
94+
Executors.newSingleThreadExecutor(new CustomizableThreadFactory("lock-leadership-"));
95+
96+
/**
97+
* Flag to denote whether the {@link ExecutorService} was provided via the setter and
98+
* thus should not be shutdown when {@link #destroy()} is called
99+
*/
100+
private boolean executorServiceExplicitlySet;
95101

96102
/**
97103
* Time in milliseconds to wait in between attempts to re-acquire the lock, once it is
@@ -124,28 +130,28 @@ public class LockRegistryLeaderInitiator implements SmartLifecycle, DisposableBe
124130
*/
125131
private LeaderEventPublisher leaderEventPublisher;
126132

127-
/**
128-
* Future returned by submitting an {@link LeaderSelector} to
129-
* {@link #executorService}. This is used to cancel leadership.
130-
*/
131-
private volatile Future<?> future;
132-
133133
/**
134134
* @see SmartLifecycle
135135
*/
136-
private volatile boolean autoStartup = true;
136+
private boolean autoStartup = true;
137137

138138
/**
139139
* @see SmartLifecycle which is an extension of org.springframework.context.Phased
140140
*/
141-
private volatile int phase;
141+
private int phase;
142142

143143
/**
144144
* Flag that indicates whether the leadership election for this {@link #candidate} is
145145
* running.
146146
*/
147147
private volatile boolean running;
148148

149+
/**
150+
* Future returned by submitting an {@link LeaderSelector} to
151+
* {@link #executorService}. This is used to cancel leadership.
152+
*/
153+
private volatile Future<?> future;
154+
149155
/**
150156
* Create a new leader initiator with the provided lock registry and a default
151157
* candidate (which just logs the leadership events).
@@ -168,9 +174,15 @@ public LockRegistryLeaderInitiator(LockRegistry locks, Candidate candidate) {
168174
this.candidate = candidate;
169175
}
170176

171-
@Override
172-
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
173-
this.applicationEventPublisher = applicationEventPublisher;
177+
/**
178+
* Set the {@link ExecutorService}, where is not provided then a default of
179+
* single thread Executor will be used.
180+
* @param executorService the executor service
181+
* @since 5.0.2
182+
*/
183+
public void setExecutorService(ExecutorService executorService) {
184+
this.executorService = executorService;
185+
this.executorServiceExplicitlySet = true;
174186
}
175187

176188
public void setHeartBeatMillis(long heartBeatMillis) {
@@ -182,13 +194,18 @@ public void setBusyWaitMillis(long busyWaitMillis) {
182194
}
183195

184196
/**
185-
* Sets the {@link LeaderEventPublisher}.
197+
* Set the {@link LeaderEventPublisher}.
186198
* @param leaderEventPublisher the event publisher
187199
*/
188200
public void setLeaderEventPublisher(LeaderEventPublisher leaderEventPublisher) {
189201
this.leaderEventPublisher = leaderEventPublisher;
190202
}
191203

204+
@Override
205+
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
206+
this.applicationEventPublisher = applicationEventPublisher;
207+
}
208+
192209
/**
193210
* @return true if leadership election for this {@link #candidate} is running.
194211
*/
@@ -272,9 +289,11 @@ public void start() {
272289
}
273290

274291
@Override
275-
public void destroy() throws Exception {
292+
public void destroy() {
276293
stop();
277-
this.executorService.shutdown();
294+
if (!this.executorServiceExplicitlySet) {
295+
this.executorService.shutdown();
296+
}
278297
}
279298

280299
@Override

spring-integration-core/src/test/java/org/springframework/integration/support/leader/LockRegistryLeaderInitiatorTests.java

Lines changed: 45 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.integration.support.leader;
1818

1919
import static org.hamcrest.CoreMatchers.is;
20+
import static org.junit.Assert.assertFalse;
2021
import static org.junit.Assert.assertNull;
2122
import static org.junit.Assert.assertThat;
2223
import static org.junit.Assert.assertTrue;
@@ -30,6 +31,8 @@
3031
import static org.mockito.Mockito.spy;
3132

3233
import java.util.concurrent.CountDownLatch;
34+
import java.util.concurrent.ExecutorService;
35+
import java.util.concurrent.Executors;
3336
import java.util.concurrent.TimeUnit;
3437
import java.util.concurrent.atomic.AtomicBoolean;
3538
import java.util.concurrent.atomic.AtomicReference;
@@ -48,20 +51,22 @@
4851
import org.springframework.integration.leader.event.LeaderEventPublisher;
4952
import org.springframework.integration.support.locks.DefaultLockRegistry;
5053
import org.springframework.integration.support.locks.LockRegistry;
54+
import org.springframework.integration.test.util.TestUtils;
5155

5256
/**
5357
* @author Dave Syer
5458
* @author Artem Bilan
5559
* @author Vedran Pavic
5660
* @author Glenn Renfro
61+
* @author Kiel Boatman
5762
*
5863
* @since 4.3.1
5964
*/
6065
public class LockRegistryLeaderInitiatorTests {
6166

62-
private CountDownLatch granted;
67+
private CountDownLatch granted = new CountDownLatch(1);
6368

64-
private CountDownLatch revoked;
69+
private CountDownLatch revoked = new CountDownLatch(1);
6570

6671
private final LockRegistry registry = new DefaultLockRegistry();
6772

@@ -70,8 +75,6 @@ public class LockRegistryLeaderInitiatorTests {
7075

7176
@Before
7277
public void init() {
73-
this.granted = new CountDownLatch(1);
74-
this.revoked = new CountDownLatch(1);
7578
this.initiator.setLeaderEventPublisher(new CountingPublisher(this.granted, this.revoked));
7679
}
7780

@@ -105,6 +108,7 @@ public void yield() throws Exception {
105108
public void competing() throws Exception {
106109
LockRegistryLeaderInitiator another =
107110
new LockRegistryLeaderInitiator(this.registry, new DefaultCandidate());
111+
108112
CountDownLatch other = new CountDownLatch(1);
109113
another.setLeaderEventPublisher(new CountingPublisher(other));
110114
this.initiator.start();
@@ -139,9 +143,7 @@ public void competingWithErrorPublish() throws Exception {
139143
public void testExceptionFromEvent() throws Exception {
140144
CountDownLatch onGranted = new CountDownLatch(1);
141145

142-
LockRegistryLeaderInitiator initiator = new LockRegistryLeaderInitiator(this.registry, new DefaultCandidate());
143-
144-
initiator.setLeaderEventPublisher(new DefaultLeaderEventPublisher() {
146+
this.initiator.setLeaderEventPublisher(new DefaultLeaderEventPublisher() {
145147

146148
@Override
147149
public void publishOnGranted(Object source, Context context, String role) {
@@ -155,12 +157,12 @@ public void publishOnGranted(Object source, Context context, String role) {
155157

156158
});
157159

158-
initiator.start();
160+
this.initiator.start();
159161

160162
assertTrue(onGranted.await(10, TimeUnit.SECONDS));
161163
assertTrue(initiator.getContext().isLeader());
162164

163-
initiator.stop();
165+
this.initiator.stop();
164166
}
165167

166168
@Test
@@ -177,6 +179,7 @@ public void competingWithLock() throws Exception {
177179
// set up first initiator instance using first LockRegistry
178180
LockRegistryLeaderInitiator first =
179181
new LockRegistryLeaderInitiator(firstRegistry, new DefaultCandidate());
182+
180183
CountDownLatch firstGranted = new CountDownLatch(1);
181184
CountDownLatch firstRevoked = new CountDownLatch(1);
182185
CountDownLatch firstAquireLockFailed = new CountDownLatch(1);
@@ -193,6 +196,7 @@ public void competingWithLock() throws Exception {
193196
// set up second initiator instance using second LockRegistry
194197
LockRegistryLeaderInitiator second =
195198
new LockRegistryLeaderInitiator(secondRegistry, new DefaultCandidate());
199+
196200
CountDownLatch secondGranted = new CountDownLatch(1);
197201
CountDownLatch secondRevoked = new CountDownLatch(1);
198202
CountDownLatch secondAquireLockFailed = new CountDownLatch(1);
@@ -246,20 +250,20 @@ public void testGracefulLeaderSelectorExit() throws Exception {
246250
given(registry.obtain(anyString()))
247251
.willReturn(lock);
248252

249-
LockRegistryLeaderInitiator initiator = new LockRegistryLeaderInitiator(registry);
253+
LockRegistryLeaderInitiator another = new LockRegistryLeaderInitiator(registry);
250254

251255
willAnswer(invocation -> {
252-
initiator.stop();
256+
another.stop();
253257
return false;
254258
})
255259
.given(lock)
256260
.tryLock(anyLong(), eq(TimeUnit.MILLISECONDS));
257261

258-
new DirectFieldAccessor(initiator).setPropertyValue("executorService",
262+
new DirectFieldAccessor(another).setPropertyValue("executorService",
259263
new ExecutorServiceAdapter(
260264
new SyncTaskExecutor()));
261265

262-
initiator.start();
266+
another.start();
263267

264268
Throwable throwable = throwableAtomicReference.get();
265269
assertNull(throwable);
@@ -284,17 +288,40 @@ public void testExceptionFromLock() throws Exception {
284288

285289
CountDownLatch onGranted = new CountDownLatch(1);
286290

287-
LockRegistryLeaderInitiator initiator = new LockRegistryLeaderInitiator(registry);
291+
LockRegistryLeaderInitiator another = new LockRegistryLeaderInitiator(registry);
288292

289-
initiator.setLeaderEventPublisher(new CountingPublisher(onGranted));
293+
another.setLeaderEventPublisher(new CountingPublisher(onGranted));
290294

291-
initiator.start();
295+
another.start();
292296

293297
assertTrue(onGranted.await(10, TimeUnit.SECONDS));
294-
assertTrue(initiator.getContext().isLeader());
298+
assertTrue(another.getContext().isLeader());
295299
assertTrue(exceptionThrown.get());
296300

297-
initiator.stop();
301+
another.stop();
302+
}
303+
304+
@Test
305+
public void shouldShutdownInternalExecutorService() {
306+
this.initiator.start();
307+
this.initiator.destroy();
308+
309+
ExecutorService executorService =
310+
TestUtils.getPropertyValue(this.initiator, "executorService", ExecutorService.class);
311+
312+
assertTrue(executorService.isShutdown());
313+
}
314+
315+
@Test
316+
public void doNotShutdownProvidedExecutorService() {
317+
LockRegistryLeaderInitiator another = new LockRegistryLeaderInitiator(this.registry);
318+
ExecutorService executorService = Executors.newSingleThreadExecutor();
319+
another.setExecutorService(executorService);
320+
321+
another.start();
322+
another.destroy();
323+
324+
assertFalse(executorService.isShutdown());
298325
}
299326

300327
private static class CountingPublisher implements LeaderEventPublisher {

0 commit comments

Comments
 (0)