Skip to content

Commit 562be00

Browse files
committed
feat(server): add queued job metrics to telemetry
Adds a new observable gauge `immich.queues.<queue_name>.queued` to track the number of jobs in waiting, paused, or delayed states. - Polls queue statistics every 5 seconds. - Fetches queue counts in parallel. - Only runs polling loop if job telemetry is enabled. - Includes debug logging for failed metric updates.
1 parent 997aec2 commit 562be00

File tree

4 files changed

+113
-1
lines changed

4 files changed

+113
-1
lines changed

server/src/repositories/telemetry.repository.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ type MetricGroupOptions = { enabled: boolean };
2424

2525
export class MetricGroupRepository {
2626
private enabled = false;
27+
private observableGauges = new Map<string, () => number>();
2728

2829
constructor(private metricService: MetricService) {}
2930

@@ -45,6 +46,15 @@ export class MetricGroupRepository {
4546
}
4647
}
4748

49+
setObservableGauge(name: string, valueCallback: () => number, options?: MetricOptions): void {
50+
if (this.enabled && !this.observableGauges.has(name)) {
51+
this.observableGauges.set(name, valueCallback);
52+
this.metricService.getObservableGauge(name, options).addCallback((observableResult) => {
53+
observableResult.observe(valueCallback());
54+
});
55+
}
56+
}
57+
4858
configure(options: MetricGroupOptions): this {
4959
this.enabled = options.enabled;
5060
return this;
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import { ImmichTelemetry, QueueName } from 'src/enum';
2+
import { TelemetryService } from 'src/services/telemetry.service';
3+
import { newTestService, ServiceMocks } from 'test/utils';
4+
5+
describe(TelemetryService.name, () => {
6+
let sut: TelemetryService;
7+
let mocks: ServiceMocks;
8+
9+
beforeEach(() => {
10+
({ sut, mocks } = newTestService(TelemetryService));
11+
});
12+
13+
it('should work', () => {
14+
expect(sut).toBeDefined();
15+
});
16+
17+
describe('onBootstrap', () => {
18+
it('should register queued metrics if enabled', async () => {
19+
mocks.config.getEnv.mockReturnValue({
20+
telemetry: {
21+
metrics: new Set([ImmichTelemetry.Job]),
22+
},
23+
} as any);
24+
25+
mocks.job.getJobCounts.mockResolvedValue({
26+
waiting: 1,
27+
paused: 2,
28+
delayed: 3,
29+
active: 0,
30+
completed: 0,
31+
failed: 0,
32+
});
33+
34+
await sut.onBootstrap();
35+
36+
expect(mocks.telemetry.jobs.setObservableGauge).toHaveBeenCalledTimes(Object.keys(QueueName).length);
37+
expect(mocks.job.getJobCounts).toHaveBeenCalledTimes(Object.keys(QueueName).length);
38+
});
39+
40+
it('should not register queued metrics if disabled', async () => {
41+
mocks.config.getEnv.mockReturnValue({
42+
telemetry: {
43+
metrics: new Set([]),
44+
},
45+
} as any);
46+
47+
await sut.onBootstrap();
48+
49+
expect(mocks.telemetry.jobs.setObservableGauge).not.toHaveBeenCalled();
50+
expect(mocks.job.getJobCounts).not.toHaveBeenCalled();
51+
});
52+
});
53+
});

server/src/services/telemetry.service.ts

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,62 @@
11
import { snakeCase } from 'lodash';
22
import { OnEvent } from 'src/decorators';
3-
import { ImmichWorker, JobStatus } from 'src/enum';
3+
import { ImmichTelemetry, ImmichWorker, JobStatus, QueueName } from 'src/enum';
44
import { ArgOf, ArgsOf } from 'src/repositories/event.repository';
55
import { BaseService } from 'src/services/base.service';
66

7+
const QUEUE_METRICS_POLLING_INTERVAL = 5000;
8+
79
export class TelemetryService extends BaseService {
10+
private queueCounts = new Map<string, number>();
11+
private pollingInterval?: NodeJS.Timeout;
12+
813
@OnEvent({ name: 'AppBootstrap', workers: [ImmichWorker.Api] })
914
async onBootstrap(): Promise<void> {
1015
const userCount = await this.userRepository.getCount();
1116
this.telemetryRepository.api.addToGauge('immich.users.total', userCount);
17+
18+
const { telemetry } = this.configRepository.getEnv();
19+
if (telemetry.metrics.has(ImmichTelemetry.Job)) {
20+
// Register observable gauges for queued metrics
21+
this.registerQueuedMetrics();
22+
23+
// Start polling queue statistics
24+
await this.updateQueuedMetrics();
25+
this.pollingInterval = setInterval(() => {
26+
void this.updateQueuedMetrics();
27+
}, QUEUE_METRICS_POLLING_INTERVAL);
28+
}
29+
}
30+
31+
@OnEvent({ name: 'AppShutdown' })
32+
onShutdown(): void {
33+
if (this.pollingInterval) {
34+
clearInterval(this.pollingInterval);
35+
}
36+
}
37+
38+
private registerQueuedMetrics(): void {
39+
for (const queueName of Object.values(QueueName)) {
40+
const metricName = `immich.queues.${snakeCase(queueName)}.queued`;
41+
this.telemetryRepository.jobs.setObservableGauge(metricName, () => this.queueCounts.get(metricName) ?? 0, {
42+
description: `Number of queued jobs in ${queueName} queue (waiting + paused + delayed)`,
43+
});
44+
}
45+
}
46+
47+
private async updateQueuedMetrics(): Promise<void> {
48+
await Promise.all(
49+
Object.values(QueueName).map(async (queueName) => {
50+
try {
51+
const stats = await this.jobRepository.getJobCounts(queueName);
52+
const queuedCount = stats.waiting + stats.paused + stats.delayed;
53+
const metricName = `immich.queues.${snakeCase(queueName)}.queued`;
54+
this.queueCounts.set(metricName, queuedCount);
55+
} catch (error) {
56+
this.logger.debug(`Failed to update queued metrics for ${queueName}: ${error}`);
57+
}
58+
}),
59+
);
1260
}
1361

1462
@OnEvent({ name: 'UserCreate' })

server/test/repositories/telemetry.repository.mock.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ const newMetricGroupMock = () => {
77
addToCounter: vitest.fn(),
88
addToGauge: vitest.fn(),
99
addToHistogram: vitest.fn(),
10+
setObservableGauge: vitest.fn(),
1011
configure: vitest.fn(),
1112
};
1213
};

0 commit comments

Comments
 (0)