Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
import org.jenkinsci.plugins.workflow.steps.StepContext;
import org.jenkinsci.plugins.workflow.steps.StepExecution;
import org.jenkinsci.plugins.workflow.support.concurrent.Futures;
import org.jenkinsci.plugins.workflow.support.concurrent.Timeout;
import org.jenkinsci.plugins.workflow.support.concurrent.WithThreadName;
import org.jenkinsci.plugins.workflow.support.pickles.serialization.PickleResolver;
import org.jenkinsci.plugins.workflow.support.pickles.serialization.RiverReader;
Expand All @@ -81,6 +80,7 @@

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand All @@ -89,9 +89,11 @@
import java.util.NavigableMap;
import java.util.Stack;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -1670,54 +1672,84 @@
@Restricted(DoNotUse.class)
@Terminator(attains = FlowExecutionList.EXECUTIONS_SUSPENDED)
public static void suspendAll() {
try (Timeout t = Timeout.limit(3, TimeUnit.MINUTES)) { // TODO some complicated sequence of calls to Futures could allow all of them to run in parallel
LOGGER.fine("starting to suspend all executions");
for (FlowExecution execution : FlowExecutionList.get()) {
if (execution instanceof CpsFlowExecution cpsExec) {
try {
var nonresumable = cpsExec.checkAndAbortNonresumableBuild();
LOGGER.fine("looking for executions to suspend");
var pool = new ForkJoinPool();
try {
var tasks = new ArrayList<Callable<Boolean>>();
for (var fe : FlowExecutionList.get()) {
if (fe instanceof CpsFlowExecution cpsExec) {
LOGGER.finer(() -> "will request suspension of " + cpsExec);
tasks.add(cpsExec::suspend);
}
}
LOGGER.fine(() -> "waiting for " + tasks.size() + " suspensions");
// TODO 2.516.x getDuration

Check warning on line 1686 in plugin/src/main/java/org/jenkinsci/plugins/workflow/cps/CpsFlowExecution.java

View check run for this annotation

ci.jenkins.io / Open Tasks Scanner

TODO

NORMAL: 2.516.x getDuration
var timeout = Duration.ofSeconds(SystemProperties.getLong(CpsFlowExecution.class.getName() + ".suspendTimeout", 10L));
var futures = pool.invokeAll(tasks, timeout.toMillis(), TimeUnit.MILLISECONDS);
int failed = tasks.size();
for (var future : futures) {
if (future.isDone() && !future.isCancelled() && future.get()) {
failed--;
}
}
if (failed == 0) {
LOGGER.fine("finished suspending all executions");
} else {
LOGGER.warning(failed + "/" + tasks.size() + " builds did not finish suspending within " + timeout.toSeconds() + " seconds");
}
} catch (Exception x) {
LOGGER.log(Level.WARNING, "Unexpected error suspending pipeline builds; some may not be in a consistent state on resume", x);
} finally {
pool.shutdown();
}
}

var programPromise = cpsExec.programPromise;
// Like waitForSuspension but with a timeout:
if (programPromise != null && programPromise.isDone()) {
LOGGER.fine(() -> "waiting to suspend " + execution);
try {
var program = programPromise.get();
var f = nonresumable ? program.scheduleRun() : program.terminating();
f.get(1, TimeUnit.MINUTES);
LOGGER.log(Level.FINER, " Pipeline went to sleep OK: "+execution);
} catch (InterruptedException | TimeoutException ex) {
LOGGER.log(Level.WARNING, "Error waiting for Pipeline to suspend: " + cpsExec, ex);
}
} else {
LOGGER.fine(() -> "not trying to suspend " + execution);
}
cpsExec.checkpoint(true);
if (programPromise != null) {
cpsExec.runInCpsVmThread(new FutureCallback<>() {
@Override public void onSuccess(CpsThreadGroup g) {
LOGGER.fine(() -> "shutting down CPS VM for " + cpsExec);
g.shutdown();
}
@Override public void onFailure(Throwable t) {
LOGGER.log(Level.WARNING, null, t);
}
});
}
cpsExec.saveOwner();
if (cpsExec.owner != null) {
cpsExec.owner.getListener().getLogger().close();
}
} catch (Exception ex) {
LOGGER.log(Level.WARNING, "Error persisting Pipeline execution at shutdown: " + cpsExec.owner, ex);
}
private boolean suspend() {
var steps = new AtomicInteger();
try {
var nonresumable = checkAndAbortNonresumableBuild();
// Like waitForSuspension but with a timeout:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(ignore WS)

if (programPromise != null && programPromise.isDone()) {
LOGGER.fine(() -> "waiting to suspend " + this);
try {
var program = programPromise.get();
var f = nonresumable ? program.scheduleRun() : program.terminating();
f.get(1, TimeUnit.MINUTES);
LOGGER.finer(() -> " Pipeline went to sleep OK: " + this);
steps.incrementAndGet();
} catch (InterruptedException | TimeoutException ex) {
LOGGER.warning(() -> "Timed out waiting for Pipeline to suspend: " + this);
LOGGER.log(Level.FINER, null, ex);
// if helpful, could log CPS VM thread dump (though this itself could be expensive)
}
} else {
LOGGER.fine(() -> "not trying to suspend " + this);
}
LOGGER.fine("finished suspending all executions");
checkpoint(true);
if (programPromise != null) {
runInCpsVmThread(new FutureCallback<>() {
@Override public void onSuccess(CpsThreadGroup g) {
LOGGER.fine(() -> "shutting down CPS VM for " + CpsFlowExecution.this);
g.shutdown();
steps.incrementAndGet();
}
@Override public void onFailure(Throwable t) {
LOGGER.log(t instanceof RejectedExecutionException ? Level.FINE : Level.WARNING, "failed to shut down " + CpsFlowExecution.this, t);
}
});
}
saveOwner();
if (owner != null && !isComplete()) {
owner.getListener().getLogger().close();
steps.incrementAndGet();
}
} catch (Exception ex) {
LOGGER.log(Level.WARNING, "Failed to persist build at shutdown: " + this, ex);
}
return steps.get() == 3; // suspended && shut down && closed
}

// TODO: write a custom XStream Converter so that while we are writing CpsFlowExecution, it holds that lock

Check warning on line 1752 in plugin/src/main/java/org/jenkinsci/plugins/workflow/cps/CpsFlowExecution.java

View check run for this annotation

ci.jenkins.io / Open Tasks Scanner

TODO

NORMAL: write a custom XStream Converter so that while we are writing CpsFlowExecution, it holds that lock
// the execution in Groovy CPS should hold that lock (or worse, hold that lock in the runNextChunk method)
// so that the execution gets suspended while we are getting serialized

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import java.util.logging.Logger;

import edu.umd.cs.findbugs.annotations.CheckForNull;
import org.jenkinsci.plugins.workflow.flow.FlowExecutionOwner;
import org.jenkinsci.plugins.workflow.pickles.Pickle;
import org.jenkinsci.plugins.workflow.pickles.PickleFactory;
import org.jenkinsci.plugins.workflow.support.concurrent.WithThreadName;
Expand Down Expand Up @@ -332,7 +333,12 @@ public void run() {
if (terminating) {
if (execution != null) {
try {
execution.getOwner().getListener().getLogger().println("Pausing (shutting down)");
var feo = execution.getOwner();
if (feo.get().isComplete()) {
LOGGER.warning(() -> "too late to pause " + feo);
} else {
feo.getListener().getLogger().println("Pausing (shutting down)");
}
} catch (IOException x) {
LOGGER.log(Level.WARNING, null, x);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import static org.hamcrest.Matchers.containsInRelativeOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import org.htmlunit.ElementNotFoundException;
import org.htmlunit.FailingHttpStatusCodeException;
Expand Down Expand Up @@ -969,6 +970,23 @@ public boolean takesImplicitBlockArgument() {
});
}

@Test public void slowSuspension() throws Throwable {
logger.record(CpsFlowExecution.class, Level.FINE).capture(100);
sessions.then(r -> {
var p = r.createProject(WorkflowJob.class, "p");
// Odd-numbered builds will not suspend properly (if still running when Jenkins shuts down):
p.setDefinition(new CpsFlowDefinition("echo 'sleeping now'; def n = BUILD_NUMBER as int; if (n % 2 == 0) {sleep n} else {Thread.sleep(n * 1000)}", false));
for (int i = 1; i <= 20; i++) {
var b = p.scheduleBuild2(0).waitForStart();
assertThat(b.getNumber(), is(i));
r.waitForMessage("sleeping now", b);
}
Thread.sleep(3_000); // allow earlier builds to complete
});
// normally would be 8/16 builds, but could be subject to timing conditions
assertThat(logger, LoggerRule.recorded(Level.WARNING, containsString("builds did not finish suspending")));
Comment on lines +986 to +987
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently

18.694 [id=57]	WARNING	o.j.p.w.cps.CpsFlowExecution#suspendAll: 8/16 builds did not finish suspending within 10 seconds

}

@Test public void buildXmlSaved() throws Throwable {
logger.record(CpsFlowExecution.class, Level.FINE);
sessions.then(r -> {
Expand Down
Loading