Skip to content

SKIPME sync with csd-1.5 #119

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Nov 5, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,17 @@ object Exchange {
* input partition ordering requirements are met.
*/
private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[SparkPlan] {
private def defaultNumPreShufflePartitions: Int = sqlContext.conf.numShufflePartitions
private def defaultNumPreShufflePartitions: Int =
Option(sqlContext.sparkContext.getLocalProperty("spark.sql.shuffle.partitions")).map { str =>
try {
logDebug(s"Use spark.sql.shuffle.partitions = $str from local property")
str.toInt
} catch {
case _: NumberFormatException =>
logError(s"spark.sql.shuffle.partitions from local property value $str, expect number")
sqlContext.conf.numShufflePartitions
}
}.getOrElse(sqlContext.conf.numShufflePartitions)

private def targetPostShuffleInputSize: Long = sqlContext.conf.targetPostShuffleInputSize

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,12 +409,12 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
val databaseName = tableIdent.lift(tableIdent.size - 2).getOrElse(
client.currentDatabase)
val rawTableName = tableIdent.last
val tblName = tableNamePreprocessor(rawTableName)
val table = client.getTable(databaseName, tblName).withTableName(rawTableName)
val tblNameInMetastore = tableNamePreprocessor(rawTableName)
val table = client.getTable(databaseName, tblNameInMetastore).withTableName(rawTableName)

if (table.properties.get("spark.sql.sources.provider").isDefined) {
val dataSourceTable =
cachedDataSourceTables(QualifiedTableName(databaseName, tblName).toLowerCase)
cachedDataSourceTables(QualifiedTableName(databaseName, tblNameInMetastore).toLowerCase)
// Then, if alias is specified, wrap the table with a Subquery using the alias.
// Otherwise, wrap the table with a Subquery using the table name.
val withAlias =
Expand All @@ -431,7 +431,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
case Some(aliasText) => Subquery(aliasText, HiveQl.createPlan(viewText))
}
} else {
MetastoreRelation(databaseName, tblName, alias)(table)(hive)
MetastoreRelation(databaseName, rawTableName, alias)(table)(hive)
}
}

Expand Down Expand Up @@ -793,6 +793,7 @@ private[hive] case class MetastoreRelation

@transient override lazy val statistics: Statistics = Statistics(
sizeInBytes = {
val hiveContext = sqlContext.asInstanceOf[HiveContext]
val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE)
val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE)
// TODO: check if this estimate is valid for tables after partition pruning.
Expand All @@ -801,13 +802,20 @@ private[hive] case class MetastoreRelation
// alternative would be going through Hadoop's FileSystem API, which can be expensive if a lot
// of RPCs are involved. Besides `totalSize`, there are also `numFiles`, `numRows`,
// `rawDataSize` keys (see StatsSetupConst in Hive) that we can look at in the future.
BigInt(
// When table is external,`totalSize` is always zero, which will influence join strategy
// so when `totalSize` is zero, use `rawDataSize` instead
// if the size is still less than zero, we use default size
Option(totalSize).map(_.toLong).filter(_ > 0)
.getOrElse(Option(rawDataSize).map(_.toLong).filter(_ > 0)
.getOrElse(sqlContext.conf.defaultSizeInBytes)))

// When table is external,`totalSize` is always zero, which will influence join strategy
// so when `totalSize` is zero, use `rawDataSize` instead
// if the size is still less than zero, we use default size
val sizeEst = Option(totalSize).map(_.toLong).filter(_ > 0)
.getOrElse(Option(rawDataSize).map(_.toLong).filter(_ > 0)
.getOrElse(hiveContext.hadoopFileSelector.flatMap(
_.getFilesSizeInBytes(
hiveQlTable.getTableName,
hiveQlTable.getPath.getFileSystem(hiveContext.hiveconf),
hiveQlTable.getPath)).filter(_ > 0)
.getOrElse(sqlContext.conf.defaultSizeInBytes)))
logDebug(s"Size estimation for table ${hiveQlTable.getTableName}: $sizeEst bytes")
BigInt(sizeEst)
}
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,4 +440,17 @@ abstract class HadoopFileSelector {
* to this table.
*/
def selectFiles(tableName: String, fs: FileSystem, basePath: Path): Option[Seq[Path]]

/**
* Get size in bytes of files constituting a table from the given base path according to the
* client's custom algorithm. This is only applied to non-partitioned tables.
* @param tableName table name to select files for. This is the exact table name specified
* in the query, not a "preprocessed" file name returned by the user-defined
* function registered via [[HiveContext.setTableNamePreprocessor]].
* @param fs the filesystem containing the table
* @param basePath base path of the table in the filesystem
* @return the total sum of file size in bytes, or [[None]] if the custom file selection
* algorithm does not apply to this table.
*/
def getFilesSizeInBytes(tableName: String, fs: FileSystem, basePath: Path): Option[Long]
}