Skip to content

Commit 5fe954b

Browse files
fix: use lock to synchronise foreground and background backup (#21522)
* fix: use lock to synchronise foreground and background backup # Conflicts: # mobile/lib/domain/services/background_worker.service.dart # mobile/lib/platform/background_worker_api.g.dart # mobile/pigeon/background_worker_api.dart * add timeout to the splash-screen acquire lock * fix: null check on created date --------- Co-authored-by: shenlong-tanwen <[email protected]> Co-authored-by: Alex <[email protected]>
1 parent 7f81a5b commit 5fe954b

File tree

9 files changed

+300
-21
lines changed

9 files changed

+300
-21
lines changed

mobile/android/app/src/main/kotlin/app/alextran/immich/background/BackgroundWorker.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,8 +130,10 @@ class BackgroundWorker(context: Context, params: WorkerParameters) :
130130
* - Parameter success: Indicates whether the background task completed successfully
131131
*/
132132
private fun complete(success: Result) {
133+
Log.d(TAG, "About to complete BackupWorker with result: $success")
133134
isComplete = true
134135
engine?.destroy()
136+
engine = null
135137
flutterApi = null
136138
completionHandler.set(success)
137139
}

mobile/ios/Runner.xcodeproj/project.pbxproj

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
archiveVersion = 1;
44
classes = {
55
};
6-
objectVersion = 77;
6+
objectVersion = 54;
77
objects = {
88

99
/* Begin PBXBuildFile section */
@@ -507,14 +507,10 @@
507507
inputFileListPaths = (
508508
"${PODS_ROOT}/Target Support Files/Pods-Runner/Pods-Runner-resources-${CONFIGURATION}-input-files.xcfilelist",
509509
);
510-
inputPaths = (
511-
);
512510
name = "[CP] Copy Pods Resources";
513511
outputFileListPaths = (
514512
"${PODS_ROOT}/Target Support Files/Pods-Runner/Pods-Runner-resources-${CONFIGURATION}-output-files.xcfilelist",
515513
);
516-
outputPaths = (
517-
);
518514
runOnlyForDeploymentPostprocessing = 0;
519515
shellPath = /bin/sh;
520516
shellScript = "\"${PODS_ROOT}/Target Support Files/Pods-Runner/Pods-Runner-resources.sh\"\n";
@@ -543,14 +539,10 @@
543539
inputFileListPaths = (
544540
"${PODS_ROOT}/Target Support Files/Pods-Runner/Pods-Runner-frameworks-${CONFIGURATION}-input-files.xcfilelist",
545541
);
546-
inputPaths = (
547-
);
548542
name = "[CP] Embed Pods Frameworks";
549543
outputFileListPaths = (
550544
"${PODS_ROOT}/Target Support Files/Pods-Runner/Pods-Runner-frameworks-${CONFIGURATION}-output-files.xcfilelist",
551545
);
552-
outputPaths = (
553-
);
554546
runOnlyForDeploymentPostprocessing = 0;
555547
shellPath = /bin/sh;
556548
shellScript = "\"${PODS_ROOT}/Target Support Files/Pods-Runner/Pods-Runner-frameworks.sh\"\n";

mobile/ios/Runner/Background/BackgroundWorker.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ class BackgroundWorker: BackgroundWorkerBgHostApi {
118118
self.handleHostResult(result: result)
119119
})
120120
}
121-
121+
122122
/**
123123
* Cancels the currently running background task, either due to timeout or external request.
124124
* Sends a cancel signal to the Flutter side and sets up a fallback timer to ensure
@@ -140,6 +140,7 @@ class BackgroundWorker: BackgroundWorkerBgHostApi {
140140
self.complete(success: false)
141141
}
142142
}
143+
143144

144145
/**
145146
* Handles the result from Flutter API calls and determines the success/failure status.

mobile/lib/domain/services/background_worker.service.dart

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import 'package:background_downloader/background_downloader.dart';
55
import 'package:flutter/material.dart';
66
import 'package:hooks_riverpod/hooks_riverpod.dart';
77
import 'package:immich_mobile/constants/constants.dart';
8+
import 'package:immich_mobile/domain/utils/isolate_lock_manager.dart';
89
import 'package:immich_mobile/infrastructure/repositories/db.repository.dart';
910
import 'package:immich_mobile/infrastructure/repositories/logger_db.repository.dart';
1011
import 'package:immich_mobile/platform/background_worker_api.g.dart';
@@ -41,7 +42,8 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi {
4142
final Drift _drift;
4243
final DriftLogger _driftLogger;
4344
final BackgroundWorkerBgHostApi _backgroundHostApi;
44-
final Logger _logger = Logger('BackgroundWorkerBgService');
45+
final Logger _logger = Logger('BackgroundUploadBgService');
46+
late final IsolateLockManager _lockManager;
4547

4648
bool _isCleanedUp = false;
4749

@@ -57,6 +59,7 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi {
5759
driftProvider.overrideWith(driftOverride(drift)),
5860
],
5961
);
62+
_lockManager = IsolateLockManager(onCloseRequest: _cleanup);
6063
BackgroundWorkerFlutterApi.setUp(this);
6164
}
6265

@@ -80,11 +83,25 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi {
8083
await FileDownloader().trackTasksInGroup(kDownloadGroupLivePhoto, markDownloadedComplete: false);
8184
await FileDownloader().trackTasks();
8285
configureFileDownloaderNotifications();
83-
8486
await _ref.read(fileMediaRepositoryProvider).enableBackgroundAccess();
8587

86-
// Notify the host that the background worker service has been initialized and is ready to use
87-
_backgroundHostApi.onInitialized();
88+
// Notify the host that the background upload service has been initialized and is ready to use
89+
debugPrint("Acquiring background worker lock");
90+
if (await _lockManager.acquireLock().timeout(
91+
const Duration(seconds: 5),
92+
onTimeout: () {
93+
_lockManager.cancel();
94+
return false;
95+
},
96+
)) {
97+
_logger.info("Acquired background worker lock");
98+
await _backgroundHostApi.onInitialized();
99+
return;
100+
}
101+
102+
_logger.warning("Failed to acquire background worker lock");
103+
await _cleanup();
104+
await _backgroundHostApi.close();
88105
} catch (error, stack) {
89106
_logger.severe("Failed to initialize background worker", error, stack);
90107
_backgroundHostApi.close();
@@ -160,7 +177,8 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi {
160177
await _drift.close();
161178
await _driftLogger.close();
162179
_ref.dispose();
163-
debugPrint("Background worker cleaned up");
180+
_lockManager.releaseLock();
181+
_logger.info("Background worker resources cleaned up");
164182
} catch (error, stack) {
165183
debugPrint('Failed to cleanup background worker: $error with stack: $stack');
166184
}
Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
import 'dart:isolate';
2+
import 'dart:ui';
3+
4+
import 'package:flutter/foundation.dart';
5+
import 'package:logging/logging.dart';
6+
7+
const String kIsolateLockManagerPort = "immich://isolate_mutex";
8+
9+
enum _LockStatus { active, released }
10+
11+
class _IsolateRequest {
12+
const _IsolateRequest();
13+
}
14+
15+
class _HeartbeatRequest extends _IsolateRequest {
16+
// Port for the receiver to send replies back
17+
final SendPort sendPort;
18+
19+
const _HeartbeatRequest(this.sendPort);
20+
21+
Map<String, dynamic> toJson() {
22+
return {'type': 'heartbeat', 'sendPort': sendPort};
23+
}
24+
}
25+
26+
class _CloseRequest extends _IsolateRequest {
27+
const _CloseRequest();
28+
29+
Map<String, dynamic> toJson() {
30+
return {'type': 'close'};
31+
}
32+
}
33+
34+
class _IsolateResponse {
35+
const _IsolateResponse();
36+
}
37+
38+
class _HeartbeatResponse extends _IsolateResponse {
39+
final _LockStatus status;
40+
41+
const _HeartbeatResponse(this.status);
42+
43+
Map<String, dynamic> toJson() {
44+
return {'type': 'heartbeat', 'status': status.index};
45+
}
46+
}
47+
48+
typedef OnCloseLockHolderRequest = void Function();
49+
50+
class IsolateLockManager {
51+
final String _portName;
52+
bool _hasLock = false;
53+
ReceivePort? _receivePort;
54+
final OnCloseLockHolderRequest? _onCloseRequest;
55+
final Set<SendPort> _waitingIsolates = {};
56+
// Token object - a new one is created for each acquisition attempt
57+
Object? _currentAcquisitionToken;
58+
59+
IsolateLockManager({String? portName, OnCloseLockHolderRequest? onCloseRequest})
60+
: _portName = portName ?? kIsolateLockManagerPort,
61+
_onCloseRequest = onCloseRequest;
62+
63+
Future<bool> acquireLock() async {
64+
if (_hasLock) {
65+
Logger('BackgroundWorkerLockManager').warning("WARNING: [acquireLock] called more than once");
66+
return true;
67+
}
68+
69+
// Create a new token - this invalidates any previous attempt
70+
final token = _currentAcquisitionToken = Object();
71+
72+
final ReceivePort rp = _receivePort = ReceivePort(_portName);
73+
final SendPort sp = rp.sendPort;
74+
75+
while (!IsolateNameServer.registerPortWithName(sp, _portName)) {
76+
// This attempt was superseded by a newer one in the same isolate
77+
if (_currentAcquisitionToken != token) {
78+
return false;
79+
}
80+
81+
await _lockReleasedByHolder(token);
82+
}
83+
84+
_hasLock = true;
85+
rp.listen(_onRequest);
86+
return true;
87+
}
88+
89+
Future<void> _lockReleasedByHolder(Object token) async {
90+
SendPort? holder = IsolateNameServer.lookupPortByName(_portName);
91+
debugPrint("Found lock holder: $holder");
92+
if (holder == null) {
93+
// No holder, try and acquire lock
94+
return;
95+
}
96+
97+
final ReceivePort tempRp = ReceivePort();
98+
final SendPort tempSp = tempRp.sendPort;
99+
final bs = tempRp.asBroadcastStream();
100+
101+
try {
102+
while (true) {
103+
// Send a heartbeat request with the send port to receive reply from the holder
104+
105+
debugPrint("Sending heartbeat request to lock holder");
106+
holder.send(_HeartbeatRequest(tempSp).toJson());
107+
dynamic answer = await bs.first.timeout(const Duration(seconds: 3), onTimeout: () => null);
108+
109+
debugPrint("Received heartbeat response from lock holder: $answer");
110+
// This attempt was superseded by a newer one in the same isolate
111+
if (_currentAcquisitionToken != token) {
112+
break;
113+
}
114+
115+
if (answer == null) {
116+
// Holder failed, most likely killed without calling releaseLock
117+
// Check if a different waiting isolate took the lock
118+
if (holder == IsolateNameServer.lookupPortByName(_portName)) {
119+
// No, remove the stale lock
120+
IsolateNameServer.removePortNameMapping(_portName);
121+
}
122+
break;
123+
}
124+
125+
// Unknown message type received for heartbeat request. Try again
126+
_IsolateResponse? response = _parseResponse(answer);
127+
if (response == null || response is! _HeartbeatResponse) {
128+
break;
129+
}
130+
131+
if (response.status == _LockStatus.released) {
132+
// Holder has released the lock
133+
break;
134+
}
135+
136+
// If the _LockStatus is active, we check again if the task completed
137+
// by sending a released messaged again, if not, send a new heartbeat again
138+
139+
// Check if the holder completed its task after the heartbeat
140+
answer = await bs.first.timeout(
141+
const Duration(seconds: 3),
142+
onTimeout: () => const _HeartbeatResponse(_LockStatus.active).toJson(),
143+
);
144+
145+
response = _parseResponse(answer);
146+
if (response is _HeartbeatResponse && response.status == _LockStatus.released) {
147+
break;
148+
}
149+
}
150+
} catch (e) {
151+
// Timeout or error
152+
} finally {
153+
tempRp.close();
154+
}
155+
return;
156+
}
157+
158+
_IsolateRequest? _parseRequest(dynamic msg) {
159+
if (msg is! Map<String, dynamic>) {
160+
return null;
161+
}
162+
163+
return switch (msg['type']) {
164+
'heartbeat' => _HeartbeatRequest(msg['sendPort']),
165+
'close' => const _CloseRequest(),
166+
_ => null,
167+
};
168+
}
169+
170+
_IsolateResponse? _parseResponse(dynamic msg) {
171+
if (msg is! Map<String, dynamic>) {
172+
return null;
173+
}
174+
175+
return switch (msg['type']) {
176+
'heartbeat' => _HeartbeatResponse(_LockStatus.values[msg['status']]),
177+
_ => null,
178+
};
179+
}
180+
181+
// Executed in the isolate with the lock
182+
void _onRequest(dynamic msg) {
183+
final request = _parseRequest(msg);
184+
if (request == null) {
185+
return;
186+
}
187+
188+
if (request is _HeartbeatRequest) {
189+
// Add the send port to the list of waiting isolates
190+
_waitingIsolates.add(request.sendPort);
191+
request.sendPort.send(const _HeartbeatResponse(_LockStatus.active).toJson());
192+
return;
193+
}
194+
195+
if (request is _CloseRequest) {
196+
_onCloseRequest?.call();
197+
return;
198+
}
199+
}
200+
201+
void releaseLock() {
202+
if (_hasLock) {
203+
IsolateNameServer.removePortNameMapping(_portName);
204+
205+
// Notify waiting isolates
206+
for (final port in _waitingIsolates) {
207+
port.send(const _HeartbeatResponse(_LockStatus.released).toJson());
208+
}
209+
_waitingIsolates.clear();
210+
211+
_hasLock = false;
212+
}
213+
214+
_receivePort?.close();
215+
_receivePort = null;
216+
}
217+
218+
void cancel() {
219+
if (_hasLock) {
220+
return;
221+
}
222+
223+
debugPrint("Cancelling ongoing acquire lock attempts");
224+
// Create a new token to invalidate ongoing acquire lock attempts
225+
_currentAcquisitionToken = Object();
226+
}
227+
228+
void requestHolderToClose() {
229+
if (_hasLock) {
230+
return;
231+
}
232+
233+
IsolateNameServer.lookupPortByName(_portName)?.send(const _CloseRequest().toJson());
234+
}
235+
}

0 commit comments

Comments
 (0)