Skip to content

Commit 1edc806

Browse files
SPARK-1729. Update logging in Spark Sink.
1 parent 8c00289 commit 1edc806

File tree

6 files changed

+163
-23
lines changed

6 files changed

+163
-23
lines changed
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.flume.sink
18+
import org.apache.log4j.{LogManager, PropertyConfigurator}
19+
import org.slf4j.{Logger, LoggerFactory}
20+
import org.slf4j.impl.StaticLoggerBinder
21+
22+
trait Logging {
23+
// Make the log field transient so that objects with Logging can
24+
// be serialized and used on another machine
25+
@transient private var log_ : Logger = null
26+
27+
// Method to get or create the logger for this object
28+
protected def log: Logger = {
29+
if (log_ == null) {
30+
initializeIfNecessary()
31+
var className = this.getClass.getName
32+
// Ignore trailing $'s in the class names for Scala objects
33+
if (className.endsWith("$")) {
34+
className = className.substring(0, className.length - 1)
35+
}
36+
log_ = LoggerFactory.getLogger(className)
37+
}
38+
log_
39+
}
40+
41+
// Log methods that take only a String
42+
protected def logInfo(msg: => String) {
43+
if (log.isInfoEnabled) log.info(msg)
44+
}
45+
46+
protected def logDebug(msg: => String) {
47+
if (log.isDebugEnabled) log.debug(msg)
48+
}
49+
50+
protected def logTrace(msg: => String) {
51+
if (log.isTraceEnabled) log.trace(msg)
52+
}
53+
54+
protected def logWarning(msg: => String) {
55+
if (log.isWarnEnabled) log.warn(msg)
56+
}
57+
58+
protected def logError(msg: => String) {
59+
if (log.isErrorEnabled) log.error(msg)
60+
}
61+
62+
// Log methods that take Throwables (Exceptions/Errors) too
63+
protected def logInfo(msg: => String, throwable: Throwable) {
64+
if (log.isInfoEnabled) log.info(msg, throwable)
65+
}
66+
67+
protected def logDebug(msg: => String, throwable: Throwable) {
68+
if (log.isDebugEnabled) log.debug(msg, throwable)
69+
}
70+
71+
protected def logTrace(msg: => String, throwable: Throwable) {
72+
if (log.isTraceEnabled) log.trace(msg, throwable)
73+
}
74+
75+
protected def logWarning(msg: => String, throwable: Throwable) {
76+
if (log.isWarnEnabled) log.warn(msg, throwable)
77+
}
78+
79+
protected def logError(msg: => String, throwable: Throwable) {
80+
if (log.isErrorEnabled) log.error(msg, throwable)
81+
}
82+
83+
protected def isTraceEnabled(): Boolean = {
84+
log.isTraceEnabled
85+
}
86+
87+
private def initializeIfNecessary() {
88+
if (!Logging.initialized) {
89+
Logging.initLock.synchronized {
90+
if (!Logging.initialized) {
91+
initializeLogging()
92+
}
93+
}
94+
}
95+
}
96+
97+
private def initializeLogging() {
98+
// If Log4j is being used, but is not initialized, load a default properties file
99+
val binder = StaticLoggerBinder.getSingleton
100+
val usingLog4j = binder.getLoggerFactoryClassStr.endsWith("Log4jLoggerFactory")
101+
val log4jInitialized = LogManager.getRootLogger.getAllAppenders.hasMoreElements
102+
if (!log4jInitialized && usingLog4j) {
103+
val defaultLogProps = "org/apache/spark/log4j-defaults.properties"
104+
Option(getClass.getClassLoader.getResource(defaultLogProps)) match {
105+
case Some(url) =>
106+
PropertyConfigurator.configure(url)
107+
log.info(s"Using Spark's default log4j profile: $defaultLogProps")
108+
case None =>
109+
System.err.println(s"Spark was unable to load $defaultLogProps")
110+
}
111+
}
112+
Logging.initialized = true
113+
114+
// Force a call into slf4j to initialize it. Avoids this happening from mutliple threads
115+
// and triggering this: http://mailman.qos.ch/pipermail/slf4j-dev/2010-April/002956.html
116+
log
117+
}
118+
}
119+
120+
private object Logging {
121+
@volatile private var initialized = false
122+
val initLock = new Object()
123+
try {
124+
// We use reflection here to handle the case where users remove the
125+
// slf4j-to-jul bridge order to route their logs to JUL.
126+
val bridgeClass = Class.forName("org.slf4j.bridge.SLF4JBridgeHandler")
127+
bridgeClass.getMethod("removeHandlersForRootLogger").invoke(null)
128+
val installed = bridgeClass.getMethod("isInstalled").invoke(null).asInstanceOf[Boolean]
129+
if (!installed) {
130+
bridgeClass.getMethod("install").invoke(null)
131+
}
132+
} catch {
133+
case e: ClassNotFoundException => // can't log anything yet so just fail silently
134+
}
135+
}

external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkAvroCallbackHandler.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,7 @@ import org.slf4j.LoggerFactory
3535
* is rolled back.
3636
*/
3737
private class SparkAvroCallbackHandler(val threads: Int, val channel: Channel,
38-
val transactionTimeout: Int, val backOffInterval: Int) extends SparkFlumeProtocol {
39-
private val LOG = LoggerFactory.getLogger(classOf[SparkAvroCallbackHandler])
38+
val transactionTimeout: Int, val backOffInterval: Int) extends SparkFlumeProtocol with Logging {
4039
val transactionExecutorOpt = Option(Executors.newFixedThreadPool(threads,
4140
new ThreadFactoryBuilder().setDaemon(true)
4241
.setNameFormat("Spark Sink Processor Thread - %d").build()))
@@ -56,6 +55,7 @@ private class SparkAvroCallbackHandler(val threads: Int, val channel: Channel,
5655
* @return [[EventBatch]] instance that has a sequence number and an array of at most n events
5756
*/
5857
override def getEventBatch(n: Int): EventBatch = {
58+
logDebug("Got getEventBatch call from Spark.")
5959
val sequenceNumber = seqBase + seqCounter.incrementAndGet()
6060
val processor = new TransactionProcessor(channel, sequenceNumber,
6161
n, transactionTimeout, backOffInterval, this)
@@ -66,6 +66,7 @@ private class SparkAvroCallbackHandler(val threads: Int, val channel: Channel,
6666
val batch = processor.getEventBatch
6767
if (!SparkSinkUtils.isErrorBatch(batch)) {
6868
processorMap.put(sequenceNumber.toString, processor)
69+
logDebug("Sending event batch with sequence number: " + sequenceNumber)
6970
}
7071
batch
7172
}
@@ -75,6 +76,7 @@ private class SparkAvroCallbackHandler(val threads: Int, val channel: Channel,
7576
* @param sequenceNumber The sequence number of the event batch that was successful
7677
*/
7778
override def ack(sequenceNumber: CharSequence): Void = {
79+
logDebug("Received Ack for batch with sequence number: " + sequenceNumber)
7880
completeTransaction(sequenceNumber, success = true)
7981
null
8082
}
@@ -86,7 +88,7 @@ private class SparkAvroCallbackHandler(val threads: Int, val channel: Channel,
8688
*/
8789
override def nack(sequenceNumber: CharSequence): Void = {
8890
completeTransaction(sequenceNumber, success = false)
89-
LOG.info("Spark failed to commit transaction. Will reattempt events.")
91+
logInfo("Spark failed to commit transaction. Will reattempt events.")
9092
null
9193
}
9294

@@ -115,6 +117,7 @@ private class SparkAvroCallbackHandler(val threads: Int, val channel: Channel,
115117
* Shuts down the executor used to process transactions.
116118
*/
117119
def shutdown() {
120+
logInfo("Shutting down Spark Avro Callback Handler")
118121
transactionExecutorOpt.foreach(executor => {
119122
executor.shutdownNow()
120123
})

external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkSink.scala

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,8 @@ import org.apache.spark.flume.SparkFlumeProtocol
4848
// until an ACK or NACK comes back or the transaction times out (after the specified timeout).
4949
// When the response comes, the TransactionProcessor is retrieved and then unblocked,
5050
// at which point the transaction is committed or rolled back.
51-
class SparkSink extends AbstractSink with Configurable {
51+
class SparkSink extends AbstractSink with Logging with Configurable {
5252

53-
private val LOG = LoggerFactory.getLogger(classOf[SparkSink])
5453
// Size of the pool to use for holding transaction processors.
5554
private var poolSize: Integer = SparkSinkConfig.DEFAULT_THREADS
5655

@@ -74,7 +73,7 @@ class SparkSink extends AbstractSink with Configurable {
7473
private val blockingLatch = new CountDownLatch(1)
7574

7675
override def start() {
77-
LOG.info("Starting Spark Sink: " + getName + " on port: " + port + " and interface: " +
76+
logInfo("Starting Spark Sink: " + getName + " on port: " + port + " and interface: " +
7877
hostname + " with " + "pool size: " + poolSize + " and transaction timeout: " +
7978
transactionTimeout + ".")
8079
handler = Option(new SparkAvroCallbackHandler(poolSize, getChannel, transactionTimeout,
@@ -85,19 +84,19 @@ class SparkSink extends AbstractSink with Configurable {
8584
// Netty dependencies are already available on the JVM as Flume would have pulled them in.
8685
serverOpt = Option(new NettyServer(responder, new InetSocketAddress(hostname, port)))
8786
serverOpt.foreach(server => {
88-
LOG.info("Starting Avro server for sink: " + getName)
87+
logInfo("Starting Avro server for sink: " + getName)
8988
server.start()
9089
})
9190
super.start()
9291
}
9392

9493
override def stop() {
95-
LOG.info("Stopping Spark Sink: " + getName)
94+
logInfo("Stopping Spark Sink: " + getName)
9695
handler.foreach(callbackHandler => {
9796
callbackHandler.shutdown()
9897
})
9998
serverOpt.foreach(server => {
100-
LOG.info("Stopping Avro Server for sink: " + getName)
99+
logInfo("Stopping Avro Server for sink: " + getName)
101100
server.close()
102101
server.join()
103102
})
@@ -113,12 +112,16 @@ class SparkSink extends AbstractSink with Configurable {
113112
poolSize = ctx.getInteger(THREADS, DEFAULT_THREADS)
114113
transactionTimeout = ctx.getInteger(CONF_TRANSACTION_TIMEOUT, DEFAULT_TRANSACTION_TIMEOUT)
115114
backOffInterval = ctx.getInteger(CONF_BACKOFF_INTERVAL, DEFAULT_BACKOFF_INTERVAL)
115+
logInfo("Configured Spark Sink with hostname: " + hostname + ", port: " + port + ", " +
116+
"poolSize: " + poolSize + ", transactionTimeout: " + transactionTimeout + ", " +
117+
"backoffInterval: " + backOffInterval)
116118
}
117119

118120
override def process(): Status = {
119121
// This method is called in a loop by the Flume framework - block it until the sink is
120122
// stopped to save CPU resources. The sink runner will interrupt this thread when the sink is
121123
// being shut down.
124+
logInfo("Blocking Sink Runner, sink will continue to run..")
122125
blockingLatch.await()
123126
Status.BACKOFF
124127
}

external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkSinkUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,6 @@ object SparkSinkUtils {
2525
* @return - true if the batch represents an error
2626
*/
2727
def isErrorBatch(batch: EventBatch): Boolean = {
28-
!batch.getErrorMsg.toString.equals("") //If there is an error message, it is an error batch.
28+
!batch.getErrorMsg.toString.equals("") // If there is an error message, it is an error batch.
2929
}
3030
}

external/flume-sink/src/main/scala/org/apache/spark/flume/sink/TransactionProcessor.scala

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,7 @@ import org.slf4j.LoggerFactory
4545
*/
4646
private class TransactionProcessor(val channel: Channel, val seqNum: String,
4747
var maxBatchSize: Int, val transactionTimeout: Int, val backOffInterval: Int,
48-
val parent: SparkAvroCallbackHandler) extends Callable[Void] {
49-
50-
private val LOG = LoggerFactory.getLogger(classOf[TransactionProcessor])
48+
val parent: SparkAvroCallbackHandler) extends Callable[Void] with Logging {
5149

5250
// If a real batch is not returned, we always have to return an error batch.
5351
@volatile private var eventBatch: EventBatch = new EventBatch("Unknown Error", "",
@@ -88,9 +86,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
8886
* @param success True if an ACK was received and the transaction should be committed, else false.
8987
*/
9088
def batchProcessed(success: Boolean) {
91-
if (LOG.isDebugEnabled) {
92-
LOG.debug("Batch processed for sequence number: " + seqNum)
93-
}
89+
logDebug("Batch processed for sequence number: " + seqNum)
9490
batchSuccess = success
9591
batchAckLatch.countDown()
9692
}
@@ -123,6 +119,8 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
123119
gotEventsInThisTxn = true
124120
case None =>
125121
if (!gotEventsInThisTxn) {
122+
logDebug("Sleeping for " + backOffInterval + " millis as no events were read in" +
123+
" the current transaction")
126124
TimeUnit.MILLISECONDS.sleep(backOffInterval)
127125
} else {
128126
loop.break()
@@ -133,7 +131,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
133131
if (!gotEventsInThisTxn) {
134132
val msg = "Tried several times, " +
135133
"but did not get any events from the channel!"
136-
LOG.warn(msg)
134+
logWarning(msg)
137135
eventBatch.setErrorMsg(msg)
138136
} else {
139137
// At this point, the events are available, so fill them into the event batch
@@ -142,7 +140,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
142140
})
143141
} catch {
144142
case e: Exception =>
145-
LOG.error("Error while processing transaction.", e)
143+
logWarning("Error while processing transaction.", e)
146144
eventBatch.setErrorMsg(e.getMessage)
147145
try {
148146
txOpt.foreach(tx => {
@@ -166,17 +164,18 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
166164
txOpt.foreach(tx => {
167165
if (batchSuccess) {
168166
try {
167+
logDebug("Committing transaction")
169168
tx.commit()
170169
} catch {
171170
case e: Exception =>
172-
LOG.warn("Error while attempting to commit transaction. Transaction will be rolled " +
171+
logWarning("Error while attempting to commit transaction. Transaction will be rolled " +
173172
"back", e)
174173
rollbackAndClose(tx, close = false) // tx will be closed later anyway
175174
} finally {
176175
tx.close()
177176
}
178177
} else {
179-
LOG.warn("Spark could not commit transaction, NACK received. Rolling back transaction.")
178+
logWarning("Spark could not commit transaction, NACK received. Rolling back transaction.")
180179
rollbackAndClose(tx, close = true)
181180
// This might have been due to timeout or a NACK. Either way the following call does not
182181
// cause issues. This is required to ensure the TransactionProcessor instance is not leaked
@@ -192,12 +191,12 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
192191
*/
193192
private def rollbackAndClose(tx: Transaction, close: Boolean) {
194193
try {
195-
LOG.warn("Spark was unable to successfully process the events. Transaction is being " +
194+
logWarning("Spark was unable to successfully process the events. Transaction is being " +
196195
"rolled back.")
197196
tx.rollback()
198197
} catch {
199198
case e: Exception =>
200-
LOG.error("Error rolling back transaction. Rollback may have failed!", e)
199+
logError("Error rolling back transaction. Rollback may have failed!", e)
201200
} finally {
202201
if (close) {
203202
tx.close()

external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ private[streaming] class FlumePollingReceiver(
116116
logDebug("Stored events with seq:" + seq)
117117
j += 1
118118
}
119-
logDebug("Sending ack for sequence number: " +seq)
119+
logDebug("Sending ack for sequence number: " + seq)
120120
// Send an ack to Flume so that Flume discards the events from its channels.
121121
client.ack(seq)
122122
logDebug("Ack sent for sequence number: " + seq)

0 commit comments

Comments
 (0)