import 'dart:async'; import 'dart:convert'; import 'package:flutter/foundation.dart'; import 'package:web_socket_channel/web_socket_channel.dart'; import 'remote_state_store.dart'; typedef WsEventHandler = Future Function(Map msg); class RemoteWsClient { RemoteWsClient({ required this.wsUrl, required this.store, required this.deduper, required this.onEvent, required this.onNeedRecoverySync, }); final String wsUrl; // es: wss://prova-ws.patachina.it final RemoteStateStore store; final EventRingDeduper deduper; final WsEventHandler onEvent; final VoidCallback onNeedRecoverySync; WebSocketChannel? _ch; StreamSubscription? _sub; bool _connecting = false; bool get isConnected => _ch != null; Future connect({required String token}) async { if (_connecting) return; _connecting = true; try { // ✅ se esiste già una connessione, chiudila prima await close(); await deduper.init(); final sessionId = await store.getOrCreateSessionId(); final uri = Uri.parse(wsUrl); _ch = WebSocketChannel.connect(uri); _sub = _ch!.stream.listen( (raw) async { final msg = _decode(raw); if (msg == null) return; final type = msg['type']; // ✅ auth_error esplicito dal server if (type == 'auth_error') { debugPrint('[remote][ws] auth_error=${msg['error']}'); onNeedRecoverySync(); await close(); return; } if (type == 'auth_ok') { // se server chiede full/progressive sync if (msg['need_full_sync'] == true) onNeedRecoverySync(); return; } if (type == 'ping') { _send({'type': 'pong'}); return; } final eventId = msg['event_id']?.toString(); // dedupe -> ACK e stop if (eventId != null && deduper.has(eventId)) { _send({'type': 'ack', 'event_id': eventId}); return; } try { await onEvent(msg); } catch (e) { debugPrint('[remote][ws] onEvent error=$e'); onNeedRecoverySync(); } finally { // ACK sempre, per evitare retry storm e need_full_sync if (eventId != null) { await deduper.mark(eventId); _send({'type': 'ack', 'event_id': eventId}); } } }, onDone: () { debugPrint('[remote][ws] stream closed'); _cleanup(); // opzionale: recovery sync se cade la connessione onNeedRecoverySync(); }, onError: (e) { debugPrint('[remote][ws] stream error=$e'); _cleanup(); onNeedRecoverySync(); }, cancelOnError: false, ); // auth handshake (come server.js) _send({'type': 'auth', 'token': token, 'session_id': sessionId}); } finally { // ✅ importantissimo: mai lasciare _connecting=true _connecting = false; } } Future close() async { try { // ✅ prima chiudi sink, poi cleanup await _sub?.cancel(); _sub = null; final ch = _ch; _ch = null; if (ch != null) { await ch.sink.close(); } } catch (_) { // ignore } finally { _cleanup(); } } void _send(Map obj) { try { _ch?.sink.add(jsonEncode(obj)); } catch (_) {} } Map? _decode(dynamic raw) { try { final s = raw is String ? raw : raw.toString(); final v = jsonDecode(s); return v is Map ? v : null; } catch (_) { return null; } } void _cleanup() { _sub?.cancel(); _sub = null; _ch = null; _connecting = false; } }