Skip to content

Commit d577c7e

Browse files
vsleejjxtra
authored andcommitted
Fixed ClientWebSocket disposal (#455)
- cancel CTS and set 'disposed' bool after attempting to close connection - dispose of the webSocket - mark messageQueue as complete - close MemoryStream in ReadTask()
1 parent f8b0f8e commit d577c7e

File tree

1 file changed

+55
-46
lines changed

1 file changed

+55
-46
lines changed

ExchangeSharp/Utility/ClientWebSocket.cs

Lines changed: 55 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -237,10 +237,10 @@ public void Start()
237237
{
238238
CreateWebSocket();
239239

240-
// kick off message parser and message listener
241-
Task.Run(MessageTask);
242-
Task.Run(ReadTask);
243-
}
240+
// kick off message parser and message listener
241+
Task.Run(MessageTask);
242+
Task.Run(ReadTask);
243+
}
244244

245245
/// <summary>
246246
/// Close and dispose of all resources, stops the web socket and shuts it down.
@@ -249,34 +249,38 @@ public void Dispose()
249249
{
250250
if (!disposed)
251251
{
252-
disposed = true;
253-
cancellationTokenSource.Cancel();
254-
Task.Run(async () =>
255-
{
256-
try
257-
{
258-
if (webSocket.State == WebSocketState.Open)
259-
{
260-
if (CloseCleanly)
261-
{
262-
await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Dispose", cancellationToken);
263-
}
264-
else
265-
{
266-
await webSocket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Dispose", cancellationToken);
267-
}
268-
}
269-
}
270-
catch (OperationCanceledException)
271-
{
272-
// dont care
273-
}
274-
catch (Exception ex)
275-
{
276-
Logger.Info(ex.ToString());
277-
}
278-
});
279-
}
252+
Task.Run(async () =>
253+
{
254+
try
255+
{
256+
if (webSocket.State == WebSocketState.Open)
257+
{
258+
if (CloseCleanly)
259+
{
260+
await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure,
261+
"Dispose", cancellationToken);
262+
}
263+
else
264+
{
265+
await webSocket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure,
266+
"Dispose", cancellationToken);
267+
}
268+
}
269+
disposed = true;
270+
}
271+
catch (OperationCanceledException)
272+
{
273+
// dont care
274+
}
275+
catch (Exception ex)
276+
{
277+
Logger.Info(ex.ToString());
278+
}
279+
cancellationTokenSource.Cancel();
280+
webSocket.Dispose();
281+
messageQueue.CompleteAdding();
282+
});
283+
}
280284
}
281285

282286
/// <summary>
@@ -317,7 +321,7 @@ public Task<bool> SendMessageAsync(object message)
317321

318322
private void QueueActions(params Func<IWebSocket, Task>[] actions)
319323
{
320-
if (actions != null && actions.Length != 0)
324+
if (actions != null && actions.Length != 0 && !messageQueue.IsAddingCompleted)
321325
{
322326
Func<IWebSocket, Task>[] actionsCopy = actions;
323327
messageQueue.Add((Func<Task>)(async () =>
@@ -339,8 +343,8 @@ private void QueueActions(params Func<IWebSocket, Task>[] actions)
339343

340344
private void QueueActionsWithNoExceptions(params Func<IWebSocket, Task>[] actions)
341345
{
342-
if (actions != null && actions.Length != 0)
343-
{
346+
if (actions != null && actions.Length != 0 && !messageQueue.IsAddingCompleted)
347+
{
344348
Func<IWebSocket, Task>[] actionsCopy = actions;
345349
messageQueue.Add((Func<Task>)(async () =>
346350
{
@@ -384,7 +388,6 @@ private async Task InvokeDisconnected(IWebSocket socket)
384388
private async Task ReadTask()
385389
{
386390
ArraySegment<byte> receiveBuffer = new ArraySegment<byte>(new byte[receiveChunkSize]);
387-
TimeSpan keepAlive = webSocket.KeepAliveInterval;
388391
MemoryStream stream = new MemoryStream();
389392
WebSocketReceiveResult result;
390393
bool wasConnected = false;
@@ -411,7 +414,7 @@ private async Task ReadTask()
411414
// for lists, etc.
412415
QueueActionsWithNoExceptions(InvokeConnected);
413416

414-
while (webSocket.State == WebSocketState.Open)
417+
while (webSocket.State == WebSocketState.Open && !disposed)
415418
{
416419
do
417420
{
@@ -420,7 +423,8 @@ private async Task ReadTask()
420423
{
421424
if (result.MessageType == WebSocketMessageType.Close)
422425
{
423-
await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, cancellationToken);
426+
await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure,
427+
string.Empty, new CancellationToken()); // if it's closing, then let it complete
424428
QueueActions(InvokeDisconnected);
425429
}
426430
else
@@ -429,8 +433,8 @@ private async Task ReadTask()
429433
}
430434
}
431435
}
432-
while (result != null && !result.EndOfMessage);
433-
if (stream.Length != 0)
436+
while (result != null && !result.EndOfMessage && !disposed);
437+
if (stream.Length != 0 && !disposed)
434438
{
435439
// if text message and we are handling text messages
436440
if (result.MessageType == WebSocketMessageType.Text && OnTextMessage != null)
@@ -450,11 +454,15 @@ private async Task ReadTask()
450454
}
451455
}
452456
}
453-
catch (OperationCanceledException)
454-
{
455-
// dont care
456-
}
457-
catch (Exception ex)
457+
catch (OperationCanceledException) // includes TaskCanceledException
458+
{
459+
// dont care
460+
}
461+
catch (IOException ex)
462+
{
463+
Logger.Info(ex.ToString());
464+
}
465+
catch (Exception ex)
458466
{
459467
// eat exceptions, most likely a result of a disconnect, either way we will re-create the web socket
460468
Logger.Info(ex.ToString());
@@ -466,7 +474,8 @@ private async Task ReadTask()
466474
}
467475
try
468476
{
469-
webSocket.Dispose();
477+
stream.Close();
478+
webSocket.Dispose();
470479
}
471480
catch (Exception ex)
472481
{

0 commit comments

Comments
 (0)