Skip to content

Commit e5b2362

Browse files
committed
feat(server): add queued job metrics to telemetry
Adds a new observable gauge `immich.queues.<queue_name>.queued` to track the number of jobs in waiting, paused, or delayed states. - Polls queue statistics every 5 seconds. - Fetches queue counts in parallel. - Only runs polling loop if job telemetry is enabled. - Includes debug logging for failed metric updates.
1 parent 997aec2 commit e5b2362

File tree

4 files changed

+140
-14
lines changed

4 files changed

+140
-14
lines changed

server/src/repositories/telemetry.repository.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ type MetricGroupOptions = { enabled: boolean };
2424

2525
export class MetricGroupRepository {
2626
private enabled = false;
27+
private observableGauges = new Map<string, () => number>();
2728

2829
constructor(private metricService: MetricService) {}
2930

@@ -45,6 +46,15 @@ export class MetricGroupRepository {
4546
}
4647
}
4748

49+
setObservableGauge(name: string, valueCallback: () => number, options?: MetricOptions): void {
50+
if (this.enabled && !this.observableGauges.has(name)) {
51+
this.observableGauges.set(name, valueCallback);
52+
this.metricService.getObservableGauge(name, options).addCallback((observableResult) => {
53+
observableResult.observe(valueCallback());
54+
});
55+
}
56+
}
57+
4858
configure(options: MetricGroupOptions): this {
4959
this.enabled = options.enabled;
5060
return this;
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import { ImmichTelemetry, QueueName } from 'src/enum';
2+
import { TelemetryService } from 'src/services/telemetry.service';
3+
import { newTestService, ServiceMocks } from 'test/utils';
4+
5+
describe(TelemetryService.name, () => {
6+
let sut: TelemetryService;
7+
let mocks: ServiceMocks;
8+
9+
beforeEach(() => {
10+
({ sut, mocks } = newTestService(TelemetryService));
11+
});
12+
13+
it('should work', () => {
14+
expect(sut).toBeDefined();
15+
});
16+
17+
describe('onBootstrap', () => {
18+
it('should register queued metrics if enabled', async () => {
19+
mocks.config.getEnv.mockReturnValue({
20+
telemetry: {
21+
metrics: new Set([ImmichTelemetry.Job]),
22+
},
23+
} as any);
24+
25+
mocks.job.getJobCounts.mockResolvedValue({
26+
waiting: 1,
27+
paused: 2,
28+
delayed: 3,
29+
active: 0,
30+
completed: 0,
31+
failed: 0,
32+
});
33+
34+
await sut.onBootstrap();
35+
36+
expect(mocks.telemetry.jobs.setObservableGauge).toHaveBeenCalledTimes(Object.keys(QueueName).length * 4);
37+
expect(mocks.job.getJobCounts).toHaveBeenCalledTimes(Object.keys(QueueName).length);
38+
});
39+
40+
it('should not register queued metrics if disabled', async () => {
41+
mocks.config.getEnv.mockReturnValue({
42+
telemetry: {
43+
metrics: new Set(),
44+
},
45+
} as any);
46+
47+
await sut.onBootstrap();
48+
49+
expect(mocks.telemetry.jobs.setObservableGauge).not.toHaveBeenCalled();
50+
expect(mocks.job.getJobCounts).not.toHaveBeenCalled();
51+
});
52+
});
53+
});

server/src/services/telemetry.service.ts

Lines changed: 76 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,88 @@
11
import { snakeCase } from 'lodash';
22
import { OnEvent } from 'src/decorators';
3-
import { ImmichWorker, JobStatus } from 'src/enum';
4-
import { ArgOf, ArgsOf } from 'src/repositories/event.repository';
3+
import { ImmichTelemetry, ImmichWorker, JobStatus, QueueName } from 'src/enum';
4+
import { ArgOf } from 'src/repositories/event.repository';
55
import { BaseService } from 'src/services/base.service';
66

7+
const QUEUE_METRICS_POLLING_INTERVAL = 5000;
8+
79
export class TelemetryService extends BaseService {
10+
private queueWaitingCounts = new Map<string, number>();
11+
private queuePausedCounts = new Map<string, number>();
12+
private queueDelayedCounts = new Map<string, number>();
13+
private queueActiveCounts = new Map<string, number>();
14+
private pollingInterval?: NodeJS.Timeout;
15+
816
@OnEvent({ name: 'AppBootstrap', workers: [ImmichWorker.Api] })
917
async onBootstrap(): Promise<void> {
1018
const userCount = await this.userRepository.getCount();
1119
this.telemetryRepository.api.addToGauge('immich.users.total', userCount);
20+
21+
const { telemetry } = this.configRepository.getEnv();
22+
if (telemetry.metrics.has(ImmichTelemetry.Job)) {
23+
// Register observable gauges for queued metrics
24+
this.registerQueuedMetrics();
25+
26+
// Start polling queue statistics
27+
await this.updateQueuedMetrics();
28+
this.pollingInterval = setInterval(() => {
29+
void this.updateQueuedMetrics();
30+
}, QUEUE_METRICS_POLLING_INTERVAL);
31+
}
32+
}
33+
34+
@OnEvent({ name: 'AppShutdown' })
35+
onShutdown(): void {
36+
if (this.pollingInterval) {
37+
clearInterval(this.pollingInterval);
38+
}
39+
}
40+
41+
private registerQueuedMetrics(): void {
42+
for (const queueName of Object.values(QueueName)) {
43+
const queueKey = snakeCase(queueName);
44+
45+
this.telemetryRepository.jobs.setObservableGauge(
46+
`immich.queues.${queueKey}.waiting`,
47+
() => this.queueWaitingCounts.get(queueKey) ?? 0,
48+
{ description: `Number of waiting jobs in ${queueName} queue` },
49+
);
50+
51+
this.telemetryRepository.jobs.setObservableGauge(
52+
`immich.queues.${queueKey}.paused`,
53+
() => this.queuePausedCounts.get(queueKey) ?? 0,
54+
{ description: `Number of paused jobs in ${queueName} queue` },
55+
);
56+
57+
this.telemetryRepository.jobs.setObservableGauge(
58+
`immich.queues.${queueKey}.delayed`,
59+
() => this.queueDelayedCounts.get(queueKey) ?? 0,
60+
{ description: `Number of delayed jobs in ${queueName} queue` },
61+
);
62+
63+
this.telemetryRepository.jobs.setObservableGauge(
64+
`immich.queues.${queueKey}.active`,
65+
() => this.queueActiveCounts.get(queueKey) ?? 0,
66+
{ description: `Number of active jobs in ${queueName} queue` },
67+
);
68+
}
69+
}
70+
71+
private async updateQueuedMetrics(): Promise<void> {
72+
await Promise.all(
73+
Object.values(QueueName).map(async (queueName) => {
74+
try {
75+
const stats = await this.jobRepository.getJobCounts(queueName);
76+
const queueKey = snakeCase(queueName);
77+
this.queueWaitingCounts.set(queueKey, stats.waiting);
78+
this.queuePausedCounts.set(queueKey, stats.paused);
79+
this.queueDelayedCounts.set(queueKey, stats.delayed);
80+
this.queueActiveCounts.set(queueKey, stats.active);
81+
} catch (error) {
82+
this.logger.debug(`Failed to update queued metrics for ${queueName}: ${error}`);
83+
}
84+
}),
85+
);
1286
}
1387

1488
@OnEvent({ name: 'UserCreate' })
@@ -26,12 +100,6 @@ export class TelemetryService extends BaseService {
26100
this.telemetryRepository.api.addToGauge(`immich.users.total`, 1);
27101
}
28102

29-
@OnEvent({ name: 'JobStart' })
30-
onJobStart(...[queueName]: ArgsOf<'JobStart'>) {
31-
const queueMetric = `immich.queues.${snakeCase(queueName)}.active`;
32-
this.telemetryRepository.jobs.addToGauge(queueMetric, 1);
33-
}
34-
35103
@OnEvent({ name: 'JobSuccess' })
36104
onJobSuccess({ job, response }: ArgOf<'JobSuccess'>) {
37105
if (response && Object.values(JobStatus).includes(response as JobStatus)) {
@@ -46,12 +114,6 @@ export class TelemetryService extends BaseService {
46114
this.telemetryRepository.jobs.addToCounter(jobMetric, 1);
47115
}
48116

49-
@OnEvent({ name: 'JobComplete' })
50-
onJobComplete(...[queueName]: ArgsOf<'JobComplete'>) {
51-
const queueMetric = `immich.queues.${snakeCase(queueName)}.active`;
52-
this.telemetryRepository.jobs.addToGauge(queueMetric, -1);
53-
}
54-
55117
@OnEvent({ name: 'QueueStart' })
56118
onQueueStart({ name }: ArgOf<'QueueStart'>) {
57119
this.telemetryRepository.jobs.addToCounter(`immich.queues.${snakeCase(name)}.started`, 1);

server/test/repositories/telemetry.repository.mock.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ const newMetricGroupMock = () => {
77
addToCounter: vitest.fn(),
88
addToGauge: vitest.fn(),
99
addToHistogram: vitest.fn(),
10+
setObservableGauge: vitest.fn(),
1011
configure: vitest.fn(),
1112
};
1213
};

0 commit comments

Comments
 (0)