Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Commit 2eb53bb

Browse files
committed
Use a shared broadcast Hadoop Configuration for partitioned HadoopFsRelations.
1 parent 60336e3 commit 2eb53bb

File tree

4 files changed

+390
-49
lines changed

4 files changed

+390
-49
lines changed

sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala

Lines changed: 71 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import scala.collection.JavaConversions._
2323
import scala.util.Try
2424

2525
import com.google.common.base.Objects
26+
import org.apache.hadoop.conf.Configuration
2627
import org.apache.hadoop.fs.{FileStatus, Path}
2728
import org.apache.hadoop.io.Writable
2829
import org.apache.hadoop.mapreduce._
@@ -32,13 +33,14 @@ import parquet.hadoop._
3233
import parquet.hadoop.metadata.CompressionCodecName
3334
import parquet.hadoop.util.ContextUtil
3435

36+
import org.apache.spark.broadcast.Broadcast
3537
import org.apache.spark.deploy.SparkHadoopUtil
3638
import org.apache.spark.rdd.RDD._
37-
import org.apache.spark.rdd.{NewHadoopPartition, NewHadoopRDD, RDD}
39+
import org.apache.spark.rdd.RDD
3840
import org.apache.spark.sql.sources._
3941
import org.apache.spark.sql.types.{DataType, StructType}
4042
import org.apache.spark.sql.{Row, SQLConf, SQLContext}
41-
import org.apache.spark.{Logging, Partition => SparkPartition, SparkException}
43+
import org.apache.spark.{Partition => SparkPartition, SparkEnv, SerializableWritable, Logging, SparkException}
4244

4345
private[sql] class DefaultSource extends HadoopFsRelationProvider {
4446
override def createRelation(
@@ -118,7 +120,7 @@ private[sql] class ParquetRelation2(
118120
private val maybeDataSchema: Option[StructType],
119121
private val maybePartitionSpec: Option[PartitionSpec],
120122
parameters: Map[String, String])(
121-
val sqlContext: SQLContext)
123+
@transient val sqlContext: SQLContext)
122124
extends HadoopFsRelation(maybePartitionSpec)
123125
with Logging {
124126

@@ -233,53 +235,35 @@ private[sql] class ParquetRelation2(
233235
override def buildScan(
234236
requiredColumns: Array[String],
235237
filters: Array[Filter],
236-
inputFiles: Array[FileStatus]): RDD[Row] = {
237-
238-
val job = new Job(SparkHadoopUtil.get.conf)
239-
val conf = ContextUtil.getConfiguration(job)
240-
241-
ParquetInputFormat.setReadSupportClass(job, classOf[RowReadSupport])
242-
243-
if (inputFiles.nonEmpty) {
244-
FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*)
245-
}
246-
247-
// Try to push down filters when filter push-down is enabled.
248-
if (sqlContext.conf.parquetFilterPushDown) {
249-
filters
250-
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
251-
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
252-
// is used here.
253-
.flatMap(ParquetFilters.createFilter(dataSchema, _))
254-
.reduceOption(FilterApi.and)
255-
.foreach(ParquetInputFormat.setFilterPredicate(conf, _))
256-
}
257-
258-
conf.set(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA, {
259-
val requestedSchema = StructType(requiredColumns.map(dataSchema(_)))
260-
ParquetTypesConverter.convertToString(requestedSchema.toAttributes)
261-
})
262-
263-
conf.set(
264-
RowWriteSupport.SPARK_ROW_SCHEMA,
265-
ParquetTypesConverter.convertToString(dataSchema.toAttributes))
266-
267-
// Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
238+
inputFiles: Array[FileStatus],
239+
broadcastedConf: Broadcast[SerializableWritable[Configuration]]): RDD[Row] = {
268240
val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true").toBoolean
269-
conf.set(SQLConf.PARQUET_CACHE_METADATA, useMetadataCache.toString)
241+
val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown
242+
// Create the function to set variable Parquet confs at both driver and executor side.
243+
val initLocalJobFuncOpt =
244+
ParquetRelation2.initializeLocalJobFunc(
245+
requiredColumns,
246+
filters,
247+
dataSchema,
248+
useMetadataCache,
249+
parquetFilterPushDown) _
250+
// Create the function to set input paths at the driver side.
251+
val setInputPaths = ParquetRelation2.initializeDriverSideJobFunc(inputFiles) _
270252

271253
val footers = inputFiles.map(f => metadataCache.footers(f.getPath))
272254

273255
// TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`.
274256
// After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects and
275257
// footers. Especially when a global arbitrative schema (either from metastore or data source
276258
// DDL) is available.
277-
new NewHadoopRDD(
278-
sqlContext.sparkContext,
279-
classOf[FilteringParquetRowInputFormat],
280-
classOf[Void],
281-
classOf[Row],
282-
conf) {
259+
new SqlNewHadoopRDD(
260+
sc = sqlContext.sparkContext,
261+
broadcastedConf = broadcastedConf,
262+
initDriverSideJobFuncOpt = Some(setInputPaths),
263+
initLocalJobFuncOpt = Some(initLocalJobFuncOpt),
264+
inputFormatClass = classOf[FilteringParquetRowInputFormat],
265+
keyClass = classOf[Void],
266+
valueClass = classOf[Row]) {
283267

284268
val cacheMetadata = useMetadataCache
285269

@@ -311,11 +295,11 @@ private[sql] class ParquetRelation2(
311295
new FilteringParquetRowInputFormat
312296
}
313297

314-
val jobContext = newJobContext(getConf, jobId)
298+
val jobContext = newJobContext(getConf(isDriverSide = true), jobId)
315299
val rawSplits = inputFormat.getSplits(jobContext)
316300

317301
Array.tabulate[SparkPartition](rawSplits.size) { i =>
318-
new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
302+
new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
319303
}
320304
}
321305
}.values
@@ -452,6 +436,49 @@ private[sql] object ParquetRelation2 extends Logging {
452436
// internally.
453437
private[sql] val METASTORE_SCHEMA = "metastoreSchema"
454438

439+
/** This closure sets various Parquet configurations at both driver side and executor side. */
440+
private[parquet] def initializeLocalJobFunc(
441+
requiredColumns: Array[String],
442+
filters: Array[Filter],
443+
dataSchema: StructType,
444+
useMetadataCache: Boolean,
445+
parquetFilterPushDown: Boolean)(job: Job): Unit = {
446+
val conf = job.getConfiguration
447+
conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[RowReadSupport].getName())
448+
449+
// Try to push down filters when filter push-down is enabled.
450+
if (parquetFilterPushDown) {
451+
filters
452+
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
453+
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
454+
// is used here.
455+
.flatMap(ParquetFilters.createFilter(dataSchema, _))
456+
.reduceOption(FilterApi.and)
457+
.foreach(ParquetInputFormat.setFilterPredicate(conf, _))
458+
}
459+
460+
conf.set(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA, {
461+
val requestedSchema = StructType(requiredColumns.map(dataSchema(_)))
462+
ParquetTypesConverter.convertToString(requestedSchema.toAttributes)
463+
})
464+
465+
conf.set(
466+
RowWriteSupport.SPARK_ROW_SCHEMA,
467+
ParquetTypesConverter.convertToString(dataSchema.toAttributes))
468+
469+
// Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
470+
conf.set(SQLConf.PARQUET_CACHE_METADATA, useMetadataCache.toString)
471+
}
472+
473+
/** This closure sets input paths at the driver side. */
474+
private[parquet] def initializeDriverSideJobFunc(
475+
inputFiles: Array[FileStatus])(job: Job): Unit = {
476+
// We side the input paths at the driver side.
477+
if (inputFiles.nonEmpty) {
478+
FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*)
479+
}
480+
}
481+
455482
private[parquet] def readSchema(
456483
footers: Seq[Footer], sqlContext: SQLContext): Option[StructType] = {
457484
footers.map { footer =>

sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717

1818
package org.apache.spark.sql.sources
1919

20-
import org.apache.spark.Logging
20+
import org.apache.spark.{SerializableWritable, Logging}
21+
import org.apache.spark.deploy.SparkHadoopUtil
2122
import org.apache.spark.rdd.{RDD, UnionRDD}
2223
import org.apache.spark.sql.catalyst.expressions
2324
import org.apache.spark.sql.catalyst.expressions._
@@ -84,11 +85,16 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
8485

8586
// Scanning non-partitioned HadoopFsRelation
8687
case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: HadoopFsRelation)) =>
88+
// See buildPartitionedTableScan for the reason that we need to create a shard
89+
// broadcast HadoopConf.
90+
val sharedHadoopConf = SparkHadoopUtil.get.conf
91+
val confBroadcast =
92+
t.sqlContext.sparkContext.broadcast(new SerializableWritable(sharedHadoopConf))
8793
pruneFilterProject(
8894
l,
8995
projectList,
9096
filters,
91-
(a, f) => t.buildScan(a, f, t.paths)) :: Nil
97+
(a, f) => t.buildScan(a, f, t.paths, confBroadcast)) :: Nil
9298

9399
case l @ LogicalRelation(t: TableScan) =>
94100
createPhysicalRDD(l.relation, l.output, t.buildScan()) :: Nil
@@ -115,6 +121,12 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
115121
val output = projections.map(_.toAttribute)
116122
val relation = logicalRelation.relation.asInstanceOf[HadoopFsRelation]
117123

124+
// Because we are creating one RDD per partition, we need to have a shared HadoopConf.
125+
// Otherwise, the cost of broadcasting HadoopConf in every RDD will be high.
126+
val sharedHadoopConf = SparkHadoopUtil.get.conf
127+
val confBroadcast =
128+
relation.sqlContext.sparkContext.broadcast(new SerializableWritable(sharedHadoopConf))
129+
118130
// Builds RDD[Row]s for each selected partition.
119131
val perPartitionRows = partitions.map { case Partition(partitionValues, dir) =>
120132
// The table scan operator (PhysicalRDD) which retrieves required columns from data files.
@@ -132,7 +144,8 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
132144
// assuming partition columns data stored in data files are always consistent with those
133145
// partition values encoded in partition directory paths.
134146
val nonPartitionColumns = requiredColumns.filterNot(partitionColNames.contains)
135-
val dataRows = relation.buildScan(nonPartitionColumns, filters, Array(dir))
147+
val dataRows =
148+
relation.buildScan(nonPartitionColumns, filters, Array(dir), confBroadcast)
136149

137150
// Merges data values with partition values.
138151
mergeWithPartitionValues(

0 commit comments

Comments
 (0)