Skip to content

feat: user sync stream #16862

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 27 commits into from
Apr 17, 2025
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
49eb5ee
refactor: user entity
shenlong-tanwen Mar 6, 2025
fea9a16
chore: rebase fixes
shenlong-tanwen Mar 10, 2025
155a2e8
refactor: remove int user Id
shenlong-tanwen Mar 11, 2025
3f7fa90
refactor: migrate store userId from int to string
shenlong-tanwen Mar 11, 2025
c9309d6
refactor: rename uid to id
shenlong-tanwen Mar 12, 2025
5fa42c7
feat: drift
shenlong-tanwen Mar 13, 2025
aa32bed
pr feedback
shenlong-tanwen Mar 28, 2025
df65f70
refactor: move common overrides to mixin
shenlong-tanwen Mar 29, 2025
24d9dac
refactor: remove int user Id
shenlong-tanwen Mar 11, 2025
c964067
refactor: migrate store userId from int to string
shenlong-tanwen Mar 11, 2025
734566d
refactor: rename uid to id
shenlong-tanwen Mar 12, 2025
3739e86
feat: user & partner sync stream
shenlong-tanwen Mar 14, 2025
7246bcf
pr changes
shenlong-tanwen Mar 17, 2025
6baff23
refactor: sync service and add tests
shenlong-tanwen Mar 18, 2025
bc53f5c
chore: remove generated change
shenlong-tanwen Mar 18, 2025
3dc2540
chore: move sync model
shenlong-tanwen Mar 19, 2025
1fdb78b
rebase: convert string ids to byte uuids
shenlong-tanwen Mar 29, 2025
afd664d
rebase
shenlong-tanwen Apr 3, 2025
84d665e
Merge branch 'main' of github.com:immich-app/immich into feat/user-sy…
alextran1502 Apr 12, 2025
70263c5
add processing logs
shenlong-tanwen Apr 14, 2025
52cca7a
batch db calls
shenlong-tanwen Apr 14, 2025
deca45a
rewrite isolate manager
shenlong-tanwen Apr 15, 2025
92210b3
Merge branch 'main' into feat/user-sync-stream
alextran1502 Apr 15, 2025
c427f52
rewrite with worker_manager
shenlong-tanwen Apr 16, 2025
b72a91a
Merge branch 'main' of github.com:immich-app/immich into feat/user-sy…
alextran1502 Apr 16, 2025
04e6e56
misc fixes
shenlong-tanwen Apr 16, 2025
13ef491
add sync order test
shenlong-tanwen Apr 16, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion mobile/analysis_options.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ linter:
analyzer:
exclude:
- openapi/**
- build/**
- lib/generated_plugin_registrant.dart
- lib/**/*.g.dart
- lib/**/*.drift.dart
Expand Down Expand Up @@ -92,6 +93,9 @@ custom_lint:
allowed:
# required / wanted
- lib/repositories/*_api.repository.dart
- lib/domain/models/sync_event.model.dart
- lib/{domain,infrastructure}/**/sync_stream.*
- lib/{domain,infrastructure}/**/sync_api.*
- lib/infrastructure/repositories/*_api.repository.dart
- lib/infrastructure/utils/*.converter.dart
# acceptable exceptions for the time being
Expand Down Expand Up @@ -144,7 +148,9 @@ dart_code_metrics:
- avoid-global-state
- avoid-inverted-boolean-checks
- avoid-late-final-reassignment
- avoid-local-functions
- avoid-local-functions:
exclude:
- test/**.dart
- avoid-negated-conditions
- avoid-nested-streams-and-futures
- avoid-referencing-subclasses
Expand Down
1 change: 1 addition & 0 deletions mobile/devtools_options.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
description: This file stores settings for Dart & Flutter DevTools.
documentation: https://docs.flutter.dev/tools/devtools/extensions#configure-extension-enablement-states
extensions:
- drift: true
4 changes: 4 additions & 0 deletions mobile/lib/constants/constants.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,9 @@ const double downloadFailed = -2;
// Number of log entries to retain on app start
const int kLogTruncateLimit = 250;

// Sync
const int kSyncEventBatchSize = 5000;

// Hash batch limits
const int kBatchHashFileLimit = 128;
const int kBatchHashSizeLimit = 1024 * 1024 * 1024; // 1GB
7 changes: 4 additions & 3 deletions mobile/lib/domain/interfaces/sync_api.interface.dart
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import 'package:immich_mobile/domain/models/sync/sync_event.model.dart';
import 'package:immich_mobile/domain/models/sync_event.model.dart';
import 'package:openapi/api.dart';

abstract interface class ISyncApiRepository {
Future<void> ack(String data);
Future<void> ack(List<String> data);

Stream<List<SyncEvent>> watchUserSyncEvent();
Stream<List<SyncEvent>> getSyncEvents(List<SyncRequestType> type);
}
10 changes: 10 additions & 0 deletions mobile/lib/domain/interfaces/sync_stream.interface.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import 'package:immich_mobile/domain/interfaces/db.interface.dart';
import 'package:openapi/api.dart';

abstract interface class ISyncStreamRepository implements IDatabaseRepository {
Future<bool> updateUsersV1(Iterable<SyncUserV1> data);
Future<bool> deleteUsersV1(Iterable<SyncUserDeleteV1> data);

Future<bool> updatePartnerV1(Iterable<SyncPartnerV1> data);
Future<bool> deletePartnerV1(Iterable<SyncPartnerDeleteV1> data);
}
14 changes: 0 additions & 14 deletions mobile/lib/domain/models/sync/sync_event.model.dart

This file was deleted.

13 changes: 13 additions & 0 deletions mobile/lib/domain/models/sync_event.model.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import 'package:openapi/api.dart';

class SyncEvent {
final SyncEntityType type;
// ignore: avoid-dynamic
final dynamic data;
Comment on lines +5 to +6
Copy link
Member

@mertalev mertalev Apr 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should avoid dynamic to the extent possible. Ideally, only the JSON parsing would involve dynamic, and parse directly into strongly typed objects that share a type (either through an interface, by inheritance or maybe through generics). As it is, dynamic leaks into the events, the response converter map, the main sync handler, etc.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, we went with having strongly typed DTOs initially. But it resulted in us creating a lot of duplicate domain classes that were already available from the OpenAPI generated code. The generated codes are not strongly typed so we do not have a common base class that we could use here.

final String ack;

const SyncEvent({required this.type, required this.data, required this.ack});

@override
String toString() => 'SyncEvent(type: $type, data: $data, ack: $ack)';
}
180 changes: 153 additions & 27 deletions mobile/lib/domain/services/sync_stream.service.dart
Original file line number Diff line number Diff line change
@@ -1,49 +1,175 @@
// ignore_for_file: avoid-passing-async-when-sync-expected

import 'dart:async';

import 'package:flutter/foundation.dart';
import 'package:collection/collection.dart';
import 'package:immich_mobile/domain/interfaces/sync_api.interface.dart';
import 'package:immich_mobile/domain/interfaces/sync_stream.interface.dart';
import 'package:immich_mobile/domain/utils/cancel.exception.dart';
import 'package:logging/logging.dart';
import 'package:openapi/api.dart';

class SyncStreamService {
final Logger _logger = Logger('SyncStreamService');

final ISyncApiRepository _syncApiRepository;
final ISyncStreamRepository _syncStreamRepository;
final Completer<bool>? _cancelCompleter;

SyncStreamService({
required ISyncApiRepository syncApiRepository,
required ISyncStreamRepository syncStreamRepository,
Completer<bool>? cancelCompleter,
}) : _syncApiRepository = syncApiRepository,
_syncStreamRepository = syncStreamRepository,
_cancelCompleter = cancelCompleter;

SyncStreamService(this._syncApiRepository);
Future<bool> _handleSyncData(
SyncEntityType type,
// ignore: avoid-dynamic
Iterable<dynamic> data,
) async {
if (data.isEmpty) {
_logger.warning("Received empty sync data for $type");
return false;
}

StreamSubscription? _userSyncSubscription;
_logger.fine("Processing sync data for $type of length ${data.length}");

void syncUsers() {
_userSyncSubscription =
_syncApiRepository.watchUserSyncEvent().listen((events) async {
for (final event in events) {
if (event.data is SyncUserV1) {
final data = event.data as SyncUserV1;
debugPrint("User Update: $data");
try {
if (type == SyncEntityType.partnerV1) {
return await _syncStreamRepository.updatePartnerV1(data.cast());
}

if (type == SyncEntityType.partnerDeleteV1) {
return await _syncStreamRepository.deletePartnerV1(data.cast());
}

// final user = await _userRepository.get(data.id);
if (type == SyncEntityType.userV1) {
return await _syncStreamRepository.updateUsersV1(data.cast());
}

// if (user == null) {
// continue;
// }
if (type == SyncEntityType.userDeleteV1) {
return await _syncStreamRepository.deleteUsersV1(data.cast());
}
} catch (error, stack) {
_logger.severe("Error processing sync data for $type", error, stack);
return false;
}

// user.name = data.name;
// user.email = data.email;
// user.updatedAt = DateTime.now();
_logger.warning("Unknown sync data type: $type");
return false;
}

// await _userRepository.update(user);
// await _syncApiRepository.ack(event.ack);
Future<void> _syncEvent(List<SyncRequestType> types) async {
_logger.info("Syncing Events: $types");
final streamCompleter = Completer();
bool shouldComplete = false;
// the onDone callback might fire before the events are processed
// the following flag ensures that the onDone callback is not called
// before the events are processed and also that events are processed sequentially
Completer? mutex;
StreamSubscription? subscription;
subscription = _syncApiRepository.getSyncEvents(types).listen(
(events) async {
if (events.isEmpty) {
_logger.warning("Received empty sync events");
return;
}

if (event.data is SyncUserDeleteV1) {
final data = event.data as SyncUserDeleteV1;
// If previous events are still being processed, wait for them to finish
if (mutex != null) {
await mutex!.future;
}

debugPrint("User delete: $data");
// await _syncApiRepository.ack(event.ack);
if (_cancelCompleter?.isCompleted ?? false) {
_logger.info("Sync cancelled, stopping stream");
subscription?.cancel();
if (!streamCompleter.isCompleted) {
streamCompleter.completeError(
const CancelException(),
StackTrace.current,
);
}
return;
}
}

// Take control of the mutex and process the events
mutex = Completer();

try {
final eventsMap = events.groupListsBy((event) => event.type);
final Map<SyncEntityType, String> acks = {};

for (final entry in eventsMap.entries) {
if (_cancelCompleter?.isCompleted ?? false) {
_logger.info("Sync cancelled, stopping stream");
mutex?.complete();
mutex = null;
if (!streamCompleter.isCompleted) {
streamCompleter.completeError(
const CancelException(),
StackTrace.current,
);
}

return;
}

final type = entry.key;
final data = entry.value;

if (data.isEmpty) {
_logger.warning("Received empty sync events for $type");
continue;
}

if (await _handleSyncData(type, data.map((e) => e.data))) {
// ignore: avoid-unsafe-collection-methods
acks[type] = data.last.ack;
} else {
_logger.warning("Failed to handle sync events for $type");
}
}

if (acks.isNotEmpty) {
await _syncApiRepository.ack(acks.values.toList());
}
_logger.info("$types events processed");
} catch (error, stack) {
_logger.warning("Error handling sync events", error, stack);
} finally {
mutex?.complete();
mutex = null;
}

if (shouldComplete) {
_logger.info("Sync done, completing stream");
if (!streamCompleter.isCompleted) streamCompleter.complete();
}
},
onError: (error, stack) {
_logger.warning("Error in sync stream for $types", error, stack);
// Do not proceed if the stream errors
if (!streamCompleter.isCompleted) streamCompleter.complete();
},
onDone: () {
_logger.info("$types stream done");
if (mutex == null && !streamCompleter.isCompleted) {
streamCompleter.complete();
} else {
// Marks the stream as done but does not complete the completer
// until the events are processed
shouldComplete = true;
}
},
);
return await streamCompleter.future.whenComplete(() {
_logger.info("Sync stream completed");
return subscription?.cancel();
});
}

Future<void> dispose() async {
await _userSyncSubscription?.cancel();
}
Future<void> syncUsers() =>
_syncEvent([SyncRequestType.usersV1, SyncRequestType.partnersV1]);
}
42 changes: 42 additions & 0 deletions mobile/lib/domain/utils/background_sync.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// ignore_for_file: avoid-passing-async-when-sync-expected

import 'dart:async';

import 'package:async/async.dart';
import 'package:immich_mobile/providers/infrastructure/cancel.provider.dart';
import 'package:immich_mobile/providers/infrastructure/sync_stream.provider.dart';
import 'package:immich_mobile/utils/isolate.dart';

class BackgroundSyncManager {
CancelableOperation<void>? _userSyncFuture;

BackgroundSyncManager();

Future<void> cancel() async {
await _userSyncFuture?.cancel();
_userSyncFuture = null;
}

Future<void> syncUsers() async {
if (_userSyncFuture != null) {
return _userSyncFuture!.valueOrCancellation();
}

if (_userSyncFuture == null) {
final isolate = await IsolateManager.spawn(
computation: (ref) => ref.read(syncStreamServiceProvider).syncUsers(),
onCancel: (ref) => ref.read(cancellationProvider).complete(true),
);
_userSyncFuture = CancelableOperation.fromFuture(
isolate.run(),
onCancel: () {
isolate.cancel();
_userSyncFuture = null;
},
);
return _userSyncFuture!
.valueOrCancellation()
.then((_) => _userSyncFuture = null);
}
}
}
8 changes: 8 additions & 0 deletions mobile/lib/domain/utils/cancel.exception.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
class CancelException implements Exception {
final String message;

const CancelException([this.message = "Operation was cancelled."]);

@override
String toString() => "CancelException: $message";
}
9 changes: 9 additions & 0 deletions mobile/lib/extensions/string_extensions.dart
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
import 'dart:typed_data';

import 'package:uuid/parsing.dart';

extension StringExtension on String {
String capitalize() {
return split(" ")
Expand Down Expand Up @@ -29,3 +33,8 @@ extension DurationExtension on String {
return int.parse(this);
}
}

extension UUIDExtension on String {
Uint8List toUuidByte({bool shouldValidate = false}) =>
UuidParsing.parseAsByteList(this, validate: shouldValidate);
}
Loading