forked from bazel-contrib/rules_scala
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathWorker.java
More file actions
174 lines (151 loc) · 5.39 KB
/
Worker.java
File metadata and controls
174 lines (151 loc) · 5.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
package io.bazel.rulesscala.worker;
import com.google.devtools.build.lib.worker.WorkerProtocol;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
/**
* A base for JVM workers.
*
* <p>This supports regular workers as well as persisent workers. It does not (yet) support
* multiplexed workers.
*
* <p>Worker implementations should implement the `Worker.Interface` interface and provide a main
* method that calls `Worker.workerMain`.
*/
public final class Worker {
public static interface Interface {
public void work(String[] args) throws Exception;
public abstract class WorkerException extends RuntimeException {
public WorkerException(String message) {
super(message);
}
public WorkerException(String message, Throwable cause) {
super(message, cause);
}
}
}
/**
* The entry point for all workers.
*
* <p>This should be the only thing called by a main method in a worker process.
*/
public static void workerMain(String workerArgs[], Interface workerInterface) throws Exception {
if (workerArgs.length > 0 && workerArgs[0].equals("--persistent_worker")) {
persistentWorkerMain(workerInterface);
} else {
ephemeralWorkerMain(workerArgs, workerInterface);
}
}
/** The main loop for persistent worker processes */
private static void persistentWorkerMain(Interface workerInterface) {
InputStream stdin = System.in;
PrintStream stdout = System.out;
PrintStream stderr = System.err;
ByteArrayOutputStream outStream = new SmartByteArrayOutputStream();
PrintStream out = new PrintStream(outStream);
// We can't support stdin, so assign it to read from an empty buffer
System.setIn(new ByteArrayInputStream(new byte[0]));
System.setOut(out);
System.setErr(out);
try {
while (true) {
try {
WorkerProtocol.WorkRequest request = WorkerProtocol.WorkRequest.parseDelimitedFrom(stdin);
// The request will be null if stdin is closed. We're
// not sure if this happens in TheRealWorld™ but it is
// useful for testing (to shut down a persistent
// worker process).
if (request == null) {
break;
}
int code = 0;
try {
String[] workerArgs = stringListToArray(request.getArgumentsList());
String[] args = expandArgsIfArgsfile(workerArgs);
workerInterface.work(args);
} catch (Exception e) {
if (e instanceof Interface.WorkerException) System.err.println(e.getMessage());
else e.printStackTrace();
code = 1;
}
WorkerProtocol.WorkResponse.newBuilder()
.setExitCode(code)
.setOutput(outStream.toString())
.build()
.writeDelimitedTo(stdout);
} catch (IOException e) {
// for now we swallow IOExceptions when
// reading/writing proto
} finally {
out.flush();
outStream.reset();
System.gc();
}
}
} finally {
System.setIn(stdin);
System.setOut(stdout);
System.setErr(stderr);
}
}
/** The single pass runner for ephemeral (non-persistent) worker processes */
private static void ephemeralWorkerMain(String workerArgs[], Interface workerInterface)
throws Exception {
String[] args = expandArgsIfArgsfile(workerArgs);
workerInterface.work(args);
}
private static String[] expandArgsIfArgsfile(String[] allArgs) throws IOException {
if (allArgs.length == 1 && allArgs[0].startsWith("@")) {
return stringListToArray(
Files.readAllLines(
Paths.get(allArgs[0].substring(1)),
StandardCharsets.UTF_8)
);
} else {
return allArgs;
}
}
/**
* A ByteArrayOutputStream that sometimes shrinks its internal buffer during calls to `reset`.
*
* <p>In contrast, a regular ByteArrayOutputStream will only ever grow its internal buffer.
*
* <p>For an example of subclassing a ByteArrayOutputStream, see Spring's
* ResizableByteArrayOutputStream:
* https://github.com/spring-projects/spring-framework/blob/master/spring-core/src/main/java/org/springframework/util/ResizableByteArrayOutputStream.java
*/
static class SmartByteArrayOutputStream extends ByteArrayOutputStream {
// ByteArrayOutputStream's defualt Size is 32, which is extremely small
// to capture stdout from any worker process. We choose a larger default.
private static final int DEFAULT_SIZE = 256;
public SmartByteArrayOutputStream() {
super(DEFAULT_SIZE);
}
public boolean isOversized() {
return this.buf.length > DEFAULT_SIZE;
}
@Override
public void reset() {
super.reset();
// reallocate our internal buffer if we've gone over our
// desired idle size
if (this.isOversized()) {
this.buf = new byte[DEFAULT_SIZE];
}
}
}
private static String[] stringListToArray(List<String> argList) {
int numArgs = argList.size();
String[] args = new String[numArgs];
for (int i = 0; i < numArgs; i++) {
args[i] = argList.get(i);
}
return args;
}
}