Skip to content

Commit 3b7fccc

Browse files
authored
Merge pull request #352 from que-rb/revert-348-make-poller-more-easily-satisfied
Revert "Continue polling for jobs when only some of the waiting worker priorities will be fully utilised"
2 parents 31d2274 + 41b264e commit 3b7fccc

File tree

2 files changed

+15
-61
lines changed

2 files changed

+15
-61
lines changed

lib/que/poller.rb

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,8 @@ def poll(
146146

147147
return unless should_poll?
148148

149+
expected_count = priorities.inject(0){|s,(_,c)| s + c}
150+
149151
jobs =
150152
connection.execute_prepared(
151153
:poll_jobs,
@@ -157,7 +159,7 @@ def poll(
157159
)
158160

159161
@last_polled_at = Time.now
160-
@last_poll_satisfied = any_priority_satisfied?(priorities, jobs)
162+
@last_poll_satisfied = expected_count == jobs.count
161163

162164
Que.internal_log :poller_polled, self do
163165
{
@@ -263,21 +265,5 @@ def cleanup(connection)
263265
SQL
264266
end
265267
end
266-
267-
private
268-
269-
def any_priority_satisfied?(priorities, jobs)
270-
job_priorities = jobs.map { |job| job.fetch(:priority) }.sort
271-
worker_job_counts = Hash.new(0)
272-
priorities.any? do |worker_priority, waiting_workers_count|
273-
waiting_workers_count.times do
274-
return false if job_priorities.empty?
275-
break if job_priorities.first > worker_priority
276-
job_priorities.shift
277-
worker_job_counts[worker_priority] += 1
278-
end
279-
worker_job_counts[worker_priority] == waiting_workers_count
280-
end
281-
end
282268
end
283269
end

lib/que/poller.spec.rb

Lines changed: 12 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -255,61 +255,29 @@ def assert_poll(priorities:, locked:)
255255
assert poller.should_poll?
256256
end
257257

258-
it "should be true if the jobs returned from the last poll satisfied all priority requests" do
259-
job_ids_p10 = 3.times.map { Que::Job.enqueue(job_options: { priority: 10 }).que_attrs[:id] }
260-
job_ids_p20 = 2.times.map { Que::Job.enqueue(job_options: { priority: 20 }).que_attrs[:id] }
258+
it "should be true if the last poll returned a full complement of jobs" do
259+
jobs = 5.times.map { Que::Job.enqueue }
261260

262-
result = poller.poll(priorities: { 10 => 3, 20 => 2 }, held_locks: Set.new)
263-
assert_equal (job_ids_p10 + job_ids_p20), result.map(&:id)
261+
result = poller.poll(priorities: {500 => 3}, held_locks: Set.new)
262+
assert_equal 3, result.length
264263

265264
assert_equal true, poller.should_poll?
266265
end
267266

268-
it "should be true if the jobs returned from the last poll satisfied any priority request" do
269-
job_ids_p10 = 2.times.map { Que::Job.enqueue(job_options: { priority: 10 }).que_attrs[:id] }
270-
job_ids_p20 = 2.times.map { Que::Job.enqueue(job_options: { priority: 20 }).que_attrs[:id] }
267+
it "should be false if the last poll didn't return a full complement of jobs" do
268+
jobs = 5.times.map { Que::Job.enqueue }
271269

272-
result = poller.poll(priorities: { 10 => 2, 20 => 1 }, held_locks: Set.new)
273-
assert_equal (job_ids_p10 + [job_ids_p20.first]), result.map(&:id)
274-
275-
assert_equal true, poller.should_poll?
276-
end
277-
278-
it "should be true if the jobs returned from the last poll satisfied any priority request and were slightly higher priority than each priority requested" do
279-
job_ids_p10 = 2.times.map { Que::Job.enqueue(job_options: { priority: 10 }).que_attrs[:id] }
280-
job_ids_p20 = 2.times.map { Que::Job.enqueue(job_options: { priority: 20 }).que_attrs[:id] }
281-
282-
result = poller.poll(priorities: { 11 => 2, 21 => 1 }, held_locks: Set.new)
283-
assert_equal (job_ids_p10 + [job_ids_p20.first]), result.map(&:id)
284-
285-
assert_equal true, poller.should_poll?
286-
end
287-
288-
it "should be true if the jobs returned from the last poll satisfied any priority request and a lower priority request was upgraded to high priority" do
289-
job_ids_p10 = 5.times.map { Que::Job.enqueue(job_options: { priority: 10 }).que_attrs[:id] }
290-
job_ids_p20 = 2.times.map { Que::Job.enqueue(job_options: { priority: 20 }).que_attrs[:id] }
291-
292-
result = poller.poll(priorities: { 10 => 3, 20 => 2 }, held_locks: Set.new)
293-
assert_equal job_ids_p10, result.map(&:id)
294-
295-
assert_equal true, poller.should_poll?
296-
end
297-
298-
it "should be false if the jobs returned from the last poll didn't satisfy any priority request" do
299-
job_ids_p10 = 5.times.map { Que::Job.enqueue(job_options: { priority: 10 }).que_attrs[:id] }
300-
job_ids_p20 = 2.times.map { Que::Job.enqueue(job_options: { priority: 20 }).que_attrs[:id] }
301-
302-
result = poller.poll(priorities: { 10 => 6, 20 => 3 }, held_locks: Set.new)
303-
assert_equal (job_ids_p10 + job_ids_p20), result.map(&:id)
270+
result = poller.poll(priorities: {500 => 7}, held_locks: Set.new)
271+
assert_equal 5, result.length
304272

305273
assert_equal false, poller.should_poll?
306274
end
307275

308-
it "should be true if the jobs returned from the last poll didn't satisfy any priority request, but the poll_interval has elapsed" do
309-
job_ids = 5.times.map { Que::Job.enqueue.que_attrs[:id] }
276+
it "should be true if the last poll didn't return a full complement of jobs, but the poll_interval has elapsed" do
277+
jobs = 5.times.map { Que::Job.enqueue }
310278

311-
result = poller.poll(priorities: { 500 => 7 }, held_locks: Set.new)
312-
assert_equal job_ids, result.map(&:id)
279+
result = poller.poll(priorities: {500 => 7}, held_locks: Set.new)
280+
assert_equal 5, result.length
313281

314282
poller.instance_variable_set(:@last_polled_at, Time.now - 30)
315283
assert_equal true, poller.should_poll?

0 commit comments

Comments
 (0)