Skip to content

[Feature] Web Navigator Locks #54

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ This monorepo uses [melos](https://melos.invertase.dev/) to handle command and p

To configure the monorepo for development run `melos prepare` after cloning.

For detailed usage, check out the inner [sqlite_async](https://github.com/powersync-ja/sqlite_async.dart/tree/main/packages/sqlite_async) and [drift_sqlite_async](https://github.com/powersync-ja/sqlite_async.dart/tree/main/packages/drift_sqlite_async) packages.
For detailed usage, check out the inner [sqlite_async](https://github.com/powersync-ja/sqlite_async.dart/tree/main/packages/sqlite_async) and [drift_sqlite_async](https://github.com/powersync-ja/sqlite_async.dart/tree/main/packages/drift_sqlite_async) packages.
8 changes: 6 additions & 2 deletions packages/sqlite_async/lib/src/common/mutex.dart
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import 'package:sqlite_async/src/impl/mutex_impl.dart';

abstract class Mutex {
factory Mutex() {
return MutexImpl();
factory Mutex(
{
/// An optional identifier for this Mutex instance.
/// This could be used for platform specific logic or debugging purposes.
String? identifier}) {
return MutexImpl(identifier: identifier);
}

/// timeout is a timeout for acquiring the lock, not for the callback
Expand Down
4 changes: 4 additions & 0 deletions packages/sqlite_async/lib/src/impl/stub_mutex.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import 'package:sqlite_async/src/common/mutex.dart';

class MutexImpl implements Mutex {
String? identifier;

MutexImpl({this.identifier});

@override
Future<void> close() {
throw UnimplementedError();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import 'package:sqlite_async/src/common/mutex.dart';
import 'package:sqlite_async/src/common/port_channel.dart';

abstract class MutexImpl implements Mutex {
factory MutexImpl() {
factory MutexImpl({String? identifier}) {
return SimpleMutex();
}
}
Expand All @@ -19,12 +19,13 @@ class SimpleMutex implements MutexImpl {
// Adapted from https://github.com/tekartik/synchronized.dart/blob/master/synchronized/lib/src/basic_lock.dart

Future<dynamic>? last;
String? identifier;

// Hack to make sure the Mutex is not copied to another isolate.
// ignore: unused_field
final Finalizer _f = Finalizer((_) {});

SimpleMutex();
SimpleMutex({this.identifier});

bool get locked => last != null;

Expand Down
141 changes: 135 additions & 6 deletions packages/sqlite_async/lib/src/web/web_mutex.dart
Original file line number Diff line number Diff line change
@@ -1,13 +1,37 @@
import 'dart:async';
import 'dart:math';

import 'package:meta/meta.dart';
import 'package:mutex/mutex.dart' as mutex;
import 'dart:js_interop';
import 'dart:js_util' as js_util;
// This allows for checking things like hasProperty without the need for depending on the `js` package
import 'dart:js_interop_unsafe';
import 'package:web/web.dart';

import 'package:sqlite_async/src/common/mutex.dart';

@JS('navigator')
external Navigator get _navigator;

/// Web implementation of [Mutex]
/// This should use `navigator.locks` in future
class MutexImpl implements Mutex {
late final mutex.Mutex m;
late final mutex.Mutex fallback;
String? identifier;
final String _resolvedIdentifier;

MutexImpl() {
m = mutex.Mutex();
MutexImpl({this.identifier})

/// On web a lock name is required for Navigator locks.
/// Having exclusive Mutex instances requires a somewhat unique lock name.
/// This provides a best effort unique identifier, if no identifier is provided.
/// This should be fine for most use cases:
/// - The uuid package could be added for better uniqueness if required.
/// This would add another package dependency to `sqlite_async` which is potentially unnecessary at this point.
/// An identifier should be supplied for better exclusion.
: _resolvedIdentifier = identifier ??
"${DateTime.now().microsecondsSinceEpoch}-${Random().nextDouble()}" {
fallback = mutex.Mutex();
}

@override
Expand All @@ -17,12 +41,117 @@ class MutexImpl implements Mutex {

@override
Future<T> lock<T>(Future<T> Function() callback, {Duration? timeout}) {
// Note this lock is only valid in a single web tab
return m.protect(callback);
if ((_navigator as JSObject).hasProperty('locks'.toJS).toDart) {
return _webLock(callback, timeout: timeout);
} else {
return _fallbackLock(callback, timeout: timeout);
}
}

/// Locks the callback with a standard Mutex from the `mutex` package
Future<T> _fallbackLock<T>(Future<T> Function() callback,
{Duration? timeout}) {
final completer = Completer<T>();
// Need to implement timeout manually for this
bool isTimedOut = false;
bool lockObtained = false;
if (timeout != null) {
Future.delayed(timeout, () {
if (lockObtained == false) {
isTimedOut = true;
completer.completeError(
TimeoutException('Failed to acquire lock', timeout));
}
});
}

fallback.protect(() async {
try {
if (isTimedOut) {
// Don't actually run logic
return;
}
lockObtained = true;
final result = await callback();
completer.complete(result);
} catch (ex) {
completer.completeError(ex);
}
});

return completer.future;
}

/// Locks the callback with web Navigator locks
Future<T> _webLock<T>(Future<T> Function() callback,
{Duration? timeout}) async {
final lock = await _getWebLock(timeout);
try {
final result = await callback();
return result;
} finally {
lock.release();
}
}

/// Passing the Dart callback directly to the JS Navigator can cause some weird
/// context related bugs. Instead the JS lock callback will return a hold on the lock
/// which is represented as a [HeldLock]. This hold can be used when wrapping the Dart
/// callback to manage the JS lock.
/// This is inspired and adapted from https://github.com/simolus3/sqlite3.dart/blob/7bdca77afd7be7159dbef70fd1ac5aa4996211a9/sqlite3_web/lib/src/locks.dart#L6
Future<HeldLock> _getWebLock(Duration? timeout) {
final gotLock = Completer<HeldLock>.sync();
// Navigator locks can be timed out by using an AbortSignal
final controller = AbortController();

bool lockAcquired = false;
if (timeout != null) {
// Can't really abort the `delayed` call easily :(
Future.delayed(timeout, () {
if (lockAcquired == true) {
return;
}
gotLock
.completeError(TimeoutException('Failed to acquire lock', timeout));
controller.abort('Timeout'.toJS);
});
}

// If timeout occurred before the lock is available, then this callback should not be called.
JSPromise jsCallback(JSAny lock) {
// Mark that if the timeout occurs after this point then nothing should be done
lockAcquired = true;

// Give the Held lock something to mark this Navigator lock as completed
final jsCompleter = Completer.sync();
gotLock.complete(HeldLock._(jsCompleter));
return jsCompleter.future.toJS;
}

final lockOptions = JSObject();
lockOptions['signal'] = controller.signal;
final promise = _navigator.locks
.request(_resolvedIdentifier, lockOptions, jsCallback.toJS);
// A timeout abort will throw an exception which needs to be handled.
// There should not be any other unhandled lock errors.
js_util.promiseToFuture(promise).catchError((error) {});
return gotLock.future;
}

@override
Mutex open() {
return this;
}
}

/// This represents a hold on an active Navigator lock.
/// This is created inside the Navigator lock callback function and is used to release the lock
/// from an external source.
@internal
class HeldLock {
final Completer<void> _completer;

HeldLock._(this._completer);

void release() => _completer.complete();
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class DefaultSqliteOpenFactory
// cases, we need to implement a mutex locally.
final mutex = connection.access == AccessMode.throughSharedWorker
? null
: MutexImpl();
: MutexImpl(identifier: path); // Use the DB path as a mutex identifier

return WebDatabase(connection.database, options.mutex ?? mutex);
}
Expand Down
1 change: 1 addition & 0 deletions packages/sqlite_async/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ dependencies:
collection: ^1.17.0
mutex: ^3.1.0
meta: ^1.10.0
web: ^0.5.1

dev_dependencies:
dcli: ^4.0.0
Expand Down
116 changes: 66 additions & 50 deletions packages/sqlite_async/test/mutex_test.dart
Original file line number Diff line number Diff line change
@@ -1,67 +1,83 @@
@TestOn('!browser')
import 'dart:isolate';
import 'dart:async';
import 'dart:math';

import 'package:sqlite_async/src/native/native_isolate_mutex.dart';
import 'package:sqlite_async/sqlite_async.dart';
import 'package:test/test.dart';

import 'utils/test_utils_impl.dart';

final testUtils = TestUtils();

void main() {
group('Mutex Tests', () {
test('Closing', () async {
// Test that locks are properly released when calling SharedMutex.close()
// in in Isolate.
// A timeout in this test indicates a likely error.
for (var i = 0; i < 50; i++) {
final mutex = SimpleMutex();
final serialized = mutex.shared;

final result = await Isolate.run(() async {
return _lockInIsolate(serialized);
group('Shared Mutex Tests', () {
test('Queue exclusive operations', () async {
final m = Mutex();
final collection = List.generate(10, (index) => index);
final results = <int>[];

final futures = collection.map((element) async {
return m.lock(() async {
// Simulate some asynchronous work
await Future.delayed(Duration(milliseconds: Random().nextInt(100)));
results.add(element);
return element;
});
}).toList();

await mutex.lock(() async {});
// Await all the promises
await Future.wait(futures);

expect(result, equals(5));
}
// Check if the results are in ascending order
expect(results, equals(collection));
});
});

test('Re-use after closing', () async {
// Test that shared locks can be opened and closed multiple times.
final mutex = SimpleMutex();
final serialized = mutex.shared;
test('Timeout should throw a TimeoutException', () async {
final m = Mutex();
m.lock(() async {
await Future.delayed(Duration(milliseconds: 300));
});

final result = await Isolate.run(() async {
return _lockInIsolate(serialized);
});
await expectLater(
m.lock(() async {
print('This should not get executed');
}, timeout: Duration(milliseconds: 200)),
throwsA((e) =>
e is TimeoutException &&
e.message!.contains('Failed to acquire lock')));
});

final result2 = await Isolate.run(() async {
return _lockInIsolate(serialized);
});
test('In-time timeout should function normally', () async {
final m = Mutex();
final results = [];
m.lock(() async {
await Future.delayed(Duration(milliseconds: 100));
results.add(1);
});

await mutex.lock(() async {});
await m.lock(() async {
results.add(2);
}, timeout: Duration(milliseconds: 200));

expect(result, equals(5));
expect(result2, equals(5));
});
}, timeout: const Timeout(Duration(milliseconds: 5000)));
}
expect(results, equals([1, 2]));
});

Future<Object> _lockInIsolate(
SerializedMutex smutex,
) async {
final mutex = smutex.open();
// Start a "thread" that repeatedly takes a lock
_infiniteLock(mutex).ignore();
await Future.delayed(const Duration(milliseconds: 10));
// Then close the mutex while the above loop is running.
await mutex.close();

return 5;
}
test('Different Mutex instances should cause separate locking', () async {
final m1 = Mutex();
final m2 = Mutex();

Future<void> _infiniteLock(SharedMutex mutex) async {
while (true) {
await mutex.lock(() async {
await Future.delayed(const Duration(milliseconds: 1));
final results = [];
final p1 = m1.lock(() async {
await Future.delayed(Duration(milliseconds: 300));
results.add(1);
});
}

final p2 = m2.lock(() async {
results.add(2);
});

await p1;
await p2;
expect(results, equals([2, 1]));
});
}
Loading
Loading