@@ -142,39 +142,45 @@ class HadoopTableReader(
142
142
partitionToDeserializer : Map [HivePartition ,
143
143
Class [_ <: Deserializer ]],
144
144
filterOpt : Option [PathFilter ]): RDD [Row ] = {
145
- // SPARK-5068:get FileStatus and do the filtering locally when the path is not exists
146
-
147
- var existPathSet = collection.mutable.Set [String ]()
148
- var pathPatternSet = collection.mutable.Set [String ]()
149
-
150
- val hivePartitionRDDs = partitionToDeserializer.filter {
151
- case (partition, partDeserializer) =>
152
-
153
- def updateExistPathSetByPathPattern (pathPatternStr: String ){
145
+
146
+ // SPARK-5068:get FileStatus and do the filtering locally when the path is not exists
147
+ def verifyPartitionPath (
148
+ partitionToDeserializer : Map [HivePartition , Class [_ <: Deserializer ]]):
149
+ Map [HivePartition , Class [_ <: Deserializer ]] = {
150
+ if (! sc.getConf(" spark.sql.hive.verifyPartitionPath" , " true" ).toBoolean) {
151
+ partitionToDeserializer
152
+ } else {
153
+ var existPathSet = collection.mutable.Set [String ]()
154
+ var pathPatternSet = collection.mutable.Set [String ]()
155
+ partitionToDeserializer.filter {
156
+ case (partition, partDeserializer) =>
157
+ def updateExistPathSetByPathPattern (pathPatternStr : String ) {
154
158
val pathPattern = new Path (pathPatternStr)
155
159
val fs = pathPattern.getFileSystem(sc.hiveconf)
156
- val matchs = fs.globStatus(pathPattern);
157
- matchs .map( fileStatus => ( existPathSet+= fileStatus.getPath.toString) )
160
+ val matches = fs.globStatus(pathPattern)
161
+ matches .map(fileStatus => existPathSet += fileStatus.getPath.toString)
158
162
}
159
163
// convert /demo/data/year/month/day to /demo/data/**/**/**/
160
- def getPathPatternByPath (parNum: Int ,tpath : Path ): String = {
161
- var path = tpath
162
- for (i <- (1 to parNum)) { path = path.getParent }
163
- val tails = (1 to parNum).map(_ => " *" ).mkString(" /" ," /" ," /" )
164
+ def getPathPatternByPath (parNum : Int , tempPath : Path ): String = {
165
+ var path = tempPath
166
+ for (i <- (1 to parNum)) path = path.getParent
167
+ val tails = (1 to parNum).map(_ => " *" ).mkString(" /" , " /" , " /" )
164
168
path.toString + tails
165
169
}
166
170
167
171
val partPath = HiveShim .getDataLocationPath(partition)
168
172
val partNum = Utilities .getPartitionDesc(partition).getPartSpec.size();
169
- var pathPatternStr = getPathPatternByPath(partNum,partPath)
170
- if (! pathPatternSet.contains(pathPatternStr)){
171
- pathPatternSet+= pathPatternStr
173
+ var pathPatternStr = getPathPatternByPath(partNum, partPath)
174
+ if (! pathPatternSet.contains(pathPatternStr)) {
175
+ pathPatternSet += pathPatternStr
172
176
updateExistPathSetByPathPattern(pathPatternStr)
173
177
}
174
- existPathSet.contains(partPath.toString)
178
+ existPathSet.contains(partPath.toString)
179
+ }
180
+ }
181
+ }
175
182
176
- }
177
- .map { case (partition, partDeserializer) =>
183
+ val hivePartitionRDDs = verifyPartitionPath(partitionToDeserializer).map {
178
184
val partDesc = Utilities .getPartitionDesc(partition)
179
185
val partPath = HiveShim .getDataLocationPath(partition)
180
186
val inputPathStr = applyFilterIfNeeded(partPath, filterOpt)
0 commit comments