import 'dart:async'; import 'dart:convert'; import 'package:flutter/foundation.dart'; import 'package:web_socket_channel/web_socket_channel.dart'; import 'package:web_socket_channel/io.dart'; import 'remote_state_store.dart'; typedef WsEventHandler = Future Function(Map msg); class RemoteWsClient { RemoteWsClient({ required this.wsUrl, required this.store, required this.deduper, required this.onEvent, required this.onNeedRecoverySync, this.autoReconnect = true, this.reconnectDelay = const Duration(milliseconds: 1500), this.maxReconnectDelay = const Duration(seconds: 30), this.dormantWindow = const Duration(minutes: 2), // come sync.js WS_DORMANT_MS this.pingInterval = const Duration(seconds: 25), // keepalive mobile this.needFullSyncDelay = const Duration(milliseconds: 800), // come sync.js WS_NEED_FULLSYNC_DELAY_MS }); final String wsUrl; final RemoteStateStore store; final EventRingDeduper deduper; /// Deve ritornare true SOLO quando l’evento è applicato con successo /// (o quando hai schedulato un lavoro "durable" che completerai sicuramente). final WsEventHandler onEvent; /// Trigger per fare una sync di recovery (progressive o full) final VoidCallback onNeedRecoverySync; final bool autoReconnect; final Duration reconnectDelay; final Duration maxReconnectDelay; final Duration dormantWindow; final Duration pingInterval; final Duration needFullSyncDelay; WebSocketChannel? _ch; StreamSubscription? _sub; bool _connecting = false; bool _closing = false; Timer? _reconnectTimer; Duration _currentReconnectDelay = Duration.zero; String? _lastToken; // "last seen" come sync.js (persistito in RemoteStateStore) int _lastSeenMs = 0; // stato handshake bool _authed = false; // server ha richiesto recovery (need_full_sync=true) e aspetta "recovery_done" bool _needRecoveryDoneAck = false; // debounce recovery (evita storm) Timer? _recoveryDebounce; bool _recoveryScheduled = false; bool get isConnected => _ch != null && _authed; Future connect({required String token}) async { _lastToken = token; // evita doppia connect if (_connecting) return; if (isConnected) return; _connecting = true; _closing = false; try { // chiudi connessioni precedenti await close(); await deduper.init(); final sessionId = await store.getOrCreateSessionId(); // carica lastSeen persistito (se c’è) _lastSeenMs = await store.getWsLastSeenMs(); // se era dormiente > 2min prima ancora di connettere, reset session id (come sync.js) final now = DateTime.now().millisecondsSinceEpoch; final dormant = _lastSeenMs != 0 && (now - _lastSeenMs) > dormantWindow.inMilliseconds; if (dormant) { debugPrint('[remote][ws] dormant (> ${dormantWindow.inSeconds}s) before connect -> reset session_id'); await store.resetSessionId(); await store.clearWsLastSeenMs(); } // nuova sessionId dopo reset (se necessario) final finalSessionId = dormant ? await store.getOrCreateSessionId() : sessionId; final uri = Uri.parse(wsUrl); // ✅ IOWebSocketChannel con pingInterval (mobile) _ch = IOWebSocketChannel.connect( uri, pingInterval: pingInterval, ); _authed = false; _needRecoveryDoneAck = false; // reset backoff all’apertura _currentReconnectDelay = reconnectDelay; _sub = _ch!.stream.listen( (raw) async { _touchLastSeen(); final msg = _decode(raw); if (msg == null) return; final type = msg['type']?.toString(); // auth_error dal server if (type == 'auth_error') { debugPrint('[remote][ws] auth_error=${msg['error']}'); _scheduleRecovery(); await close(); return; } // auth_ok: se server chiede recovery if (type == 'auth_ok') { _authed = true; final needFull = msg['need_full_sync'] == true; debugPrint('[remote][ws] auth_ok user=${msg['user']} session=${msg['session_id']} need_full_sync=$needFull'); if (needFull) { _needRecoveryDoneAck = true; // come sync.js: recovery ritardata _scheduleRecovery(delay: needFullSyncDelay); } return; } // ping/pong if (type == 'ping') { _send({'type': 'pong'}); return; } final eventId = msg['event_id']?.toString(); // dedupe -> ACK e stop if (eventId != null && deduper.has(eventId)) { _send({'type': 'ack', 'event_id': eventId}); return; } // Gestione evento: ACK SOLO se handled=true bool handled = false; try { handled = await onEvent(msg); } catch (e, st) { debugPrint('[remote][ws] onEvent error=$e\n$st'); handled = false; // se fallisce, chiedi recovery ma NON ACK _scheduleRecovery(); } if (eventId != null && handled) { await deduper.mark(eventId); _send({'type': 'ack', 'event_id': eventId}); } else { // NO ACK: server ritenterà (reliable delivery) } }, onDone: () { debugPrint('[remote][ws] stream closed'); _cleanupChannelOnly(); if (!_closing) { _scheduleRecovery(); _scheduleReconnect(); } }, onError: (e) { debugPrint('[remote][ws] stream error=$e'); _cleanupChannelOnly(); if (!_closing) { _scheduleRecovery(); _scheduleReconnect(); } }, cancelOnError: false, ); // auth handshake _send({'type': 'auth', 'token': token, 'session_id': finalSessionId}); } finally { _connecting = false; } } Future close() async { _closing = true; _reconnectTimer?.cancel(); _reconnectTimer = null; _recoveryDebounce?.cancel(); _recoveryDebounce = null; _recoveryScheduled = false; try { await _sub?.cancel(); _sub = null; final ch = _ch; _ch = null; _authed = false; if (ch != null) { await ch.sink.close(); } } catch (_) { // ignore } finally { _cleanupAll(); } } /// ✅ Da chiamare quando hai completato una recovery richiesta dal server (need_full_sync=true). /// Invia "recovery_done" una sola volta per sessione (finché non arriva un nuovo need_full_sync). void notifyRecoveryCompleted() { if (!_needRecoveryDoneAck) return; if (!isConnected) return; _needRecoveryDoneAck = false; _send({'type': 'recovery_done'}); } // -------------------------- // reconnect logic (sync.js-like) // -------------------------- void _scheduleReconnect() { if (!autoReconnect) return; // ❗ evita di programmare reconnect se già connesso if (isConnected) return; // ❗ evita di programmare reconnect se già in fase di connessione if (_connecting) return; // ❗ evita doppio timer if (_reconnectTimer != null) return; final token = _lastToken; if (token == null || token.isEmpty) return; final delay = _currentReconnectDelay == Duration.zero ? reconnectDelay : _currentReconnectDelay; _currentReconnectDelay = _nextDelay(delay); debugPrint('[remote][ws] scheduling reconnect in ${delay.inMilliseconds}ms'); _reconnectTimer = Timer(delay, () async { _reconnectTimer = null; // ❗ doppia sicurezza: non riconnettere se già connesso if (_closing) return; if (isConnected) return; if (_connecting) return; try { await connect(token: token); } catch (e, st) { debugPrint('[remote][ws] reconnect failed: $e'); _cleanupChannelOnly(); _scheduleReconnect(); } }); } Duration _nextDelay(Duration current) { final nextMs = (current.inMilliseconds * 2).clamp( reconnectDelay.inMilliseconds, maxReconnectDelay.inMilliseconds, ); return Duration(milliseconds: nextMs); } // -------------------------- // recovery debounce // -------------------------- void _scheduleRecovery({Duration delay = const Duration(milliseconds: 250)}) { if (_recoveryScheduled) return; _recoveryScheduled = true; _recoveryDebounce?.cancel(); _recoveryDebounce = Timer(delay, () { _recoveryScheduled = false; try { onNeedRecoverySync(); } catch (_) {} }); } // -------------------------- // utils // -------------------------- void _touchLastSeen() { _lastSeenMs = DateTime.now().millisecondsSinceEpoch; unawaited(store.setWsLastSeenMs(_lastSeenMs)); } void _send(Map obj) { try { _ch?.sink.add(jsonEncode(obj)); } catch (_) {} } Map? _decode(dynamic raw) { try { final s = raw is String ? raw : raw.toString(); final v = jsonDecode(s); return v is Map ? v : null; } catch (_) { return null; } } void _cleanupChannelOnly() { _sub?.cancel(); _sub = null; _ch = null; _authed = false; _connecting = false; } void _cleanupAll() { _cleanupChannelOnly(); _closing = false; } }