Skip to content

Commit 8b8ed45

Browse files
authored
Merge pull request #801 from soutaro/exit
Improve worker process handling
2 parents 96c7077 + 98c69c5 commit 8b8ed45

File tree

16 files changed

+169
-51
lines changed

16 files changed

+169
-51
lines changed

lib/steep.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
require "steep/shims/filter_map"
2727
require "steep/shims/symbol_start_with"
2828

29+
require "steep/thread_waiter"
2930
require "steep/equatable"
3031
require "steep/method_name"
3132
require "steep/node_helper"

lib/steep/drivers/check.rb

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,9 @@ def run
5555
master.commandline_args.push(*command_line_patterns)
5656

5757
main_thread = Thread.start do
58+
Thread.current.abort_on_exception = true
5859
master.start()
5960
end
60-
main_thread.abort_on_exception = true
6161

6262
Steep.logger.info { "Initializing server" }
6363
initialize_id = request_id()
@@ -130,6 +130,9 @@ def run
130130
stdout.puts Rainbow("Unexpected error reported. 🚨").red.bold
131131
1
132132
end
133+
rescue Errno::EPIPE => error
134+
stdout.puts Rainbow("Steep shutdown with an error: #{error.inspect}").red.bold
135+
return 1
133136
end
134137

135138
def print_expectations(project:, all_files:, expectations_path:, notifications:)

lib/steep/drivers/checkfile.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,9 +119,9 @@ def run
119119
master.typecheck_automatically = false
120120

121121
main_thread = Thread.start do
122+
Thread.current.abort_on_exception = true
122123
master.start()
123124
end
124-
main_thread.abort_on_exception = true
125125

126126
Steep.logger.info { "Initializing server" }
127127
initialize_id = request_id()

lib/steep/drivers/stats.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,9 +145,9 @@ def run
145145
master.commandline_args.push(*command_line_patterns)
146146

147147
main_thread = Thread.start do
148+
Thread.current.abort_on_exception = true
148149
master.start()
149150
end
150-
main_thread.abort_on_exception = true
151151

152152
initialize_id = request_id()
153153
client_writer.write({ method: :initialize, id: initialize_id })

lib/steep/drivers/utils/driver_helper.rb

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,16 @@ def load_config(path: steepfile || Pathname("Steepfile"))
1010
steep_file_path = path.absolute? ? path : Pathname.pwd + path
1111
Project.new(steepfile_path: steep_file_path).tap do |project|
1212
Project::DSL.parse(project, path.read, filename: path.to_s)
13+
14+
project.targets.each do |target|
15+
if collection_lock = target.options.collection_lock
16+
begin
17+
collection_lock.check_rbs_availability!
18+
rescue RBS::Collection::Config::CollectionNotAvailable
19+
raise "Run `rbs collection install` to install type definitions"
20+
end
21+
end
22+
end
1323
end
1424
end
1525

lib/steep/drivers/watch.rb

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ def run()
5757
main_thread = Thread.start do
5858
master.start()
5959
end
60-
main_thread.abort_on_exception = true
6160

6261
initialize_id = request_id()
6362
client_writer.write(method: "initialize", id: initialize_id)
@@ -157,7 +156,13 @@ def run()
157156
main_thread.join
158157
rescue Interrupt
159158
master.kill
160-
main_thread.join
159+
begin
160+
main_thread.join
161+
rescue
162+
master.each_worker do |worker|
163+
worker.kill(force: true)
164+
end
165+
end
161166
end
162167

163168
0

lib/steep/server/base_worker.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ def handle_job(job)
4040
def run
4141
tags = Steep.logger.formatter.current_tags.dup
4242
thread = Thread.new do
43+
Thread.current.abort_on_exception = true
44+
4345
Steep.logger.formatter.push_tags(*tags)
4446
Steep.logger.tagged "background" do
4547
while job = queue.pop
@@ -69,7 +71,6 @@ def run
6971
end
7072
end
7173
end
72-
thread.abort_on_exception = true
7374

7475
Steep.logger.tagged "frontend" do
7576
begin

lib/steep/server/master.rb

Lines changed: 49 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -450,50 +450,65 @@ def start
450450
end
451451
end
452452

453-
Steep.logger.tagged "main" do
454-
while job = job_queue.deq
455-
case job
456-
when ReceiveMessageJob
457-
src = case job.source
453+
loop_thread = Thread.new do
454+
Steep.logger.formatter.push_tags(*tags)
455+
Steep.logger.tagged "main" do
456+
while job = job_queue.deq
457+
case job
458+
when ReceiveMessageJob
459+
src = case job.source
460+
when :client
461+
:client
462+
else
463+
job.source.name
464+
end
465+
Steep.logger.tagged("ReceiveMessageJob(#{src}/#{job.message[:method]}/#{job.message[:id]})") do
466+
if job.response? && result_controller.process_response(job.message)
467+
# nop
468+
Steep.logger.info { "Processed by ResultController" }
469+
else
470+
case job.source
458471
when :client
459-
:client
460-
else
461-
job.source.name
462-
end
463-
Steep.logger.tagged("ReceiveMessageJob(#{src}/#{job.message[:method]}/#{job.message[:id]})") do
464-
if job.response? && result_controller.process_response(job.message)
465-
# nop
466-
Steep.logger.info { "Processed by ResultController" }
467-
else
468-
case job.source
469-
when :client
470-
process_message_from_client(job.message)
471-
472-
if job.message[:method] == "exit"
473-
job_queue.close()
472+
process_message_from_client(job.message)
473+
474+
if job.message[:method] == "exit"
475+
job_queue.close()
476+
end
477+
when WorkerProcess
478+
process_message_from_worker(job.message, worker: job.source)
474479
end
475-
when WorkerProcess
476-
process_message_from_worker(job.message, worker: job.source)
477480
end
478481
end
479-
end
480-
when SendMessageJob
481-
case job.dest
482-
when :client
483-
Steep.logger.info { "Processing SendMessageJob: dest=client, method=#{job.message[:method] || "-"}, id=#{job.message[:id] || "-"}" }
484-
writer.write job.message
485-
when WorkerProcess
486-
Steep.logger.info { "Processing SendMessageJob: dest=#{job.dest.name}, method=#{job.message[:method] || "-"}, id=#{job.message[:id] || "-"}" }
487-
job.dest << job.message
482+
when SendMessageJob
483+
case job.dest
484+
when :client
485+
Steep.logger.info { "Processing SendMessageJob: dest=client, method=#{job.message[:method] || "-"}, id=#{job.message[:id] || "-"}" }
486+
writer.write job.message
487+
when WorkerProcess
488+
Steep.logger.info { "Processing SendMessageJob: dest=#{job.dest.name}, method=#{job.message[:method] || "-"}, id=#{job.message[:id] || "-"}" }
489+
job.dest << job.message
490+
end
488491
end
489492
end
490493
end
494+
end
495+
496+
waiter = ThreadWaiter.new(each_worker.to_a) {|worker| worker.wait_thread }
497+
waiter.wait_one()
491498

492-
read_client_thread.join()
493-
worker_threads.each do |thread|
494-
thread.join
499+
unless job_queue.closed?
500+
# Exit by error
501+
each_worker do |worker|
502+
worker.kill(force: true)
495503
end
504+
raise "Unexpected worker process exit"
505+
end
506+
507+
read_client_thread.join()
508+
worker_threads.each do |thread|
509+
thread.join
496510
end
511+
loop_thread.join
497512
end
498513
end
499514

lib/steep/server/worker_process.rb

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ def self.fork_worker(type, name:, steepfile:, index:, delay_shutdown:, patterns:
6363
worker.commandline_args = patterns
6464

6565
pid = fork do
66+
Process.setpgid(0, 0)
67+
stdin_out.close
68+
stdout_in.close
6669
worker.run()
6770
end
6871

@@ -149,8 +152,22 @@ def read(&block)
149152
reader.read(&block)
150153
end
151154

152-
def kill
153-
Process.kill(:TERM, @wait_thread.pid)
155+
def kill(force: false)
156+
Steep.logger.tagged("WorkerProcess#kill@#{name}(#{wait_thread.pid})") do
157+
begin
158+
signal = force ? :KILL : :TERM
159+
Steep.logger.debug("Sending signal SIG#{signal}...")
160+
Process.kill(signal, wait_thread.pid)
161+
Steep.logger.debug("Successfully sent the signal.")
162+
rescue Errno::ESRCH => error
163+
Steep.logger.debug("Failed #{error.inspect}")
164+
end
165+
unless force
166+
Steep.logger.debug("Waiting for process exit...")
167+
wait_thread.join()
168+
Steep.logger.debug("Confirmed process exit.")
169+
end
170+
end
154171
end
155172
end
156173
end

lib/steep/thread_waiter.rb

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
module Steep
2+
class ThreadWaiter
3+
attr_reader :objects, :queue, :waiter_threads
4+
5+
def initialize(objects)
6+
@objects = objects
7+
@queue = Thread::Queue.new()
8+
@waiter_threads = Set[].compare_by_identity
9+
10+
objects.each do |object|
11+
thread = yield(object)
12+
13+
waiter_thread = Thread.new(thread) do |thread|
14+
Thread.current.report_on_exception = false
15+
16+
begin
17+
thread.join
18+
ensure
19+
queue << [object, thread]
20+
end
21+
end
22+
23+
waiter_threads << waiter_thread
24+
end
25+
end
26+
27+
def wait_one
28+
unless waiter_threads.empty?
29+
obj, th = queue.pop()
30+
waiter_threads.delete(th)
31+
obj
32+
end
33+
end
34+
end
35+
end

0 commit comments

Comments
 (0)