|
18 | 18 | package org.apache.spark.network.nio
|
19 | 19 |
|
20 | 20 | import java.io.IOException
|
| 21 | +import java.lang.ref.WeakReference |
21 | 22 | import java.net._
|
22 | 23 | import java.nio._
|
23 | 24 | import java.nio.channels._
|
@@ -140,7 +141,10 @@ private[nio] class ConnectionManager(
|
140 | 141 | new HashMap[SelectionKey, Connection] with SynchronizedMap[SelectionKey, Connection]
|
141 | 142 | private val connectionsById = new HashMap[ConnectionManagerId, SendingConnection]
|
142 | 143 | with SynchronizedMap[ConnectionManagerId, SendingConnection]
|
143 |
| - private val messageStatuses = new HashMap[Int, MessageStatus] |
| 144 | + // Tracks sent messages for which we are awaiting acknowledgements. Entries are added to this |
| 145 | + // map when messages are sent and are removed when acknowledgement messages are received or when |
| 146 | + // acknowledgement timeouts expire |
| 147 | + private val messageStatuses = new HashMap[Int, MessageStatus] // [MessageId, MessageStatus] |
144 | 148 | private val keyInterestChangeRequests = new SynchronizedQueue[(SelectionKey, Int)]
|
145 | 149 | private val registerRequests = new SynchronizedQueue[SendingConnection]
|
146 | 150 |
|
@@ -901,17 +905,30 @@ private[nio] class ConnectionManager(
|
901 | 905 | val promise = Promise[Message]()
|
902 | 906 |
|
903 | 907 | // It's important that the TimerTask doesn't capture a reference to `message`, which can cause
|
904 |
| - // memory leaks since cancelled TimerTasks won't necessarily be garbage collected until they are |
905 |
| - // scheduled to run. Therefore, extract the message id from outside of the task: |
| 908 | + // memory leaks since cancelled TimerTasks won't necessarily be garbage collected until the time |
| 909 | + // at which they would originally be scheduled to run. Therefore, extract the message id |
| 910 | + // from outside of the TimerTask closure (see SPARK-4393 for more context). |
906 | 911 | val messageId = message.id
|
| 912 | + // Keep a weak reference to the promise so that the completed promise may be garbage-collected |
| 913 | + val promiseReference = new WeakReference(promise) |
907 | 914 | val timeoutTask: TimerTask = new TimerTask {
|
908 | 915 | override def run(timeout: Timeout): Unit = {
|
909 | 916 | messageStatuses.synchronized {
|
910 | 917 | messageStatuses.remove(messageId).foreach ( s => {
|
911 | 918 | val e = new IOException("sendMessageReliably failed because ack " +
|
912 | 919 | s"was not received within $ackTimeout sec")
|
913 |
| - if (!promise.tryFailure(e)) { |
914 |
| - logWarning("Ignore error because promise is completed", e) |
| 920 | + Option(promiseReference.get) match { |
| 921 | + case Some(p) => |
| 922 | + // Attempt to fail the promise with a Timeout exception |
| 923 | + if (!p.tryFailure(e)) { |
| 924 | + // If we reach here, then someone else has already signalled success or failure |
| 925 | + // on this promise, so log a warning: |
| 926 | + logError("Ignore error because promise is completed", e) |
| 927 | + } |
| 928 | + case None => |
| 929 | + // The WeakReference was empty, which should never happen because |
| 930 | + // sendMessageReliably's caller should have a strong reference to promise.future; |
| 931 | + logError("Promise was garbage collected; this should never happen!", e) |
915 | 932 | }
|
916 | 933 | })
|
917 | 934 | }
|
|
0 commit comments