Skip to content

Commit 0846e07

Browse files
committed
first version
1 parent c235b83 commit 0846e07

File tree

1 file changed

+45
-73
lines changed

1 file changed

+45
-73
lines changed

mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala

Lines changed: 45 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717

1818
package org.apache.spark.mllib.stat.correlation
1919

20+
import org.apache.spark.storage.StorageLevel
21+
2022
import scala.collection.mutable.ArrayBuffer
2123

2224
import org.apache.spark.{Logging, HashPartitioner}
2325
import org.apache.spark.SparkContext._
24-
import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
26+
import org.apache.spark.mllib.linalg.{Vectors, DenseVector, Matrix, Vector}
2527
import org.apache.spark.rdd.{CoGroupedRDD, RDD}
2628

2729
/**
@@ -48,82 +50,52 @@ private[stat] object SpearmanCorrelation extends Correlation with Logging {
4850
* numCol RDD[Double]s, each of which sorted, and the joined back into a single RDD[Vector].
4951
*/
5052
override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
51-
val indexed = X.zipWithUniqueId()
52-
53-
val numCols = X.first.size
54-
if (numCols > 50) {
55-
logWarning("Computing the Spearman correlation matrix can be slow for large RDDs with more"
56-
+ " than 50 columns.")
57-
}
58-
val ranks = new Array[RDD[(Long, Double)]](numCols)
59-
60-
// Note: we use a for loop here instead of a while loop with a single index variable
61-
// to avoid race condition caused by closure serialization
62-
for (k <- 0 until numCols) {
63-
val column = indexed.map { case (vector, index) => (vector(k), index) }
64-
ranks(k) = getRanks(column)
65-
}
66-
67-
val ranksMat: RDD[Vector] = makeRankMatrix(ranks, X)
68-
PearsonCorrelation.computeCorrelationMatrix(ranksMat)
69-
}
70-
71-
/**
72-
* Compute the ranks for elements in the input RDD, using the average method for ties.
73-
*
74-
* With the average method, elements with the same value receive the same rank that's computed
75-
* by taking the average of their positions in the sorted list.
76-
* e.g. ranks([2, 1, 0, 2]) = [2.5, 1.0, 0.0, 2.5]
77-
* Note that positions here are 0-indexed, instead of the 1-indexed as in the definition for
78-
* ranks in the standard definition for Spearman's correlation. This does not affect the final
79-
* results and is slightly more performant.
80-
*
81-
* @param indexed RDD[(Double, Long)] containing pairs of the format (originalValue, uniqueId)
82-
* @return RDD[(Long, Double)] containing pairs of the format (uniqueId, rank), where uniqueId is
83-
* copied from the input RDD.
84-
*/
85-
private def getRanks(indexed: RDD[(Double, Long)]): RDD[(Long, Double)] = {
86-
// Get elements' positions in the sorted list for computing average rank for duplicate values
87-
val sorted = indexed.sortByKey().zipWithIndex()
88-
89-
val ranks: RDD[(Long, Double)] = sorted.mapPartitions { iter =>
90-
// add an extra element to signify the end of the list so that flatMap can flush the last
91-
// batch of duplicates
92-
val end = -1L
93-
val padded = iter ++ Iterator[((Double, Long), Long)](((Double.NaN, end), end))
94-
val firstEntry = padded.next()
95-
var lastVal = firstEntry._1._1
96-
var firstRank = firstEntry._2.toDouble
97-
val idBuffer = ArrayBuffer(firstEntry._1._2)
98-
padded.flatMap { case ((v, id), rank) =>
99-
if (v == lastVal && id != end) {
100-
idBuffer += id
101-
Iterator.empty
53+
val transposed = X.zipWithUniqueId().flatMap { case (vec, uid) =>
54+
vec.toArray.view.zipWithIndex.map { case (v, j) =>
55+
((j, v), uid)
56+
}
57+
}.persist(StorageLevel.MEMORY_AND_DISK)
58+
val sorted = transposed.sortByKey().persist(StorageLevel.MEMORY_AND_DISK)
59+
val ranked = sorted.zipWithIndex().mapPartitions { iter =>
60+
var preCol = -1
61+
var preVal = Double.NaN
62+
var startRank = -1.0
63+
var cachedIds = ArrayBuffer.empty[Long]
64+
def flush(): Iterable[(Long, (Int, Double))] = {
65+
val averageRank = startRank + (cachedIds.size - 1) / 2.0
66+
val output = cachedIds.map { i =>
67+
(i, (preCol, averageRank))
68+
}
69+
cachedIds.clear()
70+
output
71+
}
72+
iter.flatMap { case (((j, v), uid), rank) =>
73+
if (j != preCol || v != preVal) {
74+
val output = flush()
75+
preCol = j
76+
preVal = v
77+
startRank = rank
78+
cachedIds += uid
79+
output
10280
} else {
103-
val entries = if (idBuffer.size == 1) {
104-
Iterator((idBuffer(0), firstRank))
105-
} else {
106-
val averageRank = firstRank + (idBuffer.size - 1.0) / 2.0
107-
idBuffer.map(id => (id, averageRank))
108-
}
109-
lastVal = v
110-
firstRank = rank
111-
idBuffer.clear()
112-
idBuffer += id
113-
entries
81+
cachedIds += uid
82+
Iterator.empty
11483
}
84+
} ++ {
85+
flush()
11586
}
11687
}
117-
ranks
118-
}
119-
120-
private def makeRankMatrix(ranks: Array[RDD[(Long, Double)]], input: RDD[Vector]): RDD[Vector] = {
121-
val partitioner = new HashPartitioner(input.partitions.size)
122-
val cogrouped = new CoGroupedRDD[Long](ranks, partitioner)
123-
cogrouped.map {
124-
case (_, values: Array[Iterable[_]]) =>
125-
val doubles = values.asInstanceOf[Array[Iterable[Double]]]
126-
new DenseVector(doubles.flatten.toArray)
88+
val ranks = tied.groupByKey().map { case (uid, iter) =>
89+
val values = iter.toSeq.sortBy(_._1).map(_._2).toArray
90+
println(values.toSeq)
91+
Vectors.dense(values)
12792
}
93+
val corrMatrix = PearsonCorrelation.computeCorrelationMatrix(ranks)
94+
95+
transposed.unpersist(blocking = false)
96+
sorted.unpersist(blocking = false)
97+
98+
corrMatrix
12899
}
129100
}
101+

0 commit comments

Comments
 (0)