Skip to content

Commit 95fe7db

Browse files
committed
PR comments.
1 parent 2f1fa2c commit 95fe7db

File tree

5 files changed

+30
-40
lines changed

5 files changed

+30
-40
lines changed

src/csharp/Microsoft.Spark/Interop/Ipc/CallbackServer.cs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -91,15 +91,6 @@ internal int RegisterCallback(ICallbackHandler callbackHandler)
9191
return callbackId;
9292
}
9393

94-
/// <summary>
95-
/// Runs the callback server.
96-
/// </summary>
97-
internal void Run()
98-
{
99-
_listener = SocketFactory.CreateSocket();
100-
Run(_listener);
101-
}
102-
10394
/// <summary>
10495
/// Runs the callback server.
10596
/// </summary>
@@ -117,10 +108,11 @@ internal void Run(ISocketWrapper listener)
117108

118109
try
119110
{
120-
listener.Listen();
111+
_listener = listener;
112+
_listener.Listen();
121113

122114
// Communicate with the JVM the callback server's address and port.
123-
var localEndPoint = (IPEndPoint)listener.LocalEndPoint;
115+
var localEndPoint = (IPEndPoint)_listener.LocalEndPoint;
124116
_jvm.CallStaticJavaMethod(
125117
"DotnetHandler",
126118
"connectCallback",
@@ -130,7 +122,7 @@ internal void Run(ISocketWrapper listener)
130122
s_logger.LogInfo($"Started CallbackServer on {localEndPoint}");
131123

132124
// Start accepting connections from JVM.
133-
new Thread(() => StartServer(listener))
125+
new Thread(() => StartServer(_listener))
134126
{
135127
IsBackground = true
136128
}.Start();
@@ -142,6 +134,14 @@ internal void Run(ISocketWrapper listener)
142134
}
143135
}
144136

137+
/// <summary>
138+
/// Runs the callback server.
139+
/// </summary>
140+
private void Run()
141+
{
142+
Run(SocketFactory.CreateSocket());
143+
}
144+
145145
/// <summary>
146146
/// Starts listening to any connection from JVM.
147147
/// </summary>

src/scala/microsoft-spark-2.4.x/src/main/scala/org/apache/spark/api/dotnet/CallbackClient.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,7 @@ class CallbackClient(address: String, port: Int) extends Logging {
3131
getOrCreateConnection() match {
3232
case Some(connection) =>
3333
try {
34-
connection.send(callbackId, writeBody) match {
35-
case ConnectionStatus.SUCCESS =>
36-
addConnection(connection)
37-
case status =>
38-
throw new Exception(s"Error encountered with connection: '$status'")
39-
}
34+
connection.send(callbackId, writeBody)
4035
} catch {
4136
case e: Exception =>
4237
logError(s"Error calling callback [callback id = $callbackId].", e)

src/scala/microsoft-spark-2.4.x/src/main/scala/org/apache/spark/api/dotnet/CallbackConnection.scala

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ class CallbackConnection(address: String, port: Int) extends Logging {
2525

2626
def send(
2727
callbackId: Int,
28-
writeBody: DataOutputStream => Unit): ConnectionStatus.ConnectionStatus = {
28+
writeBody: DataOutputStream => Unit): Unit = {
2929
logInfo(s"Calling callback [callback id = $callbackId] ...")
3030

3131
try {
@@ -38,8 +38,7 @@ class CallbackConnection(address: String, port: Int) extends Logging {
3838
byteArrayOutputStream.writeTo(outputStream);
3939
} catch {
4040
case e: Exception => {
41-
logError("Error writing to stream.", e)
42-
return ConnectionStatus.ERROR_WRITE
41+
throw new Exception("Error writing to stream.", e)
4342
}
4443
}
4544

@@ -52,19 +51,16 @@ class CallbackConnection(address: String, port: Int) extends Logging {
5251
endOfStreamResponse match {
5352
case CallbackFlags.END_OF_STREAM =>
5453
logInfo(s"Received END_OF_STREAM signal. Calling callback [callback id = $callbackId] successful.")
55-
return ConnectionStatus.SUCCESS
5654
case _ => {
57-
logError(s"Error verifying end of stream. Expected: ${CallbackFlags.END_OF_STREAM}, " +
55+
throw new Exception(s"Error verifying end of stream. Expected: ${CallbackFlags.END_OF_STREAM}, " +
5856
s"Received: $endOfStreamResponse")
5957
}
6058
}
6159
} catch {
6260
case e: Exception => {
63-
logError("Error while verifying end of stream.", e)
61+
throw new Exception("Error while verifying end of stream.", e)
6462
}
6563
}
66-
67-
ConnectionStatus.ERROR_END_OF_STREAM
6864
}
6965

7066
def close(): Unit = {
@@ -113,9 +109,4 @@ class CallbackConnection(address: String, port: Int) extends Logging {
113109
val DOTNET_EXCEPTION_THROWN: Int = -3
114110
val END_OF_STREAM: Int = -4
115111
}
116-
}
117-
118-
object ConnectionStatus extends Enumeration {
119-
type ConnectionStatus = Value
120-
val SUCCESS, ERROR_WRITE, ERROR_END_OF_STREAM = Value
121-
}
112+
}

src/scala/microsoft-spark-2.4.x/src/main/scala/org/apache/spark/api/dotnet/DotnetBackend.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,10 +83,19 @@ class DotnetBackend extends Logging {
8383
}
8484
}
8585

86-
object DotnetBackend {
86+
object DotnetBackend extends Logging {
8787
@volatile private[spark] var callbackClient: CallbackClient = null
8888

89-
private[spark] def shutdownCallbackClient(): Unit = {
89+
private[spark] def setCallbackClient(address: String, port: Int) = synchronized {
90+
if (DotnetBackend.callbackClient == null) {
91+
logInfo(s"Connecting to a callback server at $address:$port")
92+
DotnetBackend.callbackClient = new CallbackClient(address, port)
93+
} else {
94+
throw new Exception("Callback client already set.")
95+
}
96+
}
97+
98+
private[spark] def shutdownCallbackClient(): Unit = synchronized {
9099
if (callbackClient != null) {
91100
callbackClient.shutdown()
92101
callbackClient = null

src/scala/microsoft-spark-2.4.x/src/main/scala/org/apache/spark/api/dotnet/DotnetBackendHandler.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,12 +70,7 @@ class DotnetBackendHandler(server: DotnetBackend)
7070
val address = readString(dis)
7171
assert(readObjectType(dis) == 'i')
7272
val port = readInt(dis)
73-
if (DotnetBackend.callbackClient == null) {
74-
logInfo(s"Connecting to a callback server at $address:$port")
75-
DotnetBackend.callbackClient = new CallbackClient(address, port)
76-
} else {
77-
throw new Exception("Callback client already set.")
78-
}
73+
DotnetBackend.setCallbackClient(address, port);
7974
writeInt(dos, 0)
8075
writeType(dos, "void")
8176
case "closeCallback" =>

0 commit comments

Comments
 (0)