576 lines
18 KiB
Dart
576 lines
18 KiB
Dart
import 'dart:async';
|
||
|
||
import 'package:flutter/foundation.dart';
|
||
import 'package:flutter_secure_storage/flutter_secure_storage.dart';
|
||
|
||
import 'package:aves/model/entry/entry.dart';
|
||
import 'package:aves/model/source/collection_source.dart';
|
||
import 'package:aves/model/source/events.dart';
|
||
|
||
import 'package:aves/remote/remote_settings.dart';
|
||
import 'package:aves/remote/remote_sync_bus.dart';
|
||
import 'package:aves/remote/remote_repository.dart';
|
||
import 'package:aves/services/common/services.dart';
|
||
import 'package:aves/remote/remote_client.dart';
|
||
import 'package:aves/remote/auth_client.dart';
|
||
import 'package:aves/remote/collection_source_remote_ext.dart';
|
||
|
||
import 'remote_origin.dart';
|
||
|
||
import 'remote_http_api.dart';
|
||
import 'remote_sync_engine.dart';
|
||
import 'remote_state_store.dart';
|
||
import 'remote_ws_client.dart';
|
||
|
||
// ✅ allinea token immagini
|
||
import 'remote_http.dart';
|
||
|
||
class RemoteController {
|
||
RemoteController._();
|
||
static final RemoteController instance = RemoteController._();
|
||
|
||
static const _kBootstrapDone = 'remote_bootstrap_done';
|
||
|
||
bool _syncInFlight = false;
|
||
|
||
DateTime? _retryStartTime;
|
||
Timer? _retryTimer;
|
||
|
||
RemoteAuth? _auth;
|
||
RemoteHttpApi? _api;
|
||
RemoteSyncEngine? _engine;
|
||
RemoteWsClient? _ws;
|
||
RemoteStateStore? _stateStore;
|
||
EventRingDeduper? _deduper;
|
||
|
||
Future<void>? _ensureStackFuture;
|
||
bool _wsConnectInFlight = false;
|
||
|
||
// coalescing progressive
|
||
Timer? _progressiveTimer;
|
||
bool _progressiveScheduled = false;
|
||
String? _pendingSinceIso;
|
||
|
||
// ------------------------------------------------------------
|
||
// helper: forza refresh lens quando cambia remoteVisible
|
||
// ------------------------------------------------------------
|
||
void _notifyRemoteVisibilityChanged(CollectionSource source) {
|
||
try {
|
||
source.eventBus.fire(FilterVisibilityChangedEvent());
|
||
} catch (_) {
|
||
// ignore
|
||
}
|
||
}
|
||
|
||
// refresh “forte” (la lens ascolta EntryRefreshedEvent e fa refresh())
|
||
void _forceCollectionRefresh(CollectionSource source) {
|
||
try {
|
||
source.eventBus.fire(EntryRefreshedEvent(const <AvesEntry>{}));
|
||
} catch (_) {
|
||
// ignore
|
||
}
|
||
}
|
||
|
||
// ------------------------------------------------------------
|
||
// bootstrap flag
|
||
// ------------------------------------------------------------
|
||
Future<bool> bootstrapDone() async {
|
||
final storage = FlutterSecureStorage();
|
||
return (await storage.read(key: _kBootstrapDone)) == '1';
|
||
}
|
||
|
||
Future<void> _setBootstrapDone() async {
|
||
final storage = FlutterSecureStorage();
|
||
await storage.write(key: _kBootstrapDone, value: '1');
|
||
}
|
||
|
||
// ------------------------------------------------------------
|
||
// init bus (usata da HomePage init)
|
||
// ------------------------------------------------------------
|
||
Future<void> initBusFromSettings() async {
|
||
final s = await RemoteSettings.load();
|
||
if (!s.enabled) {
|
||
RemoteSyncBus.instance.setDisabled();
|
||
return;
|
||
}
|
||
RemoteSyncBus.instance.stateNotifier.value = RemoteSyncState.syncing;
|
||
RemoteSyncBus.instance.progressNotifier.value = const RemoteSyncProgress(
|
||
done: 0,
|
||
total: 0,
|
||
showOverlay: false,
|
||
);
|
||
}
|
||
|
||
// ------------------------------------------------------------
|
||
// ws url
|
||
// ------------------------------------------------------------
|
||
String _deriveWsUrl(String baseUrl) {
|
||
final uri = Uri.parse(baseUrl);
|
||
final scheme = uri.scheme == 'http' ? 'ws' : 'wss';
|
||
final parts = uri.host.split('.');
|
||
if (parts.isEmpty) return '$scheme://${uri.host}';
|
||
parts[0] = '${parts[0]}-ws';
|
||
return '$scheme://${parts.join('.')}';
|
||
}
|
||
|
||
String _resolveWsUrl(RemoteSettings s) =>
|
||
s.wsUrl.trim().isNotEmpty ? s.wsUrl.trim() : _deriveWsUrl(s.baseUrl);
|
||
|
||
// ------------------------------------------------------------
|
||
// ensure stack single-flight
|
||
// ------------------------------------------------------------
|
||
Future<void> _ensureRealtimeStack(CollectionSource source) {
|
||
final existing = _ensureStackFuture;
|
||
if (existing != null) return existing;
|
||
|
||
final fut = _ensureRealtimeStackImpl(source);
|
||
_ensureStackFuture = fut;
|
||
return fut.whenComplete(() {
|
||
if (identical(_ensureStackFuture, fut)) _ensureStackFuture = null;
|
||
});
|
||
}
|
||
|
||
Future<void> _ensureRealtimeStackImpl(CollectionSource source) async {
|
||
final s = await RemoteSettings.load();
|
||
|
||
_stateStore ??= RemoteStateStore();
|
||
_deduper ??= EventRingDeduper(_stateStore!);
|
||
|
||
final wsUrl = _resolveWsUrl(s);
|
||
|
||
final rebuildAuth =
|
||
_auth == null ||
|
||
_auth!.email != s.email ||
|
||
_auth!.password != s.password ||
|
||
_auth!.base.toString() != (s.baseUrl.endsWith('/') ? s.baseUrl : '${s.baseUrl}/');
|
||
|
||
final rebuildWs = _ws == null || _ws!.wsUrl != wsUrl;
|
||
|
||
if (rebuildAuth || rebuildWs) {
|
||
_auth = RemoteAuth(baseUrl: s.baseUrl, email: s.email, password: s.password);
|
||
|
||
// ✅ allinea RemoteHttp (thumbs) allo stesso auth/token
|
||
RemoteHttp.attach(baseUrl: s.baseUrl, auth: _auth!);
|
||
await RemoteHttp.warmUp();
|
||
|
||
_api = RemoteHttpApi(baseUrl: s.baseUrl, auth: _auth!);
|
||
|
||
_engine = RemoteSyncEngine(
|
||
api: _api!,
|
||
repo: RemoteRepository(localMediaDb.rawDb),
|
||
source: source,
|
||
state: _stateStore!,
|
||
);
|
||
|
||
_ws?.close(); // ignore: unawaited_futures
|
||
_ws = RemoteWsClient(
|
||
wsUrl: wsUrl,
|
||
store: _stateStore!,
|
||
deduper: _deduper!,
|
||
onNeedRecoverySync: () {
|
||
_requestProgressiveSync(source: source);
|
||
},
|
||
onEvent: (msg) => _handleWsEvent(source, msg),
|
||
);
|
||
} else {
|
||
await _deduper!.init();
|
||
}
|
||
}
|
||
|
||
// ------------------------------------------------------------
|
||
// progressive coalescing
|
||
// ------------------------------------------------------------
|
||
void _requestProgressiveSync({
|
||
required CollectionSource source,
|
||
String? sinceIso,
|
||
Duration delay = const Duration(milliseconds: 600),
|
||
}) {
|
||
if (sinceIso != null && sinceIso.trim().isNotEmpty) {
|
||
_pendingSinceIso = _minIso(_pendingSinceIso, sinceIso.trim());
|
||
}
|
||
|
||
if (_progressiveScheduled) return;
|
||
_progressiveScheduled = true;
|
||
|
||
_progressiveTimer?.cancel();
|
||
_progressiveTimer = Timer(delay, () async {
|
||
_progressiveScheduled = false;
|
||
final since = _pendingSinceIso;
|
||
_pendingSinceIso = null;
|
||
|
||
try {
|
||
await _runProgressiveSync(source, since: since);
|
||
} catch (_) {}
|
||
});
|
||
}
|
||
|
||
String? _minIso(String? a, String b) {
|
||
if (a == null || a.isEmpty) return b;
|
||
final da = DateTime.tryParse(a);
|
||
final db = DateTime.tryParse(b);
|
||
if (da == null) return b;
|
||
if (db == null) return a;
|
||
return db.isBefore(da) ? b : a;
|
||
}
|
||
|
||
// ------------------------------------------------------------
|
||
// PATCH 1: progressive NON deve far tornare arancione se eri già verde
|
||
// ------------------------------------------------------------
|
||
Future<void> _runProgressiveSync(CollectionSource source, {String? since}) async {
|
||
await _ensureRealtimeStack(source);
|
||
final engine = _engine;
|
||
if (engine == null) return;
|
||
|
||
final prev = RemoteSyncBus.instance.stateNotifier.value;
|
||
|
||
// ✅ Se eri già "verde", non flashare l’icona su "syncing"
|
||
final bool keepGreen = prev == RemoteSyncState.upToDate;
|
||
|
||
int? opId;
|
||
if (!keepGreen) {
|
||
opId = RemoteSyncBus.instance.start(total: 0, showOverlay: false);
|
||
_notifyRemoteVisibilityChanged(source);
|
||
}
|
||
|
||
try {
|
||
if (since != null && since.trim().isNotEmpty) {
|
||
await engine.progressiveSyncFrom(since.trim());
|
||
} else {
|
||
await engine.progressiveSync();
|
||
}
|
||
|
||
if (opId != null) {
|
||
RemoteSyncBus.instance.finishUpToDate(opId: opId);
|
||
_notifyRemoteVisibilityChanged(source);
|
||
} else {
|
||
// resta verde
|
||
RemoteSyncBus.instance.stateNotifier.value = RemoteSyncState.upToDate;
|
||
RemoteSyncBus.instance.progressNotifier.value = null;
|
||
}
|
||
|
||
_forceCollectionRefresh(source);
|
||
} catch (e, st) {
|
||
debugPrint('[remote] progressive sync failed: $e\n$st');
|
||
|
||
if (opId != null) {
|
||
RemoteSyncBus.instance.failServerDown(opId: opId);
|
||
_notifyRemoteVisibilityChanged(source);
|
||
} else {
|
||
RemoteSyncBus.instance.stateNotifier.value = RemoteSyncState.serverDown;
|
||
RemoteSyncBus.instance.progressNotifier.value = null;
|
||
_notifyRemoteVisibilityChanged(source);
|
||
}
|
||
|
||
_forceCollectionRefresh(source);
|
||
rethrow;
|
||
}
|
||
}
|
||
|
||
// ------------------------------------------------------------
|
||
// ws start/stop
|
||
// ------------------------------------------------------------
|
||
Future<void> _startWsAlreadyEnsured() async {
|
||
if (_wsConnectInFlight) return;
|
||
_wsConnectInFlight = true;
|
||
|
||
try {
|
||
final s = await RemoteSettings.load();
|
||
if (!s.enabled) return;
|
||
if (_ws == null || _auth == null) return;
|
||
if (_ws!.isConnected) return;
|
||
|
||
final token = _auth!.token ?? await _auth!.login();
|
||
await _ws!.connect(token: token);
|
||
} finally {
|
||
_wsConnectInFlight = false;
|
||
}
|
||
}
|
||
|
||
Future<void> _stopWs(CollectionSource source) async {
|
||
_progressiveTimer?.cancel();
|
||
_progressiveTimer = null;
|
||
_progressiveScheduled = false;
|
||
_pendingSinceIso = null;
|
||
|
||
try {
|
||
await _ws?.close();
|
||
} catch (_) {}
|
||
|
||
_ws = null;
|
||
_engine = null;
|
||
_api = null;
|
||
_auth = null;
|
||
_ensureStackFuture = null;
|
||
_wsConnectInFlight = false;
|
||
|
||
_notifyRemoteVisibilityChanged(source);
|
||
_forceCollectionRefresh(source);
|
||
}
|
||
|
||
// ------------------------------------------------------------
|
||
// ws events
|
||
// ------------------------------------------------------------
|
||
Future<bool> _handleWsEvent(CollectionSource source, Map<String, dynamic> msg) async {
|
||
final type = msg['type']?.toString();
|
||
if (type == null) return true;
|
||
|
||
if ((type == 'add_dir' || type == 'del_dir') && msg['mode'] == 'bulk') {
|
||
return true;
|
||
}
|
||
|
||
if ((type == 'add_dir_done' || type == 'del_dir_done') && msg['mode'] == 'bulk') {
|
||
final since = msg['since']?.toString();
|
||
try {
|
||
await _runProgressiveSync(source, since: since);
|
||
return true;
|
||
} catch (_) {
|
||
return false;
|
||
}
|
||
}
|
||
|
||
if (type == 'added' || type == 'del' || type == 'removed' || type == 'updated') {
|
||
_requestProgressiveSync(source: source);
|
||
return true;
|
||
}
|
||
|
||
return true;
|
||
}
|
||
|
||
// ------------------------------------------------------------
|
||
// lifecycle
|
||
// ------------------------------------------------------------
|
||
Future<void> onAppStart({
|
||
required CollectionSource source,
|
||
bool resumeBootstrapIfEnabled = true,
|
||
}) async {
|
||
final s = await RemoteSettings.load();
|
||
|
||
if (!s.enabled) {
|
||
RemoteSyncBus.instance.setDisabled();
|
||
_notifyRemoteVisibilityChanged(source);
|
||
_forceCollectionRefresh(source);
|
||
|
||
final remotes = source.allEntries.where((e) => e.origin == RemoteOrigin.value).toSet();
|
||
if (remotes.isNotEmpty) source.removeEntriesFromMemory(remotes);
|
||
|
||
await _stopWs(source);
|
||
return;
|
||
}
|
||
|
||
final done = await bootstrapDone();
|
||
|
||
if (!done) {
|
||
// primo avvio: full sync (overlay se resumeBootstrapIfEnabled=true)
|
||
await fullSync(
|
||
source: source,
|
||
showOverlay: resumeBootstrapIfEnabled,
|
||
markBootstrapDoneOnSuccess: true,
|
||
);
|
||
return;
|
||
}
|
||
|
||
// prima di append: syncing + refresh
|
||
RemoteSyncBus.instance.stateNotifier.value = RemoteSyncState.syncing;
|
||
RemoteSyncBus.instance.progressNotifier.value =
|
||
const RemoteSyncProgress(done: 0, total: 0, showOverlay: false);
|
||
_notifyRemoteVisibilityChanged(source);
|
||
|
||
await source.appendRemoteEntriesFromDb();
|
||
|
||
// forza refresh Home (cache/lens)
|
||
_notifyRemoteVisibilityChanged(source);
|
||
_forceCollectionRefresh(source);
|
||
|
||
await _ensureRealtimeStack(source);
|
||
unawaited(_startWsAlreadyEnsured());
|
||
_requestProgressiveSync(source: source, delay: const Duration(milliseconds: 200));
|
||
}
|
||
|
||
Future<void> toggleRemote({required CollectionSource source}) async {
|
||
final s = await RemoteSettings.load();
|
||
|
||
final upd = RemoteSettings(
|
||
enabled: !s.enabled,
|
||
baseUrl: s.baseUrl,
|
||
indexPath: s.indexPath,
|
||
email: s.email,
|
||
password: s.password,
|
||
wsUrl: s.wsUrl,
|
||
);
|
||
await upd.save();
|
||
|
||
if (!upd.enabled) {
|
||
RemoteSyncBus.instance.setDisabled();
|
||
_notifyRemoteVisibilityChanged(source);
|
||
_forceCollectionRefresh(source);
|
||
|
||
final remotes = source.allEntries.where((e) => e.origin == RemoteOrigin.value).toSet();
|
||
if (remotes.isNotEmpty) source.removeEntriesFromMemory(remotes);
|
||
|
||
await _stopWs(source);
|
||
return;
|
||
}
|
||
|
||
// ------------------------------------------------------------
|
||
// PATCH 2: se è la prima volta, mostra overlay (barra progresso) al bootstrap
|
||
// ------------------------------------------------------------
|
||
final first = !(await bootstrapDone());
|
||
await onAppStart(source: source, resumeBootstrapIfEnabled: first);
|
||
}
|
||
|
||
// ------------------------------------------------------------
|
||
// full sync legacy (progress + overlay)
|
||
// ------------------------------------------------------------
|
||
Future<void> fullSync({
|
||
required CollectionSource source,
|
||
required bool showOverlay,
|
||
bool markBootstrapDoneOnSuccess = false,
|
||
}) async {
|
||
if (_syncInFlight) return;
|
||
_syncInFlight = true;
|
||
|
||
final s = await RemoteSettings.load();
|
||
if (!s.enabled) {
|
||
RemoteSyncBus.instance.setDisabled();
|
||
_notifyRemoteVisibilityChanged(source);
|
||
_forceCollectionRefresh(source);
|
||
_syncInFlight = false;
|
||
return;
|
||
}
|
||
|
||
try {
|
||
if (s.baseUrl.trim().isEmpty) {
|
||
RemoteSyncBus.instance.stateNotifier.value = RemoteSyncState.serverDown;
|
||
RemoteSyncBus.instance.progressNotifier.value = null;
|
||
_notifyRemoteVisibilityChanged(source);
|
||
_forceCollectionRefresh(source);
|
||
_syncInFlight = false;
|
||
return;
|
||
}
|
||
|
||
RemoteAuth? auth;
|
||
if (s.email.isNotEmpty && s.password.isNotEmpty) {
|
||
auth = RemoteAuth(baseUrl: s.baseUrl, email: s.email, password: s.password);
|
||
}
|
||
|
||
final client = RemoteJsonClient(s.baseUrl, s.indexPath, auth: auth);
|
||
await client.ping().timeout(const Duration(seconds: 3));
|
||
|
||
final items = await client.fetchAll().timeout(const Duration(seconds: 30));
|
||
final total = items.length;
|
||
|
||
final opId = RemoteSyncBus.instance.start(total: total, showOverlay: showOverlay);
|
||
_notifyRemoteVisibilityChanged(source);
|
||
|
||
final repo = RemoteRepository(localMediaDb.rawDb);
|
||
await repo.deleteAllRemotes();
|
||
|
||
const chunk = 200;
|
||
int done = 0;
|
||
final serverIds = items.map((e) => e.id).where((v) => v.isNotEmpty).toSet();
|
||
|
||
for (var i = 0; i < total; i += chunk) {
|
||
final end = (i + chunk < total) ? i + chunk : total;
|
||
await repo.upsertAll(items.sublist(i, end), chunkSize: chunk);
|
||
done = end;
|
||
RemoteSyncBus.instance.update(opId: opId, done: done, total: total);
|
||
}
|
||
|
||
await repo.pruneMissingRemotes(serverIds);
|
||
await source.appendRemoteEntriesFromDb();
|
||
|
||
_notifyRemoteVisibilityChanged(source);
|
||
_forceCollectionRefresh(source);
|
||
|
||
if (markBootstrapDoneOnSuccess) await _setBootstrapDone();
|
||
|
||
RemoteSyncBus.instance.finishUpToDate(opId: opId);
|
||
_notifyRemoteVisibilityChanged(source);
|
||
_forceCollectionRefresh(source);
|
||
|
||
await _ensureRealtimeStack(source);
|
||
unawaited(_startWsAlreadyEnsured());
|
||
_requestProgressiveSync(source: source, delay: const Duration(milliseconds: 300));
|
||
} catch (e, st) {
|
||
debugPrint('[remote] fullSync error: $e\n$st');
|
||
RemoteSyncBus.instance.stateNotifier.value = RemoteSyncState.serverDown;
|
||
RemoteSyncBus.instance.progressNotifier.value = null;
|
||
_notifyRemoteVisibilityChanged(source);
|
||
_forceCollectionRefresh(source);
|
||
|
||
_retryStartTime ??= DateTime.now();
|
||
_scheduleRetry(source);
|
||
} finally {
|
||
_syncInFlight = false;
|
||
}
|
||
}
|
||
|
||
// ------------------------------------------------------------
|
||
// retry
|
||
// ------------------------------------------------------------
|
||
void _scheduleRetry(CollectionSource source) {
|
||
_retryTimer?.cancel();
|
||
|
||
_retryTimer = Timer(const Duration(seconds: 30), () async {
|
||
final s = await RemoteSettings.load();
|
||
if (!s.enabled) return;
|
||
|
||
if (_retryStartTime != null) {
|
||
final elapsed = DateTime.now().difference(_retryStartTime!);
|
||
if (elapsed > const Duration(minutes: 5)) {
|
||
debugPrint('[remote] retry timeout -> disattivo remote');
|
||
|
||
final remotes = source.allEntries.where((e) => e.origin == RemoteOrigin.value).toSet();
|
||
if (remotes.isNotEmpty) source.removeEntriesFromMemory(remotes);
|
||
|
||
final upd = RemoteSettings(
|
||
enabled: false,
|
||
baseUrl: s.baseUrl,
|
||
indexPath: s.indexPath,
|
||
email: s.email,
|
||
password: s.password,
|
||
wsUrl: s.wsUrl,
|
||
);
|
||
await upd.save();
|
||
|
||
RemoteSyncBus.instance.setDisabled();
|
||
_notifyRemoteVisibilityChanged(source);
|
||
_forceCollectionRefresh(source);
|
||
|
||
await _stopWs(source);
|
||
return;
|
||
}
|
||
}
|
||
|
||
debugPrint('[remote] retry ping…');
|
||
|
||
final auth = (s.email.isNotEmpty && s.password.isNotEmpty)
|
||
? RemoteAuth(baseUrl: s.baseUrl, email: s.email, password: s.password)
|
||
: null;
|
||
|
||
final retryClient = RemoteJsonClient(s.baseUrl, s.indexPath, auth: auth);
|
||
|
||
try {
|
||
await retryClient.ping().timeout(const Duration(seconds: 3));
|
||
debugPrint('[remote] retry OK -> riprendo sync');
|
||
|
||
_retryTimer = null;
|
||
_retryStartTime = null;
|
||
|
||
unawaited(fullSync(source: source, showOverlay: false));
|
||
} catch (_) {
|
||
debugPrint('[remote] retry fallito');
|
||
_scheduleRetry(source);
|
||
}
|
||
});
|
||
}
|
||
|
||
Future<void> onResume(CollectionSource source) async {
|
||
final s = await RemoteSettings.load();
|
||
if (!s.enabled) return;
|
||
|
||
await _ensureRealtimeStack(source);
|
||
unawaited(_startWsAlreadyEnsured());
|
||
_requestProgressiveSync(source: source, delay: const Duration(milliseconds: 200));
|
||
}
|
||
}
|