Skip to content

Commit 979bb90

Browse files
beliefersrowen
authored andcommitted
[SPARK-26936][SQL] Fix bug of insert overwrite local dir can not create temporary path in local staging directory
## What changes were proposed in this pull request? Th environment of my cluster as follows: ``` OS:Linux version 2.6.32-220.7.1.el6.x86_64 (mockbuildc6b18n3.bsys.dev.centos.org) (gcc version 4.4.6 20110731 (Red Hat 4.4.6-3) (GCC) ) #1 SMP Wed Mar 7 00:52:02 GMT 2012 Hadoop: 2.7.2 Spark: 2.3.0 or 3.0.0(master branch) Hive: 1.2.1 ``` My spark run on deploy mode yarn-client. If I execute the SQL `insert overwrite local directory '/home/test/call_center/' select * from call_center`, a HiveException will appear as follows: `Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: java.io.IOException: Mkdirs failed to create file:/home/xitong/hive/stagingdir_hive_2019-02-19_17-31-00_678_1816816774691551856-1/-ext-10000/_temporary/0/_temporary/attempt_20190219173233_0002_m_000000_3 (exists=false, cwd=file:/data10/yarn/nm-local-dir/usercache/xitong/appcache/application_1543893582405_6126857/container_e124_1543893582405_6126857_01_000011) at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getHiveRecordWriter(HiveFileFormatUtils.java:249)` Current spark sql generate a local temporary path in local staging directory.The schema of local temporary path start with `file`, so the HiveException appears. This PR change the local temporary path to HDFS temporary path, and use DistributedFileSystem instance copy the data from HDFS temporary path to local directory. If Spark run on local deploy mode, 'insert overwrite local directory' works fine. ## How was this patch tested? UT cannot support yarn-client mode.The test is in my product environment. Closes apache#23841 from beliefer/fix-bug-of-insert-overwrite-local-dir. Authored-by: gengjiaan <[email protected]> Signed-off-by: Sean Owen <[email protected]>
1 parent 39f75b4 commit 979bb90

File tree

1 file changed

+18
-12
lines changed

1 file changed

+18
-12
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -86,20 +86,21 @@ case class InsertIntoHiveDirCommand(
8686
val jobConf = new JobConf(hadoopConf)
8787

8888
val targetPath = new Path(storage.locationUri.get)
89-
val writeToPath =
89+
val qualifiedPath = FileUtils.makeQualified(targetPath, hadoopConf)
90+
val (writeToPath: Path, fs: FileSystem) =
9091
if (isLocal) {
9192
val localFileSystem = FileSystem.getLocal(jobConf)
92-
localFileSystem.makeQualified(targetPath)
93+
(localFileSystem.makeQualified(targetPath), localFileSystem)
9394
} else {
94-
val qualifiedPath = FileUtils.makeQualified(targetPath, hadoopConf)
95-
val dfs = qualifiedPath.getFileSystem(jobConf)
96-
if (!dfs.exists(qualifiedPath)) {
97-
dfs.mkdirs(qualifiedPath.getParent)
98-
}
99-
qualifiedPath
95+
val dfs = qualifiedPath.getFileSystem(hadoopConf)
96+
(qualifiedPath, dfs)
10097
}
98+
if (!fs.exists(writeToPath)) {
99+
fs.mkdirs(writeToPath)
100+
}
101101

102-
val tmpPath = getExternalTmpPath(sparkSession, hadoopConf, writeToPath)
102+
// The temporary path must be a HDFS path, not a local path.
103+
val tmpPath = getExternalTmpPath(sparkSession, hadoopConf, qualifiedPath)
103104
val fileSinkConf = new org.apache.spark.sql.hive.HiveShim.ShimFileSinkDesc(
104105
tmpPath.toString, tableDesc, false)
105106

@@ -111,15 +112,20 @@ case class InsertIntoHiveDirCommand(
111112
fileSinkConf = fileSinkConf,
112113
outputLocation = tmpPath.toString)
113114

114-
val fs = writeToPath.getFileSystem(hadoopConf)
115115
if (overwrite && fs.exists(writeToPath)) {
116116
fs.listStatus(writeToPath).foreach { existFile =>
117117
if (Option(existFile.getPath) != createdTempDir) fs.delete(existFile.getPath, true)
118118
}
119119
}
120120

121-
fs.listStatus(tmpPath).foreach {
122-
tmpFile => fs.rename(tmpFile.getPath, writeToPath)
121+
val dfs = tmpPath.getFileSystem(hadoopConf)
122+
dfs.listStatus(tmpPath).foreach {
123+
tmpFile =>
124+
if (isLocal) {
125+
dfs.copyToLocalFile(tmpFile.getPath, writeToPath)
126+
} else {
127+
dfs.rename(tmpFile.getPath, writeToPath)
128+
}
123129
}
124130
} catch {
125131
case e: Throwable =>

0 commit comments

Comments
 (0)