aves_mio22/lib/remote/remote_controller.dart
2026-04-18 20:05:02 +02:00

576 lines
18 KiB
Dart
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import 'dart:async';
import 'package:flutter/foundation.dart';
import 'package:flutter_secure_storage/flutter_secure_storage.dart';
import 'package:aves/model/entry/entry.dart';
import 'package:aves/model/source/collection_source.dart';
import 'package:aves/model/source/events.dart';
import 'package:aves/remote/remote_settings.dart';
import 'package:aves/remote/remote_sync_bus.dart';
import 'package:aves/remote/remote_repository.dart';
import 'package:aves/services/common/services.dart';
import 'package:aves/remote/remote_client.dart';
import 'package:aves/remote/auth_client.dart';
import 'package:aves/remote/collection_source_remote_ext.dart';
import 'remote_origin.dart';
import 'remote_http_api.dart';
import 'remote_sync_engine.dart';
import 'remote_state_store.dart';
import 'remote_ws_client.dart';
// ✅ allinea token immagini
import 'remote_http.dart';
class RemoteController {
RemoteController._();
static final RemoteController instance = RemoteController._();
static const _kBootstrapDone = 'remote_bootstrap_done';
bool _syncInFlight = false;
DateTime? _retryStartTime;
Timer? _retryTimer;
RemoteAuth? _auth;
RemoteHttpApi? _api;
RemoteSyncEngine? _engine;
RemoteWsClient? _ws;
RemoteStateStore? _stateStore;
EventRingDeduper? _deduper;
Future<void>? _ensureStackFuture;
bool _wsConnectInFlight = false;
// coalescing progressive
Timer? _progressiveTimer;
bool _progressiveScheduled = false;
String? _pendingSinceIso;
// ------------------------------------------------------------
// helper: forza refresh lens quando cambia remoteVisible
// ------------------------------------------------------------
void _notifyRemoteVisibilityChanged(CollectionSource source) {
try {
source.eventBus.fire(FilterVisibilityChangedEvent());
} catch (_) {
// ignore
}
}
// refresh “forte” (la lens ascolta EntryRefreshedEvent e fa refresh())
void _forceCollectionRefresh(CollectionSource source) {
try {
source.eventBus.fire(EntryRefreshedEvent(const <AvesEntry>{}));
} catch (_) {
// ignore
}
}
// ------------------------------------------------------------
// bootstrap flag
// ------------------------------------------------------------
Future<bool> bootstrapDone() async {
final storage = FlutterSecureStorage();
return (await storage.read(key: _kBootstrapDone)) == '1';
}
Future<void> _setBootstrapDone() async {
final storage = FlutterSecureStorage();
await storage.write(key: _kBootstrapDone, value: '1');
}
// ------------------------------------------------------------
// init bus (usata da HomePage init)
// ------------------------------------------------------------
Future<void> initBusFromSettings() async {
final s = await RemoteSettings.load();
if (!s.enabled) {
RemoteSyncBus.instance.setDisabled();
return;
}
RemoteSyncBus.instance.stateNotifier.value = RemoteSyncState.syncing;
RemoteSyncBus.instance.progressNotifier.value = const RemoteSyncProgress(
done: 0,
total: 0,
showOverlay: false,
);
}
// ------------------------------------------------------------
// ws url
// ------------------------------------------------------------
String _deriveWsUrl(String baseUrl) {
final uri = Uri.parse(baseUrl);
final scheme = uri.scheme == 'http' ? 'ws' : 'wss';
final parts = uri.host.split('.');
if (parts.isEmpty) return '$scheme://${uri.host}';
parts[0] = '${parts[0]}-ws';
return '$scheme://${parts.join('.')}';
}
String _resolveWsUrl(RemoteSettings s) =>
s.wsUrl.trim().isNotEmpty ? s.wsUrl.trim() : _deriveWsUrl(s.baseUrl);
// ------------------------------------------------------------
// ensure stack single-flight
// ------------------------------------------------------------
Future<void> _ensureRealtimeStack(CollectionSource source) {
final existing = _ensureStackFuture;
if (existing != null) return existing;
final fut = _ensureRealtimeStackImpl(source);
_ensureStackFuture = fut;
return fut.whenComplete(() {
if (identical(_ensureStackFuture, fut)) _ensureStackFuture = null;
});
}
Future<void> _ensureRealtimeStackImpl(CollectionSource source) async {
final s = await RemoteSettings.load();
_stateStore ??= RemoteStateStore();
_deduper ??= EventRingDeduper(_stateStore!);
final wsUrl = _resolveWsUrl(s);
final rebuildAuth =
_auth == null ||
_auth!.email != s.email ||
_auth!.password != s.password ||
_auth!.base.toString() != (s.baseUrl.endsWith('/') ? s.baseUrl : '${s.baseUrl}/');
final rebuildWs = _ws == null || _ws!.wsUrl != wsUrl;
if (rebuildAuth || rebuildWs) {
_auth = RemoteAuth(baseUrl: s.baseUrl, email: s.email, password: s.password);
// ✅ allinea RemoteHttp (thumbs) allo stesso auth/token
RemoteHttp.attach(baseUrl: s.baseUrl, auth: _auth!);
await RemoteHttp.warmUp();
_api = RemoteHttpApi(baseUrl: s.baseUrl, auth: _auth!);
_engine = RemoteSyncEngine(
api: _api!,
repo: RemoteRepository(localMediaDb.rawDb),
source: source,
state: _stateStore!,
);
_ws?.close(); // ignore: unawaited_futures
_ws = RemoteWsClient(
wsUrl: wsUrl,
store: _stateStore!,
deduper: _deduper!,
onNeedRecoverySync: () {
_requestProgressiveSync(source: source);
},
onEvent: (msg) => _handleWsEvent(source, msg),
);
} else {
await _deduper!.init();
}
}
// ------------------------------------------------------------
// progressive coalescing
// ------------------------------------------------------------
void _requestProgressiveSync({
required CollectionSource source,
String? sinceIso,
Duration delay = const Duration(milliseconds: 600),
}) {
if (sinceIso != null && sinceIso.trim().isNotEmpty) {
_pendingSinceIso = _minIso(_pendingSinceIso, sinceIso.trim());
}
if (_progressiveScheduled) return;
_progressiveScheduled = true;
_progressiveTimer?.cancel();
_progressiveTimer = Timer(delay, () async {
_progressiveScheduled = false;
final since = _pendingSinceIso;
_pendingSinceIso = null;
try {
await _runProgressiveSync(source, since: since);
} catch (_) {}
});
}
String? _minIso(String? a, String b) {
if (a == null || a.isEmpty) return b;
final da = DateTime.tryParse(a);
final db = DateTime.tryParse(b);
if (da == null) return b;
if (db == null) return a;
return db.isBefore(da) ? b : a;
}
// ------------------------------------------------------------
// PATCH 1: progressive NON deve far tornare arancione se eri già verde
// ------------------------------------------------------------
Future<void> _runProgressiveSync(CollectionSource source, {String? since}) async {
await _ensureRealtimeStack(source);
final engine = _engine;
if (engine == null) return;
final prev = RemoteSyncBus.instance.stateNotifier.value;
// ✅ Se eri già "verde", non flashare licona su "syncing"
final bool keepGreen = prev == RemoteSyncState.upToDate;
int? opId;
if (!keepGreen) {
opId = RemoteSyncBus.instance.start(total: 0, showOverlay: false);
_notifyRemoteVisibilityChanged(source);
}
try {
if (since != null && since.trim().isNotEmpty) {
await engine.progressiveSyncFrom(since.trim());
} else {
await engine.progressiveSync();
}
if (opId != null) {
RemoteSyncBus.instance.finishUpToDate(opId: opId);
_notifyRemoteVisibilityChanged(source);
} else {
// resta verde
RemoteSyncBus.instance.stateNotifier.value = RemoteSyncState.upToDate;
RemoteSyncBus.instance.progressNotifier.value = null;
}
_forceCollectionRefresh(source);
} catch (e, st) {
debugPrint('[remote] progressive sync failed: $e\n$st');
if (opId != null) {
RemoteSyncBus.instance.failServerDown(opId: opId);
_notifyRemoteVisibilityChanged(source);
} else {
RemoteSyncBus.instance.stateNotifier.value = RemoteSyncState.serverDown;
RemoteSyncBus.instance.progressNotifier.value = null;
_notifyRemoteVisibilityChanged(source);
}
_forceCollectionRefresh(source);
rethrow;
}
}
// ------------------------------------------------------------
// ws start/stop
// ------------------------------------------------------------
Future<void> _startWsAlreadyEnsured() async {
if (_wsConnectInFlight) return;
_wsConnectInFlight = true;
try {
final s = await RemoteSettings.load();
if (!s.enabled) return;
if (_ws == null || _auth == null) return;
if (_ws!.isConnected) return;
final token = _auth!.token ?? await _auth!.login();
await _ws!.connect(token: token);
} finally {
_wsConnectInFlight = false;
}
}
Future<void> _stopWs(CollectionSource source) async {
_progressiveTimer?.cancel();
_progressiveTimer = null;
_progressiveScheduled = false;
_pendingSinceIso = null;
try {
await _ws?.close();
} catch (_) {}
_ws = null;
_engine = null;
_api = null;
_auth = null;
_ensureStackFuture = null;
_wsConnectInFlight = false;
_notifyRemoteVisibilityChanged(source);
_forceCollectionRefresh(source);
}
// ------------------------------------------------------------
// ws events
// ------------------------------------------------------------
Future<bool> _handleWsEvent(CollectionSource source, Map<String, dynamic> msg) async {
final type = msg['type']?.toString();
if (type == null) return true;
if ((type == 'add_dir' || type == 'del_dir') && msg['mode'] == 'bulk') {
return true;
}
if ((type == 'add_dir_done' || type == 'del_dir_done') && msg['mode'] == 'bulk') {
final since = msg['since']?.toString();
try {
await _runProgressiveSync(source, since: since);
return true;
} catch (_) {
return false;
}
}
if (type == 'added' || type == 'del' || type == 'removed' || type == 'updated') {
_requestProgressiveSync(source: source);
return true;
}
return true;
}
// ------------------------------------------------------------
// lifecycle
// ------------------------------------------------------------
Future<void> onAppStart({
required CollectionSource source,
bool resumeBootstrapIfEnabled = true,
}) async {
final s = await RemoteSettings.load();
if (!s.enabled) {
RemoteSyncBus.instance.setDisabled();
_notifyRemoteVisibilityChanged(source);
_forceCollectionRefresh(source);
final remotes = source.allEntries.where((e) => e.origin == RemoteOrigin.value).toSet();
if (remotes.isNotEmpty) source.removeEntriesFromMemory(remotes);
await _stopWs(source);
return;
}
final done = await bootstrapDone();
if (!done) {
// primo avvio: full sync (overlay se resumeBootstrapIfEnabled=true)
await fullSync(
source: source,
showOverlay: resumeBootstrapIfEnabled,
markBootstrapDoneOnSuccess: true,
);
return;
}
// prima di append: syncing + refresh
RemoteSyncBus.instance.stateNotifier.value = RemoteSyncState.syncing;
RemoteSyncBus.instance.progressNotifier.value =
const RemoteSyncProgress(done: 0, total: 0, showOverlay: false);
_notifyRemoteVisibilityChanged(source);
await source.appendRemoteEntriesFromDb();
// forza refresh Home (cache/lens)
_notifyRemoteVisibilityChanged(source);
_forceCollectionRefresh(source);
await _ensureRealtimeStack(source);
unawaited(_startWsAlreadyEnsured());
_requestProgressiveSync(source: source, delay: const Duration(milliseconds: 200));
}
Future<void> toggleRemote({required CollectionSource source}) async {
final s = await RemoteSettings.load();
final upd = RemoteSettings(
enabled: !s.enabled,
baseUrl: s.baseUrl,
indexPath: s.indexPath,
email: s.email,
password: s.password,
wsUrl: s.wsUrl,
);
await upd.save();
if (!upd.enabled) {
RemoteSyncBus.instance.setDisabled();
_notifyRemoteVisibilityChanged(source);
_forceCollectionRefresh(source);
final remotes = source.allEntries.where((e) => e.origin == RemoteOrigin.value).toSet();
if (remotes.isNotEmpty) source.removeEntriesFromMemory(remotes);
await _stopWs(source);
return;
}
// ------------------------------------------------------------
// PATCH 2: se è la prima volta, mostra overlay (barra progresso) al bootstrap
// ------------------------------------------------------------
final first = !(await bootstrapDone());
await onAppStart(source: source, resumeBootstrapIfEnabled: first);
}
// ------------------------------------------------------------
// full sync legacy (progress + overlay)
// ------------------------------------------------------------
Future<void> fullSync({
required CollectionSource source,
required bool showOverlay,
bool markBootstrapDoneOnSuccess = false,
}) async {
if (_syncInFlight) return;
_syncInFlight = true;
final s = await RemoteSettings.load();
if (!s.enabled) {
RemoteSyncBus.instance.setDisabled();
_notifyRemoteVisibilityChanged(source);
_forceCollectionRefresh(source);
_syncInFlight = false;
return;
}
try {
if (s.baseUrl.trim().isEmpty) {
RemoteSyncBus.instance.stateNotifier.value = RemoteSyncState.serverDown;
RemoteSyncBus.instance.progressNotifier.value = null;
_notifyRemoteVisibilityChanged(source);
_forceCollectionRefresh(source);
_syncInFlight = false;
return;
}
RemoteAuth? auth;
if (s.email.isNotEmpty && s.password.isNotEmpty) {
auth = RemoteAuth(baseUrl: s.baseUrl, email: s.email, password: s.password);
}
final client = RemoteJsonClient(s.baseUrl, s.indexPath, auth: auth);
await client.ping().timeout(const Duration(seconds: 3));
final items = await client.fetchAll().timeout(const Duration(seconds: 30));
final total = items.length;
final opId = RemoteSyncBus.instance.start(total: total, showOverlay: showOverlay);
_notifyRemoteVisibilityChanged(source);
final repo = RemoteRepository(localMediaDb.rawDb);
await repo.deleteAllRemotes();
const chunk = 200;
int done = 0;
final serverIds = items.map((e) => e.id).where((v) => v.isNotEmpty).toSet();
for (var i = 0; i < total; i += chunk) {
final end = (i + chunk < total) ? i + chunk : total;
await repo.upsertAll(items.sublist(i, end), chunkSize: chunk);
done = end;
RemoteSyncBus.instance.update(opId: opId, done: done, total: total);
}
await repo.pruneMissingRemotes(serverIds);
await source.appendRemoteEntriesFromDb();
_notifyRemoteVisibilityChanged(source);
_forceCollectionRefresh(source);
if (markBootstrapDoneOnSuccess) await _setBootstrapDone();
RemoteSyncBus.instance.finishUpToDate(opId: opId);
_notifyRemoteVisibilityChanged(source);
_forceCollectionRefresh(source);
await _ensureRealtimeStack(source);
unawaited(_startWsAlreadyEnsured());
_requestProgressiveSync(source: source, delay: const Duration(milliseconds: 300));
} catch (e, st) {
debugPrint('[remote] fullSync error: $e\n$st');
RemoteSyncBus.instance.stateNotifier.value = RemoteSyncState.serverDown;
RemoteSyncBus.instance.progressNotifier.value = null;
_notifyRemoteVisibilityChanged(source);
_forceCollectionRefresh(source);
_retryStartTime ??= DateTime.now();
_scheduleRetry(source);
} finally {
_syncInFlight = false;
}
}
// ------------------------------------------------------------
// retry
// ------------------------------------------------------------
void _scheduleRetry(CollectionSource source) {
_retryTimer?.cancel();
_retryTimer = Timer(const Duration(seconds: 30), () async {
final s = await RemoteSettings.load();
if (!s.enabled) return;
if (_retryStartTime != null) {
final elapsed = DateTime.now().difference(_retryStartTime!);
if (elapsed > const Duration(minutes: 5)) {
debugPrint('[remote] retry timeout -> disattivo remote');
final remotes = source.allEntries.where((e) => e.origin == RemoteOrigin.value).toSet();
if (remotes.isNotEmpty) source.removeEntriesFromMemory(remotes);
final upd = RemoteSettings(
enabled: false,
baseUrl: s.baseUrl,
indexPath: s.indexPath,
email: s.email,
password: s.password,
wsUrl: s.wsUrl,
);
await upd.save();
RemoteSyncBus.instance.setDisabled();
_notifyRemoteVisibilityChanged(source);
_forceCollectionRefresh(source);
await _stopWs(source);
return;
}
}
debugPrint('[remote] retry ping…');
final auth = (s.email.isNotEmpty && s.password.isNotEmpty)
? RemoteAuth(baseUrl: s.baseUrl, email: s.email, password: s.password)
: null;
final retryClient = RemoteJsonClient(s.baseUrl, s.indexPath, auth: auth);
try {
await retryClient.ping().timeout(const Duration(seconds: 3));
debugPrint('[remote] retry OK -> riprendo sync');
_retryTimer = null;
_retryStartTime = null;
unawaited(fullSync(source: source, showOverlay: false));
} catch (_) {
debugPrint('[remote] retry fallito');
_scheduleRetry(source);
}
});
}
Future<void> onResume(CollectionSource source) async {
final s = await RemoteSettings.load();
if (!s.enabled) return;
await _ensureRealtimeStack(source);
unawaited(_startWsAlreadyEnsured());
_requestProgressiveSync(source: source, delay: const Duration(milliseconds: 200));
}
}