Skip to content

Commit 987e5ab

Browse files
authored
fix(server): start job workers after DB (#17806)
Job workers are currently started on app init, which means they are started before the DB is initialised. This can be problematic if jobs which need to use the DB start running before it's ready. It also means that swapping out the queue implementation for something which uses the DB won't work.
1 parent 1b5e981 commit 987e5ab

File tree

4 files changed

+22
-7
lines changed

4 files changed

+22
-7
lines changed

server/src/app.module.ts

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@ import { LoggingInterceptor } from 'src/middleware/logging.interceptor';
1717
import { repositories } from 'src/repositories';
1818
import { ConfigRepository } from 'src/repositories/config.repository';
1919
import { EventRepository } from 'src/repositories/event.repository';
20-
import { JobRepository } from 'src/repositories/job.repository';
2120
import { LoggingRepository } from 'src/repositories/logging.repository';
2221
import { teardownTelemetry, TelemetryRepository } from 'src/repositories/telemetry.repository';
2322
import { services } from 'src/services';
2423
import { AuthService } from 'src/services/auth.service';
2524
import { CliService } from 'src/services/cli.service';
25+
import { JobService } from 'src/services/job.service';
2626
import { getKyselyConfig } from 'src/utils/database';
2727

2828
const common = [...repositories, ...services, GlobalExceptionFilter];
@@ -52,7 +52,7 @@ class BaseModule implements OnModuleInit, OnModuleDestroy {
5252
@Inject(IWorker) private worker: ImmichWorker,
5353
logger: LoggingRepository,
5454
private eventRepository: EventRepository,
55-
private jobRepository: JobRepository,
55+
private jobService: JobService,
5656
private telemetryRepository: TelemetryRepository,
5757
private authService: AuthService,
5858
) {
@@ -62,10 +62,7 @@ class BaseModule implements OnModuleInit, OnModuleDestroy {
6262
async onModuleInit() {
6363
this.telemetryRepository.setup({ repositories });
6464

65-
this.jobRepository.setup({ services });
66-
if (this.worker === ImmichWorker.MICROSERVICES) {
67-
this.jobRepository.startWorkers();
68-
}
65+
this.jobService.setServices(services);
6966

7067
this.eventRepository.setAuthFn(async (client) =>
7168
this.authService.authenticate({

server/src/enum.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,8 @@ export enum DatabaseExtension {
407407
export enum BootstrapEventPriority {
408408
// Database service should be initialized before anything else, most other services need database access
409409
DatabaseService = -200,
410+
// Other services may need to queue jobs on bootstrap.
411+
JobService = -190,
410412
// Initialise config after other bootstrap services, stop other services from using config on bootstrap
411413
SystemConfig = 100,
412414
}

server/src/repositories/job.repository.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ export class JobRepository {
3333
this.logger.setContext(JobRepository.name);
3434
}
3535

36-
setup({ services }: { services: ClassConstructor<unknown>[] }) {
36+
setup(services: ClassConstructor<unknown>[]) {
3737
const reflector = this.moduleRef.get(Reflector, { strict: false });
3838

3939
// discovery

server/src/services/job.service.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
import { BadRequestException, Injectable } from '@nestjs/common';
2+
import { ClassConstructor } from 'class-transformer';
23
import { snakeCase } from 'lodash';
34
import { OnEvent } from 'src/decorators';
45
import { mapAsset } from 'src/dtos/asset-response.dto';
56
import { AllJobStatusResponseDto, JobCommandDto, JobCreateDto, JobStatusDto } from 'src/dtos/job.dto';
67
import {
78
AssetType,
9+
BootstrapEventPriority,
810
ImmichWorker,
911
JobCommand,
1012
JobName,
@@ -51,6 +53,8 @@ const asJobItem = (dto: JobCreateDto): JobItem => {
5153

5254
@Injectable()
5355
export class JobService extends BaseService {
56+
private services: ClassConstructor<unknown>[] = [];
57+
5458
@OnEvent({ name: 'config.init', workers: [ImmichWorker.MICROSERVICES] })
5559
onConfigInit({ newConfig: config }: ArgOf<'config.init'>) {
5660
this.logger.debug(`Updating queue concurrency settings`);
@@ -69,6 +73,18 @@ export class JobService extends BaseService {
6973
this.onConfigInit({ newConfig: config });
7074
}
7175

76+
@OnEvent({ name: 'app.bootstrap', priority: BootstrapEventPriority.JobService })
77+
onBootstrap() {
78+
this.jobRepository.setup(this.services);
79+
if (this.worker === ImmichWorker.MICROSERVICES) {
80+
this.jobRepository.startWorkers();
81+
}
82+
}
83+
84+
setServices(services: ClassConstructor<unknown>[]) {
85+
this.services = services;
86+
}
87+
7288
async create(dto: JobCreateDto): Promise<void> {
7389
await this.jobRepository.queue(asJobItem(dto));
7490
}

0 commit comments

Comments
 (0)