Skip to content

Commit 33f85d5

Browse files
committed
Simplified DiskWriterQueue with blocking concurrency
1 parent 262415a commit 33f85d5

File tree

2 files changed

+22
-53
lines changed

2 files changed

+22
-53
lines changed

LiteDB/Engine/Disk/DiskService.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,8 +186,6 @@ public int WriteAsync(IEnumerable<PageBuffer> pages)
186186
count++;
187187
}
188188

189-
_queue.Value.Run();
190-
191189
return count;
192190
}
193191

Lines changed: 22 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
using System;
22
using System.Collections.Concurrent;
33
using System.IO;
4-
using System.Linq;
54
using System.Threading;
65
using System.Threading.Tasks;
76
using static LiteDB.Constants;
@@ -18,14 +17,17 @@ internal class DiskWriterQueue : IDisposable
1817

1918
// async thread controls
2019
private Task _task;
20+
private bool _shouldClose = false;
2121

2222
private readonly ConcurrentQueue<PageBuffer> _queue = new ConcurrentQueue<PageBuffer>();
23-
24-
private int _running = 0;
23+
private readonly object _queueSync = new object();
24+
private readonly ManualResetEventSlim _queueHasItems = new ManualResetEventSlim(false);
25+
private readonly ManualResetEventSlim _queueIsEmpty = new ManualResetEventSlim(true);
2526

2627
public DiskWriterQueue(Stream stream)
2728
{
2829
_stream = stream;
30+
_task = Task.Run(ExecuteQueue);
2931
}
3032

3133
/// <summary>
@@ -40,27 +42,11 @@ public DiskWriterQueue(Stream stream)
4042
public void EnqueuePage(PageBuffer page)
4143
{
4244
ENSURE(page.Origin == FileOrigin.Log, "async writer must use only for Log file");
43-
44-
_queue.Enqueue(page);
45-
}
46-
47-
/// <summary>
48-
/// If queue contains pages and are not running, starts run queue again now
49-
/// </summary>
50-
public void Run()
51-
{
52-
lock (_queue)
45+
lock (_queueSync)
5346
{
54-
if (_queue.Count == 0) return;
55-
56-
var oldValue = Interlocked.CompareExchange(ref _running, 1, 0);
57-
58-
if (oldValue == 0)
59-
{
60-
// Schedule a new thread to process the pages in the queue.
61-
// https://blog.stephencleary.com/2013/08/startnew-is-dangerous.html
62-
_task = Task.Run(ExecuteQueue);
63-
}
47+
_queueIsEmpty.Reset();
48+
_queue.Enqueue(page);
49+
_queueHasItems.Set();
6450
}
6551
}
6652

@@ -69,16 +55,7 @@ public void Run()
6955
/// </summary>
7056
public void Wait()
7157
{
72-
lock (_queue)
73-
{
74-
if (_task != null)
75-
{
76-
_task.Wait();
77-
}
78-
79-
Run();
80-
}
81-
58+
_queueIsEmpty.Wait();
8259
ENSURE(_queue.Count == 0, "queue should be empty after wait() call");
8360
}
8461

@@ -87,35 +64,25 @@ public void Wait()
8764
/// </summary>
8865
private void ExecuteQueue()
8966
{
90-
do
67+
while (true)
9168
{
9269
if (_queue.TryDequeue(out var page))
9370
{
9471
WritePageToStream(page);
9572
}
96-
97-
while (page == null)
73+
else
9874
{
9975
_stream.FlushToDisk();
100-
Volatile.Write(ref _running, 0);
101-
102-
if (!_queue.Any()) return;
103-
104-
// Another item was added to the queue after we detected it was empty.
105-
var oldValue = Interlocked.CompareExchange(ref _running, 1, 0);
106-
107-
if (oldValue == 1)
76+
lock (_queueSync)
10877
{
109-
// A new thread was already scheduled for execution, this thread can return.
110-
return;
78+
if (_queue.Count > 0) continue;
79+
_queueIsEmpty.Set();
11180
}
11281

113-
// This thread will continue to process the queue as a new thread was not scheduled.
114-
_queue.TryDequeue(out page);
115-
WritePageToStream(page);
82+
_queueHasItems.Wait();
83+
if (_shouldClose) return;
11684
}
117-
118-
} while (true);
85+
}
11986
}
12087

12188
private void WritePageToStream(PageBuffer page)
@@ -137,8 +104,12 @@ public void Dispose()
137104
{
138105
LOG($"disposing disk writer queue (with {_queue.Count} pages in queue)", "DISK");
139106

107+
_shouldClose = true;
108+
140109
// run all items in queue before dispose
141110
this.Wait();
111+
_task?.Wait();
112+
_task = null;
142113
}
143114
}
144115
}

0 commit comments

Comments
 (0)