Skip to content

[SPARK-4361][Doc] Add more docs for Hadoop Configuration #3225

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,11 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
// the bound port to the cluster manager properly
ui.foreach(_.bind())

/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

really small nit but this should be javadoc style instead of scaladoc

*
* '''Note:''' As it will be reused in all Hadoop RDDs, it's better not to modify it unless you
* plan to set some global configurations for all Hadoop RDDs.
*/
val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf)

val startTime = System.currentTimeMillis()
Expand Down Expand Up @@ -630,7 +634,10 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
* necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable),
* using the older MapReduce API (`org.apache.hadoop.mapred`).
*
* @param conf JobConf for setting up the dataset
* @param conf JobConf for setting up the dataset. Note: This will be put into a Broadcast.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't think we reuse the conf across different RDDs, do we?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

People may call this method directly and pass their Configuration.

   def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
       path: String,
       fClass: Class[F],
       kClass: Class[K],
       vClass: Class[V],
       conf: Configuration = hadoopConfiguration)

E.g., creating a configuration for accessing hbase:

import java.io.{DataOutputStream, ByteArrayOutputStream}
import java.lang.String
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Base64

def convertScanToString(scan: Scan): String = {
  val out: ByteArrayOutputStream = new ByteArrayOutputStream
  val dos: DataOutputStream = new DataOutputStream(out)
  scan.write(dos)
  Base64.encodeBytes(out.toByteArray)
}

val conf = HBaseConfiguration.create()
val scan = new Scan()
scan.setCaching(500)
scan.setCacheBlocks(false)
conf.set(TableInputFormat.INPUT_TABLE, "table_name")
conf.set(TableInputFormat.SCAN, convertScanToString(scan))
val rdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
rdd.count()

This is fine. However, some people may need to access two tables and union them. They may reuse the Configuration like this:

val conf = HBaseConfiguration.create()
val scan = new Scan()
scan.setCaching(500)
scan.setCacheBlocks(false)
conf.set(TableInputFormat.INPUT_TABLE, "table_name")
conf.set(TableInputFormat.SCAN, convertScanToString(scan))
val rdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])

conf.set(TableInputFormat.INPUT_TABLE, "another_table_name")
val rdd2 = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])

rdd.union(rdd2).count()

The result will be weird and wrong.
I think the docs should tell people not to reuse it like this.

My motivation is this mail thread: http://apache-spark-user-list.1001560.n3.nabble.com/How-did-the-RDD-union-work-td18686.html

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, there are many uses of sc.hadoopConfiguration in the wild which assume that it's shared:

https://github.com/search?q=sc.hadoopConfiguration&type=Code&utf8=%E2%9C%93

Most of those are using it to configure S3 credentials.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice find. It seems perfectly reasonable from the user's perspective to just save sc.hadoopConfiguration into a val and use it for many things. That's probably what I would have done if I didn't know about the nuances here.

* Therefore if you plan to reuse this conf to create multiple RDDs, you need to make
* sure you won't modify the conf. A safe approach is always creating a new conf for
* a new RDD.
* @param inputFormatClass Class of the InputFormat
* @param keyClass Class of the keys
* @param valueClass Class of the values
Expand Down Expand Up @@ -756,6 +763,14 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
* Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
* and extra configuration options to pass to the input format.
*
* @param conf Configuration for setting up the dataset. Note: This will be put into a Broadcast.
* Therefore if you plan to reuse this conf to create multiple RDDs, you need to make
* sure you won't modify the conf. A safe approach is always creating a new conf for
* a new RDD.
* @param fClass Class of the InputFormat
* @param kClass Class of the keys
* @param vClass Class of the values
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD will create many references to the same object.
* If you plan to directly cache Hadoop writable objects, you should first copy them using
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,15 @@ class JavaSparkContext(val sc: SparkContext)
* other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable,
* etc).
*
* @param conf JobConf for setting up the dataset. Note: This will be put into a Broadcast.
* Therefore if you plan to reuse this conf to create multiple RDDs, you need to make
* sure you won't modify the conf. A safe approach is always creating a new conf for
* a new RDD.
* @param inputFormatClass Class of the InputFormat
* @param keyClass Class of the keys
* @param valueClass Class of the values
* @param minPartitions Minimum number of Hadoop Splits to generate.
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD will create many references to the same object.
* If you plan to directly cache Hadoop writable objects, you should first copy them using
Expand All @@ -409,6 +418,14 @@ class JavaSparkContext(val sc: SparkContext)
* Get an RDD for a Hadoop-readable dataset from a Hadooop JobConf giving its InputFormat and any
* other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable,
*
* @param conf JobConf for setting up the dataset. Note: This will be put into a Broadcast.
* Therefore if you plan to reuse this conf to create multiple RDDs, you need to make
* sure you won't modify the conf. A safe approach is always creating a new conf for
* a new RDD.
* @param inputFormatClass Class of the InputFormat
* @param keyClass Class of the keys
* @param valueClass Class of the values
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD will create many references to the same object.
* If you plan to directly cache Hadoop writable objects, you should first copy them using
Expand Down Expand Up @@ -490,6 +507,14 @@ class JavaSparkContext(val sc: SparkContext)
* Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
* and extra configuration options to pass to the input format.
*
* @param conf Configuration for setting up the dataset. Note: This will be put into a Broadcast.
* Therefore if you plan to reuse this conf to create multiple RDDs, you need to make
* sure you won't modify the conf. A safe approach is always creating a new conf for
* a new RDD.
* @param fClass Class of the InputFormat
* @param kClass Class of the keys
* @param vClass Class of the values
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD will create many references to the same object.
* If you plan to directly cache Hadoop writable objects, you should first copy them using
Expand Down Expand Up @@ -689,6 +714,9 @@ class JavaSparkContext(val sc: SparkContext)

/**
* Returns the Hadoop configuration used for the Hadoop code (e.g. file systems) we reuse.
*
* '''Note:''' As it will be reused in all Hadoop RDDs, it's better not to modify it unless you
* plan to set some global configurations for all Hadoop RDDs.
*/
def hadoopConfiguration(): Configuration = {
sc.hadoopConfiguration
Expand Down