File tree Expand file tree Collapse file tree 2 files changed +4
-18
lines changed
core/src/main/scala/org/apache/spark/api/python Expand file tree Collapse file tree 2 files changed +4
-18
lines changed Original file line number Diff line number Diff line change @@ -145,24 +145,11 @@ private[spark] class PythonRDD(
145
145
stream.readFully(update)
146
146
accumulator += Collections .singletonList(update)
147
147
}
148
-
149
148
// Check whether the worker is ready to be re-used.
150
- if (reuse_worker) {
151
- // It has a high possibility that the ending mark is already available,
152
- // And current task should not be blocked by checking it
153
-
154
- if (stream.available() >= 4 ) {
155
- val ending = stream.readInt()
156
- if (ending == SpecialLengths .END_OF_STREAM ) {
157
- env.releasePythonWorker(pythonExec, envVars.toMap, worker)
158
- released = true
159
- logInfo(s " Communication with worker ended cleanly, re-use it: $worker" )
160
- } else {
161
- logInfo(s " Communication with worker did not end cleanly " +
162
- s " (ending with $ending), close it: $worker" )
163
- }
164
- } else {
165
- logInfo(s " The ending mark from worker is not available, close it: $worker" )
149
+ if (stream.readInt() == SpecialLengths .END_OF_STREAM ) {
150
+ if (reuse_worker) {
151
+ env.releasePythonWorker(pythonExec, envVars.toMap, worker)
152
+ released = true
166
153
}
167
154
}
168
155
null
Original file line number Diff line number Diff line change @@ -127,7 +127,6 @@ def process():
127
127
write_int (len (_accumulatorRegistry ), outfile )
128
128
for (aid , accum ) in _accumulatorRegistry .items ():
129
129
pickleSer ._write_with_length ((aid , accum ._value ), outfile )
130
- outfile .flush ()
131
130
132
131
# check end of stream
133
132
if read_int (infile ) == SpecialLengths .END_OF_STREAM :
You can’t perform that action at this time.
0 commit comments