Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Commit 6fa73df

Browse files
committed
Address comments of Josh and Andrew.
1 parent 807fbf9 commit 6fa73df

File tree

2 files changed

+16
-18
lines changed

2 files changed

+16
-18
lines changed

sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ private[spark] class SqlNewHadoopPartition(
5050
}
5151

5252
/**
53-
* :: DeveloperApi ::
5453
* An RDD that provides core functionality for reading data stored in Hadoop (e.g., files in HDFS,
5554
* sources in HBase, or S3), using the new MapReduce API (`org.apache.hadoop.mapreduce`).
5655
* It is based on [[org.apache.spark.rdd.NewHadoopRDD]]. It has three additions.
@@ -60,13 +59,10 @@ private[spark] class SqlNewHadoopPartition(
6059
* 3. An optional closure `initLocalJobFuncOpt` that set configurations at both the driver side
6160
* and the executor side to the shared Hadoop Configuration.
6261
*
63-
* @param sc The SparkContext to associate the RDD with.
64-
* @param inputFormatClass Storage format of the data to be read.
65-
* @param keyClass Class of the key associated with the inputFormatClass.
66-
* @param valueClass Class of the value associated with the inputFormatClass.
67-
* @param conf The Hadoop configuration.
62+
* Note: This is RDD is basically a cloned version of [[org.apache.spark.rdd.NewHadoopRDD]] with
63+
* changes based on [[org.apache.spark.rdd.HadoopRDD]]. In future, this functionality will be
64+
* folded into core.
6865
*/
69-
@DeveloperApi
7066
private[sql] class SqlNewHadoopRDD[K, V](
7167
@transient sc : SparkContext,
7268
broadcastedConf: Broadcast[SerializableWritable[Configuration]],
@@ -85,11 +81,22 @@ private[sql] class SqlNewHadoopRDD[K, V](
8581

8682
protected def getJob(): Job = {
8783
val conf: Configuration = broadcastedConf.value.value
84+
// "new Job" will make a copy of the conf. Then, it is
85+
// safe to mutate conf properties with initLocalJobFuncOpt
86+
// and initDriverSideJobFuncOpt.
8887
val newJob = new Job(conf)
8988
initLocalJobFuncOpt.map(f => f(newJob))
9089
newJob
9190
}
9291

92+
def getConf(isDriverSide: Boolean): Configuration = {
93+
val job = getJob()
94+
if (isDriverSide) {
95+
initDriverSideJobFuncOpt.map(f => f(job))
96+
}
97+
job.getConfiguration
98+
}
99+
93100
private val jobTrackerId: String = {
94101
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
95102
formatter.format(new Date())
@@ -235,14 +242,6 @@ private[sql] class SqlNewHadoopRDD[K, V](
235242
}
236243
super.persist(storageLevel)
237244
}
238-
239-
def getConf(isDriverSide: Boolean): Configuration = {
240-
val job = getJob()
241-
if (isDriverSide) {
242-
initDriverSideJobFuncOpt.map(f => f(job))
243-
}
244-
job.getConfiguration
245-
}
246245
}
247246

248247
private[spark] object SqlNewHadoopRDD {

sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,6 @@
1717

1818
package org.apache.spark.sql.sources
1919

20-
import org.apache.spark.SerializableWritable
21-
import org.apache.spark.broadcast.Broadcast
22-
2320
import scala.collection.mutable
2421
import scala.util.Try
2522

@@ -28,7 +25,9 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
2825
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
2926

3027
import org.apache.spark.annotation.{DeveloperApi, Experimental}
28+
import org.apache.spark.broadcast.Broadcast
3129
import org.apache.spark.rdd.RDD
30+
import org.apache.spark.SerializableWritable
3231
import org.apache.spark.sql._
3332
import org.apache.spark.sql.catalyst.expressions._
3433
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection

0 commit comments

Comments
 (0)