Skip to content

Commit f21cd84

Browse files
committed
Async DiskWriterQueue implementation
1 parent 33f85d5 commit f21cd84

File tree

2 files changed

+47
-6
lines changed

2 files changed

+47
-6
lines changed

LiteDB/Engine/Disk/DiskWriterQueue.cs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,12 @@ internal class DiskWriterQueue : IDisposable
2121

2222
private readonly ConcurrentQueue<PageBuffer> _queue = new ConcurrentQueue<PageBuffer>();
2323
private readonly object _queueSync = new object();
24-
private readonly ManualResetEventSlim _queueHasItems = new ManualResetEventSlim(false);
24+
private readonly AsyncManualResetEvent _queueHasItems = new AsyncManualResetEvent();
2525
private readonly ManualResetEventSlim _queueIsEmpty = new ManualResetEventSlim(true);
2626

2727
public DiskWriterQueue(Stream stream)
2828
{
2929
_stream = stream;
30-
_task = Task.Run(ExecuteQueue);
3130
}
3231

3332
/// <summary>
@@ -47,6 +46,11 @@ public void EnqueuePage(PageBuffer page)
4746
_queueIsEmpty.Reset();
4847
_queue.Enqueue(page);
4948
_queueHasItems.Set();
49+
50+
if (_task == null)
51+
{
52+
_task = Task.Factory.StartNew(ExecuteQueue, TaskCreationOptions.LongRunning);
53+
}
5054
}
5155
}
5256

@@ -62,7 +66,7 @@ public void Wait()
6266
/// <summary>
6367
/// Execute all items in queue sync
6468
/// </summary>
65-
private void ExecuteQueue()
69+
private async Task ExecuteQueue()
6670
{
6771
while (true)
6872
{
@@ -72,15 +76,16 @@ private void ExecuteQueue()
7276
}
7377
else
7478
{
75-
_stream.FlushToDisk();
7679
lock (_queueSync)
7780
{
7881
if (_queue.Count > 0) continue;
7982
_queueIsEmpty.Set();
83+
_queueHasItems.Reset();
84+
if (_shouldClose) return;
8085
}
86+
_stream.FlushToDisk();
8187

82-
_queueHasItems.Wait();
83-
if (_shouldClose) return;
88+
await _queueHasItems.WaitAsync();
8489
}
8590
}
8691
}
@@ -105,6 +110,7 @@ public void Dispose()
105110
LOG($"disposing disk writer queue (with {_queue.Count} pages in queue)", "DISK");
106111

107112
_shouldClose = true;
113+
_queueHasItems.Set(); // unblock the running loop in case there are no items
108114

109115
// run all items in queue before dispose
110116
this.Wait();
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
using System.Threading;
2+
using System.Threading.Tasks;
3+
4+
namespace LiteDB
5+
{
6+
/// <summary>
7+
/// Async implementation of ManualResetEvent
8+
/// https://devblogs.microsoft.com/pfxteam/building-async-coordination-primitives-part-1-asyncmanualresetevent/
9+
/// </summary>
10+
internal class AsyncManualResetEvent
11+
{
12+
private volatile TaskCompletionSource<bool> _tcs = new TaskCompletionSource<bool>();
13+
14+
public Task WaitAsync()
15+
{
16+
return _tcs.Task;
17+
}
18+
19+
public void Set()
20+
{
21+
_tcs.TrySetResult(true);
22+
}
23+
24+
public void Reset()
25+
{
26+
while (true)
27+
{
28+
var tcs = _tcs;
29+
if (!tcs.Task.IsCompleted ||
30+
Interlocked.CompareExchange(ref _tcs, new TaskCompletionSource<bool>(), tcs) == tcs)
31+
return;
32+
}
33+
}
34+
}
35+
}

0 commit comments

Comments
 (0)