Skip to content

Commit f8e5732

Browse files
srowenpwendell
authored andcommitted
SPARK-1209 [CORE] (Take 2) SparkHadoop{MapRed,MapReduce}Util should not use package org.apache.hadoop
andrewor14 Another try at SPARK-1209, to address #2814 (comment) I successfully tested with `mvn -Dhadoop.version=1.0.4 -DskipTests clean package; mvn -Dhadoop.version=1.0.4 test` I assume that is what failed Jenkins last time. I also tried `-Dhadoop.version1.2.1` and `-Phadoop-2.4 -Pyarn -Phive` for more coverage. So this is why the class was put in `org.apache.hadoop` to begin with, I assume. One option is to leave this as-is for now and move it only when Hadoop 1.0.x support goes away. This is the other option, which adds a call to force the constructor to be public at run-time. It's probably less surprising than putting Spark code in `org.apache.hadoop`, but, does involve reflection. A `SecurityManager` might forbid this, but it would forbid a lot of stuff Spark does. This would also only affect Hadoop 1.0.x it seems. Author: Sean Owen <[email protected]> Closes #3048 from srowen/SPARK-1209 and squashes the following commits: 0d48f4b [Sean Owen] For Hadoop 1.0.x, make certain constructors public, which were public in later versions 466e179 [Sean Owen] Disable MIMA warnings resulting from moving the class -- this was also part of the PairRDDFunctions type hierarchy though? eb61820 [Sean Owen] Move SparkHadoopMapRedUtil / SparkHadoopMapReduceUtil from org.apache.hadoop to org.apache.spark
1 parent f73b56f commit f8e5732

File tree

8 files changed

+32
-5
lines changed

8 files changed

+32
-5
lines changed

core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.apache.hadoop.mapred._
2626
import org.apache.hadoop.fs.FileSystem
2727
import org.apache.hadoop.fs.Path
2828

29+
import org.apache.spark.mapred.SparkHadoopMapRedUtil
2930
import org.apache.spark.rdd.HadoopRDD
3031

3132
/**

core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala renamed to core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,22 +15,35 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.hadoop.mapred
18+
package org.apache.spark.mapred
1919

20-
private[apache]
20+
import java.lang.reflect.Modifier
21+
22+
import org.apache.hadoop.mapred.{TaskAttemptID, JobID, JobConf, JobContext, TaskAttemptContext}
23+
24+
private[spark]
2125
trait SparkHadoopMapRedUtil {
2226
def newJobContext(conf: JobConf, jobId: JobID): JobContext = {
2327
val klass = firstAvailableClass("org.apache.hadoop.mapred.JobContextImpl",
2428
"org.apache.hadoop.mapred.JobContext")
2529
val ctor = klass.getDeclaredConstructor(classOf[JobConf],
2630
classOf[org.apache.hadoop.mapreduce.JobID])
31+
// In Hadoop 1.0.x, JobContext is an interface, and JobContextImpl is package private.
32+
// Make it accessible if it's not in order to access it.
33+
if (!Modifier.isPublic(ctor.getModifiers)) {
34+
ctor.setAccessible(true)
35+
}
2736
ctor.newInstance(conf, jobId).asInstanceOf[JobContext]
2837
}
2938

3039
def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = {
3140
val klass = firstAvailableClass("org.apache.hadoop.mapred.TaskAttemptContextImpl",
3241
"org.apache.hadoop.mapred.TaskAttemptContext")
3342
val ctor = klass.getDeclaredConstructor(classOf[JobConf], classOf[TaskAttemptID])
43+
// See above
44+
if (!Modifier.isPublic(ctor.getModifiers)) {
45+
ctor.setAccessible(true)
46+
}
3447
ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext]
3548
}
3649

core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala renamed to core/src/main/scala/org/apache/spark/mapreduce/SparkHadoopMapReduceUtil.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,14 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.hadoop.mapreduce
18+
package org.apache.spark.mapreduce
1919

2020
import java.lang.{Boolean => JBoolean, Integer => JInteger}
2121

2222
import org.apache.hadoop.conf.Configuration
23+
import org.apache.hadoop.mapreduce.{JobContext, JobID, TaskAttemptContext, TaskAttemptID}
2324

24-
private[apache]
25+
private[spark]
2526
trait SparkHadoopMapReduceUtil {
2627
def newJobContext(conf: Configuration, jobId: JobID): JobContext = {
2728
val klass = firstAvailableClass(

core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import org.apache.spark.Partition
3535
import org.apache.spark.SerializableWritable
3636
import org.apache.spark.{SparkContext, TaskContext}
3737
import org.apache.spark.executor.{DataReadMethod, InputMetrics}
38+
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
3839
import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
3940
import org.apache.spark.util.Utils
4041
import org.apache.spark.deploy.SparkHadoopUtil

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,14 @@ import org.apache.hadoop.io.SequenceFile.CompressionType
3333
import org.apache.hadoop.io.compress.CompressionCodec
3434
import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat}
3535
import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob, OutputFormat => NewOutputFormat,
36-
RecordWriter => NewRecordWriter, SparkHadoopMapReduceUtil}
36+
RecordWriter => NewRecordWriter}
3737

3838
import org.apache.spark._
3939
import org.apache.spark.Partitioner.defaultPartitioner
4040
import org.apache.spark.SparkContext._
4141
import org.apache.spark.annotation.Experimental
4242
import org.apache.spark.deploy.SparkHadoopUtil
43+
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
4344
import org.apache.spark.partial.{BoundedDouble, PartialResult}
4445
import org.apache.spark.serializer.Serializer
4546
import org.apache.spark.util.Utils

project/MimaExcludes.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,14 @@ object MimaExcludes {
7777
// SPARK-3822
7878
ProblemFilters.exclude[IncompatibleResultTypeProblem](
7979
"org.apache.spark.SparkContext.org$apache$spark$SparkContext$$createTaskScheduler")
80+
) ++ Seq(
81+
// SPARK-1209
82+
ProblemFilters.exclude[MissingClassProblem](
83+
"org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil"),
84+
ProblemFilters.exclude[MissingClassProblem](
85+
"org.apache.hadoop.mapred.SparkHadoopMapRedUtil"),
86+
ProblemFilters.exclude[MissingTypesProblem](
87+
"org.apache.spark.rdd.PairRDDFunctions")
8088
)
8189

8290
case v if v.startsWith("1.1") =>

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import parquet.hadoop.util.ContextUtil
4343
import parquet.io.ParquetDecodingException
4444
import parquet.schema.MessageType
4545

46+
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
4647
import org.apache.spark.rdd.RDD
4748
import org.apache.spark.sql.catalyst.expressions._
4849
import org.apache.spark.sql.SQLConf

sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.ql.plan.{PlanUtils, TableDesc}
3131
import org.apache.hadoop.io.Writable
3232
import org.apache.hadoop.mapred._
3333

34+
import org.apache.spark.mapred.SparkHadoopMapRedUtil
3435
import org.apache.spark.sql.Row
3536
import org.apache.spark.{Logging, SerializableWritable, SparkHadoopWriter}
3637
import org.apache.spark.sql.hive.{ShimFileSinkDesc => FileSinkDesc}

0 commit comments

Comments
 (0)