Skip to content

Commit 665869f

Browse files
committed
Channel: drop explicit API change, make always threadsafe internally
1 parent 0f12c19 commit 665869f

File tree

2 files changed

+16
-22
lines changed

2 files changed

+16
-22
lines changed

base/channels.jl

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
abstract type AbstractChannel{T} end
44

55
"""
6-
Channel{T}(sz::Int, threadsafe::Bool)
6+
Channel{T}(sz::Int)
77
88
Constructs a `Channel` with an internal buffer that can hold a maximum of `sz` objects
99
of type `T`.
@@ -12,9 +12,6 @@ of type `T`.
1212
`Channel(0)` constructs an unbuffered channel. `put!` blocks until a matching `take!` is called.
1313
And vice-versa.
1414
15-
If `threadsafe` is true, some API operations (specifically `wait`) require first acquiring
16-
the lock on the Channel object.
17-
1815
Other constructors:
1916
2017
* `Channel(Inf)`: equivalent to `Channel{Any}(typemax(Int))`
@@ -30,22 +27,22 @@ mutable struct Channel{T} <: AbstractChannel{T}
3027
data::Vector{T}
3128
sz_max::Int # maximum size of channel
3229

33-
function Channel{T}(sz::Integer, threadsafe::Bool=false) where T
30+
function Channel{T}(sz::Integer) where T
3431
if sz < 0
3532
throw(ArgumentError("Channel size must be either 0, a positive integer or Inf"))
3633
end
37-
lock = threadsafe ? ReentrantLock() : NotALock()
34+
lock = ReentrantLock()
3835
cond_put, cond_take = Condition(lock), Condition(lock)
3936
cond_wait = (sz == 0 ? Condition(lock) : cond_take) # wait is distinct from take iff unbuffered
4037
return new(cond_take, cond_wait, cond_put, :open, nothing, Vector{T}(), sz)
4138
end
4239
end
4340

44-
function Channel{T}(sz::Float64, threadsafe::Bool=false) where T
41+
function Channel{T}(sz::Float64) where T
4542
sz = (sz == Inf ? typemax(Int) : convert(Int, sz))
4643
return Channel{T}(sz)
4744
end
48-
Channel(sz, threadsafe::Bool=false) = Channel{Any}(sz, threadsafe)
45+
Channel(sz) = Channel{Any}(sz)
4946

5047
# special constructors
5148
"""
@@ -94,8 +91,8 @@ julia> istaskdone(taskref[])
9491
true
9592
```
9693
"""
97-
function Channel(func::Function, threadsafe::Bool=false; ctype=Any, csize=0, taskref=nothing)
98-
chnl = Channel{ctype}(csize, threadsafe)
94+
function Channel(func::Function; ctype=Any, csize=0, taskref=nothing)
95+
chnl = Channel{ctype}(csize)
9996
task = Task(() -> func(chnl))
10097
bind(chnl, task)
10198
yield(task) # immediately start it
@@ -377,9 +374,15 @@ unlock(c::Channel) = unlock(c.cond_take)
377374
trylock(c::Channel) = trylock(c.cond_take)
378375

379376
function wait(c::Channel)
380-
while !isready(c)
381-
check_channel_state(c)
382-
wait(c.cond_wait)
377+
isready(c) && return
378+
lock(c)
379+
try
380+
while !isready(c)
381+
check_channel_state(c)
382+
wait(c.cond_wait)
383+
end
384+
finally
385+
unlock(c)
383386
end
384387
nothing
385388
end

test/channels.jl

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,10 @@ end
5656

5757
@testset "type conversion in put!" begin
5858
c = Channel{Int64}(0)
59-
lock(c)
6059
@async put!(c, Int32(1))
6160
wait(c)
6261
@test isa(take!(c), Int64)
6362
@test_throws MethodError put!(c, "")
64-
unlock(c)
6563
@assert !islocked(c.cond_take)
6664
end
6765

@@ -122,7 +120,6 @@ using Distributed
122120
while isopen(cs[i])
123121
yield()
124122
end
125-
i < 3 && foreach(lock, cs)
126123
@test_throws ErrorException wait(cs[i])
127124
@test_throws ErrorException take!(cs[i])
128125
@test_throws ErrorException put!(cs[i], 1)
@@ -145,11 +142,9 @@ using Distributed
145142
c = Channel(N)
146143
foreach(t -> bind(c, t), tasks)
147144
foreach(schedule, tasks)
148-
lock(c)
149145
@test_throws InvalidStateException wait(c)
150146
@test !isopen(c)
151147
@test ref[] == nth
152-
unlock(c)
153148
@assert !islocked(c.cond_take)
154149

155150
# channeled_tasks
@@ -158,10 +153,8 @@ using Distributed
158153
chnls, tasks = Base.channeled_tasks(2, tf_chnls1; ctypes=[T,T], csizes=[N,N])
159154
put!(chnls[1], 1)
160155
@test take!(chnls[2]) === 2
161-
foreach(lock, chnls)
162156
@test_throws InvalidStateException wait(chnls[1])
163157
@test_throws InvalidStateException wait(chnls[2])
164-
foreach(unlock, chnls)
165158
@test istaskdone(tasks[1])
166159
@test !isopen(chnls[1])
167160
@test !isopen(chnls[2])
@@ -183,10 +176,8 @@ using Distributed
183176
yield()
184177
put!(f, 1) # allow tf4 and tf5 to exit after now, eventually closing the channel
185178

186-
foreach(lock, chnls)
187179
@test_throws InvalidStateException wait(chnls[1])
188180
@test_throws InvalidStateException wait(chnls[2])
189-
foreach(unlock, chnls)
190181
@test istaskdone(tasks[1])
191182
@test istaskdone(tasks[2])
192183
@test !isopen(chnls[1])

0 commit comments

Comments
 (0)