Skip to content

feat: user sync stream #16862

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 27 commits into from
Apr 17, 2025
Merged

feat: user sync stream #16862

merged 27 commits into from
Apr 17, 2025

Conversation

shenlong-tanwen
Copy link
Member

Description

  • Handles user updates and deletions through the sync stream flow
  • Handles partner updates and deletions through the sync stream flow

@shenlong-tanwen shenlong-tanwen changed the title refactor: user entity feat: user sync stream Mar 14, 2025
@shenlong-tanwen shenlong-tanwen marked this pull request as ready for review March 15, 2025 14:17
@shenlong-tanwen shenlong-tanwen force-pushed the feat/user-sync-stream branch 3 times, most recently from 4b0957c to 4d4436f Compare March 18, 2025 14:18
@shenlong-tanwen shenlong-tanwen force-pushed the feat/user-sync-stream branch 2 times, most recently from 8f62087 to 0de88c0 Compare March 20, 2025 01:39
Base automatically changed from feat/sqlite to main April 2, 2025 13:58
@shenlong-tanwen shenlong-tanwen force-pushed the feat/user-sync-stream branch 3 times, most recently from 6d0ba1a to b14d61a Compare April 3, 2025 20:28
@shenlong-tanwen shenlong-tanwen force-pushed the feat/user-sync-stream branch from b14d61a to afd664d Compare April 9, 2025 21:03
@alextran1502
Copy link
Contributor

alextran1502 commented Apr 13, 2025

Hi @shenlong-tanwen, I added a button to trigger the syncUser event to see how it works. I notice that the isolate isn't always triggered; I expect that if you log out and log back in, the sync mechanism will trigger again when you press the button. But it only works again after I hot restart the app. I think it might have to be something with the .fetch method in _userSyncCache.fetch. Can you check this?

@shenlong-tanwen
Copy link
Member Author

I think it might have to be something with the .fetch method in _userSyncCache.fetch

Yes, the only thing that could prevent us from creating another isolate is the AsyncCache. It basically ensures that we start a new isolate only after the completion of the previous one. In your case, I suspect the previous sync was in progress while you logged out and logged back in. However, it might be a good idea to have an option to stop the previous isolate when the user logs out and start a new once after they log in. I'll handle this soon. For now, I've added some additional logs to be printed during the sync process. This should help you confirm whether the previous one has stopped before you invoke the next sync

Comment on lines +61 to +62
// Wait for the logs to flush
await Future.delayed(const Duration(seconds: 2));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there no .flush() you could call or something?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is one but that can be used only when the logs are buffered. We do not buffer logs on isolates so it is not much useful here. Also, there is an issue with the flush logic currently with how Isar works and so it is disabled for now as well - #16599

DartPluginRegistrant.ensureInitialized();

final db = await Bootstrap.initIsar();
await Bootstrap.initDomain(db, shouldBufferLogs: false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why disable buffering?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On main isolate, we buffer logs and flush them every 5 seconds. However, an isolate does not need to necessarily run for 5 seconds. There is also a chance that an isolate might be killed before the logs are updated, so we disable buffering all together within the isolates

Comment on lines +5 to +6
// ignore: avoid-dynamic
final dynamic data;
Copy link
Contributor

@mertalev mertalev Apr 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should avoid dynamic to the extent possible. Ideally, only the JSON parsing would involve dynamic, and parse directly into strongly typed objects that share a type (either through an interface, by inheritance or maybe through generics). As it is, dynamic leaks into the events, the response converter map, the main sync handler, etc.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, we went with having strongly typed DTOs initially. But it resulted in us creating a lot of duplicate domain classes that were already available from the OpenAPI generated code. The generated codes are not strongly typed so we do not have a common base class that we could use here.

continue;
}

if (await _handleSyncData(type, data.map((e) => e.data))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Between the grouping by type, the data mapping here and only the last ack being called, the shape of the sync events seems wrong. Shouldn't the event be a type, an array of data and a single ack to begin with?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each event has a data, it's type and it's ack. If we handle each event separately, we would store and send only the ack of the last successful operation for a particular type, but since we group similar events and handle them in batch, we had to do them this way.

@shenlong-tanwen shenlong-tanwen force-pushed the feat/user-sync-stream branch 2 times, most recently from 100a1e7 to 6fa44ac Compare April 16, 2025 17:25
@alextran1502 alextran1502 merged commit 81ed54a into main Apr 17, 2025
44 checks passed
@alextran1502 alextran1502 deleted the feat/user-sync-stream branch April 17, 2025 15:25
savely-krasovsky pushed a commit to savely-krasovsky/immich that referenced this pull request Jun 8, 2025
* refactor: user entity

* chore: rebase fixes

* refactor: remove int user Id

* refactor: migrate store userId from int to string

* refactor: rename uid to id

* feat: drift

* pr feedback

* refactor: move common overrides to mixin

* refactor: remove int user Id

* refactor: migrate store userId from int to string

* refactor: rename uid to id

* feat: user & partner sync stream

* pr changes

* refactor: sync service and add tests

* chore: remove generated change

* chore: move sync model

* rebase: convert string ids to byte uuids

* rebase

* add processing logs

* batch db calls

* rewrite isolate manager

* rewrite with worker_manager

* misc fixes

* add sync order test

---------

Co-authored-by: shenlong-tanwen <[email protected]>
Co-authored-by: Alex <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants