aves_mio22/lib/remote/remote_ws_client.dart
2026-04-18 20:05:02 +02:00

351 lines
9.3 KiB
Dart
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import 'dart:async';
import 'dart:convert';
import 'package:flutter/foundation.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:web_socket_channel/io.dart';
import 'remote_state_store.dart';
typedef WsEventHandler = Future<bool> Function(Map<String, dynamic> msg);
class RemoteWsClient {
RemoteWsClient({
required this.wsUrl,
required this.store,
required this.deduper,
required this.onEvent,
required this.onNeedRecoverySync,
this.autoReconnect = true,
this.reconnectDelay = const Duration(milliseconds: 1500),
this.maxReconnectDelay = const Duration(seconds: 30),
this.dormantWindow = const Duration(minutes: 2), // come sync.js WS_DORMANT_MS
this.pingInterval = const Duration(seconds: 25), // keepalive mobile
this.needFullSyncDelay = const Duration(milliseconds: 800), // come sync.js WS_NEED_FULLSYNC_DELAY_MS
});
final String wsUrl;
final RemoteStateStore store;
final EventRingDeduper deduper;
/// Deve ritornare true SOLO quando levento è applicato con successo
/// (o quando hai schedulato un lavoro "durable" che completerai sicuramente).
final WsEventHandler onEvent;
/// Trigger per fare una sync di recovery (progressive o full)
final VoidCallback onNeedRecoverySync;
final bool autoReconnect;
final Duration reconnectDelay;
final Duration maxReconnectDelay;
final Duration dormantWindow;
final Duration pingInterval;
final Duration needFullSyncDelay;
WebSocketChannel? _ch;
StreamSubscription? _sub;
bool _connecting = false;
bool _closing = false;
Timer? _reconnectTimer;
Duration _currentReconnectDelay = Duration.zero;
String? _lastToken;
// "last seen" come sync.js (persistito in RemoteStateStore)
int _lastSeenMs = 0;
// stato handshake
bool _authed = false;
// server ha richiesto recovery (need_full_sync=true) e aspetta "recovery_done"
bool _needRecoveryDoneAck = false;
// debounce recovery (evita storm)
Timer? _recoveryDebounce;
bool _recoveryScheduled = false;
bool get isConnected => _ch != null && _authed;
Future<void> connect({required String token}) async {
_lastToken = token;
// evita doppia connect
if (_connecting) return;
if (isConnected) return;
_connecting = true;
_closing = false;
try {
// chiudi connessioni precedenti
await close();
await deduper.init();
final sessionId = await store.getOrCreateSessionId();
// carica lastSeen persistito (se cè)
_lastSeenMs = await store.getWsLastSeenMs();
// se era dormiente > 2min prima ancora di connettere, reset session id (come sync.js)
final now = DateTime.now().millisecondsSinceEpoch;
final dormant = _lastSeenMs != 0 && (now - _lastSeenMs) > dormantWindow.inMilliseconds;
if (dormant) {
debugPrint('[remote][ws] dormant (> ${dormantWindow.inSeconds}s) before connect -> reset session_id');
await store.resetSessionId();
await store.clearWsLastSeenMs();
}
// nuova sessionId dopo reset (se necessario)
final finalSessionId = dormant ? await store.getOrCreateSessionId() : sessionId;
final uri = Uri.parse(wsUrl);
// ✅ IOWebSocketChannel con pingInterval (mobile)
_ch = IOWebSocketChannel.connect(
uri,
pingInterval: pingInterval,
);
_authed = false;
_needRecoveryDoneAck = false;
// reset backoff allapertura
_currentReconnectDelay = reconnectDelay;
_sub = _ch!.stream.listen(
(raw) async {
_touchLastSeen();
final msg = _decode(raw);
if (msg == null) return;
final type = msg['type']?.toString();
// auth_error dal server
if (type == 'auth_error') {
debugPrint('[remote][ws] auth_error=${msg['error']}');
_scheduleRecovery();
await close();
return;
}
// auth_ok: se server chiede recovery
if (type == 'auth_ok') {
_authed = true;
final needFull = msg['need_full_sync'] == true;
debugPrint('[remote][ws] auth_ok user=${msg['user']} session=${msg['session_id']} need_full_sync=$needFull');
if (needFull) {
_needRecoveryDoneAck = true;
// come sync.js: recovery ritardata
_scheduleRecovery(delay: needFullSyncDelay);
}
return;
}
// ping/pong
if (type == 'ping') {
_send({'type': 'pong'});
return;
}
final eventId = msg['event_id']?.toString();
// dedupe -> ACK e stop
if (eventId != null && deduper.has(eventId)) {
_send({'type': 'ack', 'event_id': eventId});
return;
}
// Gestione evento: ACK SOLO se handled=true
bool handled = false;
try {
handled = await onEvent(msg);
} catch (e, st) {
debugPrint('[remote][ws] onEvent error=$e\n$st');
handled = false;
// se fallisce, chiedi recovery ma NON ACK
_scheduleRecovery();
}
if (eventId != null && handled) {
await deduper.mark(eventId);
_send({'type': 'ack', 'event_id': eventId});
} else {
// NO ACK: server ritenterà (reliable delivery)
}
},
onDone: () {
debugPrint('[remote][ws] stream closed');
_cleanupChannelOnly();
if (!_closing) {
_scheduleRecovery();
_scheduleReconnect();
}
},
onError: (e) {
debugPrint('[remote][ws] stream error=$e');
_cleanupChannelOnly();
if (!_closing) {
_scheduleRecovery();
_scheduleReconnect();
}
},
cancelOnError: false,
);
// auth handshake
_send({'type': 'auth', 'token': token, 'session_id': finalSessionId});
} finally {
_connecting = false;
}
}
Future<void> close() async {
_closing = true;
_reconnectTimer?.cancel();
_reconnectTimer = null;
_recoveryDebounce?.cancel();
_recoveryDebounce = null;
_recoveryScheduled = false;
try {
await _sub?.cancel();
_sub = null;
final ch = _ch;
_ch = null;
_authed = false;
if (ch != null) {
await ch.sink.close();
}
} catch (_) {
// ignore
} finally {
_cleanupAll();
}
}
/// ✅ Da chiamare quando hai completato una recovery richiesta dal server (need_full_sync=true).
/// Invia "recovery_done" una sola volta per sessione (finché non arriva un nuovo need_full_sync).
void notifyRecoveryCompleted() {
if (!_needRecoveryDoneAck) return;
if (!isConnected) return;
_needRecoveryDoneAck = false;
_send({'type': 'recovery_done'});
}
// --------------------------
// reconnect logic (sync.js-like)
// --------------------------
void _scheduleReconnect() {
if (!autoReconnect) return;
// ❗ evita di programmare reconnect se già connesso
if (isConnected) return;
// ❗ evita di programmare reconnect se già in fase di connessione
if (_connecting) return;
// ❗ evita doppio timer
if (_reconnectTimer != null) return;
final token = _lastToken;
if (token == null || token.isEmpty) return;
final delay = _currentReconnectDelay == Duration.zero
? reconnectDelay
: _currentReconnectDelay;
_currentReconnectDelay = _nextDelay(delay);
debugPrint('[remote][ws] scheduling reconnect in ${delay.inMilliseconds}ms');
_reconnectTimer = Timer(delay, () async {
_reconnectTimer = null;
// ❗ doppia sicurezza: non riconnettere se già connesso
if (_closing) return;
if (isConnected) return;
if (_connecting) return;
try {
await connect(token: token);
} catch (e, st) {
debugPrint('[remote][ws] reconnect failed: $e');
_cleanupChannelOnly();
_scheduleReconnect();
}
});
}
Duration _nextDelay(Duration current) {
final nextMs = (current.inMilliseconds * 2).clamp(
reconnectDelay.inMilliseconds,
maxReconnectDelay.inMilliseconds,
);
return Duration(milliseconds: nextMs);
}
// --------------------------
// recovery debounce
// --------------------------
void _scheduleRecovery({Duration delay = const Duration(milliseconds: 250)}) {
if (_recoveryScheduled) return;
_recoveryScheduled = true;
_recoveryDebounce?.cancel();
_recoveryDebounce = Timer(delay, () {
_recoveryScheduled = false;
try {
onNeedRecoverySync();
} catch (_) {}
});
}
// --------------------------
// utils
// --------------------------
void _touchLastSeen() {
_lastSeenMs = DateTime.now().millisecondsSinceEpoch;
unawaited(store.setWsLastSeenMs(_lastSeenMs));
}
void _send(Map<String, dynamic> obj) {
try {
_ch?.sink.add(jsonEncode(obj));
} catch (_) {}
}
Map<String, dynamic>? _decode(dynamic raw) {
try {
final s = raw is String ? raw : raw.toString();
final v = jsonDecode(s);
return v is Map<String, dynamic> ? v : null;
} catch (_) {
return null;
}
}
void _cleanupChannelOnly() {
_sub?.cancel();
_sub = null;
_ch = null;
_authed = false;
_connecting = false;
}
void _cleanupAll() {
_cleanupChannelOnly();
_closing = false;
}
}