Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,10 @@ class BackgroundWorker(context: Context, params: WorkerParameters) :
* - Parameter success: Indicates whether the background task completed successfully
*/
private fun complete(success: Result) {
Log.d(TAG, "About to complete BackupWorker with result: $success")
isComplete = true
engine?.destroy()
engine = null
flutterApi = null
completionHandler.set(success)
}
Expand Down
10 changes: 1 addition & 9 deletions mobile/ios/Runner.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
archiveVersion = 1;
classes = {
};
objectVersion = 77;
objectVersion = 54;
objects = {

/* Begin PBXBuildFile section */
Expand Down Expand Up @@ -507,14 +507,10 @@
inputFileListPaths = (
"${PODS_ROOT}/Target Support Files/Pods-Runner/Pods-Runner-resources-${CONFIGURATION}-input-files.xcfilelist",
);
inputPaths = (
);
name = "[CP] Copy Pods Resources";
outputFileListPaths = (
"${PODS_ROOT}/Target Support Files/Pods-Runner/Pods-Runner-resources-${CONFIGURATION}-output-files.xcfilelist",
);
outputPaths = (
);
runOnlyForDeploymentPostprocessing = 0;
shellPath = /bin/sh;
shellScript = "\"${PODS_ROOT}/Target Support Files/Pods-Runner/Pods-Runner-resources.sh\"\n";
Expand Down Expand Up @@ -543,14 +539,10 @@
inputFileListPaths = (
"${PODS_ROOT}/Target Support Files/Pods-Runner/Pods-Runner-frameworks-${CONFIGURATION}-input-files.xcfilelist",
);
inputPaths = (
);
name = "[CP] Embed Pods Frameworks";
outputFileListPaths = (
"${PODS_ROOT}/Target Support Files/Pods-Runner/Pods-Runner-frameworks-${CONFIGURATION}-output-files.xcfilelist",
);
outputPaths = (
);
runOnlyForDeploymentPostprocessing = 0;
shellPath = /bin/sh;
shellScript = "\"${PODS_ROOT}/Target Support Files/Pods-Runner/Pods-Runner-frameworks.sh\"\n";
Expand Down
3 changes: 2 additions & 1 deletion mobile/ios/Runner/Background/BackgroundWorker.swift
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class BackgroundWorker: BackgroundWorkerBgHostApi {
self.handleHostResult(result: result)
})
}

/**
* Cancels the currently running background task, either due to timeout or external request.
* Sends a cancel signal to the Flutter side and sets up a fallback timer to ensure
Expand All @@ -140,6 +140,7 @@ class BackgroundWorker: BackgroundWorkerBgHostApi {
self.complete(success: false)
}
}


/**
* Handles the result from Flutter API calls and determines the success/failure status.
Expand Down
28 changes: 23 additions & 5 deletions mobile/lib/domain/services/background_worker.service.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import 'package:background_downloader/background_downloader.dart';
import 'package:flutter/material.dart';
import 'package:hooks_riverpod/hooks_riverpod.dart';
import 'package:immich_mobile/constants/constants.dart';
import 'package:immich_mobile/domain/utils/isolate_lock_manager.dart';
import 'package:immich_mobile/infrastructure/repositories/db.repository.dart';
import 'package:immich_mobile/infrastructure/repositories/logger_db.repository.dart';
import 'package:immich_mobile/platform/background_worker_api.g.dart';
Expand Down Expand Up @@ -41,7 +42,8 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi {
final Drift _drift;
final DriftLogger _driftLogger;
final BackgroundWorkerBgHostApi _backgroundHostApi;
final Logger _logger = Logger('BackgroundWorkerBgService');
final Logger _logger = Logger('BackgroundUploadBgService');
late final IsolateLockManager _lockManager;

bool _isCleanedUp = false;

Expand All @@ -57,6 +59,7 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi {
driftProvider.overrideWith(driftOverride(drift)),
],
);
_lockManager = IsolateLockManager(onCloseRequest: _cleanup);
BackgroundWorkerFlutterApi.setUp(this);
}

Expand All @@ -80,11 +83,25 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi {
await FileDownloader().trackTasksInGroup(kDownloadGroupLivePhoto, markDownloadedComplete: false);
await FileDownloader().trackTasks();
configureFileDownloaderNotifications();

await _ref.read(fileMediaRepositoryProvider).enableBackgroundAccess();

// Notify the host that the background worker service has been initialized and is ready to use
_backgroundHostApi.onInitialized();
// Notify the host that the background upload service has been initialized and is ready to use
debugPrint("Acquiring background worker lock");
if (await _lockManager.acquireLock().timeout(
const Duration(seconds: 5),
onTimeout: () {
_lockManager.cancel();
return false;
},
)) {
_logger.info("Acquired background worker lock");
await _backgroundHostApi.onInitialized();
return;
}

_logger.warning("Failed to acquire background worker lock");
await _cleanup();
await _backgroundHostApi.close();
} catch (error, stack) {
_logger.severe("Failed to initialize background worker", error, stack);
_backgroundHostApi.close();
Expand Down Expand Up @@ -160,7 +177,8 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi {
await _drift.close();
await _driftLogger.close();
_ref.dispose();
debugPrint("Background worker cleaned up");
_lockManager.releaseLock();
_logger.info("Background worker resources cleaned up");
} catch (error, stack) {
debugPrint('Failed to cleanup background worker: $error with stack: $stack');
}
Expand Down
235 changes: 235 additions & 0 deletions mobile/lib/domain/utils/isolate_lock_manager.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
import 'dart:isolate';
import 'dart:ui';

import 'package:flutter/foundation.dart';
import 'package:logging/logging.dart';

const String kIsolateLockManagerPort = "immich://isolate_mutex";

enum _LockStatus { active, released }

class _IsolateRequest {
const _IsolateRequest();
}

class _HeartbeatRequest extends _IsolateRequest {
// Port for the receiver to send replies back
final SendPort sendPort;

const _HeartbeatRequest(this.sendPort);

Map<String, dynamic> toJson() {
return {'type': 'heartbeat', 'sendPort': sendPort};
}
}

class _CloseRequest extends _IsolateRequest {
const _CloseRequest();

Map<String, dynamic> toJson() {
return {'type': 'close'};
}
}

class _IsolateResponse {
const _IsolateResponse();
}

class _HeartbeatResponse extends _IsolateResponse {
final _LockStatus status;

const _HeartbeatResponse(this.status);

Map<String, dynamic> toJson() {
return {'type': 'heartbeat', 'status': status.index};
}
}

typedef OnCloseLockHolderRequest = void Function();

class IsolateLockManager {
final String _portName;
bool _hasLock = false;
ReceivePort? _receivePort;
final OnCloseLockHolderRequest? _onCloseRequest;
final Set<SendPort> _waitingIsolates = {};
// Token object - a new one is created for each acquisition attempt
Object? _currentAcquisitionToken;

IsolateLockManager({String? portName, OnCloseLockHolderRequest? onCloseRequest})
: _portName = portName ?? kIsolateLockManagerPort,
_onCloseRequest = onCloseRequest;

Future<bool> acquireLock() async {
if (_hasLock) {
Logger('BackgroundWorkerLockManager').warning("WARNING: [acquireLock] called more than once");
return true;
}

// Create a new token - this invalidates any previous attempt
final token = _currentAcquisitionToken = Object();

final ReceivePort rp = _receivePort = ReceivePort(_portName);
final SendPort sp = rp.sendPort;

while (!IsolateNameServer.registerPortWithName(sp, _portName)) {
// This attempt was superseded by a newer one in the same isolate
if (_currentAcquisitionToken != token) {
return false;
}

await _lockReleasedByHolder(token);
}

_hasLock = true;
rp.listen(_onRequest);
return true;
}

Future<void> _lockReleasedByHolder(Object token) async {
SendPort? holder = IsolateNameServer.lookupPortByName(_portName);
debugPrint("Found lock holder: $holder");
if (holder == null) {
// No holder, try and acquire lock
return;
}

final ReceivePort tempRp = ReceivePort();
final SendPort tempSp = tempRp.sendPort;
final bs = tempRp.asBroadcastStream();

try {
while (true) {
// Send a heartbeat request with the send port to receive reply from the holder

debugPrint("Sending heartbeat request to lock holder");
holder.send(_HeartbeatRequest(tempSp).toJson());
dynamic answer = await bs.first.timeout(const Duration(seconds: 3), onTimeout: () => null);

debugPrint("Received heartbeat response from lock holder: $answer");
// This attempt was superseded by a newer one in the same isolate
if (_currentAcquisitionToken != token) {
break;
}

if (answer == null) {
// Holder failed, most likely killed without calling releaseLock
// Check if a different waiting isolate took the lock
if (holder == IsolateNameServer.lookupPortByName(_portName)) {
// No, remove the stale lock
IsolateNameServer.removePortNameMapping(_portName);
}
break;
}

// Unknown message type received for heartbeat request. Try again
_IsolateResponse? response = _parseResponse(answer);
if (response == null || response is! _HeartbeatResponse) {
break;
}

if (response.status == _LockStatus.released) {
// Holder has released the lock
break;
}

// If the _LockStatus is active, we check again if the task completed
// by sending a released messaged again, if not, send a new heartbeat again

// Check if the holder completed its task after the heartbeat
answer = await bs.first.timeout(
const Duration(seconds: 3),
onTimeout: () => const _HeartbeatResponse(_LockStatus.active).toJson(),
);

response = _parseResponse(answer);
if (response is _HeartbeatResponse && response.status == _LockStatus.released) {
break;
}
}
} catch (e) {
// Timeout or error
} finally {
tempRp.close();
}
return;
}

_IsolateRequest? _parseRequest(dynamic msg) {
if (msg is! Map<String, dynamic>) {
return null;
}

return switch (msg['type']) {
'heartbeat' => _HeartbeatRequest(msg['sendPort']),
'close' => const _CloseRequest(),
_ => null,
};
}

_IsolateResponse? _parseResponse(dynamic msg) {
if (msg is! Map<String, dynamic>) {
return null;
}

return switch (msg['type']) {
'heartbeat' => _HeartbeatResponse(_LockStatus.values[msg['status']]),
_ => null,
};
}

// Executed in the isolate with the lock
void _onRequest(dynamic msg) {
final request = _parseRequest(msg);
if (request == null) {
return;
}

if (request is _HeartbeatRequest) {
// Add the send port to the list of waiting isolates
_waitingIsolates.add(request.sendPort);
request.sendPort.send(const _HeartbeatResponse(_LockStatus.active).toJson());
return;
}

if (request is _CloseRequest) {
_onCloseRequest?.call();
return;
}
}

void releaseLock() {
if (_hasLock) {
IsolateNameServer.removePortNameMapping(_portName);

// Notify waiting isolates
for (final port in _waitingIsolates) {
port.send(const _HeartbeatResponse(_LockStatus.released).toJson());
}
_waitingIsolates.clear();

_hasLock = false;
}

_receivePort?.close();
_receivePort = null;
}

void cancel() {
if (_hasLock) {
return;
}

debugPrint("Cancelling ongoing acquire lock attempts");
// Create a new token to invalidate ongoing acquire lock attempts
_currentAcquisitionToken = Object();
}

void requestHolderToClose() {
if (_hasLock) {
return;
}

IsolateNameServer.lookupPortByName(_portName)?.send(const _CloseRequest().toJson());
}
}
Loading