Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -328,14 +328,16 @@ private function releaseMutex(int $recordMutexId)
* mark a job a "finished"
* nb : after a long job, connection may be lost so we reconnect.
* But sometimes (?) a first commit fails (due to reconnect ?), while the second one is ok.
* So here we try 2 times, just in case...
* So here we try 4 times, just in case...
*
* @param int $workerRunningJobId
* @param MessagePublisher $messagePublisher
* @param $jobType
* @param null $info
*/
public function markFinished(int $workerRunningJobId, $info = null)
public function markFinished(int $workerRunningJobId, MessagePublisher $messagePublisher, $jobType, $info = null)
{
for($tryout=1; $tryout<=2; $tryout++) {
for($wait = 2, $tryout=1; $tryout<=4; $tryout++) {
try {
$this->reconnect();
$cnx = $this->getEntityManager()->getConnection()->getWrappedConnection();
Expand All @@ -356,8 +358,10 @@ public function markFinished(int $workerRunningJobId, $info = null)
throw new Exception(sprintf("updating WorkerRunningJob should return 1 row affected, got %s", $a));
}
catch (Exception $e) {
if($tryout < 2) {
sleep(1); // retry in 1 sec
if($tryout < 4) {
$messagePublisher->pushLog(sprintf("failed updating WorkerRunningJob to finished with id=%d for %s, attempt %d", $workerRunningJobId, $jobType, $tryout));
sleep($wait); // retry after more sec
$wait *= 2;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use Alchemy\Phrasea\Application as PhraseaApplication;
use Alchemy\Phrasea\Application\Helper\DataboxLoggerAware;
use Alchemy\Phrasea\Controller\Controller;
use Alchemy\Phrasea\Filesystem\FilesystemService;
use Alchemy\Phrasea\Model\Entities\WorkerRunningJob;
use Alchemy\Phrasea\Model\Repositories\WorkerRunningJobRepository;
use Alchemy\Phrasea\Plugin\Exception\JsonValidationException;
Expand Down Expand Up @@ -224,12 +225,34 @@ public function changeStatusAction(Request $request, $workerId)

/** @var WorkerRunningJob $workerRunningJob */
$workerRunningJob = $repoWorker->find($workerId);

$workerRunningJob->setStatus($request->request->get('status'));
$subdefOK = false;
$finishedDate = new \DateTime('now');

if($request->request->get('finished') == '1') {
$workerRunningJob->setFinished($finishedDate)->setFlock(null);
if ($workerRunningJob->getWork() == 'subdefCreation') {
try {
$databox = $this->findDataboxById($workerRunningJob->getDataboxId());
$record = $databox->get_record($workerRunningJob->getRecordId());
if ($record->has_subdef($workerRunningJob->getWorkOn()) ) {
$filePathToCheck = $record->get_subdef($workerRunningJob->getWorkOn())->getRealPath();
if ($this->getFileSystem()->exists($filePathToCheck)) {
// the subdefinition exist
// so mark as finished
$subdefOK = true;
$workerRunningJob->setStatus(WorkerRunningJob::FINISHED);
$workerRunningJob->setFinished($finishedDate)->setFlock(null);
}
}
} catch (\Exception $e) {
}
}

if (!$subdefOK || $workerRunningJob->getWork() != 'subdefCreation') {
$workerRunningJob->setStatus($request->request->get('status'));


if($request->request->get('finished') == '1') {
$workerRunningJob->setFinished($finishedDate)->setFlock(null);
}
}

$em = $repoWorker->getEntityManager();
Expand Down Expand Up @@ -259,14 +282,48 @@ public function doChangeStatusToCanceledAction(PhraseaApplication $app, Request
{
/** @var WorkerRunningJobRepository $repoWorker */
$repoWorker = $this->app['repo.worker-running-job'];
$finishedDate = new \DateTime('now');
$em = $repoWorker->getEntityManager();

$workerRunningJobs = $repoWorker->getRunningSinceCreated($request->request->get('hour'), ['subdefCreation', 'writeMetadatas']);
$workerRunningJobsForOnlySubdefcreation = $repoWorker->getRunningSinceCreated($request->request->get('hour'), ['subdefCreation']);

// treat the subdefinition case
/** @var WorkerRunningJob $ws */
foreach ($workerRunningJobsForOnlySubdefcreation as $ws) {
$subdefOK = false;
try {
$databox = $this->findDataboxById($ws->getDataboxId());
$record = $databox->get_record($ws->getRecordId());
if ($record->has_subdef($ws->getWorkOn()) ) {
$filePathToCheck = $record->get_subdef($ws->getWorkOn())->getRealPath();
if ($this->getFileSystem()->exists($filePathToCheck)) {
// the subdefinition exist
// so mark as finished
$subdefOK = true;
$ws->setStatus(WorkerRunningJob::FINISHED);
$ws->setFinished($finishedDate)->setFlock(null);
}
}

} catch (\Exception $e) {
}

if (!$subdefOK) {
$ws->setStatus(WorkerRunningJob::INTERRUPT);
$ws->setFinished($finishedDate)->setFlock(null);
}
$em->persist($ws);
}
$em->flush();

// treat all the rest case
$repoWorker->updateStatusRunningToCanceledSinceCreated($request->request->get('hour'));

$finishedDate = new \DateTime('now');
// "log docs" the subdefCreation and writeMetadatas action
/** @var WorkerRunningJob $workerRunningJob */
foreach ($workerRunningJobs as $workerRunningJob) {
$this->updateLogDocs($workerRunningJob, 'canceled', $finishedDate);
$this->updateLogDocs($workerRunningJob, $workerRunningJob->getStatus(), $finishedDate);
}

return $this->app->json(['success' => true]);
Expand Down Expand Up @@ -791,4 +848,11 @@ private function getUrlGenerator()
return $this->app['url_generator'];
}

/**
* @return FilesystemService
*/
private function getFileSystem()
{
return $this->app['phraseanet.filesystem'];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public function register(Application $app)
}));

$app['alchemy_worker.type_based_worker_resolver']->addFactory(MessagePublisher::SUBTITLE_TYPE, new CallableWorkerFactory(function () use ($app) {
return (new SubtitleWorker($app['repo.worker-running-job'], $app['conf'], new LazyLocator($app, 'phraseanet.appbox'), $app['alchemy_worker.logger'], $app['dispatcher']))
return (new SubtitleWorker($app['repo.worker-running-job'], $app['conf'], new LazyLocator($app, 'phraseanet.appbox'), $app['alchemy_worker.logger'], $app['dispatcher'], $app['alchemy_worker.message.publisher']))
->setFileSystemLocator(new LazyLocator($app, 'filesystem'))
->setTemporaryFileSystemLocator(new LazyLocator($app, 'temporary-filesystem'));
}));
Expand Down
23 changes: 5 additions & 18 deletions lib/Alchemy/Phrasea/WorkerManager/Worker/CreateRecordWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -137,24 +137,6 @@ public function process(array $payload)
return;
}

if ($workerRunningJob != null) {
$em->beginTransaction();
try {
$workerRunningJob
->setStatus(WorkerRunningJob::FINISHED)
->setFinished(new \DateTime('now'))
;

$em->persist($workerRunningJob);

$em->flush();
$em->commit();
} catch (\Exception $e) {
$em->rollback();
}

}

$lazaretSession = new LazaretSession();

$userRepository = $this->getUserRepository();
Expand Down Expand Up @@ -257,6 +239,11 @@ public function process(array $payload)
]
]
);

if ($workerRunningJob != null) {
$this->repoWorkerJob->markFinished($workerRunningJob->getId(), $this->messagePublisher, MessagePublisher::CREATE_RECORD_TYPE);
}

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,7 @@ public function process(array $payload)

// tell that the delete is finished
if ($workerRunningJob != null) {
$workerRunningJob
->setStatus(WorkerRunningJob::FINISHED)
->setFinished(new \DateTime('now'))
;

$em->persist($workerRunningJob);

$em->flush();
$this->repoWorker->markFinished($workerRunningJob->getId(), $this->messagePublisher, MessagePublisher::DELETE_RECORD_TYPE);
}
}
}
15 changes: 6 additions & 9 deletions lib/Alchemy/Phrasea/WorkerManager/Worker/DownloadAsyncWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -385,15 +385,7 @@ public function process(array $payload)
);

if ($workerRunningJob != null) {
$this->repoWorkerJob->reconnect();
$workerRunningJob
->setStatus(WorkerRunningJob::FINISHED)
->setFinished(new \DateTime('now'))
;

$em->persist($workerRunningJob);

$em->flush();
$this->repoWorkerJob->markFinished($workerRunningJob->getId(), $this->getMessagePublisher(),MessagePublisher::DOWNLOAD_ASYNC_TYPE);
}

sleep(1);
Expand Down Expand Up @@ -436,6 +428,11 @@ private function getWorkerRunningJobRepository()
return $this->app['repo.worker-running-job'];
}

private function getMessagePublisher()
{
return $this->app['alchemy_worker.message.publisher'];
}

private function cellRefFromColumnAndRow(int $col, int $row = null)
{
$r = Coordinate::stringFromColumnIndex($col);
Expand Down
12 changes: 1 addition & 11 deletions lib/Alchemy/Phrasea/WorkerManager/Worker/EditRecordWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -200,17 +200,7 @@ public function process(array $payload)
);

// tell that we have finished to work on edit
$this->repoWorker->reconnect();
$em->getConnection()->beginTransaction();
try {
$workerRunningJob->setStatus(WorkerRunningJob::FINISHED);
$workerRunningJob->setFinished(new \DateTime('now'));
$em->persist($workerRunningJob);
$em->flush();
$em->commit();
} catch (\Exception $e) {
$em->rollback();
}
$this->repoWorker->markFinished($workerRunningJob->getId(), $this->messagePublisher, MessagePublisher::EDIT_RECORD_TYPE);

$this->messagePublisher->pushLog(sprintf("record edited databoxname=%s databoxid=%d recordid=%d", $databox->get_viewname(), $payload['databoxId'], $payload['record_id']));
}
Expand Down
19 changes: 9 additions & 10 deletions lib/Alchemy/Phrasea/WorkerManager/Worker/ExportMailWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -216,16 +216,7 @@ public function process(array $payload)
}

if ($workerRunningJob != null) {
$this->repoWorkerJob->reconnect();
$workerRunningJob
->setWorkOn(implode(',', $deliverEmails))
->setStatus(WorkerRunningJob::FINISHED)
->setFinished(new \DateTime('now'))
;

$em->persist($workerRunningJob);

$em->flush();
$this->repoWorkerJob->markFinished($workerRunningJob->getId(), $this->getMessagePublisher(), MessagePublisher::EXPORT_MAIL_TYPE);
}

sleep(30);
Expand All @@ -250,4 +241,12 @@ private function getWorkerRunningJobRepository()
{
return $this->app['repo.worker-running-job'];
}

/**
* @return MessagePublisher
*/
private function getMessagePublisher()
{
return $this->app['alchemy_worker.message.publisher'];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ public function process(array $payload)
}

// tell that the upload is finished
$this->finishedJob($workerRunningJob, $em);
$this->repoWorker->markFinished($workerRunningJob->getId(), $this->messagePublisher, MessagePublisher::EXPOSE_UPLOAD_TYPE);
}

private function getClientAnnotationProfile(Client $exposeClient, $publicationId)
Expand Down
14 changes: 1 addition & 13 deletions lib/Alchemy/Phrasea/WorkerManager/Worker/FtpWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -355,18 +355,7 @@ private function doExport(FtpExport $export, array $payload)

if (!$processError && $workerRunningJob) {
// tell that we have finished to work on this file
$this->repoWorker->reconnect();
$em->beginTransaction();
try {
$workerRunningJob->setStatus(WorkerRunningJob::FINISHED);
$workerRunningJob->setFinished(new \DateTime('now'));
$em->persist($workerRunningJob);
$em->flush();
$em->commit();
}
catch (Exception $e) {
$em->rollback();
}
$this->repoWorker->markFinished($workerRunningJob->getId(), $this->getMessagePublisher(), MessagePublisher::FTP_TYPE);
} else {
// if there is an error
$count = isset($payload['count']) ? $payload['count'] + 1 : 2 ;
Expand Down Expand Up @@ -537,5 +526,4 @@ private function getMessagePublisher()
{
return $this->app['alchemy_worker.message.publisher'];
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,7 @@ public function process(array $payload)

// tell that the populate is finished
if ($workerRunningJob != null) {
$this->repoWorker->reconnect();
$workerRunningJob
->setStatus(WorkerRunningJob::FINISHED)
->setFinished(new \DateTime('now'))
;

$em->persist($workerRunningJob);

$em->flush();
$this->repoWorker->markFinished($workerRunningJob->getId(), $this->messagePublisher, MessagePublisher::POPULATE_INDEX_TYPE);
}
}

Expand Down
26 changes: 18 additions & 8 deletions lib/Alchemy/Phrasea/WorkerManager/Worker/ShareBasketWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use Alchemy\Phrasea\Model\Manipulator\TokenManipulator;
use Alchemy\Phrasea\Model\Repositories\BasketRepository;
use Alchemy\Phrasea\Model\Repositories\UserRepository;
use Alchemy\Phrasea\Model\Repositories\WorkerRunningJobRepository;
use Alchemy\Phrasea\Record\RecordReference;
use Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher;
use DateTime;
Expand Down Expand Up @@ -405,14 +406,7 @@ public function process(array $payload)
$this->getLogger()->info("Basket with Id " . $basket->getId() . " successfully shared !");

if ($workerRunningJob != null) {
$workerRunningJob
->setStatus(WorkerRunningJob::FINISHED)
->setFinished(new \DateTime('now'))
;

$manager->persist($workerRunningJob);

$manager->flush();
$this->getRepoWorkerRunningJob()->markFinished($workerRunningJob->getId(), $this->getMessagePublisher(), MessagePublisher::SHARE_BASKET_TYPE);
}

// file_put_contents("./tmp/phraseanet-log.txt", sprintf("\n%s; ==== END (N = %d ; dT = %d ==> %0.2f / sec) ====\n\n", time(), $n_participants, time()-$_t0, $n_participants/(max(time()-$_t0, 0.001))), FILE_APPEND);
Expand Down Expand Up @@ -500,4 +494,20 @@ private function getLogger()
{
return $this->app['alchemy_worker.logger'];
}

/**
* @return WorkerRunningJobRepository
*/
private function getRepoWorkerRunningJob()
{
return $this->app['repo.worker-running-job'];
}

/**
* @return MessagePublisher
*/
private function getMessagePublisher()
{
return $this->app['alchemy_worker.message.publisher'];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ public function process(array $payload)
$payload['subdefName'], $databox->get_viewname(), $databoxId, $recordId));

// tell that we have finished to work on this file (=unlock)
$this->repoWorker->markFinished($workerRunningJobId);
$this->repoWorker->markFinished($workerRunningJobId, $this->messagePublisher, MessagePublisher::SUBDEF_CREATION_TYPE);

$this->getDataboxLogger($databox)->initOrUpdateLogDocsFromWorker($record, $databox, $workerRunningJob, $subdefName, \Session_Logger::EVENT_SUBDEFCREATION, new \DateTime('now'), WorkerRunningJob::FINISHED);
}
Expand Down
Loading
Loading