diff --git a/NuGet.config b/NuGet.config index 7b7b765e2..9d2866825 100644 --- a/NuGet.config +++ b/NuGet.config @@ -6,5 +6,7 @@ + + diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 20215a7b2..8ba73e0c1 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -11,9 +11,9 @@ variables: _SignType: real _TeamName: DotNetSpark MSBUILDSINGLELOADCONTEXT: 1 - # backwardCompatibleRelease/forwardCompatibleRelease is the "oldest" releases that work with the current release - backwardCompatibleRelease: '0.9.0' + # forwardCompatibleRelease/backwardCompatibleRelease is the "oldest" releases that work with the current release forwardCompatibleRelease: '0.9.0' + backwardCompatibleRelease: '0.9.0' TestsToFilterOut: "(FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.DataFrameTests.TestDataFrameGroupedMapUdf)&\ (FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.DataFrameTests.TestDataFrameVectorUdf)&\ (FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.BroadcastTests.TestDestroy)&\ @@ -22,7 +22,8 @@ variables: (FullyQualifiedName!=Microsoft.Spark.E2ETest.UdfTests.UdfSimpleTypesTests.TestUdfWithReturnAsTimestampType)&\ (FullyQualifiedName!=Microsoft.Spark.E2ETest.UdfTests.UdfSimpleTypesTests.TestUdfWithTimestampType)&\ (FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.SparkSessionTests.TestCreateDataFrameWithTimestamp)" - LatestDotnetWorkerDir: '$(Build.ArtifactStagingDirectory)\Microsoft.Spark.Worker\netcoreapp3.1\win-x64' + ArtifactPath: '$(Build.ArtifactStagingDirectory)\Microsoft.Spark.Binaries' + CurrentDotnetWorkerDir: '$(ArtifactPath)\Microsoft.Spark.Worker\netcoreapp3.1\win-x64' BackwardCompatibleDotnetWorkerDir: $(Build.BinariesDirectory)\Microsoft.Spark.Worker-$(backwardCompatibleRelease) # Azure DevOps variables are transformed into environment variables, with these variables we @@ -38,404 +39,131 @@ resources: name: dotnet/spark ref: refs/tags/v$(forwardCompatibleRelease) -jobs: -- job: Build - displayName: Build and Test Sources - pool: Hosted VS2017 - - variables: - ${{ if and(ne(variables['System.TeamProject'], 'public'), notin(variables['Build.Reason'], 'PullRequest')) }}: - _OfficialBuildIdArgs: /p:OfficialBuildId=$(BUILD.BUILDNUMBER) - HADOOP_HOME: $(Build.BinariesDirectory)\hadoop - - steps: - - checkout: self - path: s\master - - checkout: forwardCompatibleRelease - path: s\$(forwardCompatibleRelease) - - - task: Maven@3 - displayName: 'Maven build src' - inputs: - mavenPomFile: master/src/scala/pom.xml - - - task: Maven@3 - displayName: 'Maven build benchmark' - inputs: - mavenPomFile: master/benchmark/scala/pom.xml - - - task: BatchScript@1 - displayName: Download Spark Distros & Winutils.exe - inputs: - filename: master\script\download-spark-distros.cmd - arguments: $(Build.BinariesDirectory) - - - task: BatchScript@1 - displayName: Download backward compatible worker v$(backwardCompatibleRelease) - inputs: - filename: master\script\download-worker-release.cmd - arguments: '$(Build.BinariesDirectory) $(backwardCompatibleRelease)' - - - script: master\build.cmd -pack - -c $(buildConfiguration) - -ci - $(_OfficialBuildIdArgs) - /p:PublishSparkWorker=true - /p:SparkWorkerPublishDir=$(Build.ArtifactStagingDirectory)\Microsoft.Spark.Worker - displayName: '.NET build' - - - task: DotNetCoreCLI@2 - displayName: '.NET unit tests' - inputs: - command: test - projects: 'master/**/*UnitTest/*.csproj' - arguments: '--configuration $(buildConfiguration)' - - - task: DotNetCoreCLI@2 - displayName: 'E2E tests for Spark 2.3.0' - inputs: - command: test - projects: 'master/**/Microsoft.Spark.E2ETest/*.csproj' - arguments: '--configuration $(buildConfiguration)' - env: - SPARK_HOME: $(Build.BinariesDirectory)\spark-2.3.0-bin-hadoop2.7 - DOTNET_WORKER_DIR: $(LatestDotnetWorkerDir) - - - task: DotNetCoreCLI@2 - displayName: 'E2E tests for Spark 2.3.1' - inputs: - command: test - projects: 'master/**/Microsoft.Spark.E2ETest/*.csproj' - arguments: '--configuration $(buildConfiguration)' - env: - SPARK_HOME: $(Build.BinariesDirectory)\spark-2.3.1-bin-hadoop2.7 - DOTNET_WORKER_DIR: $(LatestDotnetWorkerDir) - - - task: DotNetCoreCLI@2 - displayName: 'E2E tests for Spark 2.3.2' - inputs: - command: test - projects: 'master/**/Microsoft.Spark.E2ETest/*.csproj' - arguments: '--configuration $(buildConfiguration)' - env: - SPARK_HOME: $(Build.BinariesDirectory)\spark-2.3.2-bin-hadoop2.7 - DOTNET_WORKER_DIR: $(LatestDotnetWorkerDir) - - - task: DotNetCoreCLI@2 - displayName: 'E2E tests for Spark 2.3.3' - inputs: - command: test - projects: 'master/**/Microsoft.Spark.E2ETest/*.csproj' - arguments: '--configuration $(buildConfiguration)' - env: - SPARK_HOME: $(Build.BinariesDirectory)\spark-2.3.3-bin-hadoop2.7 - DOTNET_WORKER_DIR: $(LatestDotnetWorkerDir) - - - task: DotNetCoreCLI@2 - displayName: 'E2E tests for Spark 2.3.4' - inputs: - command: test - projects: 'master/**/Microsoft.Spark.E2ETest/*.csproj' - arguments: '--configuration $(buildConfiguration)' - env: - SPARK_HOME: $(Build.BinariesDirectory)\spark-2.3.4-bin-hadoop2.7 - DOTNET_WORKER_DIR: $(LatestDotnetWorkerDir) - - - task: DotNetCoreCLI@2 - displayName: 'E2E tests for Spark 2.4.0' - inputs: - command: test - projects: 'master/**/Microsoft.Spark.E2ETest/*.csproj' - arguments: '--configuration $(buildConfiguration)' - env: - SPARK_HOME: $(Build.BinariesDirectory)\spark-2.4.0-bin-hadoop2.7 - DOTNET_WORKER_DIR: $(LatestDotnetWorkerDir) - - - task: DotNetCoreCLI@2 - displayName: 'E2E tests for Spark 2.4.1' - inputs: - command: test - projects: 'master/**/Microsoft.Spark.E2ETest/*.csproj' - arguments: '--configuration $(buildConfiguration)' - env: - SPARK_HOME: $(Build.BinariesDirectory)\spark-2.4.1-bin-hadoop2.7 - DOTNET_WORKER_DIR: $(LatestDotnetWorkerDir) - - - task: DotNetCoreCLI@2 - displayName: 'E2E tests for Spark 2.4.3' - inputs: - command: test - projects: 'master/**/Microsoft.Spark*.E2ETest/*.csproj' - arguments: '--configuration $(buildConfiguration)' - env: - SPARK_HOME: $(Build.BinariesDirectory)\spark-2.4.3-bin-hadoop2.7 - DOTNET_WORKER_DIR: $(LatestDotnetWorkerDir) - - - task: DotNetCoreCLI@2 - displayName: 'E2E tests for Spark 2.4.4' - inputs: - command: test - projects: 'master/**/Microsoft.Spark*.E2ETest/*.csproj' - arguments: '--configuration $(buildConfiguration)' - env: - SPARK_HOME: $(Build.BinariesDirectory)\spark-2.4.4-bin-hadoop2.7 - DOTNET_WORKER_DIR: $(LatestDotnetWorkerDir) - - - task: DotNetCoreCLI@2 - displayName: 'E2E tests for Spark 2.4.5' - inputs: - command: test - projects: 'master/**/Microsoft.Spark*.E2ETest/*.csproj' - arguments: '--configuration $(buildConfiguration)' - env: - SPARK_HOME: $(Build.BinariesDirectory)\spark-2.4.5-bin-hadoop2.7 - DOTNET_WORKER_DIR: $(LatestDotnetWorkerDir) - - - task: DotNetCoreCLI@2 - displayName: 'E2E tests for Spark 2.3.0 with backward compatible worker v$(backwardCompatibleRelease)' - inputs: - command: test - projects: 'master/**/Microsoft.Spark.E2ETest/*.csproj' - arguments: '--configuration $(buildConfiguration) --filter $(TestsToFilterOut)' - env: - SPARK_HOME: $(Build.BinariesDirectory)\spark-2.3.0-bin-hadoop2.7 - DOTNET_WORKER_DIR: $(BackwardCompatibleDotnetWorkerDir) - - - task: DotNetCoreCLI@2 - displayName: 'E2E tests for Spark 2.3.1 with backward compatible worker v$(backwardCompatibleRelease)' - inputs: - command: test - projects: 'master/**/Microsoft.Spark.E2ETest/*.csproj' - arguments: '--configuration $(buildConfiguration) --filter $(TestsToFilterOut)' - env: - SPARK_HOME: $(Build.BinariesDirectory)\spark-2.3.1-bin-hadoop2.7 - DOTNET_WORKER_DIR: $(BackwardCompatibleDotnetWorkerDir) - - - task: DotNetCoreCLI@2 - displayName: 'E2E tests for Spark 2.3.2 with backward compatible worker v$(backwardCompatibleRelease)' - inputs: - command: test - projects: 'master/**/Microsoft.Spark.E2ETest/*.csproj' - arguments: '--configuration $(buildConfiguration) --filter $(TestsToFilterOut)' - env: - SPARK_HOME: $(Build.BinariesDirectory)\spark-2.3.2-bin-hadoop2.7 - DOTNET_WORKER_DIR: $(BackwardCompatibleDotnetWorkerDir) - - - task: DotNetCoreCLI@2 - displayName: 'E2E tests for Spark 2.3.3 with backward compatible worker v$(backwardCompatibleRelease)' - inputs: - command: test - projects: 'master/**/Microsoft.Spark.E2ETest/*.csproj' - arguments: '--configuration $(buildConfiguration) --filter $(TestsToFilterOut)' - env: - SPARK_HOME: $(Build.BinariesDirectory)\spark-2.3.3-bin-hadoop2.7 - DOTNET_WORKER_DIR: $(BackwardCompatibleDotnetWorkerDir) - - - task: DotNetCoreCLI@2 - displayName: 'E2E tests for Spark 2.3.4 with backward compatible worker v$(backwardCompatibleRelease)' - inputs: - command: test - projects: 'master/**/Microsoft.Spark.E2ETest/*.csproj' - arguments: '--configuration $(buildConfiguration) --filter $(TestsToFilterOut)' - env: - SPARK_HOME: $(Build.BinariesDirectory)\spark-2.3.4-bin-hadoop2.7 - DOTNET_WORKER_DIR: $(BackwardCompatibleDotnetWorkerDir) - - - task: DotNetCoreCLI@2 - displayName: 'E2E tests for Spark 2.4.0 with backward compatible worker v$(backwardCompatibleRelease)' - inputs: - command: test - projects: 'master/**/Microsoft.Spark.E2ETest/*.csproj' - arguments: '--configuration $(buildConfiguration) --filter $(TestsToFilterOut)' - env: - SPARK_HOME: $(Build.BinariesDirectory)\spark-2.4.0-bin-hadoop2.7 - DOTNET_WORKER_DIR: $(BackwardCompatibleDotnetWorkerDir) - - - task: DotNetCoreCLI@2 - displayName: 'E2E tests for Spark 2.4.1 with backward compatible worker v$(backwardCompatibleRelease)' - inputs: - command: test - projects: 'master/**/Microsoft.Spark.E2ETest/*.csproj' - arguments: '--configuration $(buildConfiguration) --filter $(TestsToFilterOut)' - env: - SPARK_HOME: $(Build.BinariesDirectory)\spark-2.4.1-bin-hadoop2.7 - DOTNET_WORKER_DIR: $(BackwardCompatibleDotnetWorkerDir) - - - task: DotNetCoreCLI@2 - displayName: 'E2E tests for Spark 2.4.3 with backward compatible worker v$(backwardCompatibleRelease)' - inputs: - command: test - projects: 'master/**/Microsoft.Spark*.E2ETest/*.csproj' - arguments: '--configuration $(buildConfiguration) --filter $(TestsToFilterOut)' - env: - SPARK_HOME: $(Build.BinariesDirectory)\spark-2.4.3-bin-hadoop2.7 - DOTNET_WORKER_DIR: $(BackwardCompatibleDotnetWorkerDir) - - - task: DotNetCoreCLI@2 - displayName: 'E2E tests for Spark 2.4.4 with backward compatible worker v$(backwardCompatibleRelease)' - inputs: - command: test - projects: 'master/**/Microsoft.Spark*.E2ETest/*.csproj' - arguments: '--configuration $(buildConfiguration) --filter $(TestsToFilterOut)' - env: - SPARK_HOME: $(Build.BinariesDirectory)\spark-2.4.4-bin-hadoop2.7 - DOTNET_WORKER_DIR: $(BackwardCompatibleDotnetWorkerDir) - - - task: DotNetCoreCLI@2 - displayName: 'E2E tests for Spark 2.4.5 with backward compatible worker v$(backwardCompatibleRelease)' - inputs: - command: test - projects: 'master/**/Microsoft.Spark*.E2ETest/*.csproj' - arguments: '--configuration $(buildConfiguration) --filter $(TestsToFilterOut)' - env: - SPARK_HOME: $(Build.BinariesDirectory)\spark-2.4.5-bin-hadoop2.7 - DOTNET_WORKER_DIR: $(BackwardCompatibleDotnetWorkerDir) - - - task: Maven@3 - displayName: 'Maven build src for forward compatible release v$(forwardCompatibleRelease)' - inputs: - mavenPomFile: $(forwardCompatibleRelease)/src/scala/pom.xml - - - script: $(forwardCompatibleRelease)\build.cmd - -c $(buildConfiguration) - -ci - $(_OfficialBuildIdArgs) - /p:PublishSparkWorker=false - displayName: '.NET build for forward compatible release v$(forwardCompatibleRelease)' - - - task: DotNetCoreCLI@2 - displayName: 'E2E tests for Spark 2.3.0 from forward compatible release v$(forwardCompatibleRelease)' - inputs: - command: test - projects: '$(forwardCompatibleRelease)/**/Microsoft.Spark.E2ETest/*.csproj' - arguments: '--configuration $(buildConfiguration)' - env: - SPARK_HOME: $(Build.BinariesDirectory)\spark-2.3.0-bin-hadoop2.7 - DOTNET_WORKER_DIR: $(LatestDotnetWorkerDir) - - - task: DotNetCoreCLI@2 - displayName: 'E2E tests for Spark 2.3.1 from forward compatible release v$(forwardCompatibleRelease)' - inputs: - command: test - projects: '$(forwardCompatibleRelease)/**/Microsoft.Spark.E2ETest/*.csproj' - arguments: '--configuration $(buildConfiguration)' - env: - SPARK_HOME: $(Build.BinariesDirectory)\spark-2.3.1-bin-hadoop2.7 - DOTNET_WORKER_DIR: $(LatestDotnetWorkerDir) - - - task: DotNetCoreCLI@2 - displayName: 'E2E tests for Spark 2.3.2 from forward compatible release v$(forwardCompatibleRelease)' - inputs: - command: test - projects: '$(forwardCompatibleRelease)/**/Microsoft.Spark.E2ETest/*.csproj' - arguments: '--configuration $(buildConfiguration)' - env: - SPARK_HOME: $(Build.BinariesDirectory)\spark-2.3.2-bin-hadoop2.7 - DOTNET_WORKER_DIR: $(LatestDotnetWorkerDir) - - - task: DotNetCoreCLI@2 - displayName: 'E2E tests for Spark 2.3.3 from forward compatible release v$(forwardCompatibleRelease)' - inputs: - command: test - projects: '$(forwardCompatibleRelease)/**/Microsoft.Spark.E2ETest/*.csproj' - arguments: '--configuration $(buildConfiguration)' - env: - SPARK_HOME: $(Build.BinariesDirectory)\spark-2.3.3-bin-hadoop2.7 - DOTNET_WORKER_DIR: $(LatestDotnetWorkerDir) - - - task: DotNetCoreCLI@2 - displayName: 'E2E tests for Spark 2.3.4 from forward compatible release v$(forwardCompatibleRelease)' - inputs: - command: test - projects: '$(forwardCompatibleRelease)/**/Microsoft.Spark.E2ETest/*.csproj' - arguments: '--configuration $(buildConfiguration)' - env: - SPARK_HOME: $(Build.BinariesDirectory)\spark-2.3.4-bin-hadoop2.7 - DOTNET_WORKER_DIR: $(LatestDotnetWorkerDir) - - - task: DotNetCoreCLI@2 - displayName: 'E2E tests for Spark 2.4.0 from forward compatible release v$(forwardCompatibleRelease)' - inputs: - command: test - projects: '$(forwardCompatibleRelease)/**/Microsoft.Spark.E2ETest/*.csproj' - arguments: '--configuration $(buildConfiguration)' - env: - SPARK_HOME: $(Build.BinariesDirectory)\spark-2.4.0-bin-hadoop2.7 - DOTNET_WORKER_DIR: $(LatestDotnetWorkerDir) - - - task: DotNetCoreCLI@2 - displayName: 'E2E tests for Spark 2.4.1 from forward compatible release v$(forwardCompatibleRelease)' - inputs: - command: test - projects: '$(forwardCompatibleRelease)/**/Microsoft.Spark.E2ETest/*.csproj' - arguments: '--configuration $(buildConfiguration)' - env: - SPARK_HOME: $(Build.BinariesDirectory)\spark-2.4.1-bin-hadoop2.7 - DOTNET_WORKER_DIR: $(LatestDotnetWorkerDir) - - - task: DotNetCoreCLI@2 - displayName: 'E2E tests for Spark 2.4.3 from forward compatible release v$(forwardCompatibleRelease)' - inputs: - command: test - projects: '$(forwardCompatibleRelease)/**/Microsoft.Spark*.E2ETest/*.csproj' - arguments: '--configuration $(buildConfiguration)' - env: - SPARK_HOME: $(Build.BinariesDirectory)\spark-2.4.3-bin-hadoop2.7 - DOTNET_WORKER_DIR: $(LatestDotnetWorkerDir) - - - task: DotNetCoreCLI@2 - displayName: 'E2E tests for Spark 2.4.4 from forward compatible release v$(forwardCompatibleRelease)' - inputs: - command: test - projects: '$(forwardCompatibleRelease)/**/Microsoft.Spark*.E2ETest/*.csproj' - arguments: '--configuration $(buildConfiguration)' - env: - SPARK_HOME: $(Build.BinariesDirectory)\spark-2.4.4-bin-hadoop2.7 - DOTNET_WORKER_DIR: $(LatestDotnetWorkerDir) - - - task: DotNetCoreCLI@2 - displayName: 'E2E tests for Spark 2.4.5 from forward compatible release v$(forwardCompatibleRelease)' - inputs: - command: test - projects: '$(forwardCompatibleRelease)/**/Microsoft.Spark*.E2ETest/*.csproj' - arguments: '--configuration $(buildConfiguration)' - env: - SPARK_HOME: $(Build.BinariesDirectory)\spark-2.4.5-bin-hadoop2.7 - DOTNET_WORKER_DIR: $(LatestDotnetWorkerDir) +stages: +- stage: Build + displayName: Build Sources + jobs: + - job: Build + pool: Hosted VS2017 - - ${{ if and(ne(variables['System.TeamProject'], 'public'), notin(variables['Build.Reason'], 'PullRequest')) }}: - - task: CopyFiles@2 - displayName: Stage .NET artifacts + variables: + ${{ if and(ne(variables['System.TeamProject'], 'public'), notin(variables['Build.Reason'], 'PullRequest')) }}: + _OfficialBuildIdArgs: /p:OfficialBuildId=$(BUILD.BUILDNUMBER) + + steps: + - task: Maven@3 + displayName: 'Maven build src' + inputs: + mavenPomFile: src/scala/pom.xml + + - task: Maven@3 + displayName: 'Maven build benchmark' + inputs: + mavenPomFile: benchmark/scala/pom.xml + + - script: build.cmd -pack + -c $(buildConfiguration) + -ci + $(_OfficialBuildIdArgs) + /p:PublishSparkWorker=true + /p:SparkWorkerPublishDir=$(Build.ArtifactStagingDirectory)\Microsoft.Spark.Worker + displayName: '.NET build' + + - task: DotNetCoreCLI@2 + displayName: '.NET unit tests' inputs: - sourceFolder: $(Build.SourcesDirectory)/master/artifacts/packages/$(buildConfiguration)/Shipping - contents: | - **/*.nupkg - **/*.snupkg - targetFolder: $(Build.ArtifactStagingDirectory)/BuildArtifacts/artifacts/packages/$(buildConfiguration)/Shipping + command: test + projects: '**/*UnitTest/*.csproj' + arguments: '--configuration $(buildConfiguration)' - task: CopyFiles@2 - displayName: Stage build logs + displayName: Stage Maven build jars inputs: - sourceFolder: $(Build.SourcesDirectory)/master/artifacts/log - targetFolder: $(Build.ArtifactStagingDirectory)/BuildArtifacts/artifacts/log + sourceFolder: $(Build.SourcesDirectory)/src/scala + contents: '**/*.jar' + targetFolder: $(Build.ArtifactStagingDirectory)/Jars + + - ${{ if and(ne(variables['System.TeamProject'], 'public'), notin(variables['Build.Reason'], 'PullRequest')) }}: + - task: CopyFiles@2 + displayName: Stage .NET artifacts + inputs: + sourceFolder: $(Build.SourcesDirectory)/artifacts/packages/$(buildConfiguration)/Shipping + contents: | + **/*.nupkg + **/*.snupkg + targetFolder: $(Build.ArtifactStagingDirectory)/BuildArtifacts/artifacts/packages/$(buildConfiguration)/Shipping + + - task: CopyFiles@2 + displayName: Stage build logs + inputs: + sourceFolder: $(Build.SourcesDirectory)/artifacts/log + targetFolder: $(Build.ArtifactStagingDirectory)/BuildArtifacts/artifacts/log - task: PublishBuildArtifacts@1 inputs: pathtoPublish: '$(Build.ArtifactStagingDirectory)' artifactName: Microsoft.Spark.Binaries -- ${{ if and(ne(variables['System.TeamProject'], 'public'), notin(variables['Build.Reason'], 'PullRequest')) }}: - - job: SignPublish - dependsOn: - - Build - displayName: Sign and Publish Artifacts - pool: - name: NetCoreInternal-Pool - queue: buildpool.windows.10.amd64.vs2017 + - ${{ if and(ne(variables['System.TeamProject'], 'public'), notin(variables['Build.Reason'], 'PullRequest')) }}: + - job: SignPublish + dependsOn: + - Build + displayName: Sign and Publish Artifacts + pool: + name: NetCoreInternal-Pool + queue: buildpool.windows.10.amd64.vs2017 + + variables: + ${{ if and(ne(variables['System.TeamProject'], 'public'), notin(variables['Build.Reason'], 'PullRequest')) }}: + _OfficialBuildIdArgs: /p:OfficialBuildId=$(BUILD.BUILDNUMBER) + + steps: + - task: DownloadBuildArtifacts@0 + displayName: Download Build Artifacts + inputs: + artifactName: Microsoft.Spark.Binaries + downloadPath: $(Build.ArtifactStagingDirectory) + + - task: MicroBuildSigningPlugin@2 + displayName: Install MicroBuild plugin + inputs: + signType: $(_SignType) + zipSources: false + feedSource: https://dnceng.pkgs.visualstudio.com/_packaging/MicroBuildToolset/nuget/v3/index.json + env: + TeamName: $(_TeamName) + condition: and(succeeded(), in(variables['_SignType'], 'real', 'test'), eq(variables['Agent.Os'], 'Windows_NT')) + + - task: PowerShell@2 + displayName: Sign artifacts and Package Microsoft.Spark.Worker + inputs: + filePath: eng\common\build.ps1 + arguments: -restore -sign -publish + -c $(buildConfiguration) + -ci + $(_OfficialBuildIdArgs) + /p:DotNetSignType=$(_SignType) + /p:SparkPackagesDir=$(ArtifactPath)\BuildArtifacts\artifacts\packages + /p:SparkWorkerPublishDir=$(ArtifactPath)\Microsoft.Spark.Worker + /p:SparkWorkerPackageOutputDir=$(ArtifactPath) + + - task: PublishBuildArtifacts@1 + inputs: + pathtoPublish: '$(ArtifactPath)' + artifactName: Microsoft.Spark.Binaries + +- stage: Test + displayName: E2E Tests + dependsOn: Build + jobs: + - job: Run + pool: Hosted VS2017 variables: ${{ if and(ne(variables['System.TeamProject'], 'public'), notin(variables['Build.Reason'], 'PullRequest')) }}: _OfficialBuildIdArgs: /p:OfficialBuildId=$(BUILD.BUILDNUMBER) + HADOOP_HOME: $(Build.BinariesDirectory)\hadoop + DOTNET_WORKER_DIR: $(CurrentDotnetWorkerDir) steps: - task: DownloadBuildArtifacts@0 @@ -443,31 +171,378 @@ jobs: inputs: artifactName: Microsoft.Spark.Binaries downloadPath: $(Build.ArtifactStagingDirectory) - - - task: MicroBuildSigningPlugin@2 - displayName: Install MicroBuild plugin + + - task: CopyFiles@2 + displayName: Copy jars + inputs: + sourceFolder: $(ArtifactPath)/Jars + contents: '**/*.jar' + targetFolder: $(Build.SourcesDirectory)/src/scala + + - task: BatchScript@1 + displayName: Download Spark Distros & Winutils.exe + inputs: + filename: script\download-spark-distros.cmd + arguments: $(Build.BinariesDirectory) + + - task: DotNetCoreCLI@2 + displayName: 'E2E tests for Spark 2.3.0' inputs: - signType: $(_SignType) - zipSources: false - feedSource: https://dnceng.pkgs.visualstudio.com/_packaging/MicroBuildToolset/nuget/v3/index.json + command: test + projects: '**/Microsoft.Spark.E2ETest/*.csproj' + arguments: '--configuration $(buildConfiguration)' env: - TeamName: $(_TeamName) - condition: and(succeeded(), in(variables['_SignType'], 'real', 'test'), eq(variables['Agent.Os'], 'Windows_NT')) - - - task: PowerShell@2 - displayName: Sign artifacts and Package Microsoft.Spark.Worker - inputs: - filePath: eng\common\build.ps1 - arguments: -restore -sign -publish - -c $(buildConfiguration) - -ci - $(_OfficialBuildIdArgs) - /p:DotNetSignType=$(_SignType) - /p:SparkPackagesDir=$(Build.ArtifactStagingDirectory)\Microsoft.Spark.Binaries\BuildArtifacts\artifacts\packages - /p:SparkWorkerPublishDir=$(Build.ArtifactStagingDirectory)\Microsoft.Spark.Binaries\Microsoft.Spark.Worker - /p:SparkWorkerPackageOutputDir=$(Build.ArtifactStagingDirectory)\Microsoft.Spark.Binaries + SPARK_HOME: $(Build.BinariesDirectory)\spark-2.3.0-bin-hadoop2.7 - - task: PublishBuildArtifacts@1 + - task: DotNetCoreCLI@2 + displayName: 'E2E tests for Spark 2.3.1' inputs: - pathtoPublish: '$(Build.ArtifactStagingDirectory)/Microsoft.Spark.Binaries' - artifactName: Microsoft.Spark.Binaries + command: test + projects: '**/Microsoft.Spark.E2ETest/*.csproj' + arguments: '--configuration $(buildConfiguration)' + env: + SPARK_HOME: $(Build.BinariesDirectory)\spark-2.3.1-bin-hadoop2.7 + + - task: DotNetCoreCLI@2 + displayName: 'E2E tests for Spark 2.3.2' + inputs: + command: test + projects: '**/Microsoft.Spark.E2ETest/*.csproj' + arguments: '--configuration $(buildConfiguration)' + env: + SPARK_HOME: $(Build.BinariesDirectory)\spark-2.3.2-bin-hadoop2.7 + + - task: DotNetCoreCLI@2 + displayName: 'E2E tests for Spark 2.3.3' + inputs: + command: test + projects: '**/Microsoft.Spark.E2ETest/*.csproj' + arguments: '--configuration $(buildConfiguration)' + env: + SPARK_HOME: $(Build.BinariesDirectory)\spark-2.3.3-bin-hadoop2.7 + + - task: DotNetCoreCLI@2 + displayName: 'E2E tests for Spark 2.3.4' + inputs: + command: test + projects: '**/Microsoft.Spark.E2ETest/*.csproj' + arguments: '--configuration $(buildConfiguration)' + env: + SPARK_HOME: $(Build.BinariesDirectory)\spark-2.3.4-bin-hadoop2.7 + + - task: DotNetCoreCLI@2 + displayName: 'E2E tests for Spark 2.4.0' + inputs: + command: test + projects: '**/Microsoft.Spark.E2ETest/*.csproj' + arguments: '--configuration $(buildConfiguration)' + env: + SPARK_HOME: $(Build.BinariesDirectory)\spark-2.4.0-bin-hadoop2.7 + + - task: DotNetCoreCLI@2 + displayName: 'E2E tests for Spark 2.4.1' + inputs: + command: test + projects: '**/Microsoft.Spark.E2ETest/*.csproj' + arguments: '--configuration $(buildConfiguration)' + env: + SPARK_HOME: $(Build.BinariesDirectory)\spark-2.4.1-bin-hadoop2.7 + + - task: DotNetCoreCLI@2 + displayName: 'E2E tests for Spark 2.4.3' + inputs: + command: test + projects: '**/Microsoft.Spark*.E2ETest/*.csproj' + arguments: '--configuration $(buildConfiguration)' + env: + SPARK_HOME: $(Build.BinariesDirectory)\spark-2.4.3-bin-hadoop2.7 + + - task: DotNetCoreCLI@2 + displayName: 'E2E tests for Spark 2.4.4' + inputs: + command: test + projects: '**/Microsoft.Spark*.E2ETest/*.csproj' + arguments: '--configuration $(buildConfiguration)' + env: + SPARK_HOME: $(Build.BinariesDirectory)\spark-2.4.4-bin-hadoop2.7 + + - task: DotNetCoreCLI@2 + displayName: 'E2E tests for Spark 2.4.5' + inputs: + command: test + projects: '**/Microsoft.Spark*.E2ETest/*.csproj' + arguments: '--configuration $(buildConfiguration)' + env: + SPARK_HOME: $(Build.BinariesDirectory)\spark-2.4.5-bin-hadoop2.7 + + - task: DotNetCoreCLI@2 + displayName: 'E2E tests for Spark 2.4.6' + inputs: + command: test + projects: '**/Microsoft.Spark*.E2ETest/*.csproj' + arguments: '--configuration $(buildConfiguration)' + env: + SPARK_HOME: $(Build.BinariesDirectory)\spark-2.4.6-bin-hadoop2.7 + +- stage: ForwardCompatibility + displayName: E2E Forward Compatibility Tests + dependsOn: Build + jobs: + - job: Run + pool: Hosted VS2017 + + variables: + ${{ if and(ne(variables['System.TeamProject'], 'public'), notin(variables['Build.Reason'], 'PullRequest')) }}: + _OfficialBuildIdArgs: /p:OfficialBuildId=$(BUILD.BUILDNUMBER) + HADOOP_HOME: $(Build.BinariesDirectory)\hadoop + DOTNET_WORKER_DIR: $(CurrentDotnetWorkerDir) + + steps: + - checkout: forwardCompatibleRelease + path: s\$(forwardCompatibleRelease) + + - task: Maven@3 + displayName: 'Maven build src for forward compatible release v$(forwardCompatibleRelease)' + inputs: + mavenPomFile: src/scala/pom.xml + + - task: DownloadBuildArtifacts@0 + displayName: Download Build Artifacts + inputs: + artifactName: Microsoft.Spark.Binaries + downloadPath: $(Build.ArtifactStagingDirectory) + + - task: BatchScript@1 + displayName: Download Spark Distros & Winutils.exe + inputs: + filename: script\download-spark-distros.cmd + arguments: $(Build.BinariesDirectory) + + - task: DotNetCoreCLI@2 + displayName: 'E2E tests for Spark 2.3.0' + inputs: + command: test + projects: '**/Microsoft.Spark.E2ETest/*.csproj' + arguments: '--configuration $(buildConfiguration)' + env: + SPARK_HOME: $(Build.BinariesDirectory)\spark-2.3.0-bin-hadoop2.7 + + - task: DotNetCoreCLI@2 + displayName: 'E2E tests for Spark 2.3.1' + inputs: + command: test + projects: '**/Microsoft.Spark.E2ETest/*.csproj' + arguments: '--configuration $(buildConfiguration)' + env: + SPARK_HOME: $(Build.BinariesDirectory)\spark-2.3.1-bin-hadoop2.7 + + - task: DotNetCoreCLI@2 + displayName: 'E2E tests for Spark 2.3.2' + inputs: + command: test + projects: '**/Microsoft.Spark.E2ETest/*.csproj' + arguments: '--configuration $(buildConfiguration)' + env: + SPARK_HOME: $(Build.BinariesDirectory)\spark-2.3.2-bin-hadoop2.7 + + - task: DotNetCoreCLI@2 + displayName: 'E2E tests for Spark 2.3.3' + inputs: + command: test + projects: '**/Microsoft.Spark.E2ETest/*.csproj' + arguments: '--configuration $(buildConfiguration)' + env: + SPARK_HOME: $(Build.BinariesDirectory)\spark-2.3.3-bin-hadoop2.7 + + - task: DotNetCoreCLI@2 + displayName: 'E2E tests for Spark 2.3.4' + inputs: + command: test + projects: '**/Microsoft.Spark.E2ETest/*.csproj' + arguments: '--configuration $(buildConfiguration)' + env: + SPARK_HOME: $(Build.BinariesDirectory)\spark-2.3.4-bin-hadoop2.7 + + - task: DotNetCoreCLI@2 + displayName: 'E2E tests for Spark 2.4.0' + inputs: + command: test + projects: '**/Microsoft.Spark.E2ETest/*.csproj' + arguments: '--configuration $(buildConfiguration)' + env: + SPARK_HOME: $(Build.BinariesDirectory)\spark-2.4.0-bin-hadoop2.7 + + - task: DotNetCoreCLI@2 + displayName: 'E2E tests for Spark 2.4.1' + inputs: + command: test + projects: '**/Microsoft.Spark.E2ETest/*.csproj' + arguments: '--configuration $(buildConfiguration)' + env: + SPARK_HOME: $(Build.BinariesDirectory)\spark-2.4.1-bin-hadoop2.7 + + - task: DotNetCoreCLI@2 + displayName: 'E2E tests for Spark 2.4.3' + inputs: + command: test + projects: '**/Microsoft.Spark*.E2ETest/*.csproj' + arguments: '--configuration $(buildConfiguration)' + env: + SPARK_HOME: $(Build.BinariesDirectory)\spark-2.4.3-bin-hadoop2.7 + + - task: DotNetCoreCLI@2 + displayName: 'E2E tests for Spark 2.4.4' + inputs: + command: test + projects: '**/Microsoft.Spark*.E2ETest/*.csproj' + arguments: '--configuration $(buildConfiguration)' + env: + SPARK_HOME: $(Build.BinariesDirectory)\spark-2.4.4-bin-hadoop2.7 + + - task: DotNetCoreCLI@2 + displayName: 'E2E tests for Spark 2.4.5' + inputs: + command: test + projects: '**/Microsoft.Spark*.E2ETest/*.csproj' + arguments: '--configuration $(buildConfiguration)' + env: + SPARK_HOME: $(Build.BinariesDirectory)\spark-2.4.5-bin-hadoop2.7 + +- stage: BackwardCompatibility + displayName: E2E Backward Compatibility Tests + dependsOn: Build + jobs: + - job: Run + pool: Hosted VS2017 + + variables: + ${{ if and(ne(variables['System.TeamProject'], 'public'), notin(variables['Build.Reason'], 'PullRequest')) }}: + _OfficialBuildIdArgs: /p:OfficialBuildId=$(BUILD.BUILDNUMBER) + HADOOP_HOME: $(Build.BinariesDirectory)\hadoop + DOTNET_WORKER_DIR: $(BackwardCompatibleDotnetWorkerDir) + + steps: + - task: DownloadBuildArtifacts@0 + displayName: Download Build Artifacts + inputs: + artifactName: Microsoft.Spark.Binaries + downloadPath: $(Build.ArtifactStagingDirectory) + + - task: CopyFiles@2 + displayName: Copy jars + inputs: + sourceFolder: $(ArtifactPath)/Jars + contents: '**/*.jar' + targetFolder: $(Build.SourcesDirectory)/src/scala + + - task: BatchScript@1 + displayName: Download Spark Distros & Winutils.exe + inputs: + filename: script\download-spark-distros.cmd + arguments: $(Build.BinariesDirectory) + + - task: BatchScript@1 + displayName: Download backward compatible worker v$(backwardCompatibleRelease) + inputs: + filename: script\download-worker-release.cmd + arguments: '$(Build.BinariesDirectory) $(backwardCompatibleRelease)' + + - task: DotNetCoreCLI@2 + displayName: 'E2E tests for Spark 2.3.0' + inputs: + command: test + projects: '**/Microsoft.Spark.E2ETest/*.csproj' + arguments: '--configuration $(buildConfiguration) --filter $(TestsToFilterOut)' + env: + SPARK_HOME: $(Build.BinariesDirectory)\spark-2.3.0-bin-hadoop2.7 + + - task: DotNetCoreCLI@2 + displayName: 'E2E tests for Spark 2.3.1' + inputs: + command: test + projects: '**/Microsoft.Spark.E2ETest/*.csproj' + arguments: '--configuration $(buildConfiguration) --filter $(TestsToFilterOut)' + env: + SPARK_HOME: $(Build.BinariesDirectory)\spark-2.3.1-bin-hadoop2.7 + + - task: DotNetCoreCLI@2 + displayName: 'E2E tests for Spark 2.3.2' + inputs: + command: test + projects: '**/Microsoft.Spark.E2ETest/*.csproj' + arguments: '--configuration $(buildConfiguration) --filter $(TestsToFilterOut)' + env: + SPARK_HOME: $(Build.BinariesDirectory)\spark-2.3.2-bin-hadoop2.7 + + - task: DotNetCoreCLI@2 + displayName: 'E2E tests for Spark 2.3.3' + inputs: + command: test + projects: '**/Microsoft.Spark.E2ETest/*.csproj' + arguments: '--configuration $(buildConfiguration) --filter $(TestsToFilterOut)' + env: + SPARK_HOME: $(Build.BinariesDirectory)\spark-2.3.3-bin-hadoop2.7 + + - task: DotNetCoreCLI@2 + displayName: 'E2E tests for Spark 2.3.4' + inputs: + command: test + projects: '**/Microsoft.Spark.E2ETest/*.csproj' + arguments: '--configuration $(buildConfiguration) --filter $(TestsToFilterOut)' + env: + SPARK_HOME: $(Build.BinariesDirectory)\spark-2.3.4-bin-hadoop2.7 + + - task: DotNetCoreCLI@2 + displayName: 'E2E tests for Spark 2.4.0' + inputs: + command: test + projects: '**/Microsoft.Spark.E2ETest/*.csproj' + arguments: '--configuration $(buildConfiguration) --filter $(TestsToFilterOut)' + env: + SPARK_HOME: $(Build.BinariesDirectory)\spark-2.4.0-bin-hadoop2.7 + + - task: DotNetCoreCLI@2 + displayName: 'E2E tests for Spark 2.4.1' + inputs: + command: test + projects: '**/Microsoft.Spark.E2ETest/*.csproj' + arguments: '--configuration $(buildConfiguration) --filter $(TestsToFilterOut)' + env: + SPARK_HOME: $(Build.BinariesDirectory)\spark-2.4.1-bin-hadoop2.7 + + - task: DotNetCoreCLI@2 + displayName: 'E2E tests for Spark 2.4.3' + inputs: + command: test + projects: '**/Microsoft.Spark*.E2ETest/*.csproj' + arguments: '--configuration $(buildConfiguration) --filter $(TestsToFilterOut)' + env: + SPARK_HOME: $(Build.BinariesDirectory)\spark-2.4.3-bin-hadoop2.7 + + - task: DotNetCoreCLI@2 + displayName: 'E2E tests for Spark 2.4.4' + inputs: + command: test + projects: '**/Microsoft.Spark*.E2ETest/*.csproj' + arguments: '--configuration $(buildConfiguration) --filter $(TestsToFilterOut)' + env: + SPARK_HOME: $(Build.BinariesDirectory)\spark-2.4.4-bin-hadoop2.7 + + - task: DotNetCoreCLI@2 + displayName: 'E2E tests for Spark 2.4.5' + inputs: + command: test + projects: '**/Microsoft.Spark*.E2ETest/*.csproj' + arguments: '--configuration $(buildConfiguration) --filter $(TestsToFilterOut)' + env: + SPARK_HOME: $(Build.BinariesDirectory)\spark-2.4.5-bin-hadoop2.7 + + - task: DotNetCoreCLI@2 + displayName: 'E2E tests for Spark 2.4.6' + inputs: + command: test + projects: '**/Microsoft.Spark*.E2ETest/*.csproj' + arguments: '--configuration $(buildConfiguration) --filter $(TestsToFilterOut)' + env: + SPARK_HOME: $(Build.BinariesDirectory)\spark-2.4.6-bin-hadoop2.7 + diff --git a/docs/broadcast-guide.md b/docs/broadcast-guide.md new file mode 100644 index 000000000..c3026516b --- /dev/null +++ b/docs/broadcast-guide.md @@ -0,0 +1,92 @@ +# Guide to using Broadcast Variables + +This is a guide to show how to use broadcast variables in .NET for Apache Spark. + +## What are Broadcast Variables + +[Broadcast variables in Apache Spark](https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#broadcast-variables) are a mechanism for sharing variables across executors that are meant to be read-only. They allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. + +### How to use broadcast variables in .NET for Apache Spark + +Broadcast variables are created from a variable `v` by calling `SparkContext.Broadcast(v)`. The broadcast variable is a wrapper around `v`, and its value can be accessed by calling the `Value()` method. + +Example: + +```csharp +string v = "Variable to be broadcasted"; +Broadcast bv = SparkContext.Broadcast(v); + +// Using the broadcast variable in a UDF: +Func udf = Udf( + str => $"{str}: {bv.Value()}"); +``` + +The type parameter for `Broadcast` should be the type of the variable being broadcasted. + +### Deleting broadcast variables + +The broadcast variable can be deleted from all executors by calling the `Destroy()` method on it. + +```csharp +// Destroying the broadcast variable bv: +bv.Destroy(); +``` + +> Note: `Destroy()` deletes all data and metadata related to the broadcast variable. Use this with caution - once a broadcast variable has been destroyed, it cannot be used again. + +#### Caveat of using Destroy + +One important thing to keep in mind while using broadcast variables in UDFs is to limit the scope of the variable to only the UDF that is referencing it. The [guide to using UDFs](udf-guide.md) describes this phenomenon in detail. This is especially crucial when calling `Destroy` on the broadcast variable. If the broadcast variable that has been destroyed is visible to or accessible from other UDFs, it gets picked up for serialization by all those UDFs, even if it is not being referenced by them. This will throw an error as .NET for Apache Spark is not able to serialize the destroyed broadcast variable. + +Example to demonstrate: + +```csharp +string v = "Variable to be broadcasted"; +Broadcast bv = SparkContext.Broadcast(v); + +// Using the broadcast variable in a UDF: +Func udf1 = Udf( + str => $"{str}: {bv.Value()}"); + +// Destroying bv +bv.Destroy(); + +// Calling udf1 after destroying bv throws the following expected exception: +// org.apache.spark.SparkException: Attempted to use Broadcast(0) after it was destroyed +df.Select(udf1(df["_1"])).Show(); + +// Different UDF udf2 that is not referencing bv +Func udf2 = Udf( + str => $"{str}: not referencing broadcast variable"); + +// Calling udf2 throws the following (unexpected) exception: +// [Error] [JvmBridge] org.apache.spark.SparkException: Task not serializable +df.Select(udf2(df["_1"])).Show(); +``` + +The recommended way of implementing above desired behavior: + +```csharp +string v = "Variable to be broadcasted"; +// Restricting the visibility of bv to only the UDF referencing it +{ + Broadcast bv = SparkContext.Broadcast(v); + + // Using the broadcast variable in a UDF: + Func udf1 = Udf( + str => $"{str}: {bv.Value()}"); + + // Destroying bv + bv.Destroy(); +} + +// Different UDF udf2 that is not referencing bv +Func udf2 = Udf( + str => $"{str}: not referencing broadcast variable"); + +// Calling udf2 works fine as expected +df.Select(udf2(df["_1"])).Show(); +``` + This ensures that destroying `bv` doesn't affect calling `udf2` because of unexpected serialization behavior. + + Broadcast variables are useful for transmitting read-only data to all executors, as the data is sent only once and this can give performance benefits when compared with using local variables that get shipped to the executors with each task. Please refer to the [official documentation](https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#broadcast-variables) to get a deeper understanding of broadcast variables and why they are used. \ No newline at end of file diff --git a/docs/building/ubuntu-instructions.md b/docs/building/ubuntu-instructions.md index 8bb11b163..0e3dbdf40 100644 --- a/docs/building/ubuntu-instructions.md +++ b/docs/building/ubuntu-instructions.md @@ -35,14 +35,14 @@ If you already have all the pre-requisites, skip to the [build](ubuntu-instructi ```bash sudo update-alternatives --config java ``` - 3. Install **[Apache Maven 3.6.0+](https://maven.apache.org/download.cgi)** + 3. Install **[Apache Maven 3.6.3+](https://maven.apache.org/download.cgi)** - Run the following command: ```bash mkdir -p ~/bin/maven cd ~/bin/maven - wget https://www-us.apache.org/dist/maven/maven-3/3.6.0/binaries/apache-maven-3.6.0-bin.tar.gz - tar -xvzf apache-maven-3.6.0-bin.tar.gz - ln -s apache-maven-3.6.0 current + wget https://www-us.apache.org/dist/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz + tar -xvzf apache-maven-3.6.3-bin.tar.gz + ln -s apache-maven-3.6.3 current export M2_HOME=~/bin/maven/current export PATH=${M2_HOME}/bin:${PATH} source ~/.bashrc @@ -54,11 +54,11 @@ If you already have all the pre-requisites, skip to the [build](ubuntu-instructi 📙 Click to see sample mvn -version output ``` - Apache Maven 3.6.0 (97c98ec64a1fdfee7767ce5ffb20918da4f719f3; 2018-10-24T18:41:47Z) - Maven home: ~/bin/apache-maven-3.6.0 - Java version: 1.8.0_191, vendor: Oracle Corporation, runtime: /usr/lib/jvm/java-8-openjdk-amd64/jre - Default locale: en, platform encoding: UTF-8 - OS name: "linux", version: "4.4.0-17763-microsoft", arch: "amd64", family: "unix" + Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f) + Maven home: ~/bin/apache-maven-3.6.3 + Java version: 1.8.0_242, vendor: Oracle Corporation, runtime: /usr/lib/jvm/java-8-openjdk-amd64/jre + Default locale: en_US, platform encoding: ANSI_X3.4-1968 + OS name: "linux", version: "4.4.0-142-generic", arch: "amd64", family: "unix" ``` 4. Install **[Apache Spark 2.3+](https://spark.apache.org/downloads.html)** - Download [Apache Spark 2.3+](https://spark.apache.org/downloads.html) and extract it into a local folder (e.g., `~/bin/spark-2.3.2-bin-hadoop2.7`) diff --git a/docs/udf-guide.md b/docs/udf-guide.md new file mode 100644 index 000000000..6a2905bf4 --- /dev/null +++ b/docs/udf-guide.md @@ -0,0 +1,171 @@ +# Guide to User-Defined Functions (UDFs) + +This is a guide to show how to use UDFs in .NET for Apache Spark. + +## What are UDFs + +[User-Defined Functions (UDFs)](https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/expressions/UserDefinedFunction.html) are a feature of Spark that allow developers to use custom functions to extend the system's built-in functionality. They transform values from a single row within a table to produce a single corresponding output value per row based on the logic defined in the UDF. + +Let's take the following as an example for a UDF definition: + +```csharp +string s1 = "hello"; +Func udf = Udf( + str => $"{s1} {str}"); + +``` +The above defined UDF takes a `string` as an input (in the form of a [Column](https://github.com/dotnet/spark/blob/master/src/csharp/Microsoft.Spark/Sql/Column.cs#L14) of a [Dataframe](https://github.com/dotnet/spark/blob/master/src/csharp/Microsoft.Spark/Sql/DataFrame.cs#L24)), and returns a `string` with `hello` appended in front of the input. + +For a sample Dataframe, let's take the following Dataframe `df`: + +```text ++-------+ +| name| ++-------+ +|Michael| +| Andy| +| Justin| ++-------+ +``` + +Now let's apply the above defined `udf` to the dataframe `df`: + +```csharp +DataFrame udfResult = df.Select(udf(df["name"])); +``` + +This would return the below as the Dataframe `udfResult`: + +```text ++-------------+ +| name| ++-------------+ +|hello Michael| +| hello Andy| +| hello Justin| ++-------------+ +``` +To get a better understanding of how to implement UDFs, please take a look at the [UDF helper functions](https://github.com/dotnet/spark/blob/master/src/csharp/Microsoft.Spark/Sql/Functions.cs#L3616) and some [test examples](https://github.com/dotnet/spark/blob/master/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfSimpleTypesTests.cs#L49). + +## UDF serialization + +Since UDFs are functions that need to be executed on the workers, they have to be serialized and sent to the workers as part of the payload from the driver. This involves serializing the [delegate](https://docs.microsoft.com/en-us/dotnet/csharp/programming-guide/delegates/) which is a reference to the method, along with its [target](https://docs.microsoft.com/en-us/dotnet/api/system.delegate.target?view=netframework-4.8) which is the class instance on which the current delegate invokes the instance method. Please take a look at this [code](https://github.com/dotnet/spark/blob/master/src/csharp/Microsoft.Spark/Utils/CommandSerDe.cs#L149) to get a better understanding of how UDF serialization is being done. + +## Good to know while implementing UDFs + +One behavior to be aware of while implementing UDFs in .NET for Apache Spark is how the target of the UDF gets serialized. .NET for Apache Spark uses .NET Core, which does not support serializing delegates, so it is instead done by using reflection to serialize the target where the delegate is defined. When multiple delegates are defined in a common scope, they have a shared closure that becomes the target of reflection for serialization. Let's take an example to illustrate what that means. + +The following code snippet defines two string variables that are being referenced in two function delegates that return the respective strings as result: + +```csharp +using System; + +public class C { + public void M() { + string s1 = "s1"; + string s2 = "s2"; + Func a = str => s1; + Func b = str => s2; + } +} +``` + +The above C# code generates the following C# disassembly (credit source: [sharplab.io](https://sharplab.io)) code from the compiler: + +```csharp +public class C +{ + [CompilerGenerated] + private sealed class <>c__DisplayClass0_0 + { + public string s1; + + public string s2; + + internal string b__0(string str) + { + return s1; + } + + internal string b__1(string str) + { + return s2; + } + } + + public void M() + { + <>c__DisplayClass0_0 <>c__DisplayClass0_ = new <>c__DisplayClass0_0(); + <>c__DisplayClass0_.s1 = "s1"; + <>c__DisplayClass0_.s2 = "s2"; + Func func = new Func(<>c__DisplayClass0_.b__0); + Func func2 = new Func(<>c__DisplayClass0_.b__1); + } +} +``` +As can be seen in the above decompiled code, both `func` and `func2` share the same closure `<>c__DisplayClass0_0`, which is the target that is serialized when serializing the delegates `func` and `func2`. Hence, even though `Func a` is only referencing `s1`, `s2` also gets serialized when sending over the bytes to the workers. + +This can lead to some unexpected behaviors at runtime (like in the case of using [broadcast variables](broadcast-guide.md)), which is why we recommend restricting the visibility of the variables used in a function to that function's scope. + +Going back to the above example, the following is the recommended way to implement the desired behavior of previous code snippet: + +```csharp +using System; + +public class C { + public void M() { + { + string s1 = "s1"; + Func a = str => s1; + } + { + string s2 = "s2"; + Func b = str => s2; + } + } +} +``` + +The above C# code generates the following C# disassembly (credit source: [sharplab.io](https://sharplab.io)) code from the compiler: + +```csharp +public class C +{ + [CompilerGenerated] + private sealed class <>c__DisplayClass0_0 + { + public string s1; + + internal string b__0(string str) + { + return s1; + } + } + + [CompilerGenerated] + private sealed class <>c__DisplayClass0_1 + { + public string s2; + + internal string b__1(string str) + { + return s2; + } + } + + public void M() + { + <>c__DisplayClass0_0 <>c__DisplayClass0_ = new <>c__DisplayClass0_0(); + <>c__DisplayClass0_.s1 = "s1"; + Func func = new Func(<>c__DisplayClass0_.b__0); + <>c__DisplayClass0_1 <>c__DisplayClass0_2 = new <>c__DisplayClass0_1(); + <>c__DisplayClass0_2.s2 = "s2"; + Func func2 = new Func(<>c__DisplayClass0_2.b__1); + } +} +``` + +Here we see that `func` and `func2` no longer share a closure and have their own separate closures `<>c__DisplayClass0_0` and `<>c__DisplayClass0_1` respectively. When used as the target for serialization, nothing other than the referenced variables will get serialized for the delegate. + +This behavior is important to keep in mind while implementing multiple UDFs in a common scope. +To learn more about UDFs in general, please review the following articles that explain UDFs and how to use them: [UDFs in databricks(scala)](https://docs.databricks.com/spark/latest/spark-sql/udf-scala.html), [Spark UDFs and some gotchas](https://medium.com/@achilleus/spark-udfs-we-can-use-them-but-should-we-use-them-2c5a561fde6d). \ No newline at end of file diff --git a/examples/Microsoft.Spark.CSharp.Examples/MachineLearning/Sentiment/Program.cs b/examples/Microsoft.Spark.CSharp.Examples/MachineLearning/Sentiment/Program.cs index efb85e468..51f63078d 100644 --- a/examples/Microsoft.Spark.CSharp.Examples/MachineLearning/Sentiment/Program.cs +++ b/examples/Microsoft.Spark.CSharp.Examples/MachineLearning/Sentiment/Program.cs @@ -27,7 +27,7 @@ public void Run(string[] args) SparkSession spark = SparkSession .Builder() - .AppName(".NET for Apache Spark Sentiment Analysis") + .AppName("Sentiment Analysis using .NET for Apache Spark") .GetOrCreate(); // Read in and display Yelp reviews diff --git a/examples/Microsoft.Spark.CSharp.Examples/Sql/Batch/Basic.cs b/examples/Microsoft.Spark.CSharp.Examples/Sql/Batch/Basic.cs index 6ef95eefa..fe57f7d1b 100644 --- a/examples/Microsoft.Spark.CSharp.Examples/Sql/Batch/Basic.cs +++ b/examples/Microsoft.Spark.CSharp.Examples/Sql/Batch/Basic.cs @@ -26,7 +26,7 @@ public void Run(string[] args) SparkSession spark = SparkSession .Builder() - .AppName(".NET Spark SQL basic example") + .AppName("SQL basic example using .NET for Apache Spark") .Config("spark.some.config.option", "some-value") .GetOrCreate(); diff --git a/examples/Microsoft.Spark.CSharp.Examples/Sql/Batch/Datasource.cs b/examples/Microsoft.Spark.CSharp.Examples/Sql/Batch/Datasource.cs index cf41eeceb..0945df791 100644 --- a/examples/Microsoft.Spark.CSharp.Examples/Sql/Batch/Datasource.cs +++ b/examples/Microsoft.Spark.CSharp.Examples/Sql/Batch/Datasource.cs @@ -32,7 +32,7 @@ public void Run(string[] args) SparkSession spark = SparkSession .Builder() - .AppName(".NET Spark SQL Datasource example") + .AppName("SQL Datasource example using .NET for Apache Spark") .Config("spark.some.config.option", "some-value") .GetOrCreate(); diff --git a/examples/Microsoft.Spark.CSharp.Examples/Sql/Batch/VectorDataFrameUdfs.cs b/examples/Microsoft.Spark.CSharp.Examples/Sql/Batch/VectorDataFrameUdfs.cs index 697301733..aafea7256 100644 --- a/examples/Microsoft.Spark.CSharp.Examples/Sql/Batch/VectorDataFrameUdfs.cs +++ b/examples/Microsoft.Spark.CSharp.Examples/Sql/Batch/VectorDataFrameUdfs.cs @@ -31,7 +31,7 @@ public void Run(string[] args) .Builder() // Lower the shuffle partitions to speed up groupBy() operations. .Config("spark.sql.shuffle.partitions", "3") - .AppName(".NET Spark SQL VectorUdfs example") + .AppName("SQL VectorUdfs example using .NET for Apache Spark") .GetOrCreate(); DataFrame df = spark.Read().Schema("age INT, name STRING").Json(args[0]); diff --git a/examples/Microsoft.Spark.CSharp.Examples/Sql/Batch/VectorUdfs.cs b/examples/Microsoft.Spark.CSharp.Examples/Sql/Batch/VectorUdfs.cs index 369cc3aff..2497d5ef3 100644 --- a/examples/Microsoft.Spark.CSharp.Examples/Sql/Batch/VectorUdfs.cs +++ b/examples/Microsoft.Spark.CSharp.Examples/Sql/Batch/VectorUdfs.cs @@ -29,7 +29,7 @@ public void Run(string[] args) .Builder() // Lower the shuffle partitions to speed up groupBy() operations. .Config("spark.sql.shuffle.partitions", "3") - .AppName(".NET Spark SQL VectorUdfs example") + .AppName("SQL VectorUdfs example using .NET for Apache Spark") .GetOrCreate(); DataFrame df = spark.Read().Schema("age INT, name STRING").Json(args[0]); diff --git a/script/download-spark-distros.cmd b/script/download-spark-distros.cmd index d02bb49a7..0d2435a00 100644 --- a/script/download-spark-distros.cmd +++ b/script/download-spark-distros.cmd @@ -23,5 +23,7 @@ curl -k -L -o spark-2.4.1.tgz https://archive.apache.org/dist/spark/spark-2.4.1/ curl -k -L -o spark-2.4.3.tgz https://archive.apache.org/dist/spark/spark-2.4.3/spark-2.4.3-bin-hadoop2.7.tgz && tar xzvf spark-2.4.3.tgz curl -k -L -o spark-2.4.4.tgz https://archive.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz && tar xzvf spark-2.4.4.tgz curl -k -L -o spark-2.4.5.tgz https://archive.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz && tar xzvf spark-2.4.5.tgz +curl -k -L -o spark-2.4.6.tgz https://archive.apache.org/dist/spark/spark-2.4.6/spark-2.4.6-bin-hadoop2.7.tgz && tar xzvf spark-2.4.6.tgz + +endlocal -endlocal \ No newline at end of file diff --git a/src/csharp/Extensions/Microsoft.Spark.Extensions.Delta.E2ETest/DeltaTableTests.cs b/src/csharp/Extensions/Microsoft.Spark.Extensions.Delta.E2ETest/DeltaTableTests.cs index 69249d8c5..fab7c74dc 100644 --- a/src/csharp/Extensions/Microsoft.Spark.Extensions.Delta.E2ETest/DeltaTableTests.cs +++ b/src/csharp/Extensions/Microsoft.Spark.Extensions.Delta.E2ETest/DeltaTableTests.cs @@ -11,6 +11,7 @@ using Microsoft.Spark.Sql; using Microsoft.Spark.Sql.Streaming; using Microsoft.Spark.Sql.Types; +using Microsoft.Spark.UnitTest.TestUtils; using Xunit; namespace Microsoft.Spark.Extensions.Delta.E2ETest diff --git a/src/csharp/Extensions/Microsoft.Spark.Extensions.DotNet.Interactive.UnitTest/Microsoft.Spark.Extensions.DotNet.Interactive.UnitTest.csproj b/src/csharp/Extensions/Microsoft.Spark.Extensions.DotNet.Interactive.UnitTest/Microsoft.Spark.Extensions.DotNet.Interactive.UnitTest.csproj new file mode 100644 index 000000000..391582751 --- /dev/null +++ b/src/csharp/Extensions/Microsoft.Spark.Extensions.DotNet.Interactive.UnitTest/Microsoft.Spark.Extensions.DotNet.Interactive.UnitTest.csproj @@ -0,0 +1,23 @@ + + + + netcoreapp3.1 + Microsoft.Spark.Extensions.DotNet.Interactive.UnitTest + false + + + + + + + + + + + + + + + + + diff --git a/src/csharp/Extensions/Microsoft.Spark.Extensions.DotNet.Interactive.UnitTest/PackageResolverTests.cs b/src/csharp/Extensions/Microsoft.Spark.Extensions.DotNet.Interactive.UnitTest/PackageResolverTests.cs new file mode 100644 index 000000000..219c533ff --- /dev/null +++ b/src/csharp/Extensions/Microsoft.Spark.Extensions.DotNet.Interactive.UnitTest/PackageResolverTests.cs @@ -0,0 +1,95 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.Collections.Generic; +using System.IO; +using System.Linq; +using Microsoft.DotNet.Interactive.Utility; +using Microsoft.Spark.UnitTest.TestUtils; +using Microsoft.Spark.Utils; +using Moq; +using Xunit; + +namespace Microsoft.Spark.Extensions.DotNet.Interactive.UnitTest +{ + public class PackageResolverTests + { + [Fact] + public void TestPackageResolver() + { + using var tempDir = new TemporaryDirectory(); + + string packageName = "package.name"; + string packageVersion = "0.1.0"; + string packageRootPath = + Path.Combine(tempDir.Path, "path", "to", "packages", packageName, packageVersion); + string packageFrameworkPath = Path.Combine(packageRootPath, "lib", "framework"); + + Directory.CreateDirectory(packageRootPath); + var nugetFile = new FileInfo( + Path.Combine(packageRootPath, $"{packageName}.{packageVersion}.nupkg")); + using (File.Create(nugetFile.FullName)) + { + } + + var assemblyPaths = new List + { + new FileInfo(Path.Combine(packageFrameworkPath, "1.dll")), + new FileInfo(Path.Combine(packageFrameworkPath, "2.dll")) + }; + var probingPaths = new List { new DirectoryInfo(packageRootPath) }; + + var mockSupportNugetWrapper = new Mock(); + mockSupportNugetWrapper + .SetupGet(m => m.ResolvedPackageReferences) + .Returns(new ResolvedPackageReference[] + { + new ResolvedPackageReference( + packageName, + packageVersion, + assemblyPaths, + new DirectoryInfo(packageRootPath), + probingPaths) + }); + + var packageResolver = new PackageResolver(mockSupportNugetWrapper.Object); + IEnumerable actualFiles = packageResolver.GetFiles(tempDir.Path); + + string metadataFilePath = + Path.Combine(tempDir.Path, DependencyProviderUtils.CreateFileName(1)); + var expectedFiles = new string[] + { + nugetFile.FullName, + metadataFilePath + }; + Assert.True(expectedFiles.SequenceEqual(actualFiles)); + Assert.True(File.Exists(metadataFilePath)); + + DependencyProviderUtils.Metadata actualMetadata = + DependencyProviderUtils.Metadata.Deserialize(metadataFilePath); + var expectedMetadata = new DependencyProviderUtils.Metadata + { + AssemblyProbingPaths = new string[] + { + Path.Combine(packageName, packageVersion, "lib", "framework", "1.dll"), + Path.Combine(packageName, packageVersion, "lib", "framework", "2.dll") + }, + NativeProbingPaths = new string[] + { + Path.Combine(packageName, packageVersion) + }, + NuGets = new DependencyProviderUtils.NuGetMetadata[] + { + new DependencyProviderUtils.NuGetMetadata + { + FileName = $"{packageName}.{packageVersion}.nupkg", + PackageName = packageName, + PackageVersion = packageVersion + } + } + }; + Assert.True(expectedMetadata.Equals(actualMetadata)); + } + } +} diff --git a/src/csharp/Extensions/Microsoft.Spark.Extensions.DotNet.Interactive/AssemblyKernelExtension.cs b/src/csharp/Extensions/Microsoft.Spark.Extensions.DotNet.Interactive/AssemblyKernelExtension.cs new file mode 100644 index 000000000..2deff5869 --- /dev/null +++ b/src/csharp/Extensions/Microsoft.Spark.Extensions.DotNet.Interactive/AssemblyKernelExtension.cs @@ -0,0 +1,156 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using System.Collections.Generic; +using System.IO; +using System.Threading.Tasks; +using Microsoft.CodeAnalysis; +using Microsoft.DotNet.Interactive; +using Microsoft.DotNet.Interactive.Commands; +using Microsoft.DotNet.Interactive.CSharp; +using Microsoft.DotNet.Interactive.Utility; +using Microsoft.Spark.Interop; +using Microsoft.Spark.Sql; +using Microsoft.Spark.Utils; + +namespace Microsoft.Spark.Extensions.DotNet.Interactive +{ + /// + /// A kernel extension when using .NET for Apache Spark with Microsoft.DotNet.Interactive + /// Adds nuget and assembly dependencies to the default + /// using . + /// + public class AssemblyKernelExtension : IKernelExtension + { + private const string TempDirEnvVar = "DOTNET_SPARK_EXTENSION_INTERACTIVE_TMPDIR"; + + private readonly PackageResolver _packageResolver = + new PackageResolver(new SupportNugetWrapper()); + + /// + /// Called by the Microsoft.DotNet.Interactive Assembly Extension Loader. + /// + /// The kernel calling this method. + /// when extension is loaded. + public Task OnLoadAsync(IKernel kernel) + { + if (kernel is CompositeKernel kernelBase) + { + Environment.SetEnvironmentVariable(Constants.RunningREPLEnvVar, "true"); + + DirectoryInfo tempDir = CreateTempDirectory(); + kernelBase.RegisterForDisposal(new DisposableDirectory(tempDir)); + + kernelBase.AddMiddleware(async (command, context, next) => + { + if ((context.HandlingKernel is CSharpKernel kernel) && + (command is SubmitCode) && + TryGetSparkSession(out SparkSession sparkSession) && + TryEmitAssembly(kernel, tempDir.FullName, out string assemblyPath)) + { + sparkSession.SparkContext.AddFile(assemblyPath); + + foreach (string filePath in GetPackageFiles(tempDir.FullName)) + { + sparkSession.SparkContext.AddFile(filePath); + } + } + + await next(command, context); + }); + } + + return Task.CompletedTask; + } + + private DirectoryInfo CreateTempDirectory() + { + string envTempDir = Environment.GetEnvironmentVariable(TempDirEnvVar); + string tempDirBasePath = string.IsNullOrEmpty(envTempDir) ? + Directory.GetCurrentDirectory() : + envTempDir; + + if (!IsPathValid(tempDirBasePath)) + { + throw new Exception($"[{GetType().Name}] Spaces in " + + $"'{tempDirBasePath}' is unsupported. Set the {TempDirEnvVar} " + + "environment variable to control the base path. Please see " + + "https://issues.apache.org/jira/browse/SPARK-30126 and " + + "https://github.com/apache/spark/pull/26773 for more details."); + } + + return Directory.CreateDirectory( + Path.Combine(tempDirBasePath, Path.GetRandomFileName())); + } + + private bool TryEmitAssembly(CSharpKernel kernel, string dstPath, out string assemblyPath) + { + Compilation compilation = kernel.ScriptState.Script.GetCompilation(); + string assemblyName = + AssemblyLoader.NormalizeAssemblyName(compilation.AssemblyName); + assemblyPath = Path.Combine(dstPath, $"{assemblyName}.dll"); + if (!File.Exists(assemblyPath)) + { + FileSystemExtensions.Emit(compilation, assemblyPath); + return true; + } + + throw new Exception( + $"TryEmitAssembly() unexpected duplicate assembly: ${assemblyPath}"); + } + + private bool TryGetSparkSession(out SparkSession sparkSession) + { + sparkSession = SparkSession.GetDefaultSession(); + return sparkSession != null; + } + + private IEnumerable GetPackageFiles(string path) + { + foreach (string filePath in _packageResolver.GetFiles(path)) + { + if (IsPathValid(filePath)) + { + yield return filePath; + } + else + { + // Copy file to a path without spaces. + string fileDestPath = Path.Combine( + path, + Path.GetFileName(filePath).Replace(" ", string.Empty)); + File.Copy(filePath, fileDestPath); + yield return fileDestPath; + } + } + } + + /// + /// In some versions of Spark, spaces is unsupported when using + /// . + /// + /// For more details please see: + /// - https://issues.apache.org/jira/browse/SPARK-30126 + /// - https://github.com/apache/spark/pull/26773 + /// + /// The path to validate. + /// true if the path is supported by Spark, false otherwise. + private bool IsPathValid(string path) + { + if (!path.Contains(" ")) + { + return true; + } + + Version version = SparkEnvironment.SparkVersion; + return (version.Major, version.Minor, version.Build) switch + { + (2, _, _) => false, + (3, 0, _) => true, + _ => throw new NotSupportedException($"Spark {version} not supported.") + }; + } + } +} diff --git a/src/csharp/Extensions/Microsoft.Spark.Extensions.DotNet.Interactive/Microsoft.Spark.Extensions.DotNet.Interactive.csproj b/src/csharp/Extensions/Microsoft.Spark.Extensions.DotNet.Interactive/Microsoft.Spark.Extensions.DotNet.Interactive.csproj new file mode 100644 index 000000000..da330c762 --- /dev/null +++ b/src/csharp/Extensions/Microsoft.Spark.Extensions.DotNet.Interactive/Microsoft.Spark.Extensions.DotNet.Interactive.csproj @@ -0,0 +1,38 @@ + + + + Library + netcoreapp3.1 + Microsoft.Spark.Extensions.DotNet.Interactive + true + true + + NU5100;$(NoWarn) + + DotNet Interactive Extension for .NET for Apache Spark + https://github.com/dotnet/spark/tree/master/docs/release-notes + spark;dotnet;csharp;interactive;dotnet-interactive + + + + + + + + + + all + + + + + + + + + + + + diff --git a/src/csharp/Extensions/Microsoft.Spark.Extensions.DotNet.Interactive/PackageResolver.cs b/src/csharp/Extensions/Microsoft.Spark.Extensions.DotNet.Interactive/PackageResolver.cs new file mode 100644 index 000000000..f9a76e43f --- /dev/null +++ b/src/csharp/Extensions/Microsoft.Spark.Extensions.DotNet.Interactive/PackageResolver.cs @@ -0,0 +1,165 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.IO; +using System.Threading; +using Microsoft.DotNet.Interactive.Utility; +using Microsoft.Spark.Utils; + +namespace Microsoft.Spark.Extensions.DotNet.Interactive +{ + internal class PackageResolver + { + private readonly SupportNugetWrapper _supportNugetWrapper; + private readonly ConcurrentDictionary _filesCopied; + private long _metadataCounter; + + internal PackageResolver(SupportNugetWrapper supportNugetWrapper) + { + _supportNugetWrapper = supportNugetWrapper; + _filesCopied = new ConcurrentDictionary(); + _metadataCounter = 0; + } + + /// + /// Generates and serializes a to + /// . Returns a list of file paths which include the + /// the serialized and nuget file + /// dependencies. + /// + /// Path to write metadata. + /// + /// List of file paths of the serialized + /// and nuget file dependencies. + /// + internal IEnumerable GetFiles(string writePath) + { + IEnumerable nugetPackagesToCopy = GetNewPackages(); + + var assemblyProbingPaths = new List(); + var nativeProbingPaths = new List(); + var nugetMetadata = new List(); + + foreach (ResolvedNuGetPackage package in nugetPackagesToCopy) + { + ResolvedPackageReference resolvedPackage = package.ResolvedPackage; + + foreach (FileInfo asmPath in resolvedPackage.AssemblyPaths) + { + // asmPath.FullName + // /path/to/packages/package.name/package.version/lib/framework/1.dll + // resolvedPackage.PackageRoot + // /path/to/packages/package.name/package.version/ + // GetRelativeToPackages(..) + // package.name/package.version/lib/framework/1.dll + assemblyProbingPaths.Add( + GetPathRelativeToPackages( + asmPath.FullName, + resolvedPackage.PackageRoot)); + } + + foreach (DirectoryInfo probePath in resolvedPackage.ProbingPaths) + { + // probePath.FullName + // /path/to/packages/package.name/package.version/ + // resolvedPackage.PackageRoot + // /path/to/packages/package.name/package.version/ + // GetRelativeToPackages(..) + // package.name/package.version + nativeProbingPaths.Add( + GetPathRelativeToPackages( + probePath.FullName, + resolvedPackage.PackageRoot)); + } + + nugetMetadata.Add( + new DependencyProviderUtils.NuGetMetadata + { + FileName = package.NuGetFile.Name, + PackageName = resolvedPackage.PackageName, + PackageVersion = resolvedPackage.PackageVersion + }); + + yield return package.NuGetFile.FullName; + } + + if (nugetMetadata.Count > 0) + { + var metadataPath = + Path.Combine( + writePath, + DependencyProviderUtils.CreateFileName( + Interlocked.Increment(ref _metadataCounter))); + new DependencyProviderUtils.Metadata + { + AssemblyProbingPaths = assemblyProbingPaths.ToArray(), + NativeProbingPaths = nativeProbingPaths.ToArray(), + NuGets = nugetMetadata.ToArray() + }.Serialize(metadataPath); + + yield return metadataPath; + } + } + + /// + /// Return the delta of the list of packages that have been introduced + /// since the last call. + /// + /// The delta of the list of packages. + private IEnumerable GetNewPackages() + { + IEnumerable packages = + _supportNugetWrapper.ResolvedPackageReferences; + foreach (ResolvedPackageReference package in packages) + { + IEnumerable files = + package.PackageRoot.EnumerateFiles("*.nupkg", SearchOption.AllDirectories); + + foreach (FileInfo file in files) + { + if (_filesCopied.TryAdd(file.Name, 1)) + { + yield return new ResolvedNuGetPackage + { + ResolvedPackage = package, + NuGetFile = file + }; + } + } + } + } + + /// + /// Given a , get the relative path to the packages directory. + /// The package is a subfolder within the packages directory. + /// + /// Examples: + /// path: + /// /path/to/packages/package.name/package.version/lib/framework/1.dll + /// directory: + /// /path/to/packages/package.name/package.version/ + /// relative path: + /// package.name/package.version/lib/framework/1.dll + /// + /// path: + /// /path/to/packages/package.name/package.version/ + /// directory: + /// /path/to/packages/package.name/package.version/ + /// relative path: + /// package.name/package.version + /// + /// The full path used to determine the relative path. + /// The package directory. + /// The relative path to the packages directory. + private string GetPathRelativeToPackages(string path, DirectoryInfo directory) + { + string strippedRoot = path + .Substring(directory.FullName.Length) + .Trim(Path.DirectorySeparatorChar, Path.AltDirectorySeparatorChar); + return Path.Combine(directory.Parent.Name, directory.Name, strippedRoot); + } + } +} diff --git a/src/csharp/Extensions/Microsoft.Spark.Extensions.DotNet.Interactive/ResolvedNugetPackage.cs b/src/csharp/Extensions/Microsoft.Spark.Extensions.DotNet.Interactive/ResolvedNugetPackage.cs new file mode 100644 index 000000000..57106c16a --- /dev/null +++ b/src/csharp/Extensions/Microsoft.Spark.Extensions.DotNet.Interactive/ResolvedNugetPackage.cs @@ -0,0 +1,15 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.IO; +using Microsoft.DotNet.Interactive.Utility; + +namespace Microsoft.Spark.Extensions.DotNet.Interactive +{ + internal class ResolvedNuGetPackage + { + public ResolvedPackageReference ResolvedPackage { get; set; } + public FileInfo NuGetFile { get; set; } + } +} diff --git a/src/csharp/Extensions/Microsoft.Spark.Extensions.DotNet.Interactive/SupportNugetWrapper.cs b/src/csharp/Extensions/Microsoft.Spark.Extensions.DotNet.Interactive/SupportNugetWrapper.cs new file mode 100644 index 000000000..489e39e94 --- /dev/null +++ b/src/csharp/Extensions/Microsoft.Spark.Extensions.DotNet.Interactive/SupportNugetWrapper.cs @@ -0,0 +1,13 @@ +using System.Collections.Generic; +using Microsoft.DotNet.Interactive; +using Microsoft.DotNet.Interactive.Utility; + +namespace Microsoft.Spark.Extensions.DotNet.Interactive +{ + internal class SupportNugetWrapper + { + internal virtual IEnumerable ResolvedPackageReferences => + ((ISupportNuget)KernelInvocationContext.Current.HandlingKernel) + .ResolvedPackageReferences; + } +} diff --git a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/BroadcastTests.cs b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/BroadcastTests.cs index 000c8f27e..511f5a122 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/BroadcastTests.cs +++ b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/BroadcastTests.cs @@ -1,10 +1,8 @@ using System; -using System.Collections.Generic; using System.Linq; -using Microsoft.Spark.E2ETest.Utils; using Microsoft.Spark.Sql; -using static Microsoft.Spark.Sql.Functions; using Xunit; +using static Microsoft.Spark.Sql.Functions; namespace Microsoft.Spark.E2ETest.IpcTests { diff --git a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/ML/Feature/BucketizerTests.cs b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/ML/Feature/BucketizerTests.cs index 11037bc6d..a075334de 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/ML/Feature/BucketizerTests.cs +++ b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/ML/Feature/BucketizerTests.cs @@ -4,9 +4,9 @@ using System.Collections.Generic; using System.IO; -using Microsoft.Spark.E2ETest.Utils; using Microsoft.Spark.ML.Feature; using Microsoft.Spark.Sql; +using Microsoft.Spark.UnitTest.TestUtils; using Xunit; namespace Microsoft.Spark.E2ETest.IpcTests.ML.Feature diff --git a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/ML/Feature/HashingTFTests.cs b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/ML/Feature/HashingTFTests.cs index 7b6882bea..df459ed7a 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/ML/Feature/HashingTFTests.cs +++ b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/ML/Feature/HashingTFTests.cs @@ -2,13 +2,10 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. -using System; -using System.Collections.Generic; using System.IO; -using System.Linq; -using Microsoft.Spark.E2ETest.Utils; using Microsoft.Spark.ML.Feature; using Microsoft.Spark.Sql; +using Microsoft.Spark.UnitTest.TestUtils; using Xunit; namespace Microsoft.Spark.E2ETest.IpcTests.ML.Feature diff --git a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/ML/Feature/IDFModelTests.cs b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/ML/Feature/IDFModelTests.cs index 623b7322c..202187809 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/ML/Feature/IDFModelTests.cs +++ b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/ML/Feature/IDFModelTests.cs @@ -3,9 +3,9 @@ // See the LICENSE file in the project root for more information. using System.IO; -using Microsoft.Spark.E2ETest.Utils; using Microsoft.Spark.ML.Feature; using Microsoft.Spark.Sql; +using Microsoft.Spark.UnitTest.TestUtils; using Xunit; namespace Microsoft.Spark.E2ETest.IpcTests.ML.Feature diff --git a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/ML/Feature/IDFTests.cs b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/ML/Feature/IDFTests.cs index 3dea63de7..72da97887 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/ML/Feature/IDFTests.cs +++ b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/ML/Feature/IDFTests.cs @@ -3,9 +3,9 @@ // See the LICENSE file in the project root for more information. using System.IO; -using Microsoft.Spark.E2ETest.Utils; using Microsoft.Spark.ML.Feature; using Microsoft.Spark.Sql; +using Microsoft.Spark.UnitTest.TestUtils; using Xunit; namespace Microsoft.Spark.E2ETest.IpcTests.ML.Feature diff --git a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/ML/Feature/TokenizerTests.cs b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/ML/Feature/TokenizerTests.cs index 8cdb4e03a..4b1998f50 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/ML/Feature/TokenizerTests.cs +++ b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/ML/Feature/TokenizerTests.cs @@ -3,9 +3,9 @@ // See the LICENSE file in the project root for more information. using System.IO; -using Microsoft.Spark.E2ETest.Utils; using Microsoft.Spark.ML.Feature; using Microsoft.Spark.Sql; +using Microsoft.Spark.UnitTest.TestUtils; using Xunit; namespace Microsoft.Spark.E2ETest.IpcTests.ML.Feature diff --git a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/ML/Feature/Word2VecModelTests.cs b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/ML/Feature/Word2VecModelTests.cs index 4845e011a..a5227149b 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/ML/Feature/Word2VecModelTests.cs +++ b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/ML/Feature/Word2VecModelTests.cs @@ -2,11 +2,10 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. -using System; using System.IO; -using Microsoft.Spark.E2ETest.Utils; using Microsoft.Spark.ML.Feature; using Microsoft.Spark.Sql; +using Microsoft.Spark.UnitTest.TestUtils; using Xunit; namespace Microsoft.Spark.E2ETest.IpcTests.ML.Feature diff --git a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/ML/Feature/Word2VecTests.cs b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/ML/Feature/Word2VecTests.cs index 30e14ed28..1d5da5335 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/ML/Feature/Word2VecTests.cs +++ b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/ML/Feature/Word2VecTests.cs @@ -3,9 +3,9 @@ // See the LICENSE file in the project root for more information. using System.IO; -using Microsoft.Spark.E2ETest.Utils; using Microsoft.Spark.ML.Feature; using Microsoft.Spark.Sql; +using Microsoft.Spark.UnitTest.TestUtils; using Xunit; namespace Microsoft.Spark.E2ETest.IpcTests.ML.Feature diff --git a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/SparkContextTests.cs b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/SparkContextTests.cs index 07fbf2372..ca752570a 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/SparkContextTests.cs +++ b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/SparkContextTests.cs @@ -3,7 +3,7 @@ // See the LICENSE file in the project root for more information. using System; -using Microsoft.Spark.E2ETest.Utils; +using Microsoft.Spark.UnitTest.TestUtils; using Xunit; namespace Microsoft.Spark.E2ETest.IpcTests diff --git a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/DataFrameTests.cs b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/DataFrameTests.cs index 7359bdb6b..46e899a87 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/DataFrameTests.cs +++ b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/DataFrameTests.cs @@ -3,13 +3,13 @@ // See the LICENSE file in the project root for more information. using System; -using System.Collections.Generic; using System.Linq; using Apache.Arrow; using Microsoft.Data.Analysis; using Microsoft.Spark.E2ETest.Utils; using Microsoft.Spark.Sql; using Microsoft.Spark.Sql.Types; +using Microsoft.Spark.UnitTest.TestUtils; using Xunit; using static Microsoft.Spark.Sql.Functions; using static Microsoft.Spark.UnitTest.TestUtils.ArrowTestUtils; diff --git a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/DataFrameWriterTests.cs b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/DataFrameWriterTests.cs index a7e214160..4f0d06742 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/DataFrameWriterTests.cs +++ b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/DataFrameWriterTests.cs @@ -3,8 +3,8 @@ // See the LICENSE file in the project root for more information. using System.Collections.Generic; -using Microsoft.Spark.E2ETest.Utils; using Microsoft.Spark.Sql; +using Microsoft.Spark.UnitTest.TestUtils; using Xunit; namespace Microsoft.Spark.E2ETest.IpcTests diff --git a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/Streaming/DataStreamWriterTests.cs b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/Streaming/DataStreamWriterTests.cs index 4e87dc6c6..15c2a22a7 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/Streaming/DataStreamWriterTests.cs +++ b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/Streaming/DataStreamWriterTests.cs @@ -10,6 +10,7 @@ using Microsoft.Spark.Sql; using Microsoft.Spark.Sql.Streaming; using Microsoft.Spark.Sql.Types; +using Microsoft.Spark.UnitTest.TestUtils; using Xunit; using static Microsoft.Spark.Sql.Functions; diff --git a/src/csharp/Microsoft.Spark.E2ETest/Microsoft.Spark.E2ETest.csproj b/src/csharp/Microsoft.Spark.E2ETest/Microsoft.Spark.E2ETest.csproj index abe436ec9..e03519853 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/Microsoft.Spark.E2ETest.csproj +++ b/src/csharp/Microsoft.Spark.E2ETest/Microsoft.Spark.E2ETest.csproj @@ -23,6 +23,7 @@ + diff --git a/src/csharp/Microsoft.Spark.E2ETest/SparkFixture.cs b/src/csharp/Microsoft.Spark.E2ETest/SparkFixture.cs index fc8272c5b..6d8dadbac 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/SparkFixture.cs +++ b/src/csharp/Microsoft.Spark.E2ETest/SparkFixture.cs @@ -7,9 +7,9 @@ using System.IO; using System.Reflection; using System.Runtime.InteropServices; -using Microsoft.Spark.E2ETest.Utils; using Microsoft.Spark.Interop.Ipc; using Microsoft.Spark.Sql; +using Microsoft.Spark.UnitTest.TestUtils; using Xunit; namespace Microsoft.Spark.E2ETest diff --git a/src/csharp/Microsoft.Spark.UnitTest/AssemblyLoaderTests.cs b/src/csharp/Microsoft.Spark.UnitTest/AssemblyLoaderTests.cs index da7d05197..f2f0dd30e 100644 --- a/src/csharp/Microsoft.Spark.UnitTest/AssemblyLoaderTests.cs +++ b/src/csharp/Microsoft.Spark.UnitTest/AssemblyLoaderTests.cs @@ -9,17 +9,19 @@ namespace Microsoft.Spark.UnitTest { + [Collection("Spark Unit Tests")] public class AssemblyLoaderTests { [Fact] public void TestAssemblySearchPathResolver() { + string sparkFilesDir = SparkFiles.GetRootDirectory(); string curDir = Directory.GetCurrentDirectory(); string appDir = AppDomain.CurrentDomain.BaseDirectory; // Test the default scenario. string[] searchPaths = AssemblySearchPathResolver.GetAssemblySearchPaths(); - Assert.Equal(new[] { curDir, appDir }, searchPaths); + Assert.Equal(new[] { sparkFilesDir, curDir, appDir }, searchPaths); // Test the case where DOTNET_ASSEMBLY_SEARCH_PATHS is defined. char sep = Path.PathSeparator; @@ -34,6 +36,7 @@ public void TestAssemblySearchPathResolver() "mydir2", Path.Combine(curDir, $".{sep}mydir3"), Path.Combine(curDir, $".{sep}mydir4"), + sparkFilesDir, curDir, appDir }, searchPaths); diff --git a/src/csharp/Microsoft.Spark.UnitTest/CollectionUtilsTests.cs b/src/csharp/Microsoft.Spark.UnitTest/CollectionUtilsTests.cs new file mode 100644 index 000000000..9a723b2b5 --- /dev/null +++ b/src/csharp/Microsoft.Spark.UnitTest/CollectionUtilsTests.cs @@ -0,0 +1,26 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using Microsoft.Spark.Utils; +using Xunit; + +namespace Microsoft.Spark.UnitTest +{ + public class CollectionUtilsTests + { + [Fact] + public void TestArrayEquals() + { + Assert.False(CollectionUtils.ArrayEquals(new int[] { 1 }, null)); + Assert.False(CollectionUtils.ArrayEquals(null, new int[] { 1 })); + Assert.False(CollectionUtils.ArrayEquals(new int[] { }, new int[] { 1 })); + Assert.False(CollectionUtils.ArrayEquals(new int[] { 1 }, new int[] { })); + Assert.False(CollectionUtils.ArrayEquals(new int[] { 1 }, new int[] { 1, 2 })); + Assert.False(CollectionUtils.ArrayEquals(new int[] { 1 }, new int[] { 2 })); + + Assert.True(CollectionUtils.ArrayEquals(null, null)); + Assert.True(CollectionUtils.ArrayEquals(new int[] { 1 }, new int[] { 1 })); + } + } +} diff --git a/src/csharp/Microsoft.Spark.UnitTest/CommandSerDeTests.cs b/src/csharp/Microsoft.Spark.UnitTest/CommandSerDeTests.cs index 557bdcc72..820d7dea0 100644 --- a/src/csharp/Microsoft.Spark.UnitTest/CommandSerDeTests.cs +++ b/src/csharp/Microsoft.Spark.UnitTest/CommandSerDeTests.cs @@ -14,6 +14,7 @@ namespace Microsoft.Spark.UnitTest { + [Collection("Spark Unit Tests")] public class CommandSerDeTests { [Fact] diff --git a/src/csharp/Microsoft.Spark.UnitTest/DependencyProviderUtilsTests.cs b/src/csharp/Microsoft.Spark.UnitTest/DependencyProviderUtilsTests.cs new file mode 100644 index 000000000..ad01e3724 --- /dev/null +++ b/src/csharp/Microsoft.Spark.UnitTest/DependencyProviderUtilsTests.cs @@ -0,0 +1,154 @@ +using System.IO; +using System.Linq; +using Microsoft.Spark.UnitTest.TestUtils; +using Microsoft.Spark.Utils; +using Xunit; + +namespace Microsoft.Spark.UnitTest +{ + public class DependencyProviderUtilsTests + { + [Fact] + public void TestNuGetMetadataEquals() + { + string expectedFileName = "package.name.1.0.0.nupkg"; + string expectedPackageName = "package.name"; + string expectedPackageVersion = "1.0.0"; + + var nugetMetadata = new DependencyProviderUtils.NuGetMetadata + { + FileName = expectedFileName, + PackageName = expectedPackageName, + PackageVersion = expectedPackageVersion + }; + + Assert.False(nugetMetadata.Equals(null)); + Assert.False(nugetMetadata.Equals(new DependencyProviderUtils.NuGetMetadata())); + Assert.False(nugetMetadata.Equals(new DependencyProviderUtils.NuGetMetadata + { + FileName = "", + PackageName = expectedPackageName, + PackageVersion = expectedPackageVersion + })); + Assert.False(nugetMetadata.Equals(new DependencyProviderUtils.NuGetMetadata + { + FileName = expectedFileName, + PackageName = "", + PackageVersion = expectedPackageVersion + })); + Assert.False(nugetMetadata.Equals(new DependencyProviderUtils.NuGetMetadata + { + FileName = expectedFileName, + PackageName = expectedPackageName, + PackageVersion = "" + })); + + Assert.True(nugetMetadata.Equals(new DependencyProviderUtils.NuGetMetadata + { + FileName = expectedFileName, + PackageName = expectedPackageName, + PackageVersion = expectedPackageVersion + })); + } + + [Fact] + public void TestMetadataEquals() + { + string expectedAssemblyProbingPath = "/assembly/probe/path"; + string expectedNativeProbingPath = "/native/probe/path"; + var expectedNugetMetadata = new DependencyProviderUtils.NuGetMetadata + { + FileName = "package.name.1.0.0.nupkg", + PackageName = "package.name", + PackageVersion = "1.0.0" + }; + + var metadata = new DependencyProviderUtils.Metadata + { + AssemblyProbingPaths = new string[] { expectedAssemblyProbingPath }, + NativeProbingPaths = new string[] { expectedNativeProbingPath }, + NuGets = new DependencyProviderUtils.NuGetMetadata[] { expectedNugetMetadata } + }; + + Assert.False(metadata.Equals(null)); + Assert.False(metadata.Equals(new DependencyProviderUtils.Metadata())); + Assert.False(metadata.Equals(new DependencyProviderUtils.Metadata + { + AssemblyProbingPaths = new string[] { expectedAssemblyProbingPath }, + NativeProbingPaths = new string[] { expectedNativeProbingPath, "" }, + NuGets = new DependencyProviderUtils.NuGetMetadata[] { expectedNugetMetadata } + })); + Assert.False(metadata.Equals(new DependencyProviderUtils.Metadata + { + AssemblyProbingPaths = new string[] { expectedAssemblyProbingPath }, + NativeProbingPaths = new string[] { expectedNativeProbingPath }, + NuGets = new DependencyProviderUtils.NuGetMetadata[] { expectedNugetMetadata, null } + })); + Assert.False(metadata.Equals(new DependencyProviderUtils.Metadata + { + AssemblyProbingPaths = new string[] { expectedAssemblyProbingPath, "" }, + NativeProbingPaths = new string[] { expectedNativeProbingPath }, + NuGets = new DependencyProviderUtils.NuGetMetadata[] { expectedNugetMetadata } + })); + + Assert.True(metadata.Equals(new DependencyProviderUtils.Metadata + { + AssemblyProbingPaths = new string[] { expectedAssemblyProbingPath }, + NativeProbingPaths = new string[] { expectedNativeProbingPath }, + NuGets = new DependencyProviderUtils.NuGetMetadata[] { expectedNugetMetadata } + })); + } + + [Fact] + public void TestMetadataSerDe() + { + using var tempDir = new TemporaryDirectory(); + var metadata = new DependencyProviderUtils.Metadata + { + AssemblyProbingPaths = new string[] { "/assembly/probe/path" }, + NativeProbingPaths = new string[] { "/native/probe/path" }, + NuGets = new DependencyProviderUtils.NuGetMetadata[] + { + new DependencyProviderUtils.NuGetMetadata + { + FileName = "package.name.1.0.0.nupkg", + PackageName = "package.name", + PackageVersion = "1.0.0" + } + } + }; + + string serializedFilePath = Path.Combine(tempDir.Path, "serializedMetadata"); + metadata.Serialize(serializedFilePath); + + DependencyProviderUtils.Metadata deserializedMetadata = + DependencyProviderUtils.Metadata.Deserialize(serializedFilePath); + + Assert.True(metadata.Equals(deserializedMetadata)); + } + + [Fact] + public void TestFileNames() + { + using var tempDir = new TemporaryDirectory(); + foreach (long num in Enumerable.Range(0, 3).Select(x => System.Math.Pow(10, x))) + { + string filePath = + Path.Combine(tempDir.Path, DependencyProviderUtils.CreateFileName(num)); + File.Create(filePath).Dispose(); + } + + var expectedFiles = new string[] + { + "dependencyProviderMetadata_0000000000000000001", + "dependencyProviderMetadata_0000000000000000010", + "dependencyProviderMetadata_0000000000000000100", + }; + IOrderedEnumerable actualFiles = DependencyProviderUtils + .GetMetadataFiles(tempDir.Path) + .Select(f => Path.GetFileName(f)) + .OrderBy(s => s); + Assert.True(expectedFiles.SequenceEqual(actualFiles)); + } + } +} diff --git a/src/csharp/Microsoft.Spark.UnitTest/SparkFixture.cs b/src/csharp/Microsoft.Spark.UnitTest/SparkFixture.cs new file mode 100644 index 000000000..02f2c8b3b --- /dev/null +++ b/src/csharp/Microsoft.Spark.UnitTest/SparkFixture.cs @@ -0,0 +1,109 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using System.IO; +using Microsoft.Spark.Interop; +using Microsoft.Spark.Interop.Ipc; +using Moq; +using Xunit; + +namespace Microsoft.Spark.UnitTest +{ + public sealed class SparkFixture : IDisposable + { + internal Mock MockJvm { get; private set; } + + public SparkFixture() + { + SetupBasicMockJvm(); + + // Unit tests may contain calls that hit the AssemblyLoader. + // One of the AssemblyLoader assembly search paths is populated + // using SparkFiles. Unless we are running in an E2E scenario and + // on the Worker, SparkFiles will attempt to call the JVM. Because + // this is a (non E2E) Unit test, it is necessary to mock this call. + SetupSparkFiles(); + + var mockJvmBridgeFactory = new Mock(); + mockJvmBridgeFactory + .Setup(m => m.Create(It.IsAny())) + .Returns(MockJvm.Object); + + SparkEnvironment.JvmBridgeFactory = mockJvmBridgeFactory.Object; + } + + public void Dispose() + { + } + + private void SetupBasicMockJvm() + { + MockJvm = new Mock(); + + MockJvm + .Setup(m => m.CallStaticJavaMethod( + It.IsAny(), + It.IsAny(), + It.IsAny())) + .Returns( + new JvmObjectReference("result", MockJvm.Object)); + MockJvm + .Setup(m => m.CallStaticJavaMethod( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny())) + .Returns( + new JvmObjectReference("result", MockJvm.Object)); + MockJvm + .Setup(m => m.CallStaticJavaMethod( + It.IsAny(), + It.IsAny(), + It.IsAny())) + .Returns( + new JvmObjectReference("result", MockJvm.Object)); + + MockJvm + .Setup(m => m.CallNonStaticJavaMethod( + It.IsAny(), + It.IsAny(), + It.IsAny())) + .Returns( + new JvmObjectReference("result", MockJvm.Object)); + MockJvm + .Setup(m => m.CallNonStaticJavaMethod( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny())) + .Returns( + new JvmObjectReference("result", MockJvm.Object)); + MockJvm + .Setup(m => m.CallNonStaticJavaMethod( + It.IsAny(), + It.IsAny(), + It.IsAny())) + .Returns( + new JvmObjectReference("result", MockJvm.Object)); + } + + private void SetupSparkFiles() + { + MockJvm + .Setup(m => m.CallStaticJavaMethod( + "org.apache.spark.SparkFiles", + "getRootDirectory")) + .Returns("SparkFilesRootDirectory"); + } + } + + [CollectionDefinition("Spark Unit Tests")] + public class SparkCollection : ICollectionFixture + { + // This class has no code, and is never created. Its purpose is simply + // to be the place to apply [CollectionDefinition] and all the + // ICollectionFixture<> interfaces. + } +} diff --git a/src/csharp/Microsoft.Spark.UnitTest/Sql/ColumnTests.cs b/src/csharp/Microsoft.Spark.UnitTest/Sql/ColumnTests.cs index f88d53800..adffd9312 100644 --- a/src/csharp/Microsoft.Spark.UnitTest/Sql/ColumnTests.cs +++ b/src/csharp/Microsoft.Spark.UnitTest/Sql/ColumnTests.cs @@ -12,71 +12,12 @@ namespace Microsoft.Spark.UnitTest { - public class ColumnTestsFixture : IDisposable - { - internal Mock MockJvm { get; } - - public ColumnTestsFixture() - { - MockJvm = new Mock(); - - MockJvm - .Setup(m => m.CallStaticJavaMethod( - It.IsAny(), - It.IsAny(), - It.IsAny())) - .Returns( - new JvmObjectReference("result", MockJvm.Object)); - MockJvm - .Setup(m => m.CallStaticJavaMethod( - It.IsAny(), - It.IsAny(), - It.IsAny(), - It.IsAny())) - .Returns( - new JvmObjectReference("result", MockJvm.Object)); - MockJvm - .Setup(m => m.CallStaticJavaMethod( - It.IsAny(), - It.IsAny(), - It.IsAny())) - .Returns( - new JvmObjectReference("result", MockJvm.Object)); - - MockJvm - .Setup(m => m.CallNonStaticJavaMethod( - It.IsAny(), - It.IsAny(), - It.IsAny())) - .Returns( - new JvmObjectReference("result", MockJvm.Object)); - MockJvm - .Setup(m => m.CallNonStaticJavaMethod( - It.IsAny(), - It.IsAny(), - It.IsAny(), - It.IsAny())) - .Returns( - new JvmObjectReference("result", MockJvm.Object)); - MockJvm - .Setup(m => m.CallNonStaticJavaMethod( - It.IsAny(), - It.IsAny(), - It.IsAny())) - .Returns( - new JvmObjectReference("result", MockJvm.Object)); - } - - public void Dispose() - { - } - } - - public class ColumnTests : IClassFixture + [Collection("Spark Unit Tests")] + public class ColumnTests { private readonly Mock _mockJvm; - public ColumnTests(ColumnTestsFixture fixture) + public ColumnTests(SparkFixture fixture) { _mockJvm = fixture.MockJvm; } diff --git a/src/csharp/Microsoft.Spark.E2ETest/Utils/TemporaryDirectory.cs b/src/csharp/Microsoft.Spark.UnitTest/TestUtils/TemporaryDirectory.cs similarity index 87% rename from src/csharp/Microsoft.Spark.E2ETest/Utils/TemporaryDirectory.cs rename to src/csharp/Microsoft.Spark.UnitTest/TestUtils/TemporaryDirectory.cs index 556b78f99..98d3c18f3 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/Utils/TemporaryDirectory.cs +++ b/src/csharp/Microsoft.Spark.UnitTest/TestUtils/TemporaryDirectory.cs @@ -1,63 +1,63 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. -// See the LICENSE file in the project root for more information. - -using System; -using System.IO; - -namespace Microsoft.Spark.E2ETest.Utils -{ - /// - /// Creates a temporary folder that is automatically cleaned up when disposed. - /// - internal sealed class TemporaryDirectory : IDisposable - { - private bool disposed = false; - - /// - /// Path to temporary folder. - /// - public string Path { get; } - - public TemporaryDirectory() - { - Path = System.IO.Path.Combine(System.IO.Path.GetTempPath(), Guid.NewGuid().ToString()); - Cleanup(); - Directory.CreateDirectory(Path); - Path = $"{Path}{System.IO.Path.DirectorySeparatorChar}"; - } - - public void Dispose() - { - Dispose(true); - GC.SuppressFinalize(this); - } - - private void Cleanup() - { - if (File.Exists(Path)) - { - File.Delete(Path); - } - else if (Directory.Exists(Path)) - { - Directory.Delete(Path, true); - } - } - - private void Dispose(bool disposing) - { - if (disposed) - { - return; - } - - if (disposing) - { - Cleanup(); - } - - disposed = true; - } - } -} +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using System.IO; + +namespace Microsoft.Spark.UnitTest.TestUtils +{ + /// + /// Creates a temporary folder that is automatically cleaned up when disposed. + /// + internal sealed class TemporaryDirectory : IDisposable + { + private bool _disposed = false; + + /// + /// Path to temporary folder. + /// + public string Path { get; } + + public TemporaryDirectory() + { + Path = System.IO.Path.Combine(System.IO.Path.GetTempPath(), Guid.NewGuid().ToString()); + Cleanup(); + Directory.CreateDirectory(Path); + Path = $"{Path}{System.IO.Path.DirectorySeparatorChar}"; + } + + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + private void Cleanup() + { + if (File.Exists(Path)) + { + File.Delete(Path); + } + else if (Directory.Exists(Path)) + { + Directory.Delete(Path, true); + } + } + + private void Dispose(bool disposing) + { + if (_disposed) + { + return; + } + + if (disposing) + { + Cleanup(); + } + + _disposed = true; + } + } +} diff --git a/src/csharp/Microsoft.Spark.UnitTest/UdfSerDeTests.cs b/src/csharp/Microsoft.Spark.UnitTest/UdfSerDeTests.cs index 6928150d0..bf4ef29f4 100644 --- a/src/csharp/Microsoft.Spark.UnitTest/UdfSerDeTests.cs +++ b/src/csharp/Microsoft.Spark.UnitTest/UdfSerDeTests.cs @@ -11,6 +11,7 @@ namespace Microsoft.Spark.UnitTest { + [Collection("Spark Unit Tests")] public class UdfSerDeTests { [Serializable] diff --git a/src/csharp/Microsoft.Spark.Worker.UnitTest/DaemonWorkerTests.cs b/src/csharp/Microsoft.Spark.Worker.UnitTest/DaemonWorkerTests.cs index 0490660e3..5fac38035 100644 --- a/src/csharp/Microsoft.Spark.Worker.UnitTest/DaemonWorkerTests.cs +++ b/src/csharp/Microsoft.Spark.Worker.UnitTest/DaemonWorkerTests.cs @@ -15,6 +15,7 @@ namespace Microsoft.Spark.Worker.UnitTest { + [Collection("Spark Unit Tests")] public class DaemonWorkerTests { [Fact] diff --git a/src/csharp/Microsoft.Spark.Worker.UnitTest/DependencyProviderTests.cs b/src/csharp/Microsoft.Spark.Worker.UnitTest/DependencyProviderTests.cs new file mode 100644 index 000000000..6643ba2ab --- /dev/null +++ b/src/csharp/Microsoft.Spark.Worker.UnitTest/DependencyProviderTests.cs @@ -0,0 +1,64 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.IO; +using System.IO.Compression; +using Microsoft.Spark.UnitTest.TestUtils; +using Microsoft.Spark.Utils; +using Microsoft.Spark.Worker.Utils; +using Xunit; + +namespace Microsoft.Spark.Worker.UnitTest +{ + [Collection("Spark Unit Tests")] + public class DependencyProviderTests + { + [Fact] + public void TestsUnpackPackages() + { + string packageFileName = "package.name.1.0.0.nupkg"; + string packageName = "package.name"; + string packageVersion = "1.0.0"; + + using var emptyFileDir = new TemporaryDirectory(); + string emptyFileName = "emptyfile"; + File.Create(Path.Combine(emptyFileDir.Path, emptyFileName)).Dispose(); + + using var nupkgDir = new TemporaryDirectory(); + ZipFile.CreateFromDirectory( + emptyFileDir.Path, + Path.Combine(nupkgDir.Path, packageFileName)); + + var metadata = new DependencyProviderUtils.Metadata + { + AssemblyProbingPaths = new string[] { "/assembly/probe/path" }, + NativeProbingPaths = new string[] { "/native/probe/path" }, + NuGets = new DependencyProviderUtils.NuGetMetadata[] + { + new DependencyProviderUtils.NuGetMetadata + { + FileName = packageFileName, + PackageName = packageName, + PackageVersion = packageVersion + } + } + }; + + using var unpackDir = new TemporaryDirectory(); + string metadataFilePath = + Path.Combine(nupkgDir.Path, DependencyProviderUtils.CreateFileName(1)); + metadata.Serialize(metadataFilePath); + + // Files located in nupkgDir + // nuget: package.name.1.0.0.nupkg + // metadata file: dependencyProviderMetadata_00000000000000000001 + var dependencyProvider = + new DependencyProvider(metadataFilePath, nupkgDir.Path, unpackDir.Path); + string expectedPackagePath = + Path.Combine(unpackDir.Path, ".nuget", "packages", packageName, packageVersion); + string expectedFilePath = Path.Combine(expectedPackagePath, emptyFileName); + Assert.True(File.Exists(expectedFilePath)); + } + } +} diff --git a/src/csharp/Microsoft.Spark.Worker.UnitTest/Microsoft.Spark.Worker.UnitTest.csproj b/src/csharp/Microsoft.Spark.Worker.UnitTest/Microsoft.Spark.Worker.UnitTest.csproj index 1b68d2e45..1371d5d1b 100644 --- a/src/csharp/Microsoft.Spark.Worker.UnitTest/Microsoft.Spark.Worker.UnitTest.csproj +++ b/src/csharp/Microsoft.Spark.Worker.UnitTest/Microsoft.Spark.Worker.UnitTest.csproj @@ -4,13 +4,19 @@ netcoreapp3.1 + + + + + + diff --git a/src/csharp/Microsoft.Spark.Worker.UnitTest/PayloadProcessorTests.cs b/src/csharp/Microsoft.Spark.Worker.UnitTest/PayloadProcessorTests.cs index c586e9dc2..24370abcb 100644 --- a/src/csharp/Microsoft.Spark.Worker.UnitTest/PayloadProcessorTests.cs +++ b/src/csharp/Microsoft.Spark.Worker.UnitTest/PayloadProcessorTests.cs @@ -14,6 +14,7 @@ namespace Microsoft.Spark.Worker.UnitTest { + [Collection("Spark Unit Tests")] public class PayloadProcessorTests { [Theory] diff --git a/src/csharp/Microsoft.Spark.Worker/Microsoft.Spark.Worker.csproj b/src/csharp/Microsoft.Spark.Worker/Microsoft.Spark.Worker.csproj index cff20b084..f18f41963 100644 --- a/src/csharp/Microsoft.Spark.Worker/Microsoft.Spark.Worker.csproj +++ b/src/csharp/Microsoft.Spark.Worker/Microsoft.Spark.Worker.csproj @@ -13,6 +13,7 @@ + diff --git a/src/csharp/Microsoft.Spark.Worker/Processor/PayloadProcessor.cs b/src/csharp/Microsoft.Spark.Worker/Processor/PayloadProcessor.cs index 2acc89933..58dd588aa 100644 --- a/src/csharp/Microsoft.Spark.Worker/Processor/PayloadProcessor.cs +++ b/src/csharp/Microsoft.Spark.Worker/Processor/PayloadProcessor.cs @@ -7,12 +7,7 @@ using System.Collections.Generic; using System.IO; using Microsoft.Spark.Interop.Ipc; -using Microsoft.Spark.Utils; - -#if NETCOREAPP -using System.Reflection; -using System.Runtime.Loader; -#endif +using Microsoft.Spark.Worker.Utils; namespace Microsoft.Spark.Worker.Processor { @@ -28,20 +23,6 @@ internal PayloadProcessor(Version version) _version = version; } - static PayloadProcessor() - { -#if NETCOREAPP - AssemblyLoader.LoadFromFile = AssemblyLoadContext.Default.LoadFromAssemblyPath; - AssemblyLoader.LoadFromName = (asmName) => - AssemblyLoadContext.Default.LoadFromAssemblyName(new AssemblyName(asmName)); - AssemblyLoadContext.Default.Resolving += (assemblyLoadContext, assemblyName) => - AssemblyLoader.ResolveAssembly(assemblyName.FullName); -#else - AppDomain.CurrentDomain.AssemblyResolve += (object sender, ResolveEventArgs args) => - AssemblyLoader.ResolveAssembly(args.Name); -#endif - } - /// /// Processes the given stream to construct a Payload object. /// @@ -79,8 +60,15 @@ internal Payload Process(Stream stream) TaskContextHolder.Set(payload.TaskContext); payload.SparkFilesDir = SerDe.ReadString(stream); + SparkFiles.SetRootDirectory(payload.SparkFilesDir); + + // Register additional assembly handlers after SparkFilesDir has been set + // and before any deserialization occurs. BroadcastVariableProcessor may + // deserialize objects from assemblies that are not currently loaded within + // our current context. + AssemblyLoaderHelper.RegisterAssemblyHandler(); - if (Utils.SettingUtils.IsDatabricks) + if (SettingUtils.IsDatabricks) { SerDe.ReadString(stream); SerDe.ReadString(stream); diff --git a/src/csharp/Microsoft.Spark.Worker/Utils/AssemblyLoaderHelper.cs b/src/csharp/Microsoft.Spark.Worker/Utils/AssemblyLoaderHelper.cs new file mode 100644 index 000000000..1443165bc --- /dev/null +++ b/src/csharp/Microsoft.Spark.Worker/Utils/AssemblyLoaderHelper.cs @@ -0,0 +1,93 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using System.Collections.Concurrent; +using System.IO; +using Microsoft.Spark.Services; +using Microsoft.Spark.Utils; + +#if NETCOREAPP +using System.Runtime.Loader; +#endif + +namespace Microsoft.Spark.Worker.Utils +{ + internal static class AssemblyLoaderHelper + { + private static readonly ILoggerService s_logger = + LoggerServiceFactory.GetLogger(typeof(AssemblyLoaderHelper)); + + // A mapping between a metadata file's path to its respective DependencyProvider. + private static readonly ConcurrentDictionary> + s_dependencyProviders = new ConcurrentDictionary>(); + + private static readonly bool s_runningREPL = + EnvironmentUtils.GetEnvironmentVariableAsBool(Constants.RunningREPLEnvVar); + + /// + /// Register the AssemblyLoader.ResolveAssembly handler to handle the + /// event when assemblies fail to load in the current assembly load context. + /// + static AssemblyLoaderHelper() + { +#if NETCOREAPP + AssemblyLoader.LoadFromFile = AssemblyLoadContext.Default.LoadFromAssemblyPath; + AssemblyLoadContext.Default.Resolving += (assemblyLoadContext, assemblyName) => + AssemblyLoader.ResolveAssembly(assemblyName.FullName); +#else + AppDomain.CurrentDomain.AssemblyResolve += (object sender, ResolveEventArgs args) => + AssemblyLoader.ResolveAssembly(args.Name); +#endif + } + + /// + /// In a dotnet-interactive REPL session (driver), nuget dependencies will be + /// systematically added using . + /// + /// These files include: + /// - "{packagename}.{version}.nupkg" + /// The nuget packages + /// - + /// Serialized object. + /// + /// On the Worker, in order to resolve the nuget dependencies referenced by + /// the dotnet-interactive session, we instantiate a + /// . + /// This provider will register an event handler to the Assembly Load Resolving event. + /// By using , we can access the + /// required files added to the . + /// + internal static void RegisterAssemblyHandler() + { + if (!s_runningREPL) + { + return; + } + + string sparkFilesPath = SparkFiles.GetRootDirectory(); + string[] metadataFiles = + DependencyProviderUtils.GetMetadataFiles(sparkFilesPath); + foreach (string metdatafile in metadataFiles) + { + // The execution of the delegate passed to GetOrAdd is not guaranteed to run once. + // Multiple Lazy objects may be created, but only one of them will be added to the + // ConcurrentDictionary. The Lazy value is retrieved to materialize the + // DependencyProvider object if it hasn't already been created. + Lazy dependecyProvider = s_dependencyProviders.GetOrAdd( + metdatafile, + mdf => new Lazy( + () => + { + s_logger.LogInfo($"Creating {nameof(DependencyProvider)} using {mdf}"); + return new DependencyProvider( + mdf, + sparkFilesPath, + Directory.GetCurrentDirectory()); + })); + _ = dependecyProvider.Value; + } + } + } +} diff --git a/src/csharp/Microsoft.Spark.Worker/Utils/DependencyProvider.cs b/src/csharp/Microsoft.Spark.Worker/Utils/DependencyProvider.cs new file mode 100644 index 000000000..d15bda3a1 --- /dev/null +++ b/src/csharp/Microsoft.Spark.Worker/Utils/DependencyProvider.cs @@ -0,0 +1,87 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.IO.Compression; +using Microsoft.Spark.Utils; +using DepManager = Microsoft.DotNet.DependencyManager; + +namespace Microsoft.Spark.Worker.Utils +{ + /// + /// sets up and creates a new + /// . + /// + /// The following steps outline the process: + /// - Deserializes a . + /// - Uses to unpack required + /// nugets. + /// - Uses and + /// to construct + /// a . + /// + internal class DependencyProvider : IDisposable + { + private readonly DepManager.DependencyProvider _dependencyProvider; + + internal DependencyProvider(string metadataFilePath, string srcPath, string dstPath) + { + DependencyProviderUtils.Metadata metadata = + DependencyProviderUtils.Metadata.Deserialize(metadataFilePath); + + string unpackPath = Path.Combine(dstPath, ".nuget", "packages"); + Directory.CreateDirectory(unpackPath); + + UnpackPackages(srcPath, unpackPath, metadata.NuGets); + + _dependencyProvider = CreateDependencyProvider(unpackPath, metadata); + } + + public void Dispose() + { + (_dependencyProvider as IDisposable)?.Dispose(); + } + + private DepManager.DependencyProvider CreateDependencyProvider( + string basePath, + DependencyProviderUtils.Metadata metadata) + { + IEnumerable AssemblyProbingPaths() + { + foreach (string dependency in metadata.AssemblyProbingPaths) + { + yield return Path.Combine(basePath, dependency); + } + } + + IEnumerable NativeProbingRoots() + { + foreach (string dependency in metadata.NativeProbingPaths) + { + yield return Path.Combine(basePath, dependency); + } + } + + return new DepManager.DependencyProvider( + AssemblyProbingPaths, + NativeProbingRoots); + } + + private void UnpackPackages( + string src, + string dst, + DependencyProviderUtils.NuGetMetadata[] nugetMetadata) + { + foreach (DependencyProviderUtils.NuGetMetadata metadata in nugetMetadata) + { + var packageDirectory = new DirectoryInfo( + Path.Combine(dst, metadata.PackageName.ToLower(), metadata.PackageVersion)); + if (!packageDirectory.Exists) + { + ZipFile.ExtractToDirectory( + Path.Combine(src, metadata.FileName), + packageDirectory.FullName); + } + } + } + } +} diff --git a/src/csharp/Microsoft.Spark.sln b/src/csharp/Microsoft.Spark.sln index 49eac3fc7..73047bff3 100644 --- a/src/csharp/Microsoft.Spark.sln +++ b/src/csharp/Microsoft.Spark.sln @@ -35,6 +35,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Spark.Extensions. EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Spark.Extensions.Azure.Synapse.Analytics", "Extensions\Microsoft.Spark.Extensions.Azure.Synapse.Analytics\Microsoft.Spark.Extensions.Azure.Synapse.Analytics.csproj", "{47652C7D-B076-4FD9-98AC-959E38BE18E3}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Spark.Extensions.DotNet.Interactive", "Extensions\Microsoft.Spark.Extensions.DotNet.Interactive\Microsoft.Spark.Extensions.DotNet.Interactive.csproj", "{9C32014D-8C0C-40F1-9ABA-C3BF19687E5C}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.Spark.Extensions.DotNet.Interactive.UnitTest", "Extensions\Microsoft.Spark.Extensions.DotNet.Interactive.UnitTest\Microsoft.Spark.Extensions.DotNet.Interactive.UnitTest.csproj", "{7BDE09ED-04B3-41B2-A466-3D6F7225291E}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -89,6 +93,14 @@ Global {47652C7D-B076-4FD9-98AC-959E38BE18E3}.Debug|Any CPU.Build.0 = Debug|Any CPU {47652C7D-B076-4FD9-98AC-959E38BE18E3}.Release|Any CPU.ActiveCfg = Release|Any CPU {47652C7D-B076-4FD9-98AC-959E38BE18E3}.Release|Any CPU.Build.0 = Release|Any CPU + {9C32014D-8C0C-40F1-9ABA-C3BF19687E5C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {9C32014D-8C0C-40F1-9ABA-C3BF19687E5C}.Debug|Any CPU.Build.0 = Debug|Any CPU + {9C32014D-8C0C-40F1-9ABA-C3BF19687E5C}.Release|Any CPU.ActiveCfg = Release|Any CPU + {9C32014D-8C0C-40F1-9ABA-C3BF19687E5C}.Release|Any CPU.Build.0 = Release|Any CPU + {7BDE09ED-04B3-41B2-A466-3D6F7225291E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {7BDE09ED-04B3-41B2-A466-3D6F7225291E}.Debug|Any CPU.Build.0 = Debug|Any CPU + {7BDE09ED-04B3-41B2-A466-3D6F7225291E}.Release|Any CPU.ActiveCfg = Release|Any CPU + {7BDE09ED-04B3-41B2-A466-3D6F7225291E}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -99,6 +111,8 @@ Global {2048446B-45AB-4304-B230-50EDF6E8E6A4} = {71A19F75-8279-40AB-BEA0-7D4B153FC416} {206E16CA-ED59-4F5E-8EA1-9BB7BEEACB63} = {71A19F75-8279-40AB-BEA0-7D4B153FC416} {47652C7D-B076-4FD9-98AC-959E38BE18E3} = {71A19F75-8279-40AB-BEA0-7D4B153FC416} + {9C32014D-8C0C-40F1-9ABA-C3BF19687E5C} = {71A19F75-8279-40AB-BEA0-7D4B153FC416} + {7BDE09ED-04B3-41B2-A466-3D6F7225291E} = {71A19F75-8279-40AB-BEA0-7D4B153FC416} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {FD15FFDB-EA1B-436F-841D-3386DDF94538} diff --git a/src/csharp/Microsoft.Spark/Constants.cs b/src/csharp/Microsoft.Spark/Constants.cs new file mode 100644 index 000000000..c346aadd3 --- /dev/null +++ b/src/csharp/Microsoft.Spark/Constants.cs @@ -0,0 +1,11 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +namespace Microsoft.Spark +{ + internal class Constants + { + internal const string RunningREPLEnvVar = "DOTNET_SPARK_RUNNING_REPL"; + } +} diff --git a/src/csharp/Microsoft.Spark/Interop/Ipc/IJvmBridgeFactory.cs b/src/csharp/Microsoft.Spark/Interop/Ipc/IJvmBridgeFactory.cs new file mode 100644 index 000000000..428565527 --- /dev/null +++ b/src/csharp/Microsoft.Spark/Interop/Ipc/IJvmBridgeFactory.cs @@ -0,0 +1,11 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +namespace Microsoft.Spark.Interop.Ipc +{ + internal interface IJvmBridgeFactory + { + IJvmBridge Create(int portNumber); + } +} diff --git a/src/csharp/Microsoft.Spark/Interop/Ipc/JvmBridgeFactory.cs b/src/csharp/Microsoft.Spark/Interop/Ipc/JvmBridgeFactory.cs new file mode 100644 index 000000000..9c9f4ca43 --- /dev/null +++ b/src/csharp/Microsoft.Spark/Interop/Ipc/JvmBridgeFactory.cs @@ -0,0 +1,14 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +namespace Microsoft.Spark.Interop.Ipc +{ + internal class JvmBridgeFactory : IJvmBridgeFactory + { + public IJvmBridge Create(int portNumber) + { + return new JvmBridge(portNumber); + } + } +} diff --git a/src/csharp/Microsoft.Spark/Interop/SparkEnvironment.cs b/src/csharp/Microsoft.Spark/Interop/SparkEnvironment.cs index 2d19fd185..f2523d065 100644 --- a/src/csharp/Microsoft.Spark/Interop/SparkEnvironment.cs +++ b/src/csharp/Microsoft.Spark/Interop/SparkEnvironment.cs @@ -3,7 +3,6 @@ // See the LICENSE file in the project root for more information. using System; -using System.Dynamic; using Microsoft.Spark.Interop.Ipc; using Microsoft.Spark.Services; @@ -46,17 +45,26 @@ internal static Version SparkVersion } } + private static IJvmBridgeFactory s_jvmBridgeFactory; + internal static IJvmBridgeFactory JvmBridgeFactory + { + get + { + return s_jvmBridgeFactory ??= new JvmBridgeFactory(); + } + set + { + s_jvmBridgeFactory = value; + } + } + private static IJvmBridge s_jvmBridge; internal static IJvmBridge JvmBridge { get { - if (s_jvmBridge == null) - { - s_jvmBridge = new JvmBridge(ConfigurationService.GetBackendPortNumber()); - } - - return s_jvmBridge; + return s_jvmBridge ??= + JvmBridgeFactory.Create(ConfigurationService.GetBackendPortNumber()); } set { @@ -69,8 +77,7 @@ internal static IConfigurationService ConfigurationService { get { - return s_configurationService ?? - (s_configurationService = new ConfigurationService()); + return s_configurationService ??= new ConfigurationService(); } set { diff --git a/src/csharp/Microsoft.Spark/ML/Feature/Bucketizer.cs b/src/csharp/Microsoft.Spark/ML/Feature/Bucketizer.cs index 823f13c1a..924c8b362 100644 --- a/src/csharp/Microsoft.Spark/ML/Feature/Bucketizer.cs +++ b/src/csharp/Microsoft.Spark/ML/Feature/Bucketizer.cs @@ -151,7 +151,7 @@ public Bucketizer SetInputCols(IEnumerable value) /// Gets the name of the column the output data will be written to. This is set by /// SetInputCol /// - // string, the output column + /// string, the output column public string GetOutputCol() { return (string)_jvmObject.Invoke("getOutputCol"); diff --git a/src/csharp/Microsoft.Spark/Microsoft.Spark.csproj b/src/csharp/Microsoft.Spark/Microsoft.Spark.csproj index 643e1130c..050a43493 100644 --- a/src/csharp/Microsoft.Spark/Microsoft.Spark.csproj +++ b/src/csharp/Microsoft.Spark/Microsoft.Spark.csproj @@ -17,6 +17,8 @@ + + @@ -27,7 +29,7 @@ - + diff --git a/src/csharp/Microsoft.Spark/SparkFiles.cs b/src/csharp/Microsoft.Spark/SparkFiles.cs index 8b09933a7..8c6f6af4b 100644 --- a/src/csharp/Microsoft.Spark/SparkFiles.cs +++ b/src/csharp/Microsoft.Spark/SparkFiles.cs @@ -2,33 +2,62 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. +using System; +using System.IO; using Microsoft.Spark.Interop; using Microsoft.Spark.Interop.Ipc; namespace Microsoft.Spark { /// - /// Resolves paths to files added through `SparkContext.addFile()`. + /// Resolves paths to files added through . /// public static class SparkFiles { private static IJvmBridge Jvm { get; } = SparkEnvironment.JvmBridge; private static readonly string s_sparkFilesClassName = "org.apache.spark.SparkFiles"; + [ThreadStatic] + private static string s_rootDirectory; + + [ThreadStatic] + private static bool s_isRunningOnWorker; + /// - /// Get the absolute path of a file added through `SparkContext.addFile()`. + /// Get the absolute path of a file added through + /// . /// - /// The name of the file added through `SparkContext.addFile()` + /// The name of the file added through + /// . /// /// The absolute path of the file. public static string Get(string fileName) => - (string)Jvm.CallStaticJavaMethod(s_sparkFilesClassName, "get", fileName); + Path.GetFullPath(Path.Combine(GetRootDirectory(), fileName)); /// - /// Get the root directory that contains files added through `SparkContext.addFile()`. + /// Get the root directory that contains files added through + /// . /// /// The root directory that contains the files. public static string GetRootDirectory() => + s_isRunningOnWorker ? + s_rootDirectory : (string)Jvm.CallStaticJavaMethod(s_sparkFilesClassName, "getRootDirectory"); + + /// + /// Set the root directory that contains files added through + /// . + /// + /// This should only be called from the Microsoft.Spark.Worker. + /// + /// + /// Root directory that contains files added + /// through . + /// + internal static void SetRootDirectory(string path) + { + s_isRunningOnWorker = true; + s_rootDirectory = path; + } } } diff --git a/src/csharp/Microsoft.Spark/Utils/AssemblyLoader.cs b/src/csharp/Microsoft.Spark/Utils/AssemblyLoader.cs index 621a81881..3b9b34f5e 100644 --- a/src/csharp/Microsoft.Spark/Utils/AssemblyLoader.cs +++ b/src/csharp/Microsoft.Spark/Utils/AssemblyLoader.cs @@ -8,6 +8,7 @@ using System.Reflection; using System.Runtime.InteropServices; using System.Text.RegularExpressions; +using Microsoft.Spark.Services; namespace Microsoft.Spark.Utils { @@ -20,8 +21,10 @@ internal static class AssemblySearchPathResolver /// precedence: /// 1) Comma-separated paths specified in DOTNET_ASSEMBLY_SEARCH_PATHS environment /// variable. Note that if a path starts with ".", the working directory will be prepended. - /// 2) The working directory. - /// 3) The directory of the application. + /// 2) The path of the files added through + /// . + /// 3) The working directory. + /// 4) The directory of the application. /// /// /// The reason that the working directory has higher precedence than the directory @@ -54,6 +57,12 @@ internal static string[] GetAssemblySearchPaths() } } + string sparkFilesPath = SparkFiles.GetRootDirectory(); + if (!string.IsNullOrWhiteSpace(sparkFilesPath)) + { + searchPaths.Add(sparkFilesPath); + } + searchPaths.Add(Directory.GetCurrentDirectory()); searchPaths.Add(AppDomain.CurrentDomain.BaseDirectory); @@ -65,13 +74,15 @@ internal static class AssemblyLoader { internal static Func LoadFromFile { get; set; } = Assembly.LoadFrom; - internal static Func LoadFromName { get; set; } = Assembly.Load; + private static readonly ILoggerService s_logger = + LoggerServiceFactory.GetLogger(typeof(AssemblyLoader)); private static readonly Dictionary s_assemblyCache = new Dictionary(); - private static readonly string[] s_searchPaths = - AssemblySearchPathResolver.GetAssemblySearchPaths(); + // Lazily evaluate the assembly search paths because it has a dependency on SparkFiles. + private static readonly Lazy s_searchPaths = + new Lazy(() => AssemblySearchPathResolver.GetAssemblySearchPaths()); private static readonly string[] s_extensions = RuntimeInformation.IsOSPlatform(OSPlatform.Windows) ? @@ -95,9 +106,7 @@ internal static class AssemblyLoader /// /// The full name of the assembly /// Name of the file that contains the assembly - /// Cached or Loaded Assembly - /// Thrown if the assembly is not - /// found. + /// Cached or Loaded Assembly or null if not found internal static Assembly LoadAssembly(string assemblyName, string assemblyFileName) { // assemblyFileName is empty when serializing a UDF from within the REPL. @@ -119,7 +128,14 @@ internal static Assembly LoadAssembly(string assemblyName, string assemblyFileNa return assembly; } - throw new FileNotFoundException($"Assembly '{assemblyName}' file not found '{assemblyFileName}' in '{string.Join(",", s_searchPaths)}'"); + s_logger.LogWarn( + string.Format( + "Assembly '{0}' file not found '{1}' in '{2}'", + assemblyName, + assemblyFileName, + string.Join(",", s_searchPaths.Value))); + + return null; } } @@ -129,9 +145,7 @@ internal static Assembly LoadAssembly(string assemblyName, string assemblyFileNa /// s_extension combination. /// /// The fullname of the assembly to load - /// The loaded assembly - /// Thrown if the assembly is not - /// found. + /// The loaded assembly or null if not found internal static Assembly ResolveAssembly(string assemblyName) { lock (s_cacheLock) @@ -153,7 +167,15 @@ internal static Assembly ResolveAssembly(string assemblyName) } } - throw new FileNotFoundException($"Assembly '{assemblyName}' file not found '{simpleAsmName}[{string.Join(",", s_extensions)}]' in '{string.Join(",", s_searchPaths)}'"); + s_logger.LogWarn( + string.Format( + "Assembly '{0}' file not found '{1}[{2}]' in '{3}'", + assemblyName, + simpleAsmName, + string.Join(",", s_extensions), + string.Join(",", s_searchPaths.Value))); + + return null; } } @@ -165,7 +187,7 @@ internal static Assembly ResolveAssembly(string assemblyName) /// True if assembly is loaded, false otherwise. private static bool TryLoadAssembly(string assemblyFileName, ref Assembly assembly) { - foreach (string searchPath in s_searchPaths) + foreach (string searchPath in s_searchPaths.Value) { string assemblyPath = Path.Combine(searchPath, assemblyFileName); if (File.Exists(assemblyPath)) @@ -195,7 +217,7 @@ ex is FileLoadException || /// /// Assembly name /// Normalized assembly name - private static string NormalizeAssemblyName(string assemblyName) + internal static string NormalizeAssemblyName(string assemblyName) { // Check if the assembly name follows the Roslyn naming convention. // Roslyn assembly name: "\u211B*4b31b71b-d4bd-4642-9f63-eef5f5d99197#1-14" diff --git a/src/csharp/Microsoft.Spark/Utils/CollectionUtils.cs b/src/csharp/Microsoft.Spark/Utils/CollectionUtils.cs new file mode 100644 index 000000000..774e20835 --- /dev/null +++ b/src/csharp/Microsoft.Spark/Utils/CollectionUtils.cs @@ -0,0 +1,18 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using System.Linq; + +namespace Microsoft.Spark.Utils +{ + internal static class CollectionUtils + { + internal static bool ArrayEquals(T[] array1, T[] array2) + { + return (array1?.Length == array2?.Length) && + ((array1 == null) || array1.SequenceEqual(array2)); + } + } +} diff --git a/src/csharp/Microsoft.Spark/Utils/DependencyProviderUtils.cs b/src/csharp/Microsoft.Spark/Utils/DependencyProviderUtils.cs new file mode 100644 index 000000000..3954151d1 --- /dev/null +++ b/src/csharp/Microsoft.Spark/Utils/DependencyProviderUtils.cs @@ -0,0 +1,99 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using System.IO; +using System.Runtime.Serialization.Formatters.Binary; + +namespace Microsoft.Spark.Utils +{ + internal class DependencyProviderUtils + { + private static readonly string s_filePattern = "dependencyProviderMetadata_*"; + + internal static string[] GetMetadataFiles(string path) => + Directory.GetFiles(path, s_filePattern); + + // Create the dependency provider metadata filename based on the number passed into the + // function. + // + // number => filename + // 0 => dependencyProviderMetadata_0000000000000000000 + // 1 => dependencyProviderMetadata_0000000000000000001 + // ... + // 20 => dependencyProviderMetadata_0000000000000000020 + internal static string CreateFileName(long number) => + s_filePattern.Replace("*", $"{number:D19}"); + + [Serializable] + internal class NuGetMetadata + { + public string FileName { get; set; } + public string PackageName { get; set; } + public string PackageVersion { get; set; } + + public override int GetHashCode() + { + return base.GetHashCode(); + } + + public override bool Equals(object obj) + { + return (obj is NuGetMetadata nugetMetadata) && + Equals(nugetMetadata); + } + + private bool Equals(NuGetMetadata other) + { + return (other != null) && + (FileName == other.FileName) && + (PackageName == other.PackageName) && + (PackageVersion == other.PackageVersion); + } + } + + [Serializable] + internal class Metadata + { + public string[] AssemblyProbingPaths { get; set; } + public string[] NativeProbingPaths { get; set; } + public NuGetMetadata[] NuGets { get; set; } + + public override int GetHashCode() + { + return base.GetHashCode(); + } + + public override bool Equals(object obj) + { + return (obj is Metadata metadata) && + Equals(metadata); + } + + internal static Metadata Deserialize(string path) + { + using FileStream fileStream = File.OpenRead(path); + var formatter = new BinaryFormatter(); + return (Metadata)formatter.Deserialize(fileStream); + } + + internal void Serialize(string path) + { + using FileStream fileStream = File.OpenWrite(path); + var formatter = new BinaryFormatter(); + formatter.Serialize(fileStream, this); + } + + private bool Equals(Metadata other) + { + return (other != null) && + CollectionUtils.ArrayEquals( + AssemblyProbingPaths, + other.AssemblyProbingPaths) && + CollectionUtils.ArrayEquals(NativeProbingPaths, other.NativeProbingPaths) && + CollectionUtils.ArrayEquals(NuGets, other.NuGets); + } + } + } +} diff --git a/src/csharp/Microsoft.Spark/Utils/UdfSerDe.cs b/src/csharp/Microsoft.Spark/Utils/UdfSerDe.cs index 638838b9f..d338ddbdb 100644 --- a/src/csharp/Microsoft.Spark/Utils/UdfSerDe.cs +++ b/src/csharp/Microsoft.Spark/Utils/UdfSerDe.cs @@ -257,8 +257,21 @@ private static TypeData SerializeType(Type type) private static Type DeserializeType(TypeData typeData) => s_typeCache.GetOrAdd( typeData, - td => AssemblyLoader.LoadAssembly( - td.AssemblyName, - td.AssemblyFileName).GetType(td.Name)); + td => + { + Type type = AssemblyLoader.LoadAssembly( + td.AssemblyName, + td.AssemblyFileName).GetType(td.Name); + if (type == null) + { + throw new FileNotFoundException( + string.Format( + "Assembly '{0}' file not found '{1}'", + td.AssemblyName, + td.AssemblyFileName)); + } + + return type; + }); } } diff --git a/src/csharp/Microsoft.Spark/Utils/UdfUtils.cs b/src/csharp/Microsoft.Spark/Utils/UdfUtils.cs index b012794ba..ccb5e5209 100644 --- a/src/csharp/Microsoft.Spark/Utils/UdfUtils.cs +++ b/src/csharp/Microsoft.Spark/Utils/UdfUtils.cs @@ -5,7 +5,6 @@ using System; using System.Collections.Generic; using System.Diagnostics; -using System.Linq; using Apache.Arrow; using Microsoft.Data.Analysis; using Microsoft.Spark.Interop; @@ -183,22 +182,24 @@ internal static JvmObjectReference CreatePythonFunction(IJvmBridge jvm, byte[] c private static IJvmObjectReferenceProvider CreateEnvVarsForPythonFunction(IJvmBridge jvm) { var environmentVars = new Hashtable(jvm); - string assemblySearchPath = string.Join(",", - new[] - { - Environment.GetEnvironmentVariable( - AssemblySearchPathResolver.AssemblySearchPathsEnvVarName), - SparkFiles.GetRootDirectory() - }.Where(s => !string.IsNullOrWhiteSpace(s))); - + string assemblySearchPath = Environment.GetEnvironmentVariable( + AssemblySearchPathResolver.AssemblySearchPathsEnvVarName); if (!string.IsNullOrEmpty(assemblySearchPath)) { environmentVars.Put( AssemblySearchPathResolver.AssemblySearchPathsEnvVarName, assemblySearchPath); } - // DOTNET_WORKER_SPARK_VERSION is used to handle different versions of Spark on the worker. - environmentVars.Put("DOTNET_WORKER_SPARK_VERSION", SparkEnvironment.SparkVersion.ToString()); + // DOTNET_WORKER_SPARK_VERSION is used to handle different versions + // of Spark on the worker. + environmentVars.Put( + "DOTNET_WORKER_SPARK_VERSION", + SparkEnvironment.SparkVersion.ToString()); + + if (EnvironmentUtils.GetEnvironmentVariableAsBool(Constants.RunningREPLEnvVar)) + { + environmentVars.Put(Constants.RunningREPLEnvVar, "true"); + } return environmentVars; } diff --git a/src/scala/microsoft-spark-2.4.x/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala b/src/scala/microsoft-spark-2.4.x/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala index 65a56e3e8..5925dcca9 100644 --- a/src/scala/microsoft-spark-2.4.x/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala +++ b/src/scala/microsoft-spark-2.4.x/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala @@ -34,7 +34,8 @@ import scala.util.Try */ object DotnetRunner extends Logging { private val DEBUG_PORT = 5567 - private val supportedSparkVersions = Set[String]("2.4.0", "2.4.1", "2.4.3", "2.4.4", "2.4.5") + private val supportedSparkVersions = + Set[String]("2.4.0", "2.4.1", "2.4.3", "2.4.4", "2.4.5", "2.4.6") val SPARK_VERSION = DotnetUtils.normalizeSparkVersion(spark.SPARK_VERSION)