forked from rommapp/romm
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsync_watcher.py
More file actions
334 lines (286 loc) · 10.9 KB
/
sync_watcher.py
File metadata and controls
334 lines (286 loc) · 10.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
"""Sync folder watcher for File Transfer mode.
This module is invoked by watchfiles when changes are detected in the sync
folder. It processes incoming save files from devices that use file_transfer
sync mode.
The watcher is configured to run as a separate watchfiles process monitoring
the sync base path. When files appear in a device's incoming/ directory, they
are matched to ROMs and processed as save uploads.
"""
import asyncio
import json
import os
import shutil
from collections.abc import Sequence
from datetime import datetime, timezone
from typing import cast
import sentry_sdk
from config import ENABLE_SYNC_FOLDER_WATCHER, SENTRY_DSN
from handler.database import (
db_device_handler,
db_device_save_sync_handler,
db_platform_handler,
db_save_handler,
db_sync_session_handler,
)
from handler.filesystem import fs_asset_handler, fs_sync_handler
from handler.sync.comparison import compare_save_state
from logger.formatter import highlight as hl
from logger.logger import log
from models.device import SyncMode
from models.sync_session import SyncSessionStatus
from utils import get_version
sentry_sdk.init(
dsn=SENTRY_DSN,
release=f"romm@{get_version()}",
)
Change = tuple[str, str]
def _extract_device_and_platform(path: str) -> tuple[str, str, str] | None:
"""Extract device_id, platform_slug, and filename from a sync incoming path.
Expected path format: {SYNC_BASE_PATH}/{device_id}/incoming/{platform_slug}/filename.ext
"""
try:
rel_path = os.path.relpath(path, start=str(fs_sync_handler.base_path))
parts = rel_path.split(os.sep)
if len(parts) < 4 or parts[1] != "incoming":
return None
device_id = parts[0]
platform_slug = parts[2]
filename = parts[-1]
return (device_id, platform_slug, filename)
except (ValueError, IndexError):
return None
def _ensure_conflicts_dir(device_id: str, platform_slug: str) -> str:
conflicts_dir = str(
fs_sync_handler.base_path
/ fs_sync_handler.build_conflicts_path(device_id, platform_slug)
)
os.makedirs(conflicts_dir, exist_ok=True)
return conflicts_dir
def process_sync_changes(changes: Sequence[Change]) -> None:
"""Process file changes detected in the sync folder."""
if not ENABLE_SYNC_FOLDER_WATCHER:
return
# Only process added/modified files in incoming directories
added_files: list[tuple[str, str, str, str]] = (
[]
) # (device_id, platform_slug, filename, full_path)
for _event_type, change_path in changes:
src_path = os.fsdecode(change_path)
# Only process files (not directories)
if not os.path.isfile(src_path):
continue
parsed = _extract_device_and_platform(src_path)
if not parsed:
continue
device_id, platform_slug, filename = parsed
added_files.append((device_id, platform_slug, filename, src_path))
if not added_files:
return
# Group by device
by_device: dict[str, list[tuple[str, str, str]]] = {}
for device_id, platform_slug, filename, full_path in added_files:
by_device.setdefault(device_id, []).append((platform_slug, filename, full_path))
for device_id, files in by_device.items():
_process_device_incoming(device_id, files)
def _process_device_incoming(
device_id: str,
files: list[tuple[str, str, str]], # (platform_slug, filename, full_path)
) -> None:
"""Process incoming files for a single device."""
from endpoints.sockets.sync import (
emit_sync_completed,
emit_sync_error,
emit_sync_started,
)
# Look up device - try all users since file transfer is server-side
device = db_device_handler.get_device_by_id(device_id)
if not device:
log.warning(f"Sync watcher: unknown device {device_id}, skipping")
return
if device.sync_mode != SyncMode.FILE_TRANSFER:
log.warning(
f"Sync watcher: device {device_id} is not in file_transfer mode, skipping"
)
return
if not device.sync_enabled:
log.info(f"Sync watcher: device {device_id} sync is disabled, skipping")
return
# Create a sync session
sync_session = db_sync_session_handler.create_session(
device_id=device.id, user_id=device.user_id
)
db_sync_session_handler.update_session(
session_id=sync_session.id,
data={
"status": SyncSessionStatus.IN_PROGRESS,
"operations_planned": len(files),
},
)
asyncio.run(
emit_sync_started(
user_id=device.user_id,
device_id=device.id,
session_id=sync_session.id,
sync_mode="file_transfer",
)
)
completed = 0
failed = 0
for platform_slug, filename, full_path in files:
try:
_process_incoming_file(
device, sync_session.id, platform_slug, filename, full_path
)
completed += 1
except Exception:
log.error(
f"Sync watcher: failed to process {filename} for device {device_id}",
exc_info=True,
)
failed += 1
# Complete the session
db_sync_session_handler.complete_session(
session_id=sync_session.id,
operations_completed=completed,
operations_failed=failed,
)
if failed > 0:
asyncio.run(
emit_sync_error(
user_id=device.user_id,
device_id=device.id,
session_id=sync_session.id,
error_message=f"{failed} file(s) failed to process",
)
)
asyncio.run(
emit_sync_completed(
user_id=device.user_id,
device_id=device.id,
session_id=sync_session.id,
operations_completed=completed,
operations_failed=failed,
)
)
log.info(
f"Sync watcher: device {device_id} processed {completed} files, {failed} failures"
)
def _process_incoming_file(
device, session_id: int, platform_slug: str, filename: str, full_path: str
) -> None:
"""Process a single incoming file from a device's sync folder."""
from endpoints.sockets.sync import emit_sync_conflict
# Look up platform
platform = db_platform_handler.get_platform_by_fs_slug(platform_slug)
if not platform:
log.warning(f"Sync watcher: unknown platform slug {platform_slug}")
return
# Compute hash of incoming file
file_hash = fs_sync_handler.compute_file_hash(full_path)
file_size = os.path.getsize(full_path)
file_mtime = datetime.fromtimestamp(os.path.getmtime(full_path), tz=timezone.utc)
# Try to find matching saves on this platform for this user
saves_on_platform = db_save_handler.get_saves(
user_id=device.user_id,
platform_id=platform.id,
)
matched_save = None
for save in saves_on_platform:
if save.file_name == filename:
matched_save = save
break
if matched_save:
# Compare with existing save
device_sync = db_device_save_sync_handler.get_sync(
device_id=device.id, save_id=matched_save.id
)
result = compare_save_state(
client_hash=file_hash,
client_updated_at=file_mtime,
server_hash=matched_save.content_hash,
server_updated_at=matched_save.updated_at,
device_last_synced_at=device_sync.last_synced_at if device_sync else None,
)
if result.action == "no_op":
log.debug(f"Sync watcher: {filename} is already in sync, skipping")
fs_sync_handler.remove_incoming_file(full_path)
return
if result.action == "upload":
# Client file is newer - update server save
log.info(
f"Sync watcher: updating save {hl(filename)} from device {device.id}"
)
with open(full_path, "rb") as f:
file_data = f.read()
asyncio.run(
fs_asset_handler.write_file(
file=file_data,
path=matched_save.file_path,
filename=matched_save.file_name,
)
)
db_save_handler.update_save(
matched_save.id,
{
"file_size_bytes": file_size,
"content_hash": file_hash,
},
)
db_device_save_sync_handler.upsert_sync(
device_id=device.id,
save_id=matched_save.id,
synced_at=datetime.now(timezone.utc),
)
fs_sync_handler.remove_incoming_file(full_path)
elif result.action == "conflict":
log.warning(
f"Sync watcher: conflict detected for {filename} "
f"on device {device.id}: {result.reason}"
)
# Move conflicting file to conflicts directory
conflicts_dir = _ensure_conflicts_dir(device.id, platform_slug)
conflict_path = os.path.join(conflicts_dir, filename)
shutil.move(full_path, conflict_path)
log.info(f"Sync watcher: moved conflicting file to {conflict_path}")
# Emit socket notification for conflict
asyncio.run(
emit_sync_conflict(
user_id=device.user_id,
device_id=device.id,
session_id=session_id,
file_name=filename,
rom_id=matched_save.rom_id,
reason=result.reason,
)
)
elif result.action == "download":
# Server is newer - write server save to device's outgoing directory
log.info(
f"Sync watcher: server save is newer for {filename}, "
f"writing to outgoing"
)
server_file_path = f"{matched_save.file_path}/{matched_save.file_name}"
server_full_path = fs_asset_handler.validate_path(server_file_path)
with open(str(server_full_path), "rb") as f:
server_data = f.read()
fs_sync_handler.write_outgoing_file(
device_id=device.id,
platform_slug=platform_slug,
file_name=filename,
data=server_data,
)
db_device_save_sync_handler.upsert_sync(
device_id=device.id,
save_id=matched_save.id,
synced_at=datetime.now(timezone.utc),
)
fs_sync_handler.remove_incoming_file(full_path)
else:
log.info(
f"Sync watcher: new file {hl(filename)} from device {device.id} "
f"on platform {platform_slug} - no matching save found, skipping"
)
if __name__ == "__main__":
changes = cast(list[Change], json.loads(os.getenv("WATCHFILES_CHANGES", "[]")))
if changes:
process_sync_changes(changes)