Skip to content

Commit 4d4436f

Browse files
refactor: sync service and add tests
1 parent 6d5d232 commit 4d4436f

File tree

11 files changed

+222
-98
lines changed

11 files changed

+222
-98
lines changed

mobile/lib/domain/services/sync_stream.service.dart

Lines changed: 36 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import 'dart:async';
44

55
import 'package:immich_mobile/domain/interfaces/sync_api.interface.dart';
66
import 'package:immich_mobile/domain/interfaces/sync_stream.interface.dart';
7-
import 'package:immich_mobile/domain/models/sync/sync_event.model.dart';
87
import 'package:logging/logging.dart';
98
import 'package:openapi/api.dart';
109

@@ -14,12 +13,6 @@ class SyncStreamService {
1413
final ISyncApiRepository _syncApiRepository;
1514
final ISyncStreamRepository _syncStreamRepository;
1615

17-
StreamSubscription? _userSyncSubscription;
18-
Completer<void> _userSyncCompleter = Completer<void>();
19-
20-
StreamSubscription? _partnerSyncSubscription;
21-
Completer<void> _partnerSyncCompleter = Completer<void>();
22-
2316
SyncStreamService({
2417
required ISyncApiRepository syncApiRepository,
2518
required ISyncStreamRepository syncStreamRepository,
@@ -51,47 +44,46 @@ class SyncStreamService {
5144
return false;
5245
}
5346

54-
Future<void> _handleSyncEvents(List<SyncEvent> events) async {
55-
Map<SyncEntityType, String> acks = {};
56-
for (final event in events) {
57-
if (await _handleSyncData(event.data)) {
58-
// Only retain the latest ack from each type
59-
acks[event.type] = event.ack;
60-
}
61-
}
62-
await _syncApiRepository.ack(acks.values.toList());
63-
}
64-
65-
Future<void> syncUsers() async {
66-
_logger.info("Syncing User Changes");
67-
_userSyncSubscription =
68-
_syncApiRepository.getSyncEvents([SyncRequestType.usersV1]).listen(
69-
_handleSyncEvents,
70-
onDone: () {
71-
_userSyncCompleter.complete();
72-
_userSyncCompleter = Completer<void>();
47+
Future<void> _syncEvent(List<SyncRequestType> types) async {
48+
_logger.info("Syncing Events: $types");
49+
final streamCompleter = Completer();
50+
bool shouldSkipOnDone = false;
51+
final subscription = _syncApiRepository.getSyncEvents(types).listen(
52+
(events) async {
53+
try {
54+
Map<SyncEntityType, String> acks = {};
55+
for (final event in events) {
56+
// the onDone callback might fire before the events are processed
57+
// the following flag ensures that the onDone callback is not called
58+
// before the events are processed
59+
shouldSkipOnDone = true;
60+
if (await _handleSyncData(event.data)) {
61+
// Only retain the latest ack from each type
62+
acks[event.type] = event.ack;
63+
}
64+
}
65+
await _syncApiRepository.ack(acks.values.toList());
66+
} catch (error, stack) {
67+
_logger.warning("Error handling sync events", error, stack);
68+
}
69+
streamCompleter.completeOnce();
7370
},
71+
onError: (_) => streamCompleter.completeOnce(),
72+
// onDone is required to be called in cases where the stream is empty
73+
onDone: () => shouldSkipOnDone ? null : streamCompleter.completeOnce,
7474
);
75-
return await _userSyncCompleter.future;
75+
streamCompleter.future.whenComplete(subscription.cancel);
76+
return await streamCompleter.future;
7677
}
7778

78-
Future<void> syncPartners() async {
79-
_logger.info("Syncing Partner Changes");
80-
_partnerSyncSubscription =
81-
_syncApiRepository.getSyncEvents([SyncRequestType.partnersV1]).listen(
82-
_handleSyncEvents,
83-
onDone: () {
84-
_partnerSyncCompleter.complete();
85-
_partnerSyncCompleter = Completer<void>();
86-
},
87-
);
88-
return await _partnerSyncCompleter.future;
89-
}
79+
Future<void> syncUsers() => _syncEvent([SyncRequestType.usersV1]);
80+
Future<void> syncPartners() => _syncEvent([SyncRequestType.partnersV1]);
81+
}
9082

91-
Future<void> dispose() async {
92-
await _userSyncSubscription?.cancel();
93-
_userSyncCompleter.complete();
94-
await _partnerSyncSubscription?.cancel();
95-
_partnerSyncCompleter.complete();
83+
extension on Completer {
84+
void completeOnce() {
85+
if (!isCompleted) {
86+
complete();
87+
}
9688
}
9789
}

mobile/lib/providers/app_life_cycle.provider.dart

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import 'package:immich_mobile/models/backup/backup_state.model.dart';
77
import 'package:immich_mobile/providers/album/album.provider.dart';
88
import 'package:immich_mobile/providers/asset.provider.dart';
99
import 'package:immich_mobile/providers/auth.provider.dart';
10-
import 'package:immich_mobile/providers/background_sync.provider.dart';
1110
import 'package:immich_mobile/providers/backup/backup.provider.dart';
1211
import 'package:immich_mobile/providers/backup/ios_background_settings.provider.dart';
1312
import 'package:immich_mobile/providers/backup/manual_upload.provider.dart';
@@ -114,7 +113,6 @@ class AppLifeCycleNotifier extends StateNotifier<AppLifeCycleEnum> {
114113
_ref.read(backupProvider.notifier).cancelBackup();
115114
}
116115
_ref.read(websocketProvider.notifier).disconnect();
117-
_ref.read(backgroundSyncProvider).stop();
118116
}
119117

120118
LogService.I.flush();
Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
import 'package:hooks_riverpod/hooks_riverpod.dart';
22
import 'package:immich_mobile/constants/constants.dart';
33
import 'package:immich_mobile/utils/background_sync.dart';
4-
import 'package:riverpod_annotation/riverpod_annotation.dart';
54

6-
part 'background_sync.provider.g.dart';
7-
8-
@Riverpod(keepAlive: true)
9-
BackgroundSyncManager backgroundSync(Ref _) =>
10-
BackgroundSyncManager(duration: kBackgroundSyncDuration);
5+
final backgroundSyncProvider = Provider<BackgroundSyncManager>(
6+
(ref) => BackgroundSyncManager(duration: kBackgroundSyncDuration),
7+
);

mobile/lib/providers/background_sync.provider.g.dart

Lines changed: 0 additions & 27 deletions
This file was deleted.

mobile/lib/providers/db.provider.dart

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,5 @@
11
import 'package:hooks_riverpod/hooks_riverpod.dart';
2-
import 'package:immich_mobile/infrastructure/repositories/db.repository.dart';
32
import 'package:isar/isar.dart';
4-
import 'package:riverpod_annotation/riverpod_annotation.dart';
53

64
// overwritten in main.dart due to async loading
75
final dbProvider = Provider<Isar>((_) => throw UnimplementedError());
8-
9-
@Riverpod(keepAlive: true)
10-
Drift drift(Ref _) => Drift();

mobile/lib/providers/infrastructure/db.provider.dart

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import 'dart:async';
2+
13
import 'package:hooks_riverpod/hooks_riverpod.dart';
24
import 'package:immich_mobile/infrastructure/repositories/db.repository.dart';
35
import 'package:isar/isar.dart';
@@ -8,5 +10,8 @@ part 'db.provider.g.dart';
810
@Riverpod(keepAlive: true)
911
Isar isar(Ref ref) => throw UnimplementedError('isar');
1012

11-
@Riverpod(keepAlive: true)
12-
Drift drift(Ref _) => Drift();
13+
final driftProvider = Provider<Drift>((ref) {
14+
final drift = Drift();
15+
ref.onDispose(() => unawaited(drift.close()));
16+
return drift;
17+
});

mobile/lib/providers/infrastructure/sync_stream.provider.dart

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,16 @@
1-
import 'dart:async';
2-
31
import 'package:hooks_riverpod/hooks_riverpod.dart';
42
import 'package:immich_mobile/domain/services/sync_stream.service.dart';
53
import 'package:immich_mobile/infrastructure/repositories/sync_api.repository.dart';
64
import 'package:immich_mobile/infrastructure/repositories/sync_stream.repository.dart';
75
import 'package:immich_mobile/providers/api.provider.dart';
86
import 'package:immich_mobile/providers/infrastructure/db.provider.dart';
97

10-
final syncStreamServiceProvider = Provider((ref) {
11-
final instance = SyncStreamService(
8+
final syncStreamServiceProvider = Provider(
9+
(ref) => SyncStreamService(
1210
syncApiRepository: ref.watch(syncApiRepositoryProvider),
1311
syncStreamRepository: ref.watch(syncStreamRepositoryProvider),
14-
);
15-
16-
ref.onDispose(() => unawaited(instance.dispose()));
17-
18-
return instance;
19-
});
12+
),
13+
);
2014

2115
final syncApiRepositoryProvider = Provider(
2216
(ref) => SyncApiRepository(ref.watch(apiServiceProvider)),

mobile/lib/utils/background_sync.dart

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,12 @@ class BackgroundSyncManager {
2727
}
2828

2929
void start() {
30-
_logger.info('Background sync enabled');
30+
_logger.info('Starting Background sync');
3131
_timer ??= _createTimer();
3232
}
3333

3434
void stop() {
35-
_logger.info('Background sync disabled');
35+
_logger.info('Stopping Background sync');
3636
_timer?.cancel();
3737
_timer = null;
3838
}
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
import 'dart:async';
2+
3+
import 'package:flutter_test/flutter_test.dart';
4+
import 'package:immich_mobile/domain/interfaces/sync_api.interface.dart';
5+
import 'package:immich_mobile/domain/interfaces/sync_stream.interface.dart';
6+
import 'package:immich_mobile/domain/services/sync_stream.service.dart';
7+
import 'package:mocktail/mocktail.dart';
8+
import 'package:openapi/api.dart';
9+
10+
import '../../fixtures/sync_stream.stub.dart';
11+
import '../../infrastructure/repository.mock.dart';
12+
13+
void main() {
14+
late SyncStreamService sut;
15+
late ISyncStreamRepository mockSyncStreamRepo;
16+
late ISyncApiRepository mockSyncApiRepo;
17+
18+
setUp(() {
19+
mockSyncStreamRepo = MockSyncStreamRepository();
20+
mockSyncApiRepo = MockSyncApiRepository();
21+
sut = SyncStreamService(
22+
syncApiRepository: mockSyncApiRepo,
23+
syncStreamRepository: mockSyncStreamRepo,
24+
);
25+
26+
when(() => mockSyncApiRepo.getSyncEvents([SyncRequestType.usersV1]))
27+
.thenAnswer((_) => Stream.value(SyncStreamStub.userEvents));
28+
when(() => mockSyncApiRepo.getSyncEvents([SyncRequestType.partnersV1]))
29+
.thenAnswer((_) => Stream.value(SyncStreamStub.partnerEvents));
30+
when(() => mockSyncApiRepo.ack(any())).thenAnswer((_) => Future.value());
31+
32+
registerFallbackValue(SyncStreamStub.userV1Admin);
33+
when(() => mockSyncStreamRepo.updateUsersV1(any()))
34+
.thenAnswer((_) => Future.value(true));
35+
registerFallbackValue(SyncStreamStub.partnerV1);
36+
when(() => mockSyncStreamRepo.updatePartnerV1(any()))
37+
.thenAnswer((_) => Future.value(false));
38+
registerFallbackValue(SyncStreamStub.userDeleteV1);
39+
when(() => mockSyncStreamRepo.deleteUsersV1(any()))
40+
.thenAnswer((_) => Future.value(false));
41+
registerFallbackValue(SyncStreamStub.partnerDeleteV1);
42+
when(() => mockSyncStreamRepo.deletePartnerV1(any()))
43+
.thenAnswer((_) => Future.value(true));
44+
});
45+
46+
group("_syncEvent", () {
47+
test("future completed on success", () async {
48+
await expectLater(sut.syncUsers(), completes);
49+
});
50+
51+
test("future completes on error from stream", () async {
52+
when(() => mockSyncApiRepo.getSyncEvents([SyncRequestType.usersV1]))
53+
.thenAnswer((_) => Stream.error(Exception("Error")));
54+
await expectLater(sut.syncUsers(), completes);
55+
});
56+
57+
test("future throws on api exception", () {
58+
when(() => mockSyncApiRepo.getSyncEvents([SyncRequestType.usersV1]))
59+
.thenThrow(Exception("Error"));
60+
expect(sut.syncUsers(), throwsA(isA<Exception>()));
61+
});
62+
63+
test("future completes on repository exception", () {
64+
when(() => mockSyncStreamRepo.updateUsersV1(any()))
65+
.thenThrow(Exception("Error"));
66+
expect(sut.syncUsers(), completes);
67+
});
68+
69+
test("sends ack for successful events", () async {
70+
when(() => mockSyncStreamRepo.updateUsersV1(any()))
71+
.thenAnswer((_) => Future.value(false));
72+
when(() => mockSyncStreamRepo.deleteUsersV1(any()))
73+
.thenAnswer((_) => Future.value(true));
74+
await sut.syncUsers();
75+
verify(() => mockSyncApiRepo.ack(["2"])).called(1);
76+
});
77+
78+
test("only sends the latest ack for events of same type", () async {
79+
await sut.syncUsers();
80+
verify(() => mockSyncApiRepo.ack(["5"])).called(1);
81+
});
82+
});
83+
84+
group("syncUsers", () {
85+
test("calls _syncEvent with usersV1", () async {
86+
await sut.syncUsers();
87+
verify(() => mockSyncApiRepo.getSyncEvents([SyncRequestType.usersV1]))
88+
.called(1);
89+
});
90+
91+
test("calls _handleSyncData for each event", () async {
92+
await sut.syncUsers();
93+
verify(() => mockSyncStreamRepo.updateUsersV1(SyncStreamStub.userV1Admin))
94+
.called(1);
95+
verify(
96+
() => mockSyncStreamRepo.deleteUsersV1(SyncStreamStub.userDeleteV1),
97+
).called(1);
98+
});
99+
});
100+
101+
group("syncPartners", () {
102+
test("calls _syncEvent with partnersV1", () async {
103+
await sut.syncPartners();
104+
verify(() => mockSyncApiRepo.getSyncEvents([SyncRequestType.partnersV1]))
105+
.called(1);
106+
});
107+
108+
test("calls _handleSyncData for each event", () async {
109+
await sut.syncPartners();
110+
verify(
111+
() => mockSyncStreamRepo.updatePartnerV1(SyncStreamStub.partnerV1),
112+
).called(1);
113+
verify(
114+
() =>
115+
mockSyncStreamRepo.deletePartnerV1(SyncStreamStub.partnerDeleteV1),
116+
).called(1);
117+
});
118+
});
119+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import 'package:immich_mobile/domain/models/sync/sync_event.model.dart';
2+
import 'package:openapi/api.dart';
3+
4+
abstract final class SyncStreamStub {
5+
static final userV1Admin = SyncUserV1(
6+
deletedAt: DateTime(2020),
7+
email: "admin@admin",
8+
id: "1",
9+
name: "Admin",
10+
);
11+
static final userV1User = SyncUserV1(
12+
deletedAt: DateTime(2021),
13+
email: "user@user",
14+
id: "2",
15+
name: "User",
16+
);
17+
static final userDeleteV1 = SyncUserDeleteV1(userId: "2");
18+
static final userEvents = [
19+
SyncEvent(type: SyncEntityType.userV1, data: userV1Admin, ack: "1"),
20+
SyncEvent(
21+
type: SyncEntityType.userDeleteV1,
22+
data: userDeleteV1,
23+
ack: "2",
24+
),
25+
SyncEvent(type: SyncEntityType.userV1, data: userV1User, ack: "5"),
26+
];
27+
28+
static final partnerV1 = SyncPartnerV1(
29+
inTimeline: true,
30+
sharedById: "1",
31+
sharedWithId: "2",
32+
);
33+
static final partnerDeleteV1 = SyncPartnerDeleteV1(
34+
sharedById: "3",
35+
sharedWithId: "4",
36+
);
37+
static final partnerEvents = [
38+
SyncEvent(type: SyncEntityType.partnerV1, data: partnerV1, ack: "3"),
39+
SyncEvent(
40+
type: SyncEntityType.partnerDeleteV1,
41+
data: partnerDeleteV1,
42+
ack: "4",
43+
),
44+
];
45+
}

0 commit comments

Comments
 (0)