From 189d4fa0e4294161f7ed492ca79f17702fad9238 Mon Sep 17 00:00:00 2001 From: Jens Deppe Date: Wed, 11 Dec 2019 13:55:56 -0800 Subject: [PATCH] ProcessWrapper changes to launch locators and servers - Provide callback for process output consumption. - Provide callback to determine when a process is 'ready'. - Introduce `ClusterStarter` to launch and manage a cluster of Geode members (locator and servers). --- pom.xml | 1 + spring-data-geode/pom.xml | 14 + .../data/gemfire/process/ProcessExecutor.java | 25 +- .../process/ProcessInputStreamListener.java | 41 --- .../data/gemfire/process/ProcessWrapper.java | 15 +- .../gemfire/test/support/ClusterStarter.java | 316 ++++++++++++++++++ 6 files changed, 359 insertions(+), 53 deletions(-) delete mode 100644 spring-data-geode/src/test/java/org/springframework/data/gemfire/process/ProcessInputStreamListener.java create mode 100644 spring-data-geode/src/test/java/org/springframework/data/gemfire/test/support/ClusterStarter.java diff --git a/pom.xml b/pom.xml index 07c2449e6..00e113dab 100644 --- a/pom.xml +++ b/pom.xml @@ -54,6 +54,7 @@ 0.4 2.3.0.BUILD-SNAPSHOT 1.2.0.RELEASE + 3.1.2 diff --git a/spring-data-geode/pom.xml b/spring-data-geode/pom.xml index fdc7d56b3..e861967a9 100644 --- a/spring-data-geode/pom.xml +++ b/spring-data-geode/pom.xml @@ -242,6 +242,20 @@ test + + org.awaitility + awaitility + ${awaitility.version} + test + + + + com.sun.java + tools + ${java.version} + system + ${java.home}/../lib/tools.jar + diff --git a/spring-data-geode/src/test/java/org/springframework/data/gemfire/process/ProcessExecutor.java b/spring-data-geode/src/test/java/org/springframework/data/gemfire/process/ProcessExecutor.java index b5e5d92af..4051aa253 100644 --- a/spring-data-geode/src/test/java/org/springframework/data/gemfire/process/ProcessExecutor.java +++ b/spring-data-geode/src/test/java/org/springframework/data/gemfire/process/ProcessExecutor.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.function.Consumer; import java.util.stream.Collectors; import org.springframework.data.gemfire.test.support.FileSystemUtils; @@ -56,10 +57,21 @@ public static ProcessWrapper launch(Class type, String... args) throws IOExce } public static ProcessWrapper launch(File workingDirectory, Class type, String... args) throws IOException { - return launch(workingDirectory, JAVA_CLASSPATH, type, args); + return launch("FORK", workingDirectory, JAVA_CLASSPATH, type, args); } - public static ProcessWrapper launch(File workingDirectory, String classpath, Class type, String... args) + public static ProcessWrapper launch(File workingDirectory, String classpath, Class type, String... args) throws IOException { + return launch("FORK", workingDirectory, classpath, type, args); + } + + public static ProcessWrapper launch(String name, File workingDirectory, String classpath, + Class type, String... args) + throws IOException { + return launch(name, workingDirectory, classpath, type, x -> {}, y -> {}, args); + } + + public static ProcessWrapper launch(String name, File workingDirectory, String classpath, + Class type, Consumer logConsumer, Consumer waitFunction, String... args) throws IOException { ProcessBuilder processBuilder = new ProcessBuilder() @@ -71,7 +83,10 @@ public static ProcessWrapper launch(File workingDirectory, String classpath, Cla ProcessWrapper processWrapper = new ProcessWrapper(process, ProcessConfiguration.create(processBuilder)); - //processWrapper.register((input) -> System.err.printf("[FORK] - %s%n", input)); + processWrapper.register(logConsumer); + processWrapper.init(); + + waitFunction.accept(processWrapper); return processWrapper; } @@ -115,7 +130,9 @@ protected static Collection getSpringGemFireSystemProperties() } protected static boolean isJvmOption(String option) { - return StringUtils.hasText(option) && (option.startsWith("-D") || option.startsWith("-X")); + return StringUtils.hasText(option) && (option.startsWith("-D") + || option.startsWith("-X") + || option.startsWith("-agent")); } protected static File validateDirectory(File workingDirectory) { diff --git a/spring-data-geode/src/test/java/org/springframework/data/gemfire/process/ProcessInputStreamListener.java b/spring-data-geode/src/test/java/org/springframework/data/gemfire/process/ProcessInputStreamListener.java deleted file mode 100644 index a9476017d..000000000 --- a/spring-data-geode/src/test/java/org/springframework/data/gemfire/process/ProcessInputStreamListener.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright 2010-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.data.gemfire.process; - -import java.util.EventListener; - -/** - * The {@link ProcessInputStreamListener} is a callback interface that gets called when input arrives from either a - * {@link Process process's} standard output steam or standard error stream. - * - * @author John Blum - * @see java.util.EventListener - * @since 1.5.0 - */ -public interface ProcessInputStreamListener extends EventListener { - - /** - * Callback method that gets called when the {@link Process} sends output from either its standard out - * or standard error streams. - * - * @param input {@link String} containing output from the {@link Process} that this listener is listening to. - * @see java.lang.Process#getErrorStream() - * @see java.lang.Process#getInputStream() - */ - void onInput(String input); - -} diff --git a/spring-data-geode/src/test/java/org/springframework/data/gemfire/process/ProcessWrapper.java b/spring-data-geode/src/test/java/org/springframework/data/gemfire/process/ProcessWrapper.java index 42bf8566a..2bf4e54c0 100644 --- a/spring-data-geode/src/test/java/org/springframework/data/gemfire/process/ProcessWrapper.java +++ b/spring-data-geode/src/test/java/org/springframework/data/gemfire/process/ProcessWrapper.java @@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import java.util.logging.Level; import java.util.logging.Logger; @@ -60,7 +61,7 @@ public class ProcessWrapper { protected static final long DEFAULT_WAIT_TIME_MILLISECONDS = TimeUnit.SECONDS.toMillis(5); - private final List listeners = new CopyOnWriteArrayList<>(); + private final List> listeners = new CopyOnWriteArrayList<>(); protected final Logger log = Logger.getLogger(getClass().getName()); @@ -77,11 +78,9 @@ public ProcessWrapper(Process process, ProcessConfiguration processConfiguration this.process = process; this.processConfiguration = processConfiguration; - - init(); } - private void init() { + public void init() { newThread("Process OUT Stream Reader Thread", newProcessInputStreamReaderRunnable(process.getInputStream())).start(); @@ -101,8 +100,8 @@ protected Runnable newProcessInputStreamReaderRunnable(InputStream in) { try { for (String input = inputReader.readLine(); input != null; input = inputReader.readLine()) { - for (ProcessInputStreamListener listener : listeners) { - listener.onInput(input); + for (Consumer listener : listeners) { + listener.accept(input); } } } @@ -213,7 +212,7 @@ public String readLogFile(File log) throws IOException { return FileUtils.read(log); } - public boolean register(ProcessInputStreamListener listener) { + public boolean register(Consumer listener) { return (listener != null && listeners.add(listener)); } @@ -323,7 +322,7 @@ public int shutdown() { return stop(); } - public boolean unregister(ProcessInputStreamListener listener) { + public boolean unregister(Consumer listener) { return listeners.remove(listener); } diff --git a/spring-data-geode/src/test/java/org/springframework/data/gemfire/test/support/ClusterStarter.java b/spring-data-geode/src/test/java/org/springframework/data/gemfire/test/support/ClusterStarter.java new file mode 100644 index 000000000..8c9a43417 --- /dev/null +++ b/spring-data-geode/src/test/java/org/springframework/data/gemfire/test/support/ClusterStarter.java @@ -0,0 +1,316 @@ +/* + * Copyright 2017-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.data.gemfire.test.support; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import javax.management.JMX; +import javax.management.MBeanServerConnection; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import javax.management.remote.JMXConnector; +import javax.management.remote.JMXConnectorFactory; +import javax.management.remote.JMXServiceURL; + +import com.sun.tools.attach.AttachNotSupportedException; +import com.sun.tools.attach.VirtualMachine; +import com.sun.tools.attach.VirtualMachineDescriptor; +import org.awaitility.Awaitility; +import org.junit.rules.TemporaryFolder; +import org.springframework.data.gemfire.process.ProcessExecutor; +import org.springframework.data.gemfire.process.ProcessWrapper; + +import org.apache.geode.distributed.LocatorLauncher; +import org.apache.geode.distributed.ServerLauncher; +import org.apache.geode.management.LocatorMXBean; +import org.apache.geode.management.MemberMXBean; + +/** + * Utility class to launch and manage a static Geode cluster as a unit within tests. For example: + *
+ *   private static ClusterStarter cluster = new ClusterStarter();
+ *  {@literal @}BeforeClass
+ *   public static void setup() throws Exception {
+ *     cluster.withLocator("locator")
+ *         .withLogging()
+ *         .withDebugging(5005, false)
+ *         .withArgs("-port", "0");
+ *     cluster.withServer("server-1")
+ *         .withLogging()
+ *         .withLocatorPort();
+ *
+ *     cluster.launch();
+ *   }
+ *
+ *  {@literal @}AfterClass
+ *   public static void teardown() throws Exception {
+ *     cluster.shutdown();
+ *   }
+ * 
+ * + * The cluster assumes an optional, single locator and any number of servers. If launched, the + * locator's port can be retrieved with {@link #getLocatorPort()}. This allows for the locator to + * be launched with an ephemeral port. + * + * @author Jens Deppe + */ +public class ClusterStarter { + + private Map trackedProcesses = new LinkedHashMap<>(); + private Map memberBuilders = new LinkedHashMap<>(); + private TemporaryFolder tmpDir = new TemporaryFolder(); + private int locatorPort; + + public void launch() throws Exception { + tmpDir.create(); + for (Map.Entry builder : memberBuilders.entrySet()) { + MemberBuilder b = builder.getValue(); + File memberWorkingDir = tmpDir.newFolder(builder.getKey()); + + for (BiConsumer c : b.getDeferredConsumers()) { + c.accept(b, this); + } + + ProcessWrapper process = ProcessExecutor + .launch(builder.getKey(), memberWorkingDir, b.getClasspath(), b.getMainClass(), + b.getLogConsumer(), b.getProcessStartupWaiter(), b.args.toArray(new String[]{})); + + trackedProcesses.put(builder.getKey(), process); + } + } + + public void shutdown() { + List reversed = new ArrayList<>(trackedProcesses.values()); + Collections.reverse(reversed); + + for (ProcessWrapper process : reversed) { + process.stop(); + } + + tmpDir.delete(); + } + + public int getLocatorPort() { + return locatorPort; + } + + public MemberBuilder withLocator(String name) { + MemberBuilder builder = new MemberBuilder(name, LocatorLauncher.class); + builder.withArgs("start", name) + .withArgs("-Dgemfire.jmx-manager-start=true"); + builder.withProcessStartupWaiter(getLocatorStartupWaiter(name)); + memberBuilders.put(name, builder); + + return builder; + } + + public MemberBuilder withServer(String name) { + MemberBuilder builder = new MemberBuilder(name, ServerLauncher.class); + builder.withArgs("start", name) + .withArgs("-Dgemfire.http-service-port=0"); + builder.withProcessStartupWaiter(getServerStartupWaiter(name)); + memberBuilders.put(name, builder); + + return builder; + } + + private Consumer getLocatorStartupWaiter(String name) { + return (wrapper) -> { + try { + int pid = getMemberPid(wrapper); + MBeanServerConnection connection = getMBeanServerConnection(pid); + waitForMemberToBeOnline(name, connection); + + ObjectName locatorObjectName = new ObjectName(String.format("GemFire:service=Locator,type=Member,member=%s", name)); + LocatorMXBean member = + JMX.newMXBeanProxy(connection, locatorObjectName, LocatorMXBean.class); + locatorPort = member.getPort(); + } catch (IOException | AttachNotSupportedException | MalformedObjectNameException ex) { + throw new RuntimeException(ex); + } + }; + + } + + private Consumer getServerStartupWaiter(String name) { + return (wrapper) -> { + try { + int pid = getMemberPid(wrapper); + MBeanServerConnection connection = getMBeanServerConnection(pid); + waitForMemberToBeOnline(name, connection); + } catch (IOException | AttachNotSupportedException | MalformedObjectNameException ex) { + throw new RuntimeException(ex); + } + }; + } + + private void waitForMemberToBeOnline(String name, MBeanServerConnection connection) throws + MalformedObjectNameException { + ObjectName memberObjectName = + new ObjectName(String.format("GemFire:type=Member,member=%s", name)); + MemberMXBean member = + JMX.newMXBeanProxy(connection, memberObjectName, MemberMXBean.class); + + // Wait until the bean is available + Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions() + .until(() -> member.status() != null); + + // Wait until the status is 'online' + Awaitility.await().ignoreExceptions().atMost(30, TimeUnit.SECONDS).ignoreExceptions() + .until(() -> member.status().contains("\"status\":\"online\"")); + } + + private int getMemberPid(ProcessWrapper wrapper) { + Awaitility.await().atMost(5, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(wrapper::getPid); + return wrapper.getPid(); + } + + private MBeanServerConnection getMBeanServerConnection(int pid) throws IOException, AttachNotSupportedException { + List vms = VirtualMachine.list(); + for (VirtualMachineDescriptor desc : vms) { + VirtualMachine vm = VirtualMachine.attach(desc); + + if (!vm.id().equals(Integer.toString(pid))) { + continue; + } + + Properties props = vm.getAgentProperties(); + String connectorAddress = + props.getProperty("com.sun.management.jmxremote.localConnectorAddress"); + + if (connectorAddress == null) { + throw new RuntimeException("com.sun.management.jmxremote.localConnectorAddress property not available. Process must be started with -Dcom.sun.management.jmxremote"); + } + + JMXServiceURL url = new JMXServiceURL(connectorAddress); + JMXConnector connector = JMXConnectorFactory.connect(url); + + return connector.getMBeanServerConnection(); + } + + throw new RuntimeException("Unable to create JMX connection to pid " + pid); + } + + public static class MemberBuilder { + private List args = new ArrayList<>(); + private String marker; + private Class mainClass; + private List javaClasspath; + private Consumer logConsumer; + private Consumer processStartupWaiter; + private List> deferredConsumers = new ArrayList<>(); + + public MemberBuilder(String marker, Class mainClass) { + this.marker = marker; + this.mainClass = mainClass; + this.logConsumer = x -> {}; + this.processStartupWaiter = x -> {}; + + javaClasspath = Arrays.stream(System.getProperty("java.class.path") + .split(File.pathSeparator)).collect( Collectors.toList()); + args.add("-Dcom.sun.management.jmxremote"); + } + + public MemberBuilder reduceClasspathTo(String... includes) { + List includesList = Arrays.stream(includes).collect(Collectors.toList()); + javaClasspath = javaClasspath.stream() + .filter(x -> includesList.stream().anyMatch(e -> x.contains(e))) + .collect(Collectors.toList()); + + return this; + } + + public MemberBuilder reduceClasspathBy(String... excludes) { + List excludesList = Arrays.stream(excludes).collect(Collectors.toList()); + javaClasspath = javaClasspath.stream() + .filter(x -> excludesList.stream().noneMatch(e -> x.contains(e))) + .collect(Collectors.toList()); + + return this; + } + + private Class getMainClass() { + return mainClass; + } + + private String getClasspath() { + return String.join(File.pathSeparator, javaClasspath); + } + + private Consumer getLogConsumer() { + return logConsumer; + } + + private List> getDeferredConsumers() { + return deferredConsumers; + } + + public MemberBuilder withLocatorPort() { + return withDeferredConsumer((x, y) -> { + x.withArgs(String.format("-Dgemfire.locators=localhost[%d]", y.getLocatorPort())); + }); + } + + private MemberBuilder withDeferredConsumer(BiConsumer consumer) { + deferredConsumers.add(consumer); + return this; + } + + public MemberBuilder withLogging() { + withArgs("-Dlogback.log.level=INFO"); + logConsumer = input -> System.out.printf("[%s] - %s%n", marker, input); + return this; + } + + public MemberBuilder withDebugging(int port, boolean suspended) { + args.add(String.format("-agentlib:jdwp=transport=dt_socket,server=y,suspend=%s,address=%d", + (suspended ? "y" : "n"), port)); + return this; + } + + public MemberBuilder withProcessStartupWaiter(Consumer waiter) { + processStartupWaiter = waiter; + return this; + } + + private Consumer getProcessStartupWaiter() { + return processStartupWaiter; + } + + public MemberBuilder withArgs(String... args) { + this.args.addAll(Arrays.stream(args).collect(Collectors.toList())); + return this; + } + + private List getArgs() { + return args; + } + } + +}