Skip to content

Commit 7977c2f

Browse files
author
Davies Liu
committed
do retry on client side
1 parent b838f35 commit 7977c2f

File tree

2 files changed

+15
-12
lines changed

2 files changed

+15
-12
lines changed

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -613,16 +613,7 @@ private[spark] object PythonRDD extends Logging {
613613
setDaemon(true)
614614
override def run() {
615615
try {
616-
var sock: Socket = null
617-
try {
618-
sock = serverSocket.accept()
619-
} catch {
620-
case e: SocketTimeoutException =>
621-
// there is a small chance that the client had connected, so retry
622-
logWarning("Timed out after 4 seconds, retry once")
623-
serverSocket.setSoTimeout(10)
624-
sock = serverSocket.accept()
625-
}
616+
val sock = serverSocket.accept()
626617
val out = new DataOutputStream(new BufferedOutputStream(sock.getOutputStream))
627618
try {
628619
writeIteratorToStream(items, out)

python/pyspark/rdd.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,11 +113,23 @@ def _parse_memory(s):
113113

114114
def _load_from_socket(port, serializer):
115115
sock = socket.socket()
116-
sock.settimeout(5)
116+
sock.settimeout(1)
117117
try:
118118
sock.connect(("localhost", port))
119119
rf = sock.makefile("rb", 65536)
120-
for item in serializer.load_stream(rf):
120+
iter = serializer.load_stream(rf)
121+
try:
122+
yield next(iter)
123+
except socket.timeout as e:
124+
# the connection is not acknowledged by JVM, retry
125+
# server will be closed after 3 seconds, then it will be refused
126+
for v in _load_from_socket(port, serializer):
127+
yield v
128+
return
129+
130+
# increase the timeout, because the server side may be slowed down by GC
131+
sock.settimeout(10)
132+
for item in iter:
121133
yield item
122134
finally:
123135
sock.close()

0 commit comments

Comments
 (0)