aves_mio22/lib/remote.ok/remote_ws_client.dart.ok
2026-04-18 20:05:02 +02:00

157 lines
3.8 KiB
Text

import 'dart:async';
import 'dart:convert';
import 'package:flutter/foundation.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
import 'remote_state_store.dart';
typedef WsEventHandler = Future<void> Function(Map<String, dynamic> msg);
class RemoteWsClient {
RemoteWsClient({
required this.wsUrl,
required this.store,
required this.deduper,
required this.onEvent,
required this.onNeedRecoverySync,
});
final String wsUrl; // es: wss://prova-ws.patachina.it
final RemoteStateStore store;
final EventRingDeduper deduper;
final WsEventHandler onEvent;
final VoidCallback onNeedRecoverySync;
WebSocketChannel? _ch;
StreamSubscription? _sub;
bool _connecting = false;
bool get isConnected => _ch != null;
Future<void> connect({required String token}) async {
if (_connecting) return;
_connecting = true;
try {
// ✅ se esiste già una connessione, chiudila prima
await close();
await deduper.init();
final sessionId = await store.getOrCreateSessionId();
final uri = Uri.parse(wsUrl);
_ch = WebSocketChannel.connect(uri);
_sub = _ch!.stream.listen(
(raw) async {
final msg = _decode(raw);
if (msg == null) return;
final type = msg['type'];
// ✅ auth_error esplicito dal server
if (type == 'auth_error') {
debugPrint('[remote][ws] auth_error=${msg['error']}');
onNeedRecoverySync();
await close();
return;
}
if (type == 'auth_ok') {
// se server chiede full/progressive sync
if (msg['need_full_sync'] == true) onNeedRecoverySync();
return;
}
if (type == 'ping') {
_send({'type': 'pong'});
return;
}
final eventId = msg['event_id']?.toString();
// dedupe -> ACK e stop
if (eventId != null && deduper.has(eventId)) {
_send({'type': 'ack', 'event_id': eventId});
return;
}
try {
await onEvent(msg);
} catch (e) {
debugPrint('[remote][ws] onEvent error=$e');
onNeedRecoverySync();
} finally {
// ACK sempre, per evitare retry storm e need_full_sync
if (eventId != null) {
await deduper.mark(eventId);
_send({'type': 'ack', 'event_id': eventId});
}
}
},
onDone: () {
debugPrint('[remote][ws] stream closed');
_cleanup();
// opzionale: recovery sync se cade la connessione
onNeedRecoverySync();
},
onError: (e) {
debugPrint('[remote][ws] stream error=$e');
_cleanup();
onNeedRecoverySync();
},
cancelOnError: false,
);
// auth handshake (come server.js)
_send({'type': 'auth', 'token': token, 'session_id': sessionId});
} finally {
// ✅ importantissimo: mai lasciare _connecting=true
_connecting = false;
}
}
Future<void> close() async {
try {
// ✅ prima chiudi sink, poi cleanup
await _sub?.cancel();
_sub = null;
final ch = _ch;
_ch = null;
if (ch != null) {
await ch.sink.close();
}
} catch (_) {
// ignore
} finally {
_cleanup();
}
}
void _send(Map<String, dynamic> obj) {
try {
_ch?.sink.add(jsonEncode(obj));
} catch (_) {}
}
Map<String, dynamic>? _decode(dynamic raw) {
try {
final s = raw is String ? raw : raw.toString();
final v = jsonDecode(s);
return v is Map<String, dynamic> ? v : null;
} catch (_) {
return null;
}
}
void _cleanup() {
_sub?.cancel();
_sub = null;
_ch = null;
_connecting = false;
}
}