Skip to content

Commit 00df755

Browse files
committed
fixup: undo previous commit. remove ST versions of Lock, Event, Semaphore. introduce new Threads.Condition
1 parent 237634a commit 00df755

File tree

8 files changed

+116
-255
lines changed

8 files changed

+116
-255
lines changed

base/event.jl

Lines changed: 8 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ called can be woken up. For level-triggered notifications, you must keep extra s
9292
track of whether a notification has happened. The [`Channel`](@ref) and [`Event`](@ref) types do
9393
this, and can be used for level-triggered events.
9494
95-
This object is NOT thread-safe. See [`Threads.ConditionMT`](@ref) for a thread-safe version.
95+
This object is NOT thread-safe. See [`Threads.Condition`](@ref) for a thread-safe version.
9696
"""
9797
struct GenericCondition{L<:AbstractLock}
9898
waitq::Vector{Any}
@@ -183,52 +183,9 @@ Return `true` if no tasks are waiting on the condition, `false` otherwise.
183183
isempty(c::GenericCondition) = isempty(c.waitq)
184184

185185

186-
"""
187-
Event()
188-
189-
Create a level-triggered event source. Tasks that call [`wait`](@ref) on an
190-
`Event` are suspended and queued until `notify` is called on the `Event`.
191-
After `notify` is called, the `Event` remains in a signaled state and
192-
tasks will no longer block when waiting for it.
193-
194-
!!! compat "Julia 1.1"
195-
This functionality requires at least Julia 1.1.
196-
"""
197-
mutable struct GenericEvent{L<:AbstractLock}
198-
notify::GenericCondition{L}
199-
set::Bool
200-
GenericEvent{L}() where {L<:AbstractLock} = new{L}(GenericCondition{L}(), false)
201-
end
202-
203-
function wait(e::GenericEvent)
204-
e.set && return
205-
lock(e.notify)
206-
try
207-
while !e.set
208-
wait(e.notify)
209-
end
210-
finally
211-
unlock(e.notify)
212-
end
213-
nothing
214-
end
215-
216-
function notify(e::GenericEvent)
217-
lock(e.notify)
218-
try
219-
if !e.set
220-
e.set = true
221-
notify(e.notify)
222-
end
223-
finally
224-
unlock(e.notify)
225-
end
226-
nothing
227-
end
228-
229-
230-
const ConditionST = GenericCondition{AlwaysLockedST}
231-
const EventST = GenericEvent{CooperativeLock}
186+
# default (Julia v1.0) is currently single-threaded
187+
# (although it uses MT-safe versions, when possible)
188+
const Condition = GenericCondition{AlwaysLockedST}
232189

233190

234191
## scheduler and work queue
@@ -433,11 +390,11 @@ Use [`isopen`](@ref) to check whether it is still active.
433390
"""
434391
mutable struct AsyncCondition
435392
handle::Ptr{Cvoid}
436-
cond::ConditionST
393+
cond::Condition
437394
isopen::Bool
438395

439396
function AsyncCondition()
440-
this = new(Libc.malloc(_sizeof_uv_async), ConditionST(), true)
397+
this = new(Libc.malloc(_sizeof_uv_async), Condition(), true)
441398
associate_julia_struct(this.handle, this)
442399
finalizer(uvfinalize, this)
443400
err = ccall(:uv_async_init, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Ptr{Cvoid}),
@@ -491,14 +448,14 @@ to check whether a timer is still active.
491448
"""
492449
mutable struct Timer
493450
handle::Ptr{Cvoid}
494-
cond::ConditionST
451+
cond::Condition
495452
isopen::Bool
496453

497454
function Timer(timeout::Real; interval::Real = 0.0)
498455
timeout 0 || throw(ArgumentError("timer cannot have negative timeout of $timeout seconds"))
499456
interval 0 || throw(ArgumentError("timer cannot have negative repeat interval of $interval seconds"))
500457

501-
this = new(Libc.malloc(_sizeof_uv_timer), ConditionST(), true)
458+
this = new(Libc.malloc(_sizeof_uv_timer), Condition(), true)
502459
err = ccall(:uv_timer_init, Cint, (Ptr{Cvoid}, Ptr{Cvoid}), eventloop(), this)
503460
if err != 0
504461
#TODO: this codepath is currently not tested

base/lock.jl

Lines changed: 85 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -8,25 +8,22 @@ Creates a re-entrant lock for synchronizing [`Task`](@ref)s.
88
The same task can acquire the lock as many times as required.
99
Each [`lock`](@ref) must be matched with an [`unlock`](@ref).
1010
"""
11-
mutable struct GenericReentrantLock{ThreadLock<:AbstractLock} <: AbstractLock
11+
mutable struct ReentrantLock <: AbstractLock
1212
locked_by::Union{Task, Nothing}
13-
cond_wait::GenericCondition{ThreadLock}
13+
cond_wait::GenericCondition{Threads.SpinLock}
1414
reentrancy_cnt::Int
1515

16-
GenericReentrantLock{ThreadLock}() where {ThreadLock<:AbstractLock} = new(nothing, GenericCondition{ThreadLock}(), 0)
16+
ReentrantLock() = new(nothing, GenericCondition{Threads.SpinLock}(), 0)
1717
end
1818

19-
# A basic single-threaded, Julia-aware lock:
20-
const ReentrantLockST = GenericReentrantLock{CooperativeLock}
21-
2219

2320
"""
2421
islocked(lock) -> Status (Boolean)
2522
2623
Check whether the `lock` is held by any task/thread.
2724
This should not be used for synchronization (see instead [`trylock`](@ref)).
2825
"""
29-
function islocked(rl::GenericReentrantLock)
26+
function islocked(rl::ReentrantLock)
3027
return rl.reentrancy_cnt != 0
3128
end
3229

@@ -40,7 +37,7 @@ return `false`.
4037
4138
Each successful `trylock` must be matched by an [`unlock`](@ref).
4239
"""
43-
function trylock(rl::GenericReentrantLock)
40+
function trylock(rl::ReentrantLock)
4441
t = current_task()
4542
lock(rl.cond_wait)
4643
try
@@ -67,7 +64,7 @@ wait for it to become available.
6764
6865
Each `lock` must be matched by an [`unlock`](@ref).
6966
"""
70-
function lock(rl::GenericReentrantLock)
67+
function lock(rl::ReentrantLock)
7168
t = current_task()
7269
lock(rl.cond_wait)
7370
try
@@ -95,7 +92,7 @@ Releases ownership of the `lock`.
9592
If this is a recursive lock which has been acquired before, decrement an
9693
internal counter and return immediately.
9794
"""
98-
function unlock(rl::GenericReentrantLock)
95+
function unlock(rl::ReentrantLock)
9996
t = current_task()
10097
rl.reentrancy_cnt == 0 && error("unlock count must match lock count")
10198
rl.locked_by == t || error("unlock from wrong thread")
@@ -112,7 +109,7 @@ function unlock(rl::GenericReentrantLock)
112109
return
113110
end
114111

115-
function unlockall(rl::GenericReentrantLock)
112+
function unlockall(rl::ReentrantLock)
116113
t = current_task()
117114
n = rl.reentrancy_cnt
118115
rl.locked_by == t || error("unlock from wrong thread")
@@ -128,7 +125,7 @@ function unlockall(rl::GenericReentrantLock)
128125
return n
129126
end
130127

131-
function relockall(rl::GenericReentrantLock, n::Int)
128+
function relockall(rl::ReentrantLock, n::Int)
132129
t = current_task()
133130
lock(rl)
134131
n1 = rl.reentrancy_cnt
@@ -157,20 +154,33 @@ function trylock(f, l::AbstractLock)
157154
return false
158155
end
159156

157+
@eval Threads begin
158+
const Condition = Base.GenericCondition{ReentrantLock}
159+
160+
"""
161+
Special note for [`Threads.Condition`](@ref):
162+
163+
The caller must be holding the [`lock`](@ref) that owns `c` before calling this method.
164+
The calling task will be blocked until some other task wakes it,
165+
usually by calling [`notify`](@ref)` on the same Condition object.
166+
The lock will be atomically released when blocking (even if it was locked recursively),
167+
and will be reacquired before returning.
168+
"""
169+
wait(c::Threads.Condition)
170+
end
171+
160172
"""
161173
Semaphore(sem_size)
162174
163175
Create a counting semaphore that allows at most `sem_size`
164176
acquires to be in use at any time.
165177
Each acquire must be matched with a release.
166-
167-
This construct is NOT threadsafe.
168178
"""
169179
mutable struct Semaphore
170180
sem_size::Int
171181
curr_cnt::Int
172-
cond_wait::ConditionST
173-
Semaphore(sem_size) = sem_size > 0 ? new(sem_size, 0, ConditionST()) : throw(ArgumentError("Semaphore size must be > 0"))
182+
cond_wait::Threads.Condition
183+
Semaphore(sem_size) = sem_size > 0 ? new(sem_size, 0, Threads.Condition()) : throw(ArgumentError("Semaphore size must be > 0"))
174184
end
175185

176186
"""
@@ -180,14 +190,16 @@ Wait for one of the `sem_size` permits to be available,
180190
blocking until one can be acquired.
181191
"""
182192
function acquire(s::Semaphore)
183-
while true
184-
if s.curr_cnt < s.sem_size
185-
s.curr_cnt = s.curr_cnt + 1
186-
return
187-
else
193+
lock(s.cond_wait)
194+
try
195+
while s.curr_cnt >= s.sem_size
188196
wait(s.cond_wait)
189197
end
198+
s.curr_cnt = s.curr_cnt + 1
199+
finally
200+
unlock(s.cond_wait)
190201
end
202+
return
191203
end
192204

193205
"""
@@ -198,8 +210,57 @@ possibly allowing another task to acquire it
198210
and resume execution.
199211
"""
200212
function release(s::Semaphore)
201-
@assert s.curr_cnt > 0 "release count must match acquire count"
202-
s.curr_cnt -= 1
203-
notify(s.cond_wait; all=false)
213+
lock(s.cond_wait)
214+
try
215+
s.curr_cnt > 0 || error("release count must match acquire count")
216+
s.curr_cnt -= 1
217+
notify(s.cond_wait; all=false)
218+
finally
219+
unlock(s.cond_wait)
220+
end
204221
return
205222
end
223+
224+
225+
"""
226+
Event()
227+
228+
Create a level-triggered event source. Tasks that call [`wait`](@ref) on an
229+
`Event` are suspended and queued until `notify` is called on the `Event`.
230+
After `notify` is called, the `Event` remains in a signaled state and
231+
tasks will no longer block when waiting for it.
232+
233+
!!! compat "Julia 1.1"
234+
This functionality requires at least Julia 1.1.
235+
"""
236+
mutable struct Event
237+
notify::Threads.Condition
238+
set::Bool
239+
Event() = new(Threads.Condition(), false)
240+
end
241+
242+
function wait(e::Event)
243+
e.set && return
244+
lock(e.notify)
245+
try
246+
while !e.set
247+
wait(e.notify)
248+
end
249+
finally
250+
unlock(e.notify)
251+
end
252+
nothing
253+
end
254+
255+
function notify(e::Event)
256+
lock(e.notify)
257+
try
258+
if !e.set
259+
e.set = true
260+
notify(e.notify)
261+
end
262+
finally
263+
unlock(e.notify)
264+
end
265+
nothing
266+
end

0 commit comments

Comments
 (0)