Skip to content

Commit aa78c23

Browse files
jerryshaotdas
authored andcommitted
[SPARK-4671][Streaming]Do not replicate streaming block when WAL is enabled
Currently streaming block will be replicated when specific storage level is set, since WAL is already fault tolerant, so replication is needless and will hurt the throughput of streaming application. Hi tdas , as per discussed about this issue, I fixed with this implementation, I'm not is this the way you want, would you mind taking a look at it? Thanks a lot. Author: jerryshao <[email protected]> Closes #3534 from jerryshao/SPARK-4671 and squashes the following commits: 500b456 [jerryshao] Do not replicate streaming block when WAL is enabled (cherry picked from commit 3f5f4cc) Signed-off-by: Tathagata Das <[email protected]>
1 parent 01adf45 commit aa78c23

File tree

1 file changed

+19
-1
lines changed

1 file changed

+19
-1
lines changed

streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,24 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
121121
private val maxFailures = conf.getInt(
122122
"spark.streaming.receiver.writeAheadLog.maxFailures", 3)
123123

124+
private val effectiveStorageLevel = {
125+
if (storageLevel.deserialized) {
126+
logWarning(s"Storage level serialization ${storageLevel.deserialized} is not supported when" +
127+
s" write ahead log is enabled, change to serialization false")
128+
}
129+
if (storageLevel.replication > 1) {
130+
logWarning(s"Storage level replication ${storageLevel.replication} is unnecessary when " +
131+
s"write ahead log is enabled, change to replication 1")
132+
}
133+
134+
StorageLevel(storageLevel.useDisk, storageLevel.useMemory, storageLevel.useOffHeap, false, 1)
135+
}
136+
137+
if (storageLevel != effectiveStorageLevel) {
138+
logWarning(s"User defined storage level $storageLevel is changed to effective storage level " +
139+
s"$effectiveStorageLevel when write ahead log is enabled")
140+
}
141+
124142
// Manages rolling log files
125143
private val logManager = new WriteAheadLogManager(
126144
checkpointDirToLogDir(checkpointDir, streamId),
@@ -156,7 +174,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
156174
// Store the block in block manager
157175
val storeInBlockManagerFuture = Future {
158176
val putResult =
159-
blockManager.putBytes(blockId, serializedBlock, storageLevel, tellMaster = true)
177+
blockManager.putBytes(blockId, serializedBlock, effectiveStorageLevel, tellMaster = true)
160178
if (!putResult.map { _._1 }.contains(blockId)) {
161179
throw new SparkException(
162180
s"Could not store $blockId to block manager with storage level $storageLevel")

0 commit comments

Comments
 (0)