Skip to content

Commit adf4924

Browse files
committed
2 parents d10bf00 + c5414b6 commit adf4924

30 files changed

+609
-514
lines changed

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
420420
/**
421421
* Group the values for each key in the RDD into a single sequence. Allows controlling the
422422
* partitioning of the resulting key-value pair RDD by passing a Partitioner.
423+
* The ordering of elements within each group is not guaranteed, and may even differ
424+
* each time the resulting RDD is evaluated.
423425
*
424426
* Note: This operation may be very expensive. If you are grouping in order to perform an
425427
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
@@ -439,7 +441,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
439441

440442
/**
441443
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
442-
* resulting RDD with into `numPartitions` partitions.
444+
* resulting RDD with into `numPartitions` partitions. The ordering of elements within
445+
* each group is not guaranteed, and may even differ each time the resulting RDD is evaluated.
443446
*
444447
* Note: This operation may be very expensive. If you are grouping in order to perform an
445448
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
@@ -535,7 +538,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
535538

536539
/**
537540
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
538-
* resulting RDD with the existing partitioner/parallelism level.
541+
* resulting RDD with the existing partitioner/parallelism level. The ordering of elements
542+
* within each group is not guaranteed, and may even differ each time the resulting RDD is
543+
* evaluated.
539544
*
540545
* Note: This operation may be very expensive. If you are grouping in order to perform an
541546
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
@@ -951,9 +956,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
951956
val writeShard = (context: TaskContext, iter: Iterator[(K,V)]) => {
952957
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
953958
// around by taking a mod. We expect that no task will be attempted 2 billion times.
954-
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
959+
val attemptNumber = (context.getAttemptId % Int.MaxValue).toInt
955960
/* "reduce task" <split #> <attempt # = spark task #> */
956-
val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId,
961+
val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.getPartitionId,
957962
attemptNumber)
958963
val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
959964
val format = outfmt.newInstance
@@ -1022,9 +1027,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
10221027
val writeToFile = (context: TaskContext, iter: Iterator[(K, V)]) => {
10231028
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
10241029
// around by taking a mod. We expect that no task will be attempted 2 billion times.
1025-
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
1030+
val attemptNumber = (context.getAttemptId % Int.MaxValue).toInt
10261031

1027-
writer.setup(context.stageId, context.partitionId, attemptNumber)
1032+
writer.setup(context.getStageId, context.getPartitionId, attemptNumber)
10281033
writer.open()
10291034
try {
10301035
var count = 0

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -509,7 +509,8 @@ abstract class RDD[T: ClassTag](
509509

510510
/**
511511
* Return an RDD of grouped items. Each group consists of a key and a sequence of elements
512-
* mapping to that key.
512+
* mapping to that key. The ordering of elements within each group is not guaranteed, and
513+
* may even differ each time the resulting RDD is evaluated.
513514
*
514515
* Note: This operation may be very expensive. If you are grouping in order to perform an
515516
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
@@ -520,7 +521,8 @@ abstract class RDD[T: ClassTag](
520521

521522
/**
522523
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
523-
* mapping to that key.
524+
* mapping to that key. The ordering of elements within each group is not guaranteed, and
525+
* may even differ each time the resulting RDD is evaluated.
524526
*
525527
* Note: This operation may be very expensive. If you are grouping in order to perform an
526528
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
@@ -531,7 +533,8 @@ abstract class RDD[T: ClassTag](
531533

532534
/**
533535
* Return an RDD of grouped items. Each group consists of a key and a sequence of elements
534-
* mapping to that key.
536+
* mapping to that key. The ordering of elements within each group is not guaranteed, and
537+
* may even differ each time the resulting RDD is evaluated.
535538
*
536539
* Note: This operation may be very expensive. If you are grouping in order to perform an
537540
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
@@ -1028,15 +1031,26 @@ abstract class RDD[T: ClassTag](
10281031
* Zips this RDD with its element indices. The ordering is first based on the partition index
10291032
* and then the ordering of items within each partition. So the first item in the first
10301033
* partition gets index 0, and the last item in the last partition receives the largest index.
1034+
*
10311035
* This is similar to Scala's zipWithIndex but it uses Long instead of Int as the index type.
10321036
* This method needs to trigger a spark job when this RDD contains more than one partitions.
1037+
*
1038+
* Note that some RDDs, such as those returned by groupBy(), do not guarantee order of
1039+
* elements in a partition. The index assigned to each element is therefore not guaranteed,
1040+
* and may even change if the RDD is reevaluated. If a fixed ordering is required to guarantee
1041+
* the same index assignments, you should sort the RDD with sortByKey() or save it to a file.
10331042
*/
10341043
def zipWithIndex(): RDD[(T, Long)] = new ZippedWithIndexRDD(this)
10351044

10361045
/**
10371046
* Zips this RDD with generated unique Long ids. Items in the kth partition will get ids k, n+k,
10381047
* 2*n+k, ..., where n is the number of partitions. So there may exist gaps, but this method
10391048
* won't trigger a spark job, which is different from [[org.apache.spark.rdd.RDD#zipWithIndex]].
1049+
*
1050+
* Note that some RDDs, such as those returned by groupBy(), do not guarantee order of
1051+
* elements in a partition. The unique ID assigned to each element is therefore not guaranteed,
1052+
* and may even change if the RDD is reevaluated. If a fixed ordering is required to guarantee
1053+
* the same index assignments, you should sort the RDD with sortByKey() or save it to a file.
10401054
*/
10411055
def zipWithUniqueId(): RDD[(T, Long)] = {
10421056
val n = this.partitions.size.toLong

dev/check-license

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,27 +20,28 @@
2020

2121
acquire_rat_jar () {
2222

23-
URL1="http://search.maven.org/remotecontent?filepath=org/apache/rat/apache-rat/${RAT_VERSION}/apache-rat-${RAT_VERSION}.jar"
24-
URL2="http://repo1.maven.org/maven2/org/apache/rat/apache-rat/${RAT_VERSION}/apache-rat-${RAT_VERSION}.jar"
23+
URL="http://repo1.maven.org/maven2/org/apache/rat/apache-rat/${RAT_VERSION}/apache-rat-${RAT_VERSION}.jar"
2524

2625
JAR="$rat_jar"
27-
26+
2827
if [[ ! -f "$rat_jar" ]]; then
2928
# Download rat launch jar if it hasn't been downloaded yet
3029
if [ ! -f "$JAR" ]; then
3130
# Download
3231
printf "Attempting to fetch rat\n"
3332
JAR_DL="${JAR}.part"
3433
if hash curl 2>/dev/null; then
35-
(curl --silent "${URL1}" > "$JAR_DL" || curl --silent "${URL2}" > "$JAR_DL") && mv "$JAR_DL" "$JAR"
34+
curl --silent "${URL}" > "$JAR_DL" && mv "$JAR_DL" "$JAR"
3635
elif hash wget 2>/dev/null; then
37-
(wget --quiet ${URL1} -O "$JAR_DL" || wget --quiet ${URL2} -O "$JAR_DL") && mv "$JAR_DL" "$JAR"
36+
wget --quiet ${URL} -O "$JAR_DL" && mv "$JAR_DL" "$JAR"
3837
else
3938
printf "You do not have curl or wget installed, please install rat manually.\n"
4039
exit -1
4140
fi
4241
fi
43-
if [ ! -f "$JAR" ]; then
42+
43+
unzip -tq $JAR &> /dev/null
44+
if [ $? -ne 0 ]; then
4445
# We failed to download
4546
printf "Our attempt to download rat locally to ${JAR} failed. Please install rat manually.\n"
4647
exit -1
@@ -55,7 +56,7 @@ cd "$FWDIR"
5556

5657
if test -x "$JAVA_HOME/bin/java"; then
5758
declare java_cmd="$JAVA_HOME/bin/java"
58-
else
59+
else
5960
declare java_cmd=java
6061
fi
6162

dev/run-tests-jenkins

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,8 +141,10 @@ function post_message () {
141141
test_result="$?"
142142

143143
if [ "$test_result" -eq "124" ]; then
144-
fail_message="**[Tests timed out](${BUILD_URL}consoleFull)** after \
145-
a configured wait of \`${TESTS_TIMEOUT}\`."
144+
fail_message="**[Tests timed out](${BUILD_URL}consoleFull)** \
145+
for PR $ghprbPullId at commit [\`${SHORT_COMMIT_HASH}\`](${COMMIT_URL}) \
146+
after a configured wait of \`${TESTS_TIMEOUT}\`."
147+
146148
post_message "$fail_message"
147149
exit $test_result
148150
else

docs/configuration.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,25 @@ Apart from these, the following properties are also available, and may be useful
206206
used during aggregation goes above this amount, it will spill the data into disks.
207207
</td>
208208
</tr>
209+
<tr>
210+
<td><code>spark.python.profile</code></td>
211+
<td>false</td>
212+
<td>
213+
Enable profiling in Python worker, the profile result will show up by `sc.show_profiles()`,
214+
or it will be displayed before the driver exiting. It also can be dumped into disk by
215+
`sc.dump_profiles(path)`. If some of the profile results had been displayed maually,
216+
they will not be displayed automatically before driver exiting.
217+
</td>
218+
</tr>
219+
<tr>
220+
<td><code>spark.python.profile.dump</code></td>
221+
<td>(none)</td>
222+
<td>
223+
The directory which is used to dump the profile result before driver exiting.
224+
The results will be dumped as separated file for each RDD. They can be loaded
225+
by ptats.Stats(). If this is specified, the profile result will not be displayed
226+
automatically.
227+
</tr>
209228
<tr>
210229
<td><code>spark.python.worker.reuse</code></td>
211230
<td>true</td>

docs/programming-guide.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -883,7 +883,7 @@ for details.
883883
<tr>
884884
<td> <b>groupByKey</b>([<i>numTasks</i>]) </td>
885885
<td> When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable&lt;V&gt;) pairs. <br />
886-
<b>Note:</b> If you are grouping in order to perform an aggregation (such as a sum or
886+
<b>Note:</b> If you are grouping in order to perform an aggregation (such as a sum or
887887
average) over each key, using <code>reduceByKey</code> or <code>combineByKey</code> will yield much better
888888
performance.
889889
<br />

external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol}
3131
import org.apache.spark.storage.StorageLevel
3232
import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuiteBase}
3333
import org.apache.spark.streaming.util.ManualClock
34-
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
34+
import org.apache.spark.util.Utils
3535

3636
import org.jboss.netty.channel.ChannelPipeline
3737
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
@@ -41,35 +41,40 @@ import org.jboss.netty.handler.codec.compression._
4141
class FlumeStreamSuite extends TestSuiteBase {
4242

4343
test("flume input stream") {
44-
runFlumeStreamTest(false, 9998)
44+
runFlumeStreamTest(false)
4545
}
4646

4747
test("flume input compressed stream") {
48-
runFlumeStreamTest(true, 9997)
48+
runFlumeStreamTest(true)
4949
}
5050

51-
def runFlumeStreamTest(enableDecompression: Boolean, testPort: Int) {
51+
def runFlumeStreamTest(enableDecompression: Boolean) {
5252
// Set up the streaming context and input streams
5353
val ssc = new StreamingContext(conf, batchDuration)
54-
val flumeStream: JavaReceiverInputDStream[SparkFlumeEvent] =
55-
FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK, enableDecompression)
54+
val (flumeStream, testPort) =
55+
Utils.startServiceOnPort(9997, (trialPort: Int) => {
56+
val dstream = FlumeUtils.createStream(
57+
ssc, "localhost", trialPort, StorageLevel.MEMORY_AND_DISK, enableDecompression)
58+
(dstream, trialPort)
59+
})
60+
5661
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
5762
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
58-
val outputStream = new TestOutputStream(flumeStream.receiverInputDStream, outputBuffer)
63+
val outputStream = new TestOutputStream(flumeStream, outputBuffer)
5964
outputStream.register()
6065
ssc.start()
6166

6267
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
6368
val input = Seq(1, 2, 3, 4, 5)
6469
Thread.sleep(1000)
6570
val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort))
66-
var client: AvroSourceProtocol = null;
67-
71+
var client: AvroSourceProtocol = null
72+
6873
if (enableDecompression) {
6974
client = SpecificRequestor.getClient(
7075
classOf[AvroSourceProtocol],
7176
new NettyTransceiver(new InetSocketAddress("localhost", testPort),
72-
new CompressionChannelFactory(6)));
77+
new CompressionChannelFactory(6)))
7378
} else {
7479
client = SpecificRequestor.getClient(
7580
classOf[AvroSourceProtocol], transceiver)

mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ sealed trait Matrix extends Serializable {
8585
}
8686

8787
/**
88-
* Column-majored dense matrix.
88+
* Column-major dense matrix.
8989
* The entry values are stored in a single array of doubles with columns listed in sequence.
9090
* For example, the following matrix
9191
* {{{
@@ -128,7 +128,7 @@ class DenseMatrix(val numRows: Int, val numCols: Int, val values: Array[Double])
128128
}
129129

130130
/**
131-
* Column-majored sparse matrix.
131+
* Column-major sparse matrix.
132132
* The entry values are stored in Compressed Sparse Column (CSC) format.
133133
* For example, the following matrix
134134
* {{{
@@ -207,7 +207,7 @@ class SparseMatrix(
207207
object Matrices {
208208

209209
/**
210-
* Creates a column-majored dense matrix.
210+
* Creates a column-major dense matrix.
211211
*
212212
* @param numRows number of rows
213213
* @param numCols number of columns
@@ -218,7 +218,7 @@ object Matrices {
218218
}
219219

220220
/**
221-
* Creates a column-majored sparse matrix in Compressed Sparse Column (CSC) format.
221+
* Creates a column-major sparse matrix in Compressed Sparse Column (CSC) format.
222222
*
223223
* @param numRows number of rows
224224
* @param numCols number of columns

python/pyspark/accumulators.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,21 @@ def addInPlace(self, value1, value2):
215215
COMPLEX_ACCUMULATOR_PARAM = AddingAccumulatorParam(0.0j)
216216

217217

218+
class PStatsParam(AccumulatorParam):
219+
"""PStatsParam is used to merge pstats.Stats"""
220+
221+
@staticmethod
222+
def zero(value):
223+
return None
224+
225+
@staticmethod
226+
def addInPlace(value1, value2):
227+
if value1 is None:
228+
return value2
229+
value1.add(value2)
230+
return value1
231+
232+
218233
class _UpdateRequestHandler(SocketServer.StreamRequestHandler):
219234

220235
"""

python/pyspark/context.py

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import sys
2121
from threading import Lock
2222
from tempfile import NamedTemporaryFile
23+
import atexit
2324

2425
from pyspark import accumulators
2526
from pyspark.accumulators import Accumulator
@@ -30,7 +31,6 @@
3031
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \
3132
PairDeserializer, CompressedSerializer
3233
from pyspark.storagelevel import StorageLevel
33-
from pyspark import rdd
3434
from pyspark.rdd import RDD
3535
from pyspark.traceback_utils import CallSite, first_spark_call
3636

@@ -192,6 +192,9 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize,
192192
self._temp_dir = \
193193
self._jvm.org.apache.spark.util.Utils.createTempDir(local_dir).getAbsolutePath()
194194

195+
# profiling stats collected for each PythonRDD
196+
self._profile_stats = []
197+
195198
def _initialize_context(self, jconf):
196199
"""
197200
Initialize SparkContext in function to allow subclass specific initialization
@@ -792,6 +795,40 @@ def runJob(self, rdd, partitionFunc, partitions=None, allowLocal=False):
792795
it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal)
793796
return list(mappedRDD._collect_iterator_through_file(it))
794797

798+
def _add_profile(self, id, profileAcc):
799+
if not self._profile_stats:
800+
dump_path = self._conf.get("spark.python.profile.dump")
801+
if dump_path:
802+
atexit.register(self.dump_profiles, dump_path)
803+
else:
804+
atexit.register(self.show_profiles)
805+
806+
self._profile_stats.append([id, profileAcc, False])
807+
808+
def show_profiles(self):
809+
""" Print the profile stats to stdout """
810+
for i, (id, acc, showed) in enumerate(self._profile_stats):
811+
stats = acc.value
812+
if not showed and stats:
813+
print "=" * 60
814+
print "Profile of RDD<id=%d>" % id
815+
print "=" * 60
816+
stats.sort_stats("time", "cumulative").print_stats()
817+
# mark it as showed
818+
self._profile_stats[i][2] = True
819+
820+
def dump_profiles(self, path):
821+
""" Dump the profile stats into directory `path`
822+
"""
823+
if not os.path.exists(path):
824+
os.makedirs(path)
825+
for id, acc, _ in self._profile_stats:
826+
stats = acc.value
827+
if stats:
828+
p = os.path.join(path, "rdd_%d.pstats" % id)
829+
stats.dump_stats(p)
830+
self._profile_stats = []
831+
795832

796833
def _test():
797834
import atexit

0 commit comments

Comments
 (0)