157 lines
3.8 KiB
Dart
157 lines
3.8 KiB
Dart
import 'dart:async';
|
|
import 'dart:convert';
|
|
|
|
import 'package:flutter/foundation.dart';
|
|
import 'package:web_socket_channel/web_socket_channel.dart';
|
|
|
|
import 'remote_state_store.dart';
|
|
|
|
typedef WsEventHandler = Future<void> Function(Map<String, dynamic> msg);
|
|
|
|
class RemoteWsClient {
|
|
RemoteWsClient({
|
|
required this.wsUrl,
|
|
required this.store,
|
|
required this.deduper,
|
|
required this.onEvent,
|
|
required this.onNeedRecoverySync,
|
|
});
|
|
|
|
final String wsUrl; // es: wss://prova-ws.patachina.it
|
|
final RemoteStateStore store;
|
|
final EventRingDeduper deduper;
|
|
|
|
final WsEventHandler onEvent;
|
|
final VoidCallback onNeedRecoverySync;
|
|
|
|
WebSocketChannel? _ch;
|
|
StreamSubscription? _sub;
|
|
bool _connecting = false;
|
|
|
|
bool get isConnected => _ch != null;
|
|
|
|
Future<void> connect({required String token}) async {
|
|
if (_connecting) return;
|
|
_connecting = true;
|
|
|
|
try {
|
|
// ✅ se esiste già una connessione, chiudila prima
|
|
await close();
|
|
|
|
await deduper.init();
|
|
final sessionId = await store.getOrCreateSessionId();
|
|
|
|
final uri = Uri.parse(wsUrl);
|
|
_ch = WebSocketChannel.connect(uri);
|
|
|
|
_sub = _ch!.stream.listen(
|
|
(raw) async {
|
|
final msg = _decode(raw);
|
|
if (msg == null) return;
|
|
|
|
final type = msg['type'];
|
|
|
|
// ✅ auth_error esplicito dal server
|
|
if (type == 'auth_error') {
|
|
debugPrint('[remote][ws] auth_error=${msg['error']}');
|
|
onNeedRecoverySync();
|
|
await close();
|
|
return;
|
|
}
|
|
|
|
if (type == 'auth_ok') {
|
|
// se server chiede full/progressive sync
|
|
if (msg['need_full_sync'] == true) onNeedRecoverySync();
|
|
return;
|
|
}
|
|
|
|
if (type == 'ping') {
|
|
_send({'type': 'pong'});
|
|
return;
|
|
}
|
|
|
|
final eventId = msg['event_id']?.toString();
|
|
|
|
// dedupe -> ACK e stop
|
|
if (eventId != null && deduper.has(eventId)) {
|
|
_send({'type': 'ack', 'event_id': eventId});
|
|
return;
|
|
}
|
|
|
|
try {
|
|
await onEvent(msg);
|
|
} catch (e) {
|
|
debugPrint('[remote][ws] onEvent error=$e');
|
|
onNeedRecoverySync();
|
|
} finally {
|
|
// ACK sempre, per evitare retry storm e need_full_sync
|
|
if (eventId != null) {
|
|
await deduper.mark(eventId);
|
|
_send({'type': 'ack', 'event_id': eventId});
|
|
}
|
|
}
|
|
},
|
|
onDone: () {
|
|
debugPrint('[remote][ws] stream closed');
|
|
_cleanup();
|
|
// opzionale: recovery sync se cade la connessione
|
|
onNeedRecoverySync();
|
|
},
|
|
onError: (e) {
|
|
debugPrint('[remote][ws] stream error=$e');
|
|
_cleanup();
|
|
onNeedRecoverySync();
|
|
},
|
|
cancelOnError: false,
|
|
);
|
|
|
|
// auth handshake (come server.js)
|
|
_send({'type': 'auth', 'token': token, 'session_id': sessionId});
|
|
} finally {
|
|
// ✅ importantissimo: mai lasciare _connecting=true
|
|
_connecting = false;
|
|
}
|
|
}
|
|
|
|
Future<void> close() async {
|
|
try {
|
|
// ✅ prima chiudi sink, poi cleanup
|
|
await _sub?.cancel();
|
|
_sub = null;
|
|
|
|
final ch = _ch;
|
|
_ch = null;
|
|
|
|
if (ch != null) {
|
|
await ch.sink.close();
|
|
}
|
|
} catch (_) {
|
|
// ignore
|
|
} finally {
|
|
_cleanup();
|
|
}
|
|
}
|
|
|
|
void _send(Map<String, dynamic> obj) {
|
|
try {
|
|
_ch?.sink.add(jsonEncode(obj));
|
|
} catch (_) {}
|
|
}
|
|
|
|
Map<String, dynamic>? _decode(dynamic raw) {
|
|
try {
|
|
final s = raw is String ? raw : raw.toString();
|
|
final v = jsonDecode(s);
|
|
return v is Map<String, dynamic> ? v : null;
|
|
} catch (_) {
|
|
return null;
|
|
}
|
|
}
|
|
|
|
void _cleanup() {
|
|
_sub?.cancel();
|
|
_sub = null;
|
|
_ch = null;
|
|
_connecting = false;
|
|
}
|
|
}
|