@@ -2,19 +2,19 @@ package org.vestifeed.sync
22
33import android.util.Log
44import kotlinx.coroutines.Dispatchers
5- import org.vestifeed.entries.EntriesRepo
5+ import kotlinx.coroutines.flow.Flow
66import kotlinx.coroutines.flow.MutableStateFlow
77import kotlinx.coroutines.flow.asStateFlow
88import kotlinx.coroutines.flow.update
99import kotlinx.coroutines.withContext
1010import org.vestifeed.api.Api
1111import org.vestifeed.db.Database
1212import java.time.Instant
13+ import java.time.OffsetDateTime
1314
1415class Sync (
1516 private val api : Api ,
1617 private val db : Database ,
17- private val entriesRepo : EntriesRepo ,
1818) {
1919 sealed class State {
2020 object Idle : State()
@@ -60,7 +60,7 @@ class Sync(
6060 Log .d(" sync" , " syncing entries" )
6161
6262 runCatching {
63- entriesRepo.syncAll ().collect { progress ->
63+ syncAllEntries ().collect { progress ->
6464 var message = " Fetching news"
6565
6666 if (progress.itemsSynced > 0 ) {
@@ -93,7 +93,7 @@ class Sync(
9393
9494 if (args.syncFlags) {
9595 runCatching {
96- entriesRepo. syncReadEntries()
96+ syncReadEntries()
9797 }.onFailure {
9898 _state .update { State .Idle }
9999 return SyncResult .Failure (
@@ -105,7 +105,7 @@ class Sync(
105105 }
106106
107107 runCatching {
108- entriesRepo. syncBookmarkedEntries()
108+ syncBookmarkedEntries()
109109 }.onFailure {
110110 _state .update { State .Idle }
111111 return SyncResult .Failure (
@@ -136,7 +136,7 @@ class Sync(
136136
137137 return if (args.syncEntries) {
138138 runCatching {
139- val newAndUpdatedEntries = entriesRepo.syncNewAndUpdated (
139+ val newAndUpdatedEntries = syncNewAndUpdatedEntries (
140140 lastEntriesSyncDateTime = db.conf.select().lastEntriesSyncDatetime,
141141 )
142142
@@ -191,4 +191,127 @@ class Sync(
191191 }
192192 Log .d(" sync" , " returning" )
193193 }
194+
195+ fun syncAllEntries (): Flow <SyncProgress > = kotlinx.coroutines.flow.flow {
196+ emit(SyncProgress (0L ))
197+
198+ var entriesLoaded = 0L
199+ emit(SyncProgress (entriesLoaded))
200+
201+ api.getEntries(false ).collect { batch ->
202+ entriesLoaded + = batch.getOrThrow().size
203+ emit(SyncProgress (entriesLoaded))
204+ db.transaction {
205+ db.entry.insertOrReplace(batch.getOrThrow())
206+ }
207+ }
208+ }
209+
210+ suspend fun syncReadEntries () {
211+ withContext(Dispatchers .IO ) {
212+ val unsyncedEntries = db.entry.selectByReadSynced(false )
213+
214+ if (unsyncedEntries.isEmpty()) {
215+ return @withContext
216+ }
217+
218+ val unsyncedReadEntries = unsyncedEntries.filter { it.extRead }
219+
220+ if (unsyncedReadEntries.isNotEmpty()) {
221+ api.markEntriesAsRead(
222+ entriesIds = unsyncedReadEntries.map { it.id },
223+ read = true ,
224+ )
225+
226+ db.transaction {
227+ unsyncedReadEntries.forEach {
228+ db.entry.updateReadSynced(true , it.id)
229+ }
230+ }
231+ }
232+
233+ val unsyncedUnreadEntries = unsyncedEntries.filter { ! it.extRead }
234+
235+ if (unsyncedUnreadEntries.isNotEmpty()) {
236+ api.markEntriesAsRead(
237+ entriesIds = unsyncedUnreadEntries.map { it.id },
238+ read = false ,
239+ )
240+
241+ db.transaction {
242+ unsyncedUnreadEntries.forEach {
243+ db.entry.updateReadSynced(true , it.id)
244+ }
245+ }
246+ }
247+ }
248+ }
249+
250+ suspend fun syncBookmarkedEntries () {
251+ withContext(Dispatchers .IO ) {
252+ val notSyncedEntries = db.entry.selectByBookmarkedSynced(false )
253+
254+ if (notSyncedEntries.isEmpty()) {
255+ return @withContext
256+ }
257+
258+ val notSyncedBookmarkedEntries = notSyncedEntries.filter { it.extBookmarked }
259+
260+ if (notSyncedBookmarkedEntries.isNotEmpty()) {
261+ api.markEntriesAsBookmarked(notSyncedBookmarkedEntries, true )
262+
263+ db.transaction {
264+ notSyncedBookmarkedEntries.forEach {
265+ db.entry.updateBookmarkedSynced(true , it.id)
266+ }
267+ }
268+ }
269+
270+ val notSyncedNotBookmarkedEntries = notSyncedEntries.filterNot { it.extBookmarked }
271+
272+ if (notSyncedNotBookmarkedEntries.isNotEmpty()) {
273+ api.markEntriesAsBookmarked(notSyncedNotBookmarkedEntries, false )
274+
275+ db.transaction {
276+ notSyncedNotBookmarkedEntries.forEach {
277+ db.entry.updateBookmarkedSynced(true , it.id)
278+ }
279+ }
280+ }
281+ }
282+ }
283+
284+ suspend fun syncNewAndUpdatedEntries (
285+ lastEntriesSyncDateTime : String ,
286+ ): Int {
287+ return withContext(Dispatchers .IO ) {
288+ val lastSyncInstant = if (lastEntriesSyncDateTime.isNotBlank()) {
289+ OffsetDateTime .parse(lastEntriesSyncDateTime)
290+ } else {
291+ null
292+ }
293+
294+ val maxUpdated = db.entry.selectMaxUpdated()
295+
296+ val maxUpdatedInstant = if (maxUpdated != null ) {
297+ OffsetDateTime .parse(maxUpdated)
298+ } else {
299+ null
300+ }
301+
302+ val entries = api.getNewAndUpdatedEntries(
303+ lastSync = lastSyncInstant,
304+ maxEntryId = db.entry.selectMaxId(),
305+ maxEntryUpdated = maxUpdatedInstant,
306+ ).getOrThrow()
307+
308+ db.transaction {
309+ db.entry.insertOrReplace(entries)
310+ }
311+
312+ entries.size
313+ }
314+ }
315+
316+ data class SyncProgress (val itemsSynced : Long )
194317}
0 commit comments