Skip to content

Commit 3035ec5

Browse files
committed
Reset Async queue on fork
1 parent 7dc6eb0 commit 3035ec5

File tree

2 files changed

+22
-0
lines changed

2 files changed

+22
-0
lines changed

lib/concurrent-ruby/concurrent/async.rb

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,7 @@ def initialize(delegate)
309309
@delegate = delegate
310310
@queue = []
311311
@executor = Concurrent.global_io_executor
312+
@ruby_pid = $$
312313
end
313314

314315
# Delegates method calls to the wrapped object.
@@ -326,6 +327,7 @@ def method_missing(method, *args, &block)
326327

327328
ivar = Concurrent::IVar.new
328329
synchronize do
330+
reset_if_forked
329331
@queue.push [ivar, method, args, block]
330332
@executor.post { perform } if @queue.length == 1
331333
end
@@ -361,6 +363,13 @@ def perform
361363
end
362364
end
363365
end
366+
367+
def reset_if_forked
368+
if $$ != @ruby_pid
369+
@queue.clear
370+
@ruby_pid = $$
371+
end
372+
end
364373
end
365374
private_constant :AsyncDelegator
366375

spec/concurrent/async_spec.rb

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,5 +296,18 @@ def gather(seconds, first, *rest)
296296
expect(object.bucket).to eq [:a, :b, :c, :d]
297297
end
298298
end
299+
300+
context 'fork safety' do
301+
it 'does not hang when forked' do
302+
skip "Platform does not support fork" unless Process.respond_to?(:fork)
303+
object = Class.new {
304+
include Concurrent::Async
305+
def foo; end
306+
}.new
307+
object.async.foo
308+
_, status = Process.waitpid2(fork {object.await.foo})
309+
expect(status.exitstatus).to eq 0
310+
end
311+
end
299312
end
300313
end

0 commit comments

Comments
 (0)