Skip to content

Commit d4c5f43

Browse files
committed
Merge pull request #6 from apache/master
merge upstream changes
2 parents dc1ba9e + ba5bcad commit d4c5f43

File tree

23 files changed

+236
-74
lines changed

23 files changed

+236
-74
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ Spark is a fast and general cluster computing system for Big Data. It provides
44
high-level APIs in Scala, Java, and Python, and an optimized engine that
55
supports general computation graphs for data analysis. It also supports a
66
rich set of higher-level tools including Spark SQL for SQL and structured
7-
data processing, MLLib for machine learning, GraphX for graph processing,
8-
and Spark Streaming.
7+
data processing, MLlib for machine learning, GraphX for graph processing,
8+
and Spark Streaming for stream processing.
99

1010
<http://spark.apache.org/>
1111

bin/pyspark

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ export PYSPARK_SUBMIT_ARGS
8585

8686
# For pyspark tests
8787
if [[ -n "$SPARK_TESTING" ]]; then
88+
unset YARN_CONF_DIR
89+
unset HADOOP_CONF_DIR
8890
if [[ -n "$PYSPARK_DOC_TEST" ]]; then
8991
exec "$PYSPARK_PYTHON" -m doctest $1
9092
else

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1064,11 +1064,10 @@ abstract class RDD[T: ClassTag](
10641064
// greater than totalParts because we actually cap it at totalParts in runJob.
10651065
var numPartsToTry = 1
10661066
if (partsScanned > 0) {
1067-
// If we didn't find any rows after the first iteration, just try all partitions next.
1068-
// Otherwise, interpolate the number of partitions we need to try, but overestimate it
1069-
// by 50%.
1067+
// If we didn't find any rows after the previous iteration, quadruple and retry. Otherwise,
1068+
// interpolate the number of partitions we need to try, but overestimate it by 50%.
10701069
if (buf.size == 0) {
1071-
numPartsToTry = totalParts - 1
1070+
numPartsToTry = partsScanned * 4
10721071
} else {
10731072
numPartsToTry = (1.5 * num * partsScanned / buf.size).toInt
10741073
}

ec2/spark_ec2.py

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,17 @@ def parse_args():
102102
"(for debugging)")
103103
parser.add_option(
104104
"--ebs-vol-size", metavar="SIZE", type="int", default=0,
105-
help="Attach a new EBS volume of size SIZE (in GB) to each node as " +
106-
"/vol. The volumes will be deleted when the instances terminate. " +
107-
"Only possible on EBS-backed AMIs.")
105+
help="Size (in GB) of each EBS volume.")
106+
parser.add_option(
107+
"--ebs-vol-type", default="standard",
108+
help="EBS volume type (e.g. 'gp2', 'standard').")
109+
parser.add_option(
110+
"--ebs-vol-num", type="int", default=1,
111+
help="Number of EBS volumes to attach to each node as /vol[x]. " +
112+
"The volumes will be deleted when the instances terminate. " +
113+
"Only possible on EBS-backed AMIs. " +
114+
"EBS volumes are only attached if --ebs-vol-size > 0." +
115+
"Only support up to 8 EBS volumes.")
108116
parser.add_option(
109117
"--swap", metavar="SWAP", type="int", default=1024,
110118
help="Swap space to set up per node, in MB (default: 1024)")
@@ -348,13 +356,16 @@ def launch_cluster(conn, opts, cluster_name):
348356
print >> stderr, "Could not find AMI " + opts.ami
349357
sys.exit(1)
350358

351-
# Create block device mapping so that we can add an EBS volume if asked to
359+
# Create block device mapping so that we can add EBS volumes if asked to.
360+
# The first drive is attached as /dev/sds, 2nd as /dev/sdt, ... /dev/sdz
352361
block_map = BlockDeviceMapping()
353362
if opts.ebs_vol_size > 0:
354-
device = EBSBlockDeviceType()
355-
device.size = opts.ebs_vol_size
356-
device.delete_on_termination = True
357-
block_map["/dev/sdv"] = device
363+
for i in range(opts.ebs_vol_num):
364+
device = EBSBlockDeviceType()
365+
device.size = opts.ebs_vol_size
366+
device.volume_type=opts.ebs_vol_type
367+
device.delete_on_termination = True
368+
block_map["/dev/sd" + chr(ord('s') + i)] = device
358369

359370
# AWS ignores the AMI-specified block device mapping for M3 (see SPARK-3342).
360371
if opts.instance_type.startswith('m3.'):
@@ -828,6 +839,12 @@ def get_partition(total, num_partitions, current_partitions):
828839

829840
def real_main():
830841
(opts, action, cluster_name) = parse_args()
842+
843+
# Input parameter validation
844+
if opts.ebs_vol_num > 8:
845+
print >> stderr, "ebs-vol-num cannot be greater than 8"
846+
sys.exit(1)
847+
831848
try:
832849
conn = ec2.connect_to_region(opts.region)
833850
except Exception as e:

python/pyspark/rdd.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1089,11 +1089,11 @@ def take(self, num):
10891089
# we actually cap it at totalParts in runJob.
10901090
numPartsToTry = 1
10911091
if partsScanned > 0:
1092-
# If we didn't find any rows after the first iteration, just
1093-
# try all partitions next. Otherwise, interpolate the number
1094-
# of partitions we need to try, but overestimate it by 50%.
1092+
# If we didn't find any rows after the previous iteration,
1093+
# quadruple and retry. Otherwise, interpolate the number of
1094+
# partitions we need to try, but overestimate it by 50%.
10951095
if len(items) == 0:
1096-
numPartsToTry = totalParts - 1
1096+
numPartsToTry = partsScanned * 4
10971097
else:
10981098
numPartsToTry = int(1.5 * num * partsScanned / len(items))
10991099

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
272272
val currentTable = table(tableName).queryExecution.analyzed
273273
val asInMemoryRelation = currentTable match {
274274
case _: InMemoryRelation =>
275-
currentTable.logicalPlan
275+
currentTable
276276

277277
case _ =>
278278
InMemoryRelation(useCompression, columnBatchSize, executePlan(currentTable).executedPlan)

sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,10 @@ case class SetCommand(
6060
logWarning(s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
6161
s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS} instead.")
6262
context.setConf(SQLConf.SHUFFLE_PARTITIONS, v)
63-
Array(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=$v"))
63+
Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=$v"))
6464
} else {
6565
context.setConf(k, v)
66-
Array(Row(s"$k=$v"))
66+
Seq(Row(s"$k=$v"))
6767
}
6868

6969
// Query the value bound to key k.
@@ -78,11 +78,19 @@ case class SetCommand(
7878
"hive-hwi-0.12.0.jar",
7979
"hive-0.12.0.jar").mkString(":")
8080

81-
Array(
81+
context.getAllConfs.map { case (k, v) =>
82+
Row(s"$k=$v")
83+
}.toSeq ++ Seq(
8284
Row("system:java.class.path=" + hiveJars),
8385
Row("system:sun.java.command=shark.SharkServer2"))
8486
} else {
85-
Array(Row(s"$k=${context.getConf(k, "<undefined>")}"))
87+
if (k == SQLConf.Deprecated.MAPRED_REDUCE_TASKS) {
88+
logWarning(s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
89+
s"showing ${SQLConf.SHUFFLE_PARTITIONS} instead.")
90+
Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=${context.numShufflePartitions}"))
91+
} else {
92+
Seq(Row(s"$k=${context.getConf(k, "<undefined>")}"))
93+
}
8694
}
8795

8896
// Query all key-value pairs that are set in the SQLConf of the context.

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ private[hive] case class SourceCommand(filePath: String) extends Command
4444

4545
private[hive] case class AddFile(filePath: String) extends Command
4646

47+
private[hive] case class AddJar(path: String) extends Command
48+
4749
private[hive] case class DropTable(tableName: String, ifExists: Boolean) extends Command
4850

4951
private[hive] case class AnalyzeTable(tableName: String) extends Command
@@ -231,7 +233,7 @@ private[hive] object HiveQl {
231233
} else if (sql.trim.toLowerCase.startsWith("uncache table")) {
232234
CacheCommand(sql.trim.drop(14).trim, false)
233235
} else if (sql.trim.toLowerCase.startsWith("add jar")) {
234-
NativeCommand(sql)
236+
AddJar(sql.trim.drop(8).trim)
235237
} else if (sql.trim.toLowerCase.startsWith("add file")) {
236238
AddFile(sql.trim.drop(9))
237239
} else if (sql.trim.toLowerCase.startsWith("dfs")) {
@@ -1018,9 +1020,9 @@ private[hive] object HiveQl {
10181020

10191021
/* Other functions */
10201022
case Token("TOK_FUNCTION", Token(RAND(), Nil) :: Nil) => Rand
1021-
case Token("TOK_FUNCTION", Token(SUBSTR(), Nil) :: string :: pos :: Nil) =>
1023+
case Token("TOK_FUNCTION", Token(SUBSTR(), Nil) :: string :: pos :: Nil) =>
10221024
Substring(nodeToExpr(string), nodeToExpr(pos), Literal(Integer.MAX_VALUE, IntegerType))
1023-
case Token("TOK_FUNCTION", Token(SUBSTR(), Nil) :: string :: pos :: length :: Nil) =>
1025+
case Token("TOK_FUNCTION", Token(SUBSTR(), Nil) :: string :: pos :: length :: Nil) =>
10241026
Substring(nodeToExpr(string), nodeToExpr(pos), nodeToExpr(length))
10251027

10261028
/* UDFs - Must be last otherwise will preempt built in functions */

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,11 +195,12 @@ private[hive] trait HiveStrategies {
195195

196196
case class HiveCommandStrategy(context: HiveContext) extends Strategy {
197197
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
198-
case logical.NativeCommand(sql) =>
199-
NativeCommand(sql, plan.output)(context) :: Nil
198+
case logical.NativeCommand(sql) => NativeCommand(sql, plan.output)(context) :: Nil
200199

201200
case hive.DropTable(tableName, ifExists) => execution.DropTable(tableName, ifExists) :: Nil
202201

202+
case hive.AddJar(path) => execution.AddJar(path) :: Nil
203+
203204
case hive.AnalyzeTable(tableName) => execution.AnalyzeTable(tableName) :: Nil
204205

205206
case describe: logical.DescribeCommand =>

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,3 +60,19 @@ case class DropTable(tableName: String, ifExists: Boolean) extends LeafNode with
6060
Seq.empty[Row]
6161
}
6262
}
63+
64+
/**
65+
* :: DeveloperApi ::
66+
*/
67+
@DeveloperApi
68+
case class AddJar(path: String) extends LeafNode with Command {
69+
def hiveContext = sqlContext.asInstanceOf[HiveContext]
70+
71+
override def output = Seq.empty
72+
73+
override protected[sql] lazy val sideEffectResult: Seq[Row] = {
74+
hiveContext.runSqlHive(s"ADD JAR $path")
75+
hiveContext.sparkContext.addJar(path)
76+
Seq.empty[Row]
77+
}
78+
}

0 commit comments

Comments
 (0)