diff --git a/demos/django-todolist/.metadata b/demos/django-todolist/.metadata new file mode 100644 index 00000000..391c336b --- /dev/null +++ b/demos/django-todolist/.metadata @@ -0,0 +1,45 @@ +# This file tracks properties of this Flutter project. +# Used by Flutter tool to assess capabilities and perform upgrades etc. +# +# This file should be version controlled and should not be manually edited. + +version: + revision: "5874a72aa4c779a02553007c47dacbefba2374dc" + channel: "stable" + +project_type: app + +# Tracks metadata for the flutter migrate command +migration: + platforms: + - platform: root + create_revision: 5874a72aa4c779a02553007c47dacbefba2374dc + base_revision: 5874a72aa4c779a02553007c47dacbefba2374dc + - platform: android + create_revision: 5874a72aa4c779a02553007c47dacbefba2374dc + base_revision: 5874a72aa4c779a02553007c47dacbefba2374dc + - platform: ios + create_revision: 5874a72aa4c779a02553007c47dacbefba2374dc + base_revision: 5874a72aa4c779a02553007c47dacbefba2374dc + - platform: linux + create_revision: 5874a72aa4c779a02553007c47dacbefba2374dc + base_revision: 5874a72aa4c779a02553007c47dacbefba2374dc + - platform: macos + create_revision: 5874a72aa4c779a02553007c47dacbefba2374dc + base_revision: 5874a72aa4c779a02553007c47dacbefba2374dc + - platform: web + create_revision: 5874a72aa4c779a02553007c47dacbefba2374dc + base_revision: 5874a72aa4c779a02553007c47dacbefba2374dc + - platform: windows + create_revision: 5874a72aa4c779a02553007c47dacbefba2374dc + base_revision: 5874a72aa4c779a02553007c47dacbefba2374dc + + # User provided section + + # List of Local paths (relative to this file) that should be + # ignored by the migrate tool. + # + # Files that are not part of the templates will be ignored by default. + unmanaged_files: + - 'lib/main.dart' + - 'ios/Runner.xcodeproj/project.pbxproj' diff --git a/demos/django-todolist/android/app/src/main/kotlin/co/powersync/django_todolist/MainActivity.kt b/demos/django-todolist/android/app/src/main/kotlin/co/powersync/django_todolist/MainActivity.kt new file mode 100644 index 00000000..f5cbc4a3 --- /dev/null +++ b/demos/django-todolist/android/app/src/main/kotlin/co/powersync/django_todolist/MainActivity.kt @@ -0,0 +1,5 @@ +package co.powersync.django_todolist + +import io.flutter.embedding.android.FlutterActivity + +class MainActivity: FlutterActivity() diff --git a/demos/django-todolist/ios/RunnerTests/RunnerTests.swift b/demos/django-todolist/ios/RunnerTests/RunnerTests.swift new file mode 100644 index 00000000..86a7c3b1 --- /dev/null +++ b/demos/django-todolist/ios/RunnerTests/RunnerTests.swift @@ -0,0 +1,12 @@ +import Flutter +import UIKit +import XCTest + +class RunnerTests: XCTestCase { + + func testExample() { + // If you add code to the Runner application, consider adding tests here. + // See https://developer.apple.com/documentation/xctest for more information about using XCTest. + } + +} diff --git a/demos/django-todolist/lib/powersync.dart b/demos/django-todolist/lib/powersync.dart index 4a40b4a2..0b4b9277 100644 --- a/demos/django-todolist/lib/powersync.dart +++ b/demos/django-todolist/lib/powersync.dart @@ -1,4 +1,5 @@ // This file performs setup of the PowerSync database +import 'package:flutter/foundation.dart'; import 'package:logging/logging.dart'; import 'package:path/path.dart'; import 'package:path_provider/path_provider.dart'; @@ -99,8 +100,14 @@ Future isLoggedIn() async { } Future getDatabasePath() async { + const dbFilename = 'powersync-demo.db'; + // getApplicationSupportDirectory is not supported on Web + if (kIsWeb) { + return dbFilename; + } + final dir = await getApplicationSupportDirectory(); - return join(dir.path, 'powersync-demo.db'); + return join(dir.path, dbFilename); } // opens the database and connects if logged in diff --git a/demos/django-todolist/pubspec.lock b/demos/django-todolist/pubspec.lock index 06cdf7ca..0930169d 100644 --- a/demos/django-todolist/pubspec.lock +++ b/demos/django-todolist/pubspec.lock @@ -164,18 +164,18 @@ packages: dependency: transitive description: name: leak_tracker - sha256: "7f0df31977cb2c0b88585095d168e689669a2cc9b97c309665e3386f3e9d341a" + sha256: "3f87a60e8c63aecc975dda1ceedbc8f24de75f09e4856ea27daf8958f2f0ce05" url: "https://pub.dev" source: hosted - version: "10.0.4" + version: "10.0.5" leak_tracker_flutter_testing: dependency: transitive description: name: leak_tracker_flutter_testing - sha256: "06e98f569d004c1315b991ded39924b21af84cf14cc94791b8aea337d25b57f8" + sha256: "932549fb305594d82d7183ecd9fa93463e9914e1b67cacc34bc40906594a1806" url: "https://pub.dev" source: hosted - version: "3.0.3" + version: "3.0.5" leak_tracker_testing: dependency: transitive description: @@ -212,18 +212,18 @@ packages: dependency: transitive description: name: material_color_utilities - sha256: "0e0a020085b65b6083975e499759762399b4475f766c21668c4ecca34ea74e5a" + sha256: f7142bb1154231d7ea5f96bc7bde4bda2a0945d2806bb11670e30b850d56bdec url: "https://pub.dev" source: hosted - version: "0.8.0" + version: "0.11.1" meta: dependency: transitive description: name: meta - sha256: "7687075e408b093f36e6bbf6c91878cc0d4cd10f409506f7bc996f68220b9136" + sha256: bdb68674043280c3428e9ec998512fb681678676b3c54e773629ffe74419f8c7 url: "https://pub.dev" source: hosted - version: "1.12.0" + version: "1.15.0" mutex: dependency: transitive description: @@ -479,10 +479,10 @@ packages: dependency: transitive description: name: test_api - sha256: "9955ae474176f7ac8ee4e989dadfb411a58c30415bcfb648fa04b2b8a03afa7f" + sha256: "5b8a98dafc4d5c4c9c72d8b31ab2b23fc13422348d2997120294d3bac86b4ddb" url: "https://pub.dev" source: hosted - version: "0.7.0" + version: "0.7.2" typed_data: dependency: transitive description: @@ -519,10 +519,10 @@ packages: dependency: transitive description: name: vm_service - sha256: "3923c89304b715fb1eb6423f017651664a03bf5f4b29983627c4da791f74a4ec" + sha256: "5c5f338a667b4c644744b661f309fb8080bb94b18a7e91ef1dbd343bed00ed6d" url: "https://pub.dev" source: hosted - version: "14.2.1" + version: "14.2.5" web: dependency: transitive description: diff --git a/demos/django-todolist/pubspec.yaml b/demos/django-todolist/pubspec.yaml index 4a9bd9af..7158d730 100644 --- a/demos/django-todolist/pubspec.yaml +++ b/demos/django-todolist/pubspec.yaml @@ -24,5 +24,9 @@ dev_dependencies: flutter_lints: ^3.0.1 +dependency_overrides: + sqlite_async: + path: /home/simon/src/sqlite_async.dart/packages/sqlite_async + flutter: uses-material-design: true diff --git a/demos/django-todolist/web/favicon.png b/demos/django-todolist/web/favicon.png new file mode 100644 index 00000000..8aaa46ac Binary files /dev/null and b/demos/django-todolist/web/favicon.png differ diff --git a/demos/django-todolist/web/icons/Icon-192.png b/demos/django-todolist/web/icons/Icon-192.png new file mode 100644 index 00000000..b749bfef Binary files /dev/null and b/demos/django-todolist/web/icons/Icon-192.png differ diff --git a/demos/django-todolist/web/icons/Icon-512.png b/demos/django-todolist/web/icons/Icon-512.png new file mode 100644 index 00000000..88cfd48d Binary files /dev/null and b/demos/django-todolist/web/icons/Icon-512.png differ diff --git a/demos/django-todolist/web/icons/Icon-maskable-192.png b/demos/django-todolist/web/icons/Icon-maskable-192.png new file mode 100644 index 00000000..eb9b4d76 Binary files /dev/null and b/demos/django-todolist/web/icons/Icon-maskable-192.png differ diff --git a/demos/django-todolist/web/icons/Icon-maskable-512.png b/demos/django-todolist/web/icons/Icon-maskable-512.png new file mode 100644 index 00000000..d69c5669 Binary files /dev/null and b/demos/django-todolist/web/icons/Icon-maskable-512.png differ diff --git a/demos/django-todolist/web/index.html b/demos/django-todolist/web/index.html new file mode 100644 index 00000000..79c42864 --- /dev/null +++ b/demos/django-todolist/web/index.html @@ -0,0 +1,38 @@ + + + + + + + + + + + + + + + + + + + + django_todolist + + + + + + diff --git a/demos/django-todolist/web/manifest.json b/demos/django-todolist/web/manifest.json new file mode 100644 index 00000000..aa135b47 --- /dev/null +++ b/demos/django-todolist/web/manifest.json @@ -0,0 +1,35 @@ +{ + "name": "django_todolist", + "short_name": "django_todolist", + "start_url": ".", + "display": "standalone", + "background_color": "#0175C2", + "theme_color": "#0175C2", + "description": "A new Flutter project.", + "orientation": "portrait-primary", + "prefer_related_applications": false, + "icons": [ + { + "src": "icons/Icon-192.png", + "sizes": "192x192", + "type": "image/png" + }, + { + "src": "icons/Icon-512.png", + "sizes": "512x512", + "type": "image/png" + }, + { + "src": "icons/Icon-maskable-192.png", + "sizes": "192x192", + "type": "image/png", + "purpose": "maskable" + }, + { + "src": "icons/Icon-maskable-512.png", + "sizes": "512x512", + "type": "image/png", + "purpose": "maskable" + } + ] +} diff --git a/demos/supabase-anonymous-auth/pubspec.yaml b/demos/supabase-anonymous-auth/pubspec.yaml index 982f10ee..ec6b70d6 100644 --- a/demos/supabase-anonymous-auth/pubspec.yaml +++ b/demos/supabase-anonymous-auth/pubspec.yaml @@ -25,5 +25,9 @@ dev_dependencies: flutter_lints: ^3.0.1 +dependency_overrides: + sqlite_async: + path: /home/simon/src/sqlite_async.dart/packages/sqlite_async + flutter: uses-material-design: true diff --git a/demos/supabase-edge-function-auth/pubspec.yaml b/demos/supabase-edge-function-auth/pubspec.yaml index 6bd0a91b..c10c047c 100644 --- a/demos/supabase-edge-function-auth/pubspec.yaml +++ b/demos/supabase-edge-function-auth/pubspec.yaml @@ -25,5 +25,9 @@ dev_dependencies: flutter_lints: ^3.0.1 +dependency_overrides: + sqlite_async: + path: /home/simon/src/sqlite_async.dart/packages/sqlite_async + flutter: uses-material-design: true diff --git a/demos/supabase-simple-chat/pubspec.yaml b/demos/supabase-simple-chat/pubspec.yaml index 9968ad8b..a0dc5712 100644 --- a/demos/supabase-simple-chat/pubspec.yaml +++ b/demos/supabase-simple-chat/pubspec.yaml @@ -54,6 +54,10 @@ dev_dependencies: # rules and activating additional ones. flutter_lints: ^3.0.1 +dependency_overrides: + sqlite_async: + path: /home/simon/src/sqlite_async.dart/packages/sqlite_async + # For information on the generic Dart part of this file, see the # following page: https://dart.dev/tools/pub/pubspec diff --git a/demos/supabase-todolist/pubspec.yaml b/demos/supabase-todolist/pubspec.yaml index d4d64314..ed8627f4 100644 --- a/demos/supabase-todolist/pubspec.yaml +++ b/demos/supabase-todolist/pubspec.yaml @@ -27,5 +27,9 @@ dev_dependencies: flutter_lints: ^3.0.1 +dependency_overrides: + sqlite_async: + path: /home/simon/src/sqlite_async.dart/packages/sqlite_async + flutter: uses-material-design: true diff --git a/packages/powersync/lib/src/database/powersync_db_mixin.dart b/packages/powersync/lib/src/database/powersync_db_mixin.dart index 05e5bae0..af8156e1 100644 --- a/packages/powersync/lib/src/database/powersync_db_mixin.dart +++ b/packages/powersync/lib/src/database/powersync_db_mixin.dart @@ -71,11 +71,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { @protected Future baseInit() async { statusStream = statusStreamController.stream; - updates = database.updates - .map((update) => - PowerSyncUpdateNotification.fromUpdateNotification(update)) - .where((update) => update.isNotEmpty) - .cast(); + updates = powerSyncUpdateNotifications(database.updates); await database.initialize(); await _checkVersion(); @@ -466,3 +462,12 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { await database.refreshSchema(); } } + +Stream powerSyncUpdateNotifications( + Stream inner) { + return inner + .map((update) => + PowerSyncUpdateNotification.fromUpdateNotification(update)) + .where((update) => update.isNotEmpty) + .cast(); +} diff --git a/packages/powersync/lib/src/database/web/web_powersync_database.dart b/packages/powersync/lib/src/database/web/web_powersync_database.dart index abf3305e..cfd52d52 100644 --- a/packages/powersync/lib/src/database/web/web_powersync_database.dart +++ b/packages/powersync/lib/src/database/web/web_powersync_database.dart @@ -15,6 +15,8 @@ import 'package:powersync/src/streaming_sync.dart'; import 'package:sqlite_async/sqlite_async.dart'; import 'package:powersync/src/schema_logic.dart' as schema_logic; +import '../../web/sync_controller.dart'; + /// A PowerSync managed database. /// /// Web implementation for [PowerSyncDatabase] @@ -93,7 +95,8 @@ class PowerSyncDatabaseImpl Logger? logger}) { final db = SqliteDatabase.withFactory(openFactory, maxReaders: 1); return PowerSyncDatabaseImpl.withDatabase( - schema: schema, logger: logger, database: db); + schema: schema, logger: logger, database: db) + ..openFactory = openFactory; } /// Open a PowerSyncDatabase on an existing [SqliteDatabase]. @@ -119,14 +122,15 @@ class PowerSyncDatabaseImpl /// The connection is automatically re-opened if it fails for any reason. /// /// Status changes are reported on [statusStream]. - baseConnect( - {required PowerSyncBackendConnector connector, - - /// Throttle time between CRUD operations - /// Defaults to 10 milliseconds. - required Duration crudThrottleTime, - required Future Function() reconnect, - Map? params}) async { + baseConnect({ + required PowerSyncBackendConnector connector, + + /// Throttle time between CRUD operations + /// Defaults to 10 milliseconds. + required Duration crudThrottleTime, + required Future Function() reconnect, + Map? params, + }) async { await initialize(); // Disconnect if connected @@ -135,9 +139,23 @@ class PowerSyncDatabaseImpl await isInitialized; - // TODO better multitab support final storage = BucketStorage(database); - final sync = StreamingSyncImplementation( + StreamingSync sync; + // Try using a shared worker for the synchronization implementation to avoid + // duplicating work across tabs. + try { + sync = await SyncWorkerHandle.start( + this, + connector, + Uri.base.resolve('/powersync_sync.worker.js'), + ); + } catch (e) { + logger.warning( + 'Could not use shared worker for synchronization, falling back to locks.', + e, + ); + + sync = StreamingSyncImplementation( adapter: storage, credentialsCallback: connector.getCredentialsCached, invalidCredentialsCallback: connector.fetchCredentials, @@ -148,7 +166,10 @@ class PowerSyncDatabaseImpl syncParameters: params, // Only allows 1 sync implementation to run at a time per database // This should be global (across tabs) when using Navigator locks. - identifier: database.openFactory.path); + identifier: database.openFactory.path, + ); + } + sync.statusStream.listen((event) { setStatus(event); }); diff --git a/packages/powersync/lib/src/streaming_sync.dart b/packages/powersync/lib/src/streaming_sync.dart index 70760878..563e401a 100644 --- a/packages/powersync/lib/src/streaming_sync.dart +++ b/packages/powersync/lib/src/streaming_sync.dart @@ -19,7 +19,16 @@ import 'sync_types.dart'; /// a different value to indicate "no error". const _noError = Object(); -class StreamingSyncImplementation { +abstract interface class StreamingSync { + Stream get statusStream; + + Future streamingSync(); + + /// Close any active streams. + Future abort(); +} + +class StreamingSyncImplementation implements StreamingSync { BucketStorage adapter; final Future Function() credentialsCallback; @@ -31,6 +40,8 @@ class StreamingSyncImplementation { final StreamController _statusStreamController = StreamController.broadcast(); + + @override late final Stream statusStream; late final http.Client _client; @@ -74,7 +85,7 @@ class StreamingSyncImplementation { statusStream = _statusStreamController.stream; } - /// Close any active streams. + @override Future abort() async { // If streamingSync() hasn't been called yet, _abort will be null. var future = _abort?.abort(); @@ -107,6 +118,7 @@ class StreamingSyncImplementation { return lastStatus.connected; } + @override Future streamingSync() async { try { _abort = AbortController(); diff --git a/packages/powersync/lib/src/web/sync_controller.dart b/packages/powersync/lib/src/web/sync_controller.dart new file mode 100644 index 00000000..60bf504e --- /dev/null +++ b/packages/powersync/lib/src/web/sync_controller.dart @@ -0,0 +1,100 @@ +import 'dart:async'; +import 'dart:js_interop'; + +import 'package:powersync/powersync.dart'; +import 'package:sqlite_async/web.dart'; +import 'package:web/web.dart'; + +import '../database/web/web_powersync_database.dart'; +import '../streaming_sync.dart'; +import 'sync_worker_protocol.dart'; + +class SyncWorkerHandle implements StreamingSync { + final PowerSyncDatabaseImpl _database; + final PowerSyncBackendConnector _connector; + + late final WorkerCommunicationChannel _channel; + + final StreamController _status = StreamController.broadcast(); + + SyncWorkerHandle._(this._database, this._connector, MessagePort sendToWorker, + SharedWorker worker) { + _channel = WorkerCommunicationChannel( + port: sendToWorker, + errors: EventStreamProviders.errorEvent.forTarget(worker), + requestHandler: (type, payload) async { + switch (type) { + case SyncWorkerMessageType.requestEndpoint: + final endpoint = await (_database.database as WebSqliteConnection) + .exposeEndpoint(); + + return ( + WebEndpoint( + databaseName: endpoint.connectName, + databasePort: endpoint.connectPort, + lockName: endpoint.lockName, + ), + [endpoint.connectPort].toJS + ); + case SyncWorkerMessageType.uploadCrud: + await _connector.uploadData(_database); + return (JSObject(), null); + case SyncWorkerMessageType.invalidCredentialsCallback: + final credentials = await _connector.fetchCredentials(); + return ( + credentials != null + ? SerializedCredentials.from(credentials) + : null, + null + ); + case SyncWorkerMessageType.credentialsCallback: + final credentials = await _connector.getCredentialsCached(); + return ( + credentials != null + ? SerializedCredentials.from(credentials) + : null, + null + ); + default: + throw StateError('Unexpected message type $type'); + } + }, + ); + + _channel.events.listen((data) { + final (type, payload) = data; + if (type == SyncWorkerMessageType.notifySyncStatus) { + _status.add((payload as SerializedSyncStatus).asSyncStatus()); + } + }); + } + + static Future start(PowerSyncDatabaseImpl database, + PowerSyncBackendConnector connector, Uri workerUri) async { + final worker = SharedWorker(workerUri.toString().toJS); + final handle = SyncWorkerHandle._(database, connector, worker.port, worker); + + // Make sure that the worker is working, or throw immediately. + await handle._channel.ping(); + + return handle; + } + + Future close() async { + await abort(); + await _channel.close(); + } + + @override + Future abort() async { + await _channel.abortSynchronization(); + } + + @override + Stream get statusStream => _status.stream; + + @override + Future streamingSync() async { + await _channel.startSynchronization(_database.openFactory.path); + } +} diff --git a/packages/powersync/lib/src/web/sync_worker.dart b/packages/powersync/lib/src/web/sync_worker.dart new file mode 100644 index 00000000..ecec0804 --- /dev/null +++ b/packages/powersync/lib/src/web/sync_worker.dart @@ -0,0 +1,258 @@ +/// This file needs to be compiled to JavaScript with the command +/// dart compile js -O4 packages/powersync/lib/src/web/sync_worker.worker.dart -o assets/db_worker.js +/// The output should then be included in each project's `web` directory +library; + +import 'dart:async'; +import 'dart:js_interop'; + +import 'package:async/async.dart'; +import 'package:fetch_client/fetch_client.dart'; +import 'package:powersync/powersync.dart'; +import 'package:powersync/src/streaming_sync.dart'; +import 'package:sqlite_async/web.dart'; +import 'package:web/web.dart' hide RequestMode; + +import '../bucket_storage.dart'; +import '../database/powersync_db_mixin.dart'; +import 'sync_worker_protocol.dart'; + +final _logger = autoLogger; + +void main() { + _SyncWorker().start(); +} + +class _SyncWorker { + final SharedWorkerGlobalScope _self; + final Map _requestedSyncTasks = {}; + + _SyncWorker() : _self = globalContext as SharedWorkerGlobalScope; + + void start() async { + // Start listening for connect events, each signifies a client connecting + // to this worker. + EventStreamProviders.connectEvent.forTarget(_self).listen((e) { + final ports = (e as MessageEvent).ports.toDart; + for (final port in ports) { + _ConnectedClient(port, this); + } + }); + } + + _SyncRunner referenceSyncTask( + String databaseIdentifier, _ConnectedClient client) { + return _requestedSyncTasks.putIfAbsent(databaseIdentifier, () { + return _SyncRunner(databaseIdentifier); + }) + ..registerClient(client); + } +} + +class _ConnectedClient { + late WorkerCommunicationChannel channel; + final _SyncWorker _worker; + + _SyncRunner? _runner; + StreamSubscription? _logSubscription; + + _ConnectedClient(MessagePort port, this._worker) { + channel = WorkerCommunicationChannel( + port: port, + requestHandler: (type, payload) async { + switch (type) { + case SyncWorkerMessageType.startSynchronization: + final request = payload as StartSynchronization; + _runner = _worker.referenceSyncTask(request.databaseName, this); + return (JSObject(), null); + case SyncWorkerMessageType.abortSynchronization: + _runner?.unregisterClient(this); + _runner = null; + return (JSObject(), null); + default: + throw StateError('Unexpected message type $type'); + } + }, + ); + + _logSubscription = _logger.onRecord.listen((record) { + final msg = StringBuffer( + '[${record.loggerName}] ${record.level.name}: ${record.time}: ${record.message}'); + + if (record.error != null) { + msg + ..writeln() + ..write(record.error); + } + if (record.stackTrace != null) { + msg + ..writeln() + ..write(record.stackTrace); + } + + channel.notify(SyncWorkerMessageType.logEvent, msg.toString().toJS); + }); + } + + void markClosed() { + _logSubscription?.cancel(); + _runner?.unregisterClient(this); + _runner = null; + } +} + +class _SyncRunner { + final String identifier; + + final StreamGroup<_RunnerEvent> _group = StreamGroup(); + final StreamController<_RunnerEvent> _mainEvents = StreamController(); + + StreamingSync? sync; + _ConnectedClient? databaseHost; + final connections = <_ConnectedClient>[]; + + _SyncRunner(this.identifier) { + _group.add(_mainEvents.stream); + + Future(() async { + await for (final event in _group.stream) { + try { + switch (event) { + case _AddConnection(:final client): + connections.add(client); + if (sync == null) { + await _requestDatabase(client); + } + case _RemoveConnection(:final client): + connections.remove(client); + if (connections.isEmpty) { + await sync?.abort(); + sync = null; + } + case _ActiveDatabaseClosed(): + _logger.info('Remote database closed, finding a new client'); + sync?.abort(); + sync = null; + + // The only reliable notification we get for a client closing is + // when that client is currently hosting the database. Use the + // opportunity to check whether secondary clients have also closed + // in the meantime. + final newHost = await _collectActiveClients(); + if (newHost == null) { + _logger.info('No client remains'); + } else { + await _requestDatabase(newHost); + } + } + } catch (e, s) { + _logger.warning('Error handling $event', e, s); + } + } + }); + } + + /// Pings all current [connections], removing those that don't answer in 5s + /// (as they are likely closed tabs as well). + /// + /// Returns the first client that responds (without waiting for others). + Future<_ConnectedClient?> _collectActiveClients() async { + final candidates = connections.toList(); + if (candidates.isEmpty) { + return null; + } + + final firstResponder = Completer<_ConnectedClient?>(); + var pendingRequests = candidates.length; + + for (final candidate in candidates) { + candidate.channel.ping().then((_) { + pendingRequests--; + if (!firstResponder.isCompleted) { + firstResponder.complete(candidate); + } + }).timeout(const Duration(seconds: 5), onTimeout: () { + pendingRequests--; + candidate.markClosed(); + if (pendingRequests == 0 && !firstResponder.isCompleted) { + // All requests have timed out, no connection remains + firstResponder.complete(null); + } + }); + } + + return firstResponder.future; + } + + Future _requestDatabase(_ConnectedClient client) async { + _logger.info('Sync setup: Requesting database'); + + // This is the first client, ask for a database connection + final connection = await client.channel.requestDatabase(); + _logger.info('Sync setup: Connecting to endpoint'); + final database = await WebSqliteConnection.connectToEndpoint(( + connectPort: connection.databasePort, + connectName: connection.databaseName, + lockName: connection.lockName, + )); + _logger.info('Sync setup: Has database, starting sync!'); + databaseHost = client; + + database.closedFuture.then((_) { + _logger.fine('Detected closed client'); + client.markClosed(); + + if (client == databaseHost) { + _logger + .info('Tab providing sync database has gone down, reconnecting...'); + _mainEvents.add(const _ActiveDatabaseClosed()); + } + }); + + sync = StreamingSyncImplementation( + adapter: BucketStorage(database), + credentialsCallback: client.channel.credentialsCallback, + invalidCredentialsCallback: client.channel.invalidCredentialsCallback, + uploadCrud: client.channel.uploadCrud, + updateStream: powerSyncUpdateNotifications( + database.updates ?? const Stream.empty()), + retryDelay: Duration(seconds: 3), + client: FetchClient(mode: RequestMode.cors), + identifier: identifier, + ); + sync!.statusStream.listen((event) { + _logger.fine('Broadcasting sync event: $event'); + for (final client in connections) { + client.channel.notify(SyncWorkerMessageType.notifySyncStatus, + SerializedSyncStatus.from(event)); + } + }); + sync!.streamingSync(); + } + + void registerClient(_ConnectedClient client) { + _mainEvents.add(_AddConnection(client)); + } + + void unregisterClient(_ConnectedClient client) { + _mainEvents.add(_RemoveConnection(client)); + } +} + +sealed class _RunnerEvent {} + +final class _AddConnection implements _RunnerEvent { + final _ConnectedClient client; + + _AddConnection(this.client); +} + +final class _RemoveConnection implements _RunnerEvent { + final _ConnectedClient client; + + _RemoveConnection(this.client); +} + +final class _ActiveDatabaseClosed implements _RunnerEvent { + const _ActiveDatabaseClosed(); +} diff --git a/packages/powersync/lib/src/web/sync_worker_protocol.dart b/packages/powersync/lib/src/web/sync_worker_protocol.dart new file mode 100644 index 00000000..2dc138f9 --- /dev/null +++ b/packages/powersync/lib/src/web/sync_worker_protocol.dart @@ -0,0 +1,359 @@ +import 'dart:async'; +import 'dart:js_interop'; + +import 'package:web/web.dart'; + +import '../connector.dart'; +import '../log.dart'; +import '../sync_status.dart'; + +/// Names used in [SyncWorkerMessage] +enum SyncWorkerMessageType { + ping, + + /// Sent from client to the sync worker to request the synchronization + /// starting. + startSynchronization, + + /// Te [SyncWorkerMessage.payload] for the request is a numeric id, the + /// response can be anything (void). + abortSynchronization, + + /// Sent from the sync worker to the client when it needs an endpoint to + /// connect to to start the synchronization. + /// + /// [SyncWorkerMessage.payload] is a numeric request id, the response sends + /// a [WebEndpoint]. + requestEndpoint, + + /// Invoke the `uploadCrud`, `fetchCredentials` and `getCredentialsCached` + /// methods on the client. + /// + /// For requests, the [SyncWorkerMessage.payload] is a numeric request id. + /// The response sends either a [SerializedCredentials] object or an empty + /// object for uploads. + uploadCrud, + invalidCredentialsCallback, + credentialsCallback, + + /// Notifies clients that the sync status has changed - the payload consists + /// of [SerializedSyncStatus]. + notifySyncStatus, + + /// Notifies clients about a log event emitted by the worker (typically only + /// used when workers were compiled in debug mode). + /// The payload is a [JSString]. + logEvent, + + okResponse, + errorResponse, +} + +@anonymous +extension type SyncWorkerMessage._(JSObject _) implements JSObject { + external factory SyncWorkerMessage( + {required String type, required JSAny payload}); + + external String get type; + external JSAny get payload; +} + +@anonymous +extension type StartSynchronization._(JSObject _) implements JSObject { + external factory StartSynchronization({ + required String databaseName, + required int requestId, + }); + + external String get databaseName; + external int get requestId; +} + +@anonymous +extension type WebEndpoint._(JSObject _) implements JSObject { + external factory WebEndpoint({ + required String databaseName, + required MessagePort databasePort, + required String? lockName, + }); + + external String get databaseName; + external String? get lockName; + external MessagePort get databasePort; +} + +@anonymous +extension type OkResponse._(JSObject _) implements JSObject { + external factory OkResponse({ + required int requestId, + required JSAny? payload, + }); + + external int get requestId; + external JSAny? get payload; +} + +@anonymous +extension type ErrorResponse._(JSObject _) implements JSObject { + external factory ErrorResponse({ + required int requestId, + required JSString errorMessage, + }); + + external int get requestId; + external JSString get errorMessage; +} + +@anonymous +extension type SerializedCredentials._(JSObject _) implements JSObject { + external factory SerializedCredentials({ + required JSString endpoint, + required JSString token, + required JSString? userId, + required JSNumber? expiresAt, + }); + + factory SerializedCredentials.from(PowerSyncCredentials credentials) { + return SerializedCredentials( + endpoint: credentials.endpoint.toJS, + token: credentials.token.toJS, + userId: credentials.userId?.toJS, + expiresAt: credentials.expiresAt?.microsecondsSinceEpoch.toJS, + ); + } + + external JSString get endpoint; + external JSString get token; + external JSString? get userId; + external JSNumber? get expiresAt; + + PowerSyncCredentials asCredentials() { + return PowerSyncCredentials( + endpoint: endpoint.toDart, + token: token.toDart, + userId: userId?.toDart, + expiresAt: expiresAt.isUndefinedOrNull + ? null + : DateTime.fromMicrosecondsSinceEpoch(expiresAt!.toDartInt), + ); + } +} + +@anonymous +extension type SerializedSyncStatus._(JSObject _) implements JSObject { + external factory SerializedSyncStatus({ + required bool connected, + required bool connecting, + required bool downloading, + required bool uploading, + required int? lastSyncedAt, + required bool? hasSyned, + required String? uploadError, + required String? downloadError, + }); + + factory SerializedSyncStatus.from(SyncStatus status) { + return SerializedSyncStatus( + connected: status.connected, + connecting: status.connecting, + downloading: status.downloading, + uploading: status.uploading, + lastSyncedAt: status.lastSyncedAt?.microsecondsSinceEpoch, + hasSyned: status.hasSynced, + uploadError: status.uploadError?.toString(), + downloadError: status.downloadError?.toString(), + ); + } + + external bool get connected; + external bool get connecting; + external bool get downloading; + external bool get uploading; + external int? lastSyncedAt; + external bool? hasSynced; + external String? uploadError; + external String? downloadError; + + SyncStatus asSyncStatus() { + return SyncStatus( + connected: connected, + connecting: connecting, + downloading: downloading, + uploading: uploading, + lastSyncedAt: lastSyncedAt == null + ? null + : DateTime.fromMicrosecondsSinceEpoch(lastSyncedAt!), + hasSynced: hasSynced, + uploadError: uploadError, + downloadError: downloadError, + ); + } +} + +final class WorkerCommunicationChannel { + static final _logger = autoLogger; + + final Map> _pendingRequests = {}; + int _nextRequestId = 0; + bool _hasError = false; + StreamSubscription? _incomingMessages; + StreamSubscription? _incomingErrors; + + final MessagePort port; + final FutureOr<(JSAny?, JSArray?)> Function(SyncWorkerMessageType, JSAny) + requestHandler; + final StreamController<(SyncWorkerMessageType, JSAny)> _events = + StreamController(); + + Stream<(SyncWorkerMessageType, JSAny)> get events => _events.stream; + + WorkerCommunicationChannel({ + required this.port, + required this.requestHandler, + Stream? errors, + }) { + port.start(); + _incomingErrors = errors?.listen((event) { + _hasError = true; + + _pendingRequests.forEach((_, value) { + value.completeError('Worker error: $event'); + }); + _pendingRequests.clear(); + }); + + _incomingMessages = + EventStreamProviders.messageEvent.forTarget(port).listen((event) async { + final message = event.data as SyncWorkerMessage; + final type = SyncWorkerMessageType.values.byName(message.type); + _logger.fine('[in] $type'); + + int requestId; + + switch (type) { + case SyncWorkerMessageType.ping: + requestId = (message.payload as JSNumber).toDartInt; + port.postMessage(SyncWorkerMessage( + type: SyncWorkerMessageType.okResponse.name, + payload: OkResponse(requestId: requestId, payload: null), + )); + return; + case SyncWorkerMessageType.startSynchronization: + requestId = (message.payload as StartSynchronization).requestId; + case SyncWorkerMessageType.requestEndpoint: + case SyncWorkerMessageType.abortSynchronization: + case SyncWorkerMessageType.credentialsCallback: + case SyncWorkerMessageType.invalidCredentialsCallback: + case SyncWorkerMessageType.uploadCrud: + requestId = (message.payload as JSNumber).toDartInt; + case SyncWorkerMessageType.okResponse: + final payload = message.payload as OkResponse; + _pendingRequests.remove(payload.requestId)!.complete(payload.payload); + return; + case SyncWorkerMessageType.errorResponse: + final payload = message.payload as ErrorResponse; + _pendingRequests + .remove(payload.requestId)! + .completeError(payload.errorMessage.toDart); + return; + case SyncWorkerMessageType.notifySyncStatus: + _events.add((type, message.payload)); + return; + case SyncWorkerMessageType.logEvent: + final msg = (message.payload as JSString).toDart; + _logger.info('[Sync Worker]: $msg'); + return; + } + + try { + final (response, transfer) = + await requestHandler(type, message.payload); + final responseMessage = SyncWorkerMessage( + type: SyncWorkerMessageType.okResponse.name, + payload: OkResponse(requestId: requestId, payload: response), + ); + + if (transfer != null) { + port.postMessage(responseMessage, transfer); + } else { + port.postMessage(responseMessage); + } + } catch (e) { + port.postMessage(SyncWorkerMessage( + type: SyncWorkerMessageType.errorResponse.name, + payload: ErrorResponse( + requestId: requestId, errorMessage: e.toString().toJS), + )); + } + }); + } + + (int, Future) _newRequest() { + if (_hasError) { + throw StateError('Channel has error, cannot send new requests'); + } + + final id = _nextRequestId++; + final completer = _pendingRequests[id] = Completer.sync(); + return (id, completer.future); + } + + Future _numericRequest(SyncWorkerMessageType type) { + final (id, future) = _newRequest(); + port.postMessage(SyncWorkerMessage(type: type.name, payload: id.toJS)); + return future; + } + + void notify(SyncWorkerMessageType notificationType, JSAny payload) { + port.postMessage( + SyncWorkerMessage(type: notificationType.name, payload: payload)); + } + + Future ping() async { + await _numericRequest(SyncWorkerMessageType.ping); + } + + Future startSynchronization(String databaseName) async { + final (id, completion) = _newRequest(); + port.postMessage(SyncWorkerMessage( + type: SyncWorkerMessageType.startSynchronization.name, + payload: StartSynchronization(databaseName: databaseName, requestId: id), + )); + await completion; + } + + Future abortSynchronization() async { + await _numericRequest(SyncWorkerMessageType.abortSynchronization); + } + + // Called by the sync worker to request a [WebEndpoint] for the database + // managed by the client. + Future requestDatabase() async { + return await _numericRequest(SyncWorkerMessageType.requestEndpoint) + as WebEndpoint; + } + + Future credentialsCallback() async { + final serialized = + await _numericRequest(SyncWorkerMessageType.credentialsCallback) + as SerializedCredentials?; + return serialized?.asCredentials(); + } + + Future invalidCredentialsCallback() async { + final serialized = + await _numericRequest(SyncWorkerMessageType.invalidCredentialsCallback) + as SerializedCredentials?; + return serialized?.asCredentials(); + } + + Future uploadCrud() async { + await _numericRequest(SyncWorkerMessageType.uploadCrud); + } + + Future close() async { + _incomingMessages?.cancel(); + _incomingErrors?.cancel(); + port.close(); + } +} diff --git a/packages/powersync/pubspec.yaml b/packages/powersync/pubspec.yaml index f4d7de04..2ebf0e38 100644 --- a/packages/powersync/pubspec.yaml +++ b/packages/powersync/pubspec.yaml @@ -27,6 +27,8 @@ dependencies: pubspec_parse: ^1.3.0 args: ^2.5.0 pub_semver: ^2.1.4 + js: ^0.7.0 + web: ^1.0.0 dev_dependencies: dcli: ^4.0.0 lints: ^3.0.0 @@ -38,7 +40,6 @@ dev_dependencies: shelf_static: ^1.1.2 stream_channel: ^2.1.2 path: ^1.8.3 - js: ">=0.6.7 <0.8.0" platforms: android: diff --git a/packages/powersync_attachments_helper/pubspec.yaml b/packages/powersync_attachments_helper/pubspec.yaml index f4bafc1a..d98ddc55 100644 --- a/packages/powersync_attachments_helper/pubspec.yaml +++ b/packages/powersync_attachments_helper/pubspec.yaml @@ -19,6 +19,10 @@ dev_dependencies: lints: ^3.0.0 test: ^1.25.2 +dependency_overrides: + sqlite_async: + path: /home/simon/src/sqlite_async.dart/packages/sqlite_async + platforms: android: ios: