// ws-server.js require("dotenv").config(); const WebSocket = require("ws"); const jwt = require("jsonwebtoken"); const { randomUUID } = require("crypto"); const dbWs = require("./db/dbWs"); const PORT = process.env.WS_PORT || 4002; const HOST = process.env.WS_HOST || "0.0.0.0"; const JWT_SECRET = process.env.JWT_SECRET; if (!JWT_SECRET) { console.error("❌ ERRORE: JWT_SECRET non definito nel .env"); process.exit(1); } const wss = new WebSocket.Server({ port: PORT, host: HOST }); console.log(`🚀 WebSocket server attivo su ws://${HOST}:${PORT}`); // Mappa session_id → ws (in memoria) const wsBySession = new Map(); // ===================================================== // COSTANTI (coerenti con sync.js) // ===================================================== // finestra WS “dormant”: 2 minuti const WS_DORMANT_MS = 120000; // retry tick const RETRY_INTERVAL_MS = 3000; // massimo retry: 40 * 3s = 120s = 2 minuti const MAX_RETRIES = 40; // limita lavoro per tick const MAX_RETRY_BATCH = 500; // ===================================================== // FUNZIONE DI INVIO CON LOG // ===================================================== function sendToClient(ws, msg) { const json = JSON.stringify(msg); console.log(`📤 [WS SEND] → a ${ws.user || "??"}: ${json}`); ws.send(json); } // ===================================================== // CONNESSIONE CLIENT // ===================================================== wss.on("connection", (ws) => { console.log("🔌 Nuovo client connesso"); ws.authenticated = false; ws.user = null; ws.session_id = null; ws.on("message", async (msg) => { console.log("📩 [WS RAW MESSAGE]:", msg.toString()); let data; try { data = JSON.parse(msg.toString()); } catch (err) { console.error("❌ Errore parsing JSON:", err); return; } // ----------------------------- // AUTENTICAZIONE JWT // ----------------------------- if (data.type === "auth") { try { const payload = jwt.verify(data.token, JWT_SECRET); const user = payload.name; const session_id = data.session_id || randomUUID(); ws.user = user; ws.session_id = session_id; ws.authenticated = true; wsBySession.set(session_id, ws); // Recupera o crea sessione const existing = await dbWs.getSession(session_id); if (!existing) { await dbWs.createSession(session_id, user); } else { // “touch” last_ack all’auth (come segnale di presenza) await dbWs.updateSessionAck(session_id); } const sessionRow = await dbWs.getSession(session_id); // ✅ invio auth_ok con need_full_sync, ma NON lo azzero qui sendToClient(ws, { type: "auth_ok", user, session_id, need_full_sync: !!sessionRow.need_full_sync, }); console.log(`🔐 Client autenticato: ${user} (session=${session_id})`); } catch (err) { sendToClient(ws, { type: "auth_error", error: err.message, }); ws.close(); } return; } // ----------------------------- // MESSAGGI NON AUTORIZZATI // ----------------------------- if (!ws.authenticated) { sendToClient(ws, { type: "error", message: "Not authenticated", }); return; } // ----------------------------- // ACK evento (reliable delivery) // ----------------------------- if (data.type === "ack" && data.event_id) { await dbWs.deletePendingEvent(data.event_id); await dbWs.updateSessionAck(ws.session_id); return; } // ----------------------------- // PONG (keepalive) // ----------------------------- if (data.type === "pong") { await dbWs.updateSessionAck(ws.session_id); return; } // ----------------------------- // RECOVERY DONE (client) // Il client invia questo messaggio DOPO aver completato progressive/full // quando ha ricevuto need_full_sync=true. // ----------------------------- if (data.type === "recovery_done") { await dbWs.clearNeedFullSync(ws.session_id); await dbWs.updateSessionAck(ws.session_id); console.log(`✅ recovery_done ricevuto: session=${ws.session_id} user=${ws.user}`); return; } // ----------------------------- // ALTRI MESSAGGI DAL CLIENT // ----------------------------- console.log(`📨 Messaggio da ${ws.user}:`, data); }); ws.on("close", () => { console.log(`❌ Client disconnesso: ${ws.user || "sconosciuto"}`); if (ws.session_id) { wsBySession.delete(ws.session_id); } }); }); // ===================================================== // BROADCAST AFFIDABILE PER UTENTE (event_id + pending) // ===================================================== wss.broadcastToUserReliable = async function (user, payload) { const sessions = await dbWs.getSessionsByUser(user); if (!sessions || sessions.length === 0) return; console.log(`📣 [WS BROADCAST USER=${user}] → payload=${JSON.stringify(payload)}`); for (const s of sessions) { const ws = wsBySession.get(s.session_id); // event_id diverso per ogni sessione const event_id = randomUUID(); const msg = { ...payload, event_id }; const json = JSON.stringify(msg); if (ws && ws.readyState === WebSocket.OPEN && ws.authenticated) { console.log(` ↳ inviato a session=${s.session_id}`); ws.send(json); // salva pending event per retry/ack await dbWs.addPendingEvent(event_id, s.session_id, msg); } else { // sessione non attiva: NON perdiamo l’evento subito, // ma qui segnaliamo che al prossimo reconnect potrebbe servire recovery await dbWs.setSessionNeedFullSync(s.session_id, true); // NB: l’evento verrà gestito dal retry loop se era già pending, // oppure verrà “coperto” dalla progressive/full successiva. } } }; // Mantieni compatibilità wss.broadcastToUser = wss.broadcastToUserReliable; // ===================================================== // BROADCAST PER ADMIN // ===================================================== wss.broadcastToAdmins = function (msg) { const json = JSON.stringify(msg); console.log(`📣 [WS BROADCAST ADMINS] → ${json}`); for (const client of wss.clients) { if ( client.readyState === WebSocket.OPEN && client.authenticated && client.user === "Admin" ) { console.log(` ↳ inviato a Admin`); client.send(json); } } }; // ===================================================== // RETRY + TIMEOUT (2 minuti) // ===================================================== setInterval(async () => { const pending = await dbWs.getPendingEventsOlderThan(RETRY_INTERVAL_MS, MAX_RETRY_BATCH); for (const ev of pending) { const ws = wsBySession.get(ev.session_id); // se troppe retry -> consideriamo sessione “dormiente” e passiamo a recovery if (ev.retries >= MAX_RETRIES) { console.warn(`⏱️ Timeout evento: ${ev.event_id} session=${ev.session_id}`); // marca recovery await dbWs.setSessionNeedFullSync(ev.session_id, true); // elimina questo pending (evita accumulo infinito) await dbWs.deletePendingEvent(ev.event_id); // non cancelliamo la sessione: la recovery avverrà al reconnect continue; } // retry se WS attivo if (ws && ws.readyState === WebSocket.OPEN && ws.authenticated) { console.log(`🔁 Retry event_id=${ev.event_id} session=${ev.session_id}`); // payload in DB è TEXT JSON (stringa) const payload = typeof ev.payload === "string" ? ev.payload : JSON.stringify(ev.payload); ws.send(payload); await dbWs.updatePendingEventRetry(ev.event_id); continue; } // WS non attivo: applichiamo la logica “entro 2 minuti via WS” const sessionRow = await dbWs.getSession(ev.session_id); const lastAck = sessionRow?.last_ack || 0; const dormant = !lastAck || (Date.now() - lastAck) > WS_DORMANT_MS; if (dormant) { // oltre finestra: passa a recovery await dbWs.setSessionNeedFullSync(ev.session_id, true); await dbWs.deletePendingEvent(ev.event_id); } else { // entro finestra: NON cancellare pending, // lo consegneremo quando il client si riconnette (retry loop continuerà a tentare) // opzionale: nessuna azione } } }, RETRY_INTERVAL_MS); module.exports = wss;