Skip to content

Commit 843e308

Browse files
author
Jan Petr
authored
Fix jobs locking (#1118)
1 parent 50405da commit 843e308

File tree

1 file changed

+25
-24
lines changed
  • app/code/community/Algolia/Algoliasearch/Model

1 file changed

+25
-24
lines changed

app/code/community/Algolia/Algoliasearch/Model/Queue.php

Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -150,8 +150,8 @@ public function run($maxJobs)
150150
$model->{$method}(new Varien_Object($job['data']));
151151

152152
// Delete one by one
153-
$where = $this->db->quoteInto('job_id IN (?)', $job['merged_ids']);
154-
$this->db->delete($this->table, $where);
153+
$this->db->delete($this->table, array('job_id IN (?)' => $job['merged_ids']));
154+
155155

156156
$this->logRecord['processed_jobs'] += count($job['merged_ids']);
157157
} catch (\Exception $e) {
@@ -230,11 +230,6 @@ private function getJobs($maxJobs, $pid)
230230
break;
231231
}
232232

233-
// If $jobs is empty, it's the first run
234-
if (empty($jobs)) {
235-
$firstJobId = $rawJobs[0]['job_id'];
236-
}
237-
238233
$rawJobs = $this->prepareJobs($rawJobs);
239234
$rawJobs = array_merge($jobs, $rawJobs);
240235
$rawJobs = $this->mergeJobs($rawJobs);
@@ -265,14 +260,7 @@ private function getJobs($maxJobs, $pid)
265260
}
266261
}
267262

268-
if (isset($firstJobId)) {
269-
$lastJobId = $this->maxValueInArray($jobs, 'job_id');
270-
271-
// Reserve all new jobs since last run
272-
$this->db->query("UPDATE {$this->db->quoteIdentifier($this->table, true)}
273-
SET pid = " . $pid . ", locked_at = '" . date('Y-m-d H:i:s') . "'
274-
WHERE job_id >= " . $firstJobId . " AND job_id <= " . $lastJobId);
275-
}
263+
$this->lockJobs($jobs);
276264

277265
$this->db->commit();
278266
} catch (\Exception $e) {
@@ -442,19 +430,32 @@ private function arrayMultisort()
442430
return array_pop($args);
443431
}
444432

445-
private function maxValueInArray($array, $keyToSearch)
433+
/**
434+
* @param array $jobs
435+
*/
436+
private function lockJobs($jobs)
446437
{
447-
$currentMax = null;
438+
$jobsIds = $this->getJobsIdsFromMergedJobs($jobs);
448439

449-
foreach ($array as $arr) {
450-
foreach ($arr as $key => $value) {
451-
if ($key == $keyToSearch && ($value >= $currentMax)) {
452-
$currentMax = $value;
453-
}
454-
}
440+
if ($jobsIds !== array()) {
441+
$pid = getmypid();
442+
$this->db->update($this->table, array('pid' => $pid), array('job_id IN (?)' => $jobsIds));
443+
}
444+
}
445+
446+
/**
447+
* @param array $mergedJobs
448+
*
449+
* @return string[]
450+
*/
451+
private function getJobsIdsFromMergedJobs($mergedJobs)
452+
{
453+
$jobsIds = array();
454+
foreach ($mergedJobs as $job) {
455+
$jobsIds = array_merge($jobsIds, $job['merged_ids']);
455456
}
456457

457-
return $currentMax;
458+
return $jobsIds;
458459
}
459460

460461
private function clearOldLogRecords()

0 commit comments

Comments
 (0)