Skip to content

Commit f4fd12e

Browse files
committed
Merge remote-tracking branch 'oss/master' into SPARK-33399-outputpartitioning
2 parents 4d5f688 + 1baf0d5 commit f4fd12e

File tree

37 files changed

+1190
-442
lines changed

37 files changed

+1190
-442
lines changed

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -302,8 +302,8 @@ package object config {
302302
.createWithDefaultString("1g")
303303

304304
private[spark] val EXECUTOR_MEMORY_OVERHEAD = ConfigBuilder("spark.executor.memoryOverhead")
305-
.doc("The amount of non-heap memory to be allocated per executor in cluster mode, " +
306-
"in MiB unless otherwise specified.")
305+
.doc("The amount of non-heap memory to be allocated per executor, in MiB unless otherwise" +
306+
" specified.")
307307
.version("2.3.0")
308308
.bytesConf(ByteUnit.MiB)
309309
.createOptional

docs/configuration.md

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -274,10 +274,9 @@ of the most common options to set are:
274274
<td><code>spark.executor.memoryOverhead</code></td>
275275
<td>executorMemory * 0.10, with minimum of 384 </td>
276276
<td>
277-
Amount of additional memory to be allocated per executor process in cluster mode, in MiB unless
278-
otherwise specified. This is memory that accounts for things like VM overheads, interned strings,
279-
other native overheads, etc. This tends to grow with the executor size (typically 6-10%).
280-
This option is currently supported on YARN and Kubernetes.
277+
Amount of additional memory to be allocated per executor process, in MiB unless otherwise specified.
278+
This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc.
279+
This tends to grow with the executor size (typically 6-10%). This option is currently supported on YARN and Kubernetes.
281280
<br/>
282281
<em>Note:</em> Additional memory includes PySpark executor memory
283282
(when <code>spark.executor.pyspark.memory</code> is not configured) and memory used by other

docs/sql-ref-ansi-compliance.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,14 @@ SELECT * FROM t;
110110
### SQL Functions
111111

112112
The behavior of some SQL functions can be different under ANSI mode (`spark.sql.ansi.enabled=true`).
113-
- `size`: This function returns null for null input under ANSI mode.
113+
- `size`: This function returns null for null input.
114+
- `element_at`: This function throws `ArrayIndexOutOfBoundsException` if using invalid indices.
115+
- `elt`: This function throws `ArrayIndexOutOfBoundsException` if using invalid indices.
116+
117+
### SQL Operators
118+
119+
The behavior of some SQL operators can be different under ANSI mode (`spark.sql.ansi.enabled=true`).
120+
- `array_col[index]`: This operator throws `ArrayIndexOutOfBoundsException` if using invalid indices.
114121

115122
### SQL Keywords
116123

mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala

Lines changed: 27 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ import org.apache.spark.storage.StorageLevel
4242
/** Params for linear SVM Classifier. */
4343
private[classification] trait LinearSVCParams extends ClassifierParams with HasRegParam
4444
with HasMaxIter with HasFitIntercept with HasTol with HasStandardization with HasWeightCol
45-
with HasAggregationDepth with HasThreshold with HasBlockSize {
45+
with HasAggregationDepth with HasThreshold with HasMaxBlockSizeInMB {
4646

4747
/**
4848
* Param for threshold in binary classification prediction.
@@ -57,7 +57,7 @@ private[classification] trait LinearSVCParams extends ClassifierParams with HasR
5757
"threshold in binary classification prediction applied to rawPrediction")
5858

5959
setDefault(regParam -> 0.0, maxIter -> 100, fitIntercept -> true, tol -> 1E-6,
60-
standardization -> true, threshold -> 0.0, aggregationDepth -> 2, blockSize -> 1)
60+
standardization -> true, threshold -> 0.0, aggregationDepth -> 2, maxBlockSizeInMB -> 0.0)
6161
}
6262

6363
/**
@@ -153,22 +153,13 @@ class LinearSVC @Since("2.2.0") (
153153
def setAggregationDepth(value: Int): this.type = set(aggregationDepth, value)
154154

155155
/**
156-
* Set block size for stacking input data in matrices.
157-
* If blockSize == 1, then stacking will be skipped, and each vector is treated individually;
158-
* If blockSize &gt; 1, then vectors will be stacked to blocks, and high-level BLAS routines
159-
* will be used if possible (for example, GEMV instead of DOT, GEMM instead of GEMV).
160-
* Recommended size is between 10 and 1000. An appropriate choice of the block size depends
161-
* on the sparsity and dim of input datasets, the underlying BLAS implementation (for example,
162-
* f2jBLAS, OpenBLAS, intel MKL) and its configuration (for example, number of threads).
163-
* Note that existing BLAS implementations are mainly optimized for dense matrices, if the
164-
* input dataset is sparse, stacking may bring no performance gain, the worse is possible
165-
* performance regression.
166-
* Default is 1.
156+
* Sets the value of param [[maxBlockSizeInMB]].
157+
* Default is 0.0.
167158
*
168159
* @group expertSetParam
169160
*/
170161
@Since("3.1.0")
171-
def setBlockSize(value: Int): this.type = set(blockSize, value)
162+
def setMaxBlockSizeInMB(value: Double): this.type = set(maxBlockSizeInMB, value)
172163

173164
@Since("2.2.0")
174165
override def copy(extra: ParamMap): LinearSVC = defaultCopy(extra)
@@ -177,19 +168,19 @@ class LinearSVC @Since("2.2.0") (
177168
instr.logPipelineStage(this)
178169
instr.logDataset(dataset)
179170
instr.logParams(this, labelCol, weightCol, featuresCol, predictionCol, rawPredictionCol,
180-
regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth, blockSize)
171+
regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth,
172+
maxBlockSizeInMB)
173+
174+
if (dataset.storageLevel != StorageLevel.NONE) {
175+
instr.logWarning(s"Input instances will be standardized, blockified to blocks, and " +
176+
s"then cached during training. Be careful of double caching!")
177+
}
181178

182179
val instances = extractInstances(dataset)
183180
.setName("training instances")
184181

185-
if (dataset.storageLevel == StorageLevel.NONE && $(blockSize) == 1) {
186-
instances.persist(StorageLevel.MEMORY_AND_DISK)
187-
}
188-
189-
var requestedMetrics = Seq("mean", "std", "count")
190-
if ($(blockSize) != 1) requestedMetrics +:= "numNonZeros"
191182
val (summarizer, labelSummarizer) = Summarizer
192-
.getClassificationSummarizers(instances, $(aggregationDepth), requestedMetrics)
183+
.getClassificationSummarizers(instances, $(aggregationDepth), Seq("mean", "std", "count"))
193184

194185
val histogram = labelSummarizer.histogram
195186
val numInvalid = labelSummarizer.countInvalid
@@ -199,14 +190,12 @@ class LinearSVC @Since("2.2.0") (
199190
instr.logNamedValue("lowestLabelWeight", labelSummarizer.histogram.min.toString)
200191
instr.logNamedValue("highestLabelWeight", labelSummarizer.histogram.max.toString)
201192
instr.logSumOfWeights(summarizer.weightSum)
202-
if ($(blockSize) > 1) {
203-
val scale = 1.0 / summarizer.count / numFeatures
204-
val sparsity = 1 - summarizer.numNonzeros.toArray.map(_ * scale).sum
205-
instr.logNamedValue("sparsity", sparsity.toString)
206-
if (sparsity > 0.5) {
207-
instr.logWarning(s"sparsity of input dataset is $sparsity, " +
208-
s"which may hurt performance in high-level BLAS.")
209-
}
193+
194+
var actualBlockSizeInMB = $(maxBlockSizeInMB)
195+
if (actualBlockSizeInMB == 0) {
196+
actualBlockSizeInMB = InstanceBlock.DefaultBlockSizeInMB
197+
require(actualBlockSizeInMB > 0, "inferred actual BlockSizeInMB must > 0")
198+
instr.logNamedValue("actualBlockSizeInMB", actualBlockSizeInMB.toString)
210199
}
211200

212201
val numClasses = MetadataUtils.getNumClasses(dataset.schema($(labelCol))) match {
@@ -245,12 +234,8 @@ class LinearSVC @Since("2.2.0") (
245234
Note that the intercept in scaled space and original space is the same;
246235
as a result, no scaling is needed.
247236
*/
248-
val (rawCoefficients, objectiveHistory) = if ($(blockSize) == 1) {
249-
trainOnRows(instances, featuresStd, regularization, optimizer)
250-
} else {
251-
trainOnBlocks(instances, featuresStd, regularization, optimizer)
252-
}
253-
if (instances.getStorageLevel != StorageLevel.NONE) instances.unpersist()
237+
val (rawCoefficients, objectiveHistory) =
238+
trainImpl(instances, actualBlockSizeInMB, featuresStd, regularization, optimizer)
254239

255240
if (rawCoefficients == null) {
256241
val msg = s"${optimizer.getClass.getName} failed."
@@ -284,35 +269,9 @@ class LinearSVC @Since("2.2.0") (
284269
model.setSummary(Some(summary))
285270
}
286271

287-
private def trainOnRows(
288-
instances: RDD[Instance],
289-
featuresStd: Array[Double],
290-
regularization: Option[L2Regularization],
291-
optimizer: BreezeOWLQN[Int, BDV[Double]]): (Array[Double], Array[Double]) = {
292-
val numFeatures = featuresStd.length
293-
val numFeaturesPlusIntercept = if ($(fitIntercept)) numFeatures + 1 else numFeatures
294-
295-
val bcFeaturesStd = instances.context.broadcast(featuresStd)
296-
val getAggregatorFunc = new HingeAggregator(bcFeaturesStd, $(fitIntercept))(_)
297-
val costFun = new RDDLossFunction(instances, getAggregatorFunc,
298-
regularization, $(aggregationDepth))
299-
300-
val states = optimizer.iterations(new CachedDiffFunction(costFun),
301-
Vectors.zeros(numFeaturesPlusIntercept).asBreeze.toDenseVector)
302-
303-
val arrayBuilder = mutable.ArrayBuilder.make[Double]
304-
var state: optimizer.State = null
305-
while (states.hasNext) {
306-
state = states.next()
307-
arrayBuilder += state.adjustedValue
308-
}
309-
bcFeaturesStd.destroy()
310-
311-
(if (state != null) state.x.toArray else null, arrayBuilder.result)
312-
}
313-
314-
private def trainOnBlocks(
272+
private def trainImpl(
315273
instances: RDD[Instance],
274+
actualBlockSizeInMB: Double,
316275
featuresStd: Array[Double],
317276
regularization: Option[L2Regularization],
318277
optimizer: BreezeOWLQN[Int, BDV[Double]]): (Array[Double], Array[Double]) = {
@@ -326,9 +285,11 @@ class LinearSVC @Since("2.2.0") (
326285
val func = StandardScalerModel.getTransformFunc(Array.empty, inverseStd, false, true)
327286
iter.map { case Instance(label, weight, vec) => Instance(label, weight, func(vec)) }
328287
}
329-
val blocks = InstanceBlock.blokify(standardized, $(blockSize))
288+
289+
val maxMemUsage = (actualBlockSizeInMB * 1024L * 1024L).ceil.toLong
290+
val blocks = InstanceBlock.blokifyWithMaxMemUsage(standardized, maxMemUsage)
330291
.persist(StorageLevel.MEMORY_AND_DISK)
331-
.setName(s"training blocks (blockSize=${$(blockSize)})")
292+
.setName(s"training blocks (blockSizeInMB=$actualBlockSizeInMB)")
332293

333294
val getAggregatorFunc = new BlockHingeAggregator($(fitIntercept))(_)
334295
val costFun = new RDDLossFunction(blocks, getAggregatorFunc,

mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.ml.feature
1919

20+
import scala.collection.mutable
21+
2022
import org.apache.spark.ml.linalg._
2123
import org.apache.spark.rdd.RDD
2224

@@ -100,6 +102,32 @@ private[spark] case class InstanceBlock(
100102

101103
private[spark] object InstanceBlock {
102104

105+
/**
106+
* Suggested value for BlockSizeInMB in Level-2 routine cases.
107+
* According to performance tests of BLAS routine (see SPARK-31714) and
108+
* LinearSVC (see SPARK-32907), 1.0 MB should be an acceptable value for
109+
* linear models using Level-2 routine (GEMV) to perform prediction and
110+
* gradient computation.
111+
*/
112+
val DefaultBlockSizeInMB = 1.0
113+
114+
private def getBlockMemUsage(
115+
numCols: Long,
116+
numRows: Long,
117+
nnz: Long,
118+
allUnitWeight: Boolean): Long = {
119+
val doubleBytes = java.lang.Double.BYTES
120+
val arrayHeader = 12L
121+
val denseSize = Matrices.getDenseSize(numCols, numRows)
122+
val sparseSize = Matrices.getSparseSize(nnz, numRows + 1)
123+
val matrixSize = math.min(denseSize, sparseSize)
124+
if (allUnitWeight) {
125+
matrixSize + doubleBytes * numRows + arrayHeader * 2
126+
} else {
127+
matrixSize + doubleBytes * numRows * 2 + arrayHeader * 2
128+
}
129+
}
130+
103131
def fromInstances(instances: Seq[Instance]): InstanceBlock = {
104132
val labels = instances.map(_.label).toArray
105133
val weights = if (instances.exists(_.weight != 1)) {
@@ -114,6 +142,49 @@ private[spark] object InstanceBlock {
114142
def blokify(instances: RDD[Instance], blockSize: Int): RDD[InstanceBlock] = {
115143
instances.mapPartitions(_.grouped(blockSize).map(InstanceBlock.fromInstances))
116144
}
145+
146+
def blokifyWithMaxMemUsage(
147+
instanceIterator: Iterator[Instance],
148+
maxMemUsage: Long): Iterator[InstanceBlock] = {
149+
require(maxMemUsage > 0)
150+
151+
new Iterator[InstanceBlock]() {
152+
private var numCols = -1L
153+
154+
override def hasNext: Boolean = instanceIterator.hasNext
155+
156+
override def next(): InstanceBlock = {
157+
val buff = mutable.ArrayBuilder.make[Instance]
158+
var buffCnt = 0L
159+
var buffNnz = 0L
160+
var buffUnitWeight = true
161+
var blockMemUsage = 0L
162+
163+
while (instanceIterator.hasNext && blockMemUsage < maxMemUsage) {
164+
val instance = instanceIterator.next()
165+
if (numCols < 0L) numCols = instance.features.size
166+
require(numCols == instance.features.size)
167+
168+
buff += instance
169+
buffCnt += 1L
170+
buffNnz += instance.features.numNonzeros
171+
buffUnitWeight &&= (instance.weight == 1)
172+
blockMemUsage = getBlockMemUsage(numCols, buffCnt, buffNnz, buffUnitWeight)
173+
}
174+
175+
// the block memory usage may slightly exceed threshold, not a big issue.
176+
// and this ensure even if one row exceed block limit, each block has one row.
177+
InstanceBlock.fromInstances(buff.result())
178+
}
179+
}
180+
}
181+
182+
def blokifyWithMaxMemUsage(
183+
instances: RDD[Instance],
184+
maxMemUsage: Long): RDD[InstanceBlock] = {
185+
require(maxMemUsage > 0)
186+
instances.mapPartitions(iter => blokifyWithMaxMemUsage(iter, maxMemUsage))
187+
}
117188
}
118189

119190

mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,12 @@ private[shared] object SharedParamsCodeGen {
108108
ParamDesc[Int]("blockSize", "block size for stacking input data in matrices. Data is " +
109109
"stacked within partitions. If block size is more than remaining data in a partition " +
110110
"then it is adjusted to the size of this data.",
111-
isValid = "ParamValidators.gt(0)", isExpertParam = true)
111+
isValid = "ParamValidators.gt(0)", isExpertParam = true),
112+
ParamDesc[Double]("maxBlockSizeInMB", "Maximum memory in MB for stacking input data " +
113+
"into blocks. Data is stacked within partitions. If more than remaining data size in a " +
114+
"partition then it is adjusted to the data size. If 0, try to infer an appropriate " +
115+
"value. Must be >= 0.",
116+
Some("0.0"), isValid = "ParamValidators.gtEq(0.0)", isExpertParam = true)
112117
)
113118

114119
val code = genSharedParams(params)

mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -562,4 +562,22 @@ trait HasBlockSize extends Params {
562562
/** @group expertGetParam */
563563
final def getBlockSize: Int = $(blockSize)
564564
}
565+
566+
/**
567+
* Trait for shared param maxBlockSizeInMB (default: 0.0). This trait may be changed or
568+
* removed between minor versions.
569+
*/
570+
trait HasMaxBlockSizeInMB extends Params {
571+
572+
/**
573+
* Param for Maximum memory in MB for stacking input data into blocks. Data is stacked within partitions. If more than remaining data size in a partition then it is adjusted to the data size. If 0, try to infer an appropriate value. Must be &gt;= 0..
574+
* @group expertParam
575+
*/
576+
final val maxBlockSizeInMB: DoubleParam = new DoubleParam(this, "maxBlockSizeInMB", "Maximum memory in MB for stacking input data into blocks. Data is stacked within partitions. If more than remaining data size in a partition then it is adjusted to the data size. If 0, try to infer an appropriate value. Must be >= 0.", ParamValidators.gtEq(0.0))
577+
578+
setDefault(maxBlockSizeInMB, 0.0)
579+
580+
/** @group expertGetParam */
581+
final def getMaxBlockSizeInMB: Double = $(maxBlockSizeInMB)
582+
}
565583
// scalastyle:on

mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -214,8 +214,8 @@ class LinearSVCSuite extends MLTest with DefaultReadWriteTest {
214214
.setFitIntercept(fitIntercept)
215215
.setMaxIter(5)
216216
val model = lsvc.fit(dataset)
217-
Seq(4, 16, 64).foreach { blockSize =>
218-
val model2 = lsvc.setBlockSize(blockSize).fit(dataset)
217+
Seq(0, 0.01, 0.1, 1, 2, 4).foreach { s =>
218+
val model2 = lsvc.setMaxBlockSizeInMB(s).fit(dataset)
219219
assert(model.intercept ~== model2.intercept relTol 1e-9)
220220
assert(model.coefficients ~== model2.coefficients relTol 1e-9)
221221
}

mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,4 +74,58 @@ class InstanceSuite extends SparkFunSuite{
7474
}
7575
}
7676

77+
test("InstanceBlock: blokify with max memory usage") {
78+
val instance1 = Instance(19.0, 2.0, Vectors.dense(1.0, 7.0))
79+
val instance2 = Instance(17.0, 1.0, Vectors.dense(0.0, 5.0).toSparse)
80+
val instances = Seq(instance1, instance2)
81+
82+
val blocks = InstanceBlock
83+
.blokifyWithMaxMemUsage(Iterator.apply(instance1, instance2), 128).toArray
84+
require(blocks.length == 1)
85+
val block = blocks.head
86+
assert(block.size === 2)
87+
assert(block.numFeatures === 2)
88+
block.instanceIterator.zipWithIndex.foreach {
89+
case (instance, i) =>
90+
assert(instance.label === instances(i).label)
91+
assert(instance.weight === instances(i).weight)
92+
assert(instance.features.toArray === instances(i).features.toArray)
93+
}
94+
Seq(0, 1).foreach { i =>
95+
val nzIter = block.getNonZeroIter(i)
96+
val vec = Vectors.sparse(2, nzIter.toSeq)
97+
assert(vec.toArray === instances(i).features.toArray)
98+
}
99+
100+
// instances larger than maxMemUsage
101+
val denseInstance = Instance(-1.0, 2.0, Vectors.dense(Array.fill(1000)(1.0)))
102+
InstanceBlock.blokifyWithMaxMemUsage(Iterator.single(denseInstance), 64).size
103+
InstanceBlock.blokifyWithMaxMemUsage(Iterator.fill(10)(denseInstance), 64).size
104+
105+
// different numFeatures
106+
intercept[IllegalArgumentException] {
107+
InstanceBlock.blokifyWithMaxMemUsage(Iterator.apply(instance1, denseInstance), 64).size
108+
}
109+
110+
// nnz = 10
111+
val sparseInstance = Instance(-2.0, 3.0,
112+
Vectors.sparse(1000, Array.range(0, 1000, 100), Array.fill(10)(0.1)))
113+
114+
// normally, memory usage of a block does not exceed maxMemUsage too much
115+
val maxMemUsage = 1 << 18
116+
val mixedIter = Iterator.fill(100)(denseInstance) ++
117+
Iterator.fill(1000)(sparseInstance) ++
118+
Iterator.fill(10)(denseInstance) ++
119+
Iterator.fill(10)(sparseInstance) ++
120+
Iterator.fill(100)(denseInstance) ++
121+
Iterator.fill(100)(sparseInstance)
122+
InstanceBlock.blokifyWithMaxMemUsage(mixedIter, maxMemUsage)
123+
.foreach { block =>
124+
val doubleBytes = java.lang.Double.BYTES
125+
val arrayHeader = 12L
126+
val blockMemUsage = block.matrix.getSizeInBytes +
127+
(block.labels.length + block.weights.length) * doubleBytes + arrayHeader * 2
128+
require(blockMemUsage < maxMemUsage * 1.05)
129+
}
130+
}
77131
}

0 commit comments

Comments
 (0)