Skip to content

Commit db01390

Browse files
ZacBlancoauden-woolfson
authored andcommitted
Fix ClusterManager failing to poll
Previously, ClusterManager relies on CompletableFuture.supplyAsync which uses the ForkJoin commonPool to execute tasks. On small machines this pool has a limited number of threads and the previous implementation used an infinite loop that would never break. This caused a starvation of resources which could cause tests to hang and due to the check never completing. The new implementation uses a single-threaded scheduled executor to prevent starvation and to properly setup/teardown resources. It also improves the test flakiness by ensuring the file watch service starts picking up changes to the config files first before moving on to later test cases.
1 parent 6f094ae commit db01390

6 files changed

Lines changed: 136 additions & 108 deletions

File tree

presto-router/src/main/java/com/facebook/presto/router/RouterUtil.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import java.io.IOException;
2424
import java.io.UncheckedIOException;
2525
import java.nio.file.Files;
26-
import java.nio.file.Paths;
26+
import java.nio.file.Path;
2727
import java.util.Optional;
2828

2929
import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
@@ -37,11 +37,11 @@ public final class RouterUtil
3737

3838
private RouterUtil() {}
3939

40-
public static Optional<RouterSpec> parseRouterConfig(RouterConfig config)
40+
public static Optional<RouterSpec> parseRouterConfig(Path configFile)
4141
{
4242
Optional<RouterSpec> routerSpec;
4343
try {
44-
routerSpec = Optional.of(CODEC.fromJson(Files.readAllBytes(Paths.get(config.getConfigFile()))));
44+
routerSpec = Optional.of(CODEC.fromJson(Files.readAllBytes(configFile)));
4545
}
4646
catch (IOException e) {
4747
throw new UncheckedIOException(e);

presto-router/src/main/java/com/facebook/presto/router/cluster/ClusterManager.java

Lines changed: 83 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
*/
1414
package com.facebook.presto.router.cluster;
1515

16-
import com.facebook.airlift.bootstrap.LifeCycleManager;
1716
import com.facebook.airlift.log.Logger;
1817
import com.facebook.presto.router.RouterConfig;
1918
import com.facebook.presto.router.scheduler.CustomSchedulerManager;
@@ -34,25 +33,25 @@
3433
import io.airlift.units.Duration;
3534
import org.weakref.jmx.Managed;
3635

37-
import javax.annotation.PostConstruct;
36+
import javax.annotation.PreDestroy;
3837
import javax.inject.Inject;
3938

40-
import java.io.File;
4139
import java.io.IOException;
4240
import java.net.URI;
41+
import java.nio.file.ClosedWatchServiceException;
4342
import java.nio.file.FileSystems;
4443
import java.nio.file.Path;
45-
import java.nio.file.StandardWatchEventKinds;
44+
import java.nio.file.Paths;
4645
import java.nio.file.WatchEvent;
4746
import java.nio.file.WatchKey;
4847
import java.nio.file.WatchService;
4948
import java.util.HashMap;
5049
import java.util.List;
5150
import java.util.Map;
5251
import java.util.Optional;
53-
import java.util.concurrent.CompletableFuture;
5452
import java.util.concurrent.ConcurrentHashMap;
5553
import java.util.concurrent.ScheduledExecutorService;
54+
import java.util.concurrent.ScheduledFuture;
5655
import java.util.concurrent.atomic.AtomicBoolean;
5756
import java.util.concurrent.atomic.AtomicReference;
5857
import java.util.stream.Collectors;
@@ -67,15 +66,19 @@
6766
import static com.google.common.base.Preconditions.checkArgument;
6867
import static com.google.common.collect.ImmutableList.toImmutableList;
6968
import static com.google.common.collect.ImmutableMap.toImmutableMap;
69+
import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
70+
import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
7071
import static java.util.Objects.requireNonNull;
7172
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
7273
import static java.util.concurrent.TimeUnit.MILLISECONDS;
74+
import static java.util.concurrent.TimeUnit.SECONDS;
7375

7476
public class ClusterManager
77+
implements AutoCloseable
7578
{
7679
private final AtomicReference<ClusterManagerConfig> currentConfig = new AtomicReference<>();
7780

78-
public final RouterConfig routerConfig;
81+
private final Path configFile;
7982
private final Logger log = Logger.get(ClusterManager.class);
8083

8184
// Cluster status
@@ -84,25 +87,72 @@ public class ClusterManager
8487

8588
private final AtomicBoolean isWatchServiceStarted = new AtomicBoolean();
8689
private final RemoteInfoFactory remoteInfoFactory;
87-
private final LifeCycleManager lifeCycleManager;
8890
private final HashMap<String, HashMap<URI, Integer>> serverWeights = new HashMap<>();
8991
private final CustomSchedulerManager schedulerManager;
92+
private final ScheduledExecutorService scheduledExecutorService;
93+
private final ScheduledFuture<?> configDetection;
94+
private final WatchService watchService;
95+
private final WatchKey watchKey;
9096

9197
@Inject
92-
public ClusterManager(RouterConfig config, RemoteInfoFactory remoteInfoFactory, LifeCycleManager lifeCycleManager,
93-
CustomSchedulerManager schedulerManager)
98+
public ClusterManager(RouterConfig config, RemoteInfoFactory remoteInfoFactory, CustomSchedulerManager schedulerManager)
99+
throws IOException
94100
{
95-
this.routerConfig = requireNonNull(config, "config is null");
101+
this.configFile = Paths.get(requireNonNull(config, "config is null").getConfigFile());
96102
this.remoteInfoFactory = requireNonNull(remoteInfoFactory, "remoteInfoFactory is null");
97-
this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifecycleManager is null");
98103
this.schedulerManager = schedulerManager;
99-
onConfigChangeDetection();
100-
this.initializeServerWeights();
104+
reloadConfig();
105+
initializeServerWeights();
106+
watchService = FileSystems.getDefault().newWatchService();
107+
Path parentDir = configFile.getParent();
108+
log.info("Router config watch service monitoring %s", parentDir);
109+
watchKey = parentDir.register(watchService,
110+
new WatchEvent.Kind[] {ENTRY_CREATE, ENTRY_MODIFY},
111+
SensitivityWatchEventModifier.HIGH);
112+
log.info("Successfully registered watch service for %s", parentDir);
113+
scheduledExecutorService = newSingleThreadScheduledExecutor();
114+
configDetection = scheduledExecutorService.scheduleAtFixedRate(this::monitorConfig, 0, 3, SECONDS);
101115
}
102116

103-
protected void onConfigChangeDetection()
117+
protected void monitorConfig()
104118
{
105-
RouterSpec newRouterSpec = parseRouterConfig(routerConfig)
119+
boolean reload = false;
120+
try {
121+
WatchKey key = watchService.poll(1, SECONDS);
122+
if (key == null) {
123+
return;
124+
}
125+
List<WatchEvent<?>> events = key.pollEvents();
126+
log.debug("Changes to router config directory detected");
127+
for (WatchEvent<?> event : events) {
128+
log.debug("Event detected: %s, path: %s", event.kind().name(), event.context());
129+
Path changed = (Path) event.context();
130+
if (changed.endsWith(configFile.getFileName())) {
131+
reload = true;
132+
break;
133+
}
134+
else {
135+
log.debug("Change to %s ignored by ClusterManager (config file is %s)", event.context(), configFile);
136+
}
137+
}
138+
key.reset();
139+
}
140+
catch (ClosedWatchServiceException e) {
141+
log.warn("Watch service closed. Future updates configuration changes will not be detected.");
142+
}
143+
catch (InterruptedException e) {
144+
Thread.currentThread().interrupt();
145+
log.warn("Watch service interrupted while waiting for configuration updates");
146+
}
147+
148+
if (reload) {
149+
reloadConfig();
150+
}
151+
}
152+
153+
protected void reloadConfig()
154+
{
155+
RouterSpec newRouterSpec = parseRouterConfig(configFile)
106156
.orElseThrow(() -> new PrestoException(CONFIGURATION_INVALID, "Failed to load router config"));
107157
Map<String, GroupSpec> newGroups = newRouterSpec.getGroups().stream().collect(toImmutableMap(GroupSpec::getName, group -> group));
108158
List<SelectorRuleSpec> newGroupSelectors = ImmutableList.copyOf(newRouterSpec.getSelectors());
@@ -132,54 +182,6 @@ protected void onConfigChangeDetection()
132182
currentConfig.set(new ClusterManagerConfig(newGroups, newGroupSelectors, newScheduler, newSchedulerType));
133183
}
134184

135-
@PostConstruct
136-
public void startConfigReloadTaskFileWatcher()
137-
{
138-
CompletableFuture.supplyAsync(() -> {
139-
try (WatchService watchService = FileSystems.getDefault().newWatchService()) {
140-
File routerConfigFile = new File(routerConfig.getConfigFile());
141-
log.info("Router config watch service monitoring %s", routerConfig.getConfigFile());
142-
Path parentDir = routerConfigFile.toPath().getParent();
143-
parentDir.register(
144-
watchService,
145-
new WatchEvent.Kind[] {
146-
StandardWatchEventKinds.ENTRY_MODIFY,
147-
StandardWatchEventKinds.ENTRY_CREATE,
148-
StandardWatchEventKinds.ENTRY_DELETE,
149-
StandardWatchEventKinds.OVERFLOW},
150-
SensitivityWatchEventModifier.HIGH);
151-
isWatchServiceStarted.set(true);
152-
log.info("Successfully registered watch service for %s", parentDir);
153-
154-
while (true) {
155-
WatchKey key = watchService.take();
156-
log.info("Changes to router config directory detected: %s", routerConfigFile);
157-
for (WatchEvent<?> event : key.pollEvents()) {
158-
log.debug("Event detected: %s, path: %s", event.kind().name(), event.context());
159-
Path changed = (Path) event.context();
160-
if (changed.endsWith(routerConfigFile.getName())) {
161-
try {
162-
onConfigChangeDetection();
163-
}
164-
catch (Exception e) {
165-
log.error("Exception in config reload");
166-
}
167-
}
168-
else {
169-
log.debug("Config change to %s ignored by ClusterManager (config file is %s)", event.context(), routerConfigFile.getName());
170-
}
171-
}
172-
key.reset();
173-
}
174-
}
175-
catch (IOException | InterruptedException e) {
176-
log.error("Exception in file watcher loop while monitoring %s, %s", routerConfig.getConfigFile(), e);
177-
lifeCycleManager.stop();
178-
throw new RuntimeException(e);
179-
}
180-
});
181-
}
182-
183185
public List<URI> getAllClusters()
184186
{
185187
return currentConfig.get().getGroups().values().stream()
@@ -199,9 +201,9 @@ public Optional<URI> getDestination(RequestInfo requestInfo)
199201
GroupSpec groupSpec = config.getGroups().get(target.get());
200202

201203
List<URI> healthyClusterURIs = groupSpec.getMembers().stream().filter((entry) ->
202-
Optional.ofNullable(remoteClusterInfos.get(entry))
203-
.map(RemoteClusterInfo::isHealthy)
204-
.orElse(false))
204+
Optional.ofNullable(remoteClusterInfos.get(entry))
205+
.map(RemoteClusterInfo::isHealthy)
206+
.orElse(false))
205207
.collect(Collectors.toList());
206208

207209
if (healthyClusterURIs.isEmpty()) {
@@ -285,6 +287,21 @@ public boolean getIsWatchServiceStarted()
285287
return isWatchServiceStarted.get();
286288
}
287289

290+
@PreDestroy
291+
@Override
292+
public void close()
293+
throws Exception
294+
{
295+
try {
296+
watchKey.cancel();
297+
watchService.close();
298+
}
299+
finally {
300+
configDetection.cancel(true);
301+
scheduledExecutorService.shutdownNow();
302+
}
303+
}
304+
288305
public static class ClusterStatusTracker
289306
{
290307
private final Logger log = Logger.get(ClusterStatusTracker.class);

presto-router/src/main/java/com/facebook/presto/router/cluster/RemoteState.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import javax.annotation.concurrent.ThreadSafe;
3232

3333
import java.net.URI;
34+
import java.nio.file.Paths;
3435
import java.util.Optional;
3536
import java.util.concurrent.Future;
3637
import java.util.concurrent.atomic.AtomicBoolean;
@@ -71,7 +72,7 @@ public RemoteState(HttpClient httpClient, URI remoteUri, RemoteStateConfig remot
7172
this.isHealthy = new AtomicBoolean(true);
7273
this.httpClient = requireNonNull(httpClient, "httpClient is null");
7374
this.remoteUri = requireNonNull(remoteUri, "remoteUri is null");
74-
RouterSpec routerSpec = parseRouterConfig(routerConfig)
75+
RouterSpec routerSpec = parseRouterConfig(Paths.get(routerConfig.getConfigFile()))
7576
.orElseThrow(() -> new PrestoException(CONFIGURATION_INVALID, "Failed to load router config"));
7677
this.routerUserCredentials = routerSpec.getUserCredentials();
7778
this.clusterUnhealthyTimeout = remoteStateConfig.getClusterUnhealthyTimeout();

presto-router/src/main/java/com/facebook/presto/router/predictor/PredictorManager.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import javax.inject.Inject;
2222

2323
import java.net.URI;
24+
import java.nio.file.Paths;
2425
import java.util.Optional;
2526
import java.util.concurrent.ExecutorService;
2627
import java.util.concurrent.Executors;
@@ -45,7 +46,7 @@ public class PredictorManager
4546
@Inject
4647
public PredictorManager(RemoteQueryFactory remoteQueryFactory, RouterConfig config)
4748
{
48-
RouterSpec routerSpec = parseRouterConfig(config)
49+
RouterSpec routerSpec = parseRouterConfig(Paths.get(config.getConfigFile()))
4950
.orElseThrow(() -> new PrestoException(CONFIGURATION_INVALID, "Failed to load router config"));
5051

5152
this.remoteQueryFactory = requireNonNull(remoteQueryFactory, "");

presto-router/src/test/java/com/facebook/presto/router/BarrierClusterManager.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,11 @@
1313
*/
1414
package com.facebook.presto.router;
1515

16-
import com.facebook.airlift.bootstrap.LifeCycleManager;
1716
import com.facebook.presto.router.cluster.ClusterManager;
1817
import com.facebook.presto.router.cluster.RemoteInfoFactory;
1918
import com.facebook.presto.router.scheduler.CustomSchedulerManager;
2019

20+
import java.io.IOException;
2121
import java.util.concurrent.BrokenBarrierException;
2222
import java.util.concurrent.CyclicBarrier;
2323
import java.util.concurrent.TimeUnit;
@@ -28,18 +28,18 @@ public class BarrierClusterManager
2828
{
2929
private final CyclicBarrier barrier;
3030

31-
public BarrierClusterManager(RouterConfig config, RemoteInfoFactory remoteInfoFactory, CyclicBarrier barrier, LifeCycleManager lifeCycleManager, CustomSchedulerManager schedulerManager)
31+
public BarrierClusterManager(RouterConfig config, RemoteInfoFactory remoteInfoFactory, CyclicBarrier barrier, CustomSchedulerManager schedulerManager)
32+
throws IOException
3233
{
33-
super(config, remoteInfoFactory, lifeCycleManager, schedulerManager);
34+
super(config, remoteInfoFactory, schedulerManager);
3435
this.barrier = barrier;
35-
startConfigReloadTaskFileWatcher();
3636
}
3737

3838
@Override
39-
protected void onConfigChangeDetection()
39+
protected void reloadConfig()
4040
{
4141
try {
42-
super.onConfigChangeDetection();
42+
super.reloadConfig();
4343
if (barrier != null) {
4444
barrier.await(5, TimeUnit.SECONDS);
4545
}

0 commit comments

Comments
 (0)