Skip to content

Conversation

suhsteve
Copy link
Member

@suhsteve suhsteve commented Jun 17, 2020

This PR exposes the DataStreamWriter.ForeachBatch API

#208

Users can call this API by using an Action<DataFrame, long> delegate. For example

spark
    .ReadStream()
    .Schema("id INT")
    .Csv("/path/to/csv")
    .WriteStream()
    .ForeachBatch((df, id) => 
    {
        df.Write().Csv("/unique/write/path"));
    }

Logs produced on the dotnet side:

[2020-06-21T20:55:22.0427094Z] [Info] [CallbackServer] Starting CallbackServer.
[2020-06-21T20:55:22.0581917Z] [Info] [CallbackServer] Started CallbackServer on 127.0.0.1:61344
[2020-06-21T20:55:26.1706082Z] [Info] [CallbackConnection] [1] Connected with RemoteEndPoint: 127.0.0.1:61350
[2020-06-21T20:55:26.1754979Z] [Info] [CallbackServer] Pool snapshot: [NumThreads:1], [NumConnections:1]
[2020-06-21T20:55:26.1779739Z] [Info] [CallbackConnection] [1] Received request for callback id: 1, callback handler: Microsoft.Spark.Interop.Ipc.ForeachBatchCallbackHandler
[2020-06-21T20:55:28.7991929Z] [Debug] [CallbackConnection] [1] Received END_OF_STREAM signal.
[2020-06-21T20:55:29.9014111Z] [Info] [CallbackConnection] [1] Received request for callback id: 1, callback handler: Microsoft.Spark.Interop.Ipc.ForeachBatchCallbackHandler
[2020-06-21T20:55:30.6256825Z] [Debug] [CallbackConnection] [1] Received END_OF_STREAM signal.

Logs produced on the JVM side:

20/06/21 13:55:22 INFO DotnetBackendHandler: Connecting to a callback server at 127.0.0.1:61344
20/06/21 13:55:26 INFO CallbackConnection: Calling callback [callback id = 1] ...
20/06/21 13:55:26 INFO CallbackConnection: Signaling END_OF_STREAM.
20/06/21 13:55:28 INFO CallbackConnection: Received END_OF_STREAM signal. Calling callback [callback id = 1] successful.
20/06/21 13:55:29 INFO CallbackConnection: Calling callback [callback id = 1] ...
20/06/21 13:55:29 INFO CallbackConnection: Signaling END_OF_STREAM.
20/06/21 13:55:30 INFO CallbackConnection: Received END_OF_STREAM signal. Calling callback [callback id = 1] successful.
20/06/21 13:55:32 INFO DotnetBackendHandler: Requesting to close callback client
20/06/21 13:55:32 INFO CallbackClient: Shutting down.

@suhsteve
Copy link
Member Author

microsoft-spark-2.3.x and 3.0.x files will be added/updated after we finalize the review for 2.4.x

@@ -67,32 +66,19 @@ class DotnetBackendHandler(server: DotnetBackend)
writeInt(dos, -1)
}
case "connectCallback" =>
val t = readObjectType(dis)
var t = readObjectType(dis)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's make it simpler and allow only one call to "connectCallback"

@@ -86,13 +88,17 @@ public void TestForeachBatch()
.WriteStream()
.ForeachBatch((df, id) =>
{
df.Write().Csv(Path.Combine(dstTempDirectory.Path, id.ToString()));
Func<Column, Column> innerUdf = Udf<int, int>(i => i + 200);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't work until the bug re broadcast variables is fixed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why this is related to the broadcast variable bug?

Copy link
Member Author

@suhsteve suhsteve Jun 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s_jvmBroadcastVariables is [ThreadStatic] and is null when the ForeachBatchCallbackHandler is called (since it runs in a separate worker task.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh ok. it's in a critical path (CreatePythonFunction). :)

Copy link
Contributor

@imback82 imback82 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Early feedback (4 C# files remaining)

// Tests can subclass this to get Console output to display when using
// xUnit testing framework.
// Workaround found at https://github.com/microsoft/vstest/issues/799
public class MakeConsoleWork : IDisposable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you rename this class please?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed.

Copy link
Contributor

@imback82 imback82 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed all files now. Looking really good.

imback82
imback82 previously approved these changes Jun 23, 2020
Copy link
Contributor

@imback82 imback82 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks @suhsteve for working so hard on this!

@suhsteve
Copy link
Member Author

@imback82 thanks for reviewing! Let me add the relevant files to microsoft-spark-2.3.x and 3.0.x

@imback82 imback82 merged commit 29ad2cb into dotnet:master Jun 23, 2020
@suhsteve suhsteve deleted the foreachbatch branch September 6, 2020 05:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants