Skip to content

Conversation

elvaliuliuliu
Copy link
Contributor

@elvaliuliuliu elvaliuliuliu commented Jun 5, 2020

This PR will add support to ArrayType which will fix part of #26 as follows:

  1. Enable udf take in and return ArrayType by casting unpickled objects to the appropriate type. Support scenarios like simple array, array of arrays and array of Rows.
  2. Convert internal SQL objects to c# objects when it's ArrayType.

@laneser
Copy link
Contributor

laneser commented Jun 25, 2020

This PR is good!
Because we have lots of parquet data which contains array type, and cannot read out by dotnet spark.
And this PR did the right job.

And, I saw the same check failed error that I cannot solve:

Test run for D:\a\1\s\master\artifacts\bin\Microsoft.Spark.Extensions.Delta.E2ETest\Release\netcoreapp3.1\Microsoft.Spark.Extensions.Delta.E2ETest.dll(.NETCoreApp,Version=v3.1)
Microsoft (R) Test Execution Command Line Tool Version 16.6.0
Copyright (c) Microsoft Corporation.  All rights reserved.

Starting test execution, please wait...

A total of 1 test files matched the specified pattern.
[2020-06-05T09:17:10.1955680Z] [fv-az352] [Info] [ConfigurationService] 'DOTNETBACKEND_PORT' environment variable is not set.
[2020-06-05T09:17:10.1970219Z] [fv-az352] [Info] [ConfigurationService] Using port 5567 for connection.
[2020-06-05T09:17:10.1973451Z] [fv-az352] [Info] [JvmBridge] JvMBridge port is 5567
[2020-06-05T09:18:00.8539213Z] [fv-az352] [Error] [JvmBridge] JVM method execution failed: Static method 'forPath' failed for class 'io.delta.tables.DeltaTable' when called with 1 arguments ([Index=1, Type=String, Value=C:\Users\VssAdministrator\AppData\Local\Temp\6cd7297f-60dd-463a-a915-b0ed79c2244f\sink-delta-table], )
[2020-06-05T09:18:00.8540601Z] [fv-az352] [Error] [JvmBridge] java.lang.IllegalArgumentException: Could not find active SparkSession

Maybe the E2E test sometimes just cannot find the active sparksession?

In my case, just re-run the test and it will passed...

@suhsteve
Copy link
Member

This PR is good!
Because we have lots of parquet data which contains array type, and cannot read out by dotnet spark.
And this PR did the right job.

And, I saw the same check failed error that I cannot solve:

Test run for D:\a\1\s\master\artifacts\bin\Microsoft.Spark.Extensions.Delta.E2ETest\Release\netcoreapp3.1\Microsoft.Spark.Extensions.Delta.E2ETest.dll(.NETCoreApp,Version=v3.1)
Microsoft (R) Test Execution Command Line Tool Version 16.6.0
Copyright (c) Microsoft Corporation.  All rights reserved.

Starting test execution, please wait...

A total of 1 test files matched the specified pattern.
[2020-06-05T09:17:10.1955680Z] [fv-az352] [Info] [ConfigurationService] 'DOTNETBACKEND_PORT' environment variable is not set.
[2020-06-05T09:17:10.1970219Z] [fv-az352] [Info] [ConfigurationService] Using port 5567 for connection.
[2020-06-05T09:17:10.1973451Z] [fv-az352] [Info] [JvmBridge] JvMBridge port is 5567
[2020-06-05T09:18:00.8539213Z] [fv-az352] [Error] [JvmBridge] JVM method execution failed: Static method 'forPath' failed for class 'io.delta.tables.DeltaTable' when called with 1 arguments ([Index=1, Type=String, Value=C:\Users\VssAdministrator\AppData\Local\Temp\6cd7297f-60dd-463a-a915-b0ed79c2244f\sink-delta-table], )
[2020-06-05T09:18:00.8540601Z] [fv-az352] [Error] [JvmBridge] java.lang.IllegalArgumentException: Could not find active SparkSession

Maybe the E2E test sometimes just cannot find the active sparksession?

In my case, just re-run the test and it will passed...

@laneser this error is related to #333 . Internally Delta will call getActiveSession() and if it's lucky then it will use the thread where the active session is defined.

@laneser
Copy link
Contributor

laneser commented Jun 26, 2020

@suhsteve
Do you mean that just change the

DeltaTable deltaTable = DeltaTable.ForPath(path);

To

DeltaTable deltaTable = DeltaTable.ForPath(_spark, path);

DeltaTable deltaTable = DeltaTable.ForPath(path);

will fix the unit-test ?! which do not call ForPath (use GetActiveSession version)?

@elvaliuliuliu
Copy link
Contributor Author

I will try get this PR in soon!

@imback82
Copy link
Contributor

Thanks @elvaliuliuliu. What work is remaining to make this non-WIP?

@elvaliuliuliu
Copy link
Contributor Author

Thanks @elvaliuliuliu. What work is remaining to make this non-WIP?

Currently, udf takes in ArrayList like the example here, I am thinking to support this. Should I include it in this PR?

@imback82
Copy link
Contributor

Got it. Yes, let's include it in this PR. Thanks.

@rapoth
Copy link
Contributor

rapoth commented Aug 10, 2020

@elvaliuliuliu Were you planning on updating this PR?

@elvaliuliuliu
Copy link
Contributor Author

Sorry got side-tracked. I will update this PR!

@elvaliuliuliu elvaliuliuliu changed the title [WIP] Support ArrayType Support ArrayType Aug 18, 2020
@elvaliuliuliu
Copy link
Contributor Author

This PR should be ready for review. I think FC and BC tests failed as expected, it should work with the current code, please advise. Thank you!

@Niharikadutta
Copy link
Collaborator

Thanks for working on this @elvaliuliuliu !
@imback82 @suhsteve Do we want to exclude these tests from the FC and BC pipelines?

}
if (obj.GetType() == typeof(ArrayList))
{
return CastUnpickledItems.UnpickleArray(unpickledItems);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you just return here, what will happen to the rest of rows? Would that be ok?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be okay, since UnpickleArray takes unpickledItems and within the func itself, it will deal with all the rows when it's ArrayList.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you get ArrayList, all the objects in unpickledItems will be a type of ArrayList?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, from what I have observed. It will look like below:
image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should change this function to IEnumerable<object> GetUnpickledObjects and use yield return.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean something like below?

        internal static IEnumerable<object> GetUnpickledObjects(Stream stream, int messageLength)
        {
            byte[] buffer = ArrayPool<byte>.Shared.Rent(messageLength);

            try
            { 
                ...
                // Check if unpickler returns ArrayList.
                // If so, it needs to be cast to the appropriate array type using CastToArray.
                foreach (object objArr in (object[])unpickledItems)
                {
                    if (objArr.GetType() == typeof(object[]))
                    {
                        object obj = ((object[])objArr)[0];
                        if (obj == null)
                        {
                            continue;
                        }
                        if (obj.GetType() == typeof(ArrayList))
                        {
                            yield return CastUnpickledItems.CastToArray(unpickledItems);
                        }
                        else
                        {
                            yield return unpickledItems;
                        }
                    }
                    else
                    {
                        yield return unpickledItems;
                    }
                }
            }
            finally
            {
                ArrayPool<byte>.Shared.Return(buffer);
            }
        }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @imback82 . You are iterating through all the object[] entries here, and the consumer of GetUnpickledObjects will also end up iterating through the object[] again. It may be better to make this an IEnumerable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will make the change tonight.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have refactored this part. Please take a look, thanks!

}
if (obj.GetType() == typeof(ArrayList))
{
return CastUnpickledItems.UnpickleArray(unpickledItems);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should change this function to IEnumerable<object> GetUnpickledObjects and use yield return.

@suhsteve
Copy link
Member

@elvaliuliuliu how's the progress going? Tests are failing.

@elvaliuliuliu
Copy link
Contributor Author

elvaliuliuliu commented Sep 16, 2020

@elvaliuliuliu how's the progress going? Tests are failing.

Working on n d arrays. Some of the tests are passing, it failed at certain spark versions with error like "System.IO.DirectoryNotFoundException : Could not find a part of the path 'D:\a\1\b\spark-2.3.4-bin-hadoop2.7\RELEASE'", looking into it.


// Array of Arrays.
{
Func<Column, Column> udf = Udf<double[][], double>(
Copy link
Member

@suhsteve suhsteve Sep 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you pass in multiple arrays ?

Udf<double, double[], double[][], double[][][], double> ?

Can we give the user the option to also define a udf using ArrayList? Something like
Udf<double, ArrayList, ArrayList, ArrayList, double> that will have the same behavior as the udf defined above ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added tests for multiple arrays. Working on ArrayList cases as mentioned.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we also want to support the option of return value as ArrayList?
Like Udf<double, ArrayList, ... , ArrayList>?

@imback82 ideas?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it can be useful if you want to chain with Udf that takes in ArrayList (just for consistency).

Copy link
Contributor Author

@elvaliuliuliu elvaliuliuliu Sep 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thanks!

The current logic is to cast unpickledItems to typed array if it contains ArrayList here. Another way might be casting input(after unpickling) here when executing udf functions?

I am thinking either to pass commands in GetUnpickledObjects or switch to the latter method, if we want to support ArrayList like Udf<ArrayList, ArrayList, double>. Please lmk which is preferred or any other suggestions? Thanks.

@suhsteve
Copy link
Member

We are targeting a release by end of month and we would like this PR in within the next 2-3 days.

Can we get everything wrapped up by today ?

@elvaliuliuliu
Copy link
Contributor Author

elvaliuliuliu commented Sep 17, 2020

We are targeting a release by end of month and we would like this PR in within the next 2-3 days.

Can we get everything wrapped up by today ?

I am finishing up the following left parts now - Updated

Please lmk if more comments. Thanks.

foreach (object obj in (object[])unpickledItems)
{
castUnpickledItems.Add(
(obj.GetType() == typeof(RowConstructor)) ? obj : CastArray(obj));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if falling back to CastArray always works.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated the function name accordingly. Sorry for the confusion. If not RowConstructor, it will fall into object []. CastHelper will help decide and cast object[] as needed.

/// <returns>Typed array after casting.</returns>
public static object CastArray(object obj)
{
if (obj is object[] objArr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this work with Row[]?

Copy link
Contributor Author

@elvaliuliuliu elvaliuliuliu Sep 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I have observed, if udf takes Row[] like the example here. It will be RowConstructor, and FromInternal should handle such cases. It should not fall into CastArray or CastHelper.

return objArr.Select(x => CastHelper(x)).ToArray();
}

// Array of arrays.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about array of array of array? Don't you need to handle this recursively?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have changed the function name and description accordingly to make it more clear. This should be handled recursively as covered in test. Thanks!

@suhsteve
Copy link
Member

Closing as #670 has been merged.

@suhsteve suhsteve closed this Sep 24, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants