Skip to content

Commit 6b74dd8

Browse files
committed
Also lists suspicious non-leaf partition directories
1 parent a935eb8 commit 6b74dd8

File tree

1 file changed

+29
-12
lines changed

1 file changed

+29
-12
lines changed

sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ private[sql] object PartitioningUtils {
8484
} else {
8585
// This dataset is partitioned. We need to check whether all partitions have the same
8686
// partition columns and resolve potential type conflicts.
87-
val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues.map(_._2))
87+
val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues)
8888

8989
// Creates the StructType which represents the partition columns.
9090
val fields = {
@@ -181,24 +181,41 @@ private[sql] object PartitioningUtils {
181181
* StringType
182182
* }}}
183183
*/
184-
private[sql] def resolvePartitions(values: Seq[PartitionValues]): Seq[PartitionValues] = {
185-
// Column names of all partitions must match
186-
val distinctPartitionsColNames = values.map(_.columnNames).distinct
187-
188-
if (distinctPartitionsColNames.isEmpty) {
184+
private[sql] def resolvePartitions(
185+
pathsWithPartitionValues: Seq[(Path, PartitionValues)]): Seq[PartitionValues] = {
186+
if (pathsWithPartitionValues.isEmpty) {
189187
Seq.empty
190188
} else {
191-
assert(distinctPartitionsColNames.size == 1, {
192-
val list = distinctPartitionsColNames.map(_.mkString(", ")).zipWithIndex.map {
189+
val distinctPartColNames = pathsWithPartitionValues.map(_._2.columnNames).distinct
190+
191+
def listConflictingPartitionColumns: String = {
192+
def groupByKey[K, V](seq: Seq[(K, V)]): Map[K, Iterable[V]] =
193+
seq.groupBy { case (key, _) => key }.mapValues(_.map { case (_, value) => value })
194+
195+
val partColNamesToPaths = groupByKey(pathsWithPartitionValues.map {
196+
case (path, partValues) => partValues.columnNames -> path
197+
})
198+
199+
val distinctPartColLists = distinctPartColNames.map(_.mkString(", ")).zipWithIndex.map {
193200
case (names, index) =>
194-
s"\tPartition column name list #$index: $names"
201+
s"Partition column name list #$index: $names"
195202
}
196203

197-
s"Conflicting partition column names detected:\n${list.mkString("\n")}\n" +
198-
"For partitioned table directories, data files should only live in leaf directories."
199-
})
204+
// Lists out those non-leaf partition directories that also contain files
205+
val suspiciousPaths =
206+
distinctPartColNames.sortBy(_.length).init.flatMap(partColNamesToPaths)
207+
208+
s"Conflicting partition column names detected:\n" +
209+
distinctPartColLists.mkString("\n\t", "\n\t", "\n\n") +
210+
"For partitioned table directories, data files should only live in leaf directories. " +
211+
"Please check the following directories for unexpected files:\n" +
212+
suspiciousPaths.mkString("\n\t", "\n\t", "\n")
213+
}
214+
215+
assert(distinctPartColNames.size == 1, listConflictingPartitionColumns)
200216

201217
// Resolves possible type conflicts for each column
218+
val values = pathsWithPartitionValues.map(_._2)
202219
val columnCount = values.head.columnNames.size
203220
val resolvedValues = (0 until columnCount).map { i =>
204221
resolveTypeConflicts(values.map(_.literals(i)))

0 commit comments

Comments
 (0)