// ws-server.js require("dotenv").config(); const WebSocket = require("ws"); const jwt = require("jsonwebtoken"); const { randomUUID } = require("crypto"); const dbWs = require("./db/dbWs"); const PORT = process.env.WS_PORT || 4002; const HOST = process.env.WS_HOST || "0.0.0.0"; const JWT_SECRET = process.env.JWT_SECRET; if (!JWT_SECRET) { console.error("โŒ ERRORE: JWT_SECRET non definito nel .env"); process.exit(1); } const wss = new WebSocket.Server({ port: PORT, host: HOST }); console.log(`๐Ÿš€ WebSocket server attivo su ws://${HOST}:${PORT}`); // ===================================================== // PULIZIA SESSIONI E PENDING AL RIAVVIO SERVER // ===================================================== (async () => { try { console.log("๐Ÿงน [WS CLEANUP] Pulizia sessioni e pending al riavvio server..."); await dbWs.deleteAllSessions(); await dbWs.deleteAllPendingEvents(); console.log("๐Ÿงน [WS CLEANUP] Completata: tutte le sessioni e pending rimossi"); } catch (err) { console.error("โŒ [WS CLEANUP ERROR]:", err); } })(); // Mappa session_id โ†’ ws const wsBySession = new Map(); // ===================================================== // COSTANTI // ===================================================== const WS_DORMANT_MS = 2 * 60 * 1000; // 2 minuti const WS_RETENTION_MS = 30 * 24 * 60 * 60 * 1000; // 30 giorni const RETRY_INTERVAL_MS = 3000; // OK cosรฌ const MAX_RETRIES = 40; // OK cosรฌ const MAX_RETRY_BATCH = 500; // OK cosรฌ // ===================================================== // HELPER: calcolo dormancy sessione // ===================================================== async function getSessionDormancyMs(session_id) { const s = await dbWs.getSession(session_id); if (!s || !s.last_ack) return Infinity; return Date.now() - s.last_ack; } // ===================================================== // FUNZIONE DI INVIO CON LOG // ===================================================== function sendToClient(ws, msg) { const json = JSON.stringify(msg); console.log(`๐Ÿ“ค [WS SEND] at=${new Date().toISOString()} โ†’ user=${ws.user || "??"}: ${json}`); ws.send(json); } // ===================================================== // CONNESSIONE CLIENT // ===================================================== wss.on("connection", (ws) => { console.log(`๐Ÿ”Œ [WS CONNECT] Nuovo client at=${new Date().toISOString()}`); ws.authenticated = false; ws.user = null; ws.session_id = null; ws.on("message", async (msg) => { console.log(`๐Ÿ“ฉ [WS RAW] at=${new Date().toISOString()} msg=${msg.toString()}`); let data; try { data = JSON.parse(msg.toString()); } catch (err) { console.error("โŒ Errore parsing JSON:", err); return; } // ----------------------------- // AUTENTICAZIONE JWT // ----------------------------- if (data.type === "auth") { console.log(`๐Ÿ” [WS AUTH] Richiesta auth at=${new Date().toISOString()}`); try { const payload = jwt.verify(data.token, JWT_SECRET); const user = payload.name; const session_id = data.session_id || randomUUID(); ws.user = user; ws.session_id = session_id; ws.authenticated = true; wsBySession.set(session_id, ws); console.log(`๐Ÿ” [WS AUTH OK] user=${user} session=${session_id}`); const existing = await dbWs.getSession(session_id); if (!existing) { console.log(`๐Ÿ†• [WS SESSION] Creo nuova sessione=${session_id}`); await dbWs.createSession(session_id, user); } else { console.log(`โ™ป๏ธ [WS SESSION] Sessione esistente, aggiorno last_ack`); await dbWs.updateSessionAck(session_id); } const sessionRow = await dbWs.getSession(session_id); sendToClient(ws, { type: "auth_ok", user, session_id, need_full_sync: !!sessionRow.need_full_sync, }); } catch (err) { console.error("โŒ [WS AUTH ERROR]:", err); sendToClient(ws, { type: "auth_error", error: err.message }); ws.close(); } return; } // ----------------------------- // MESSAGGI NON AUTORIZZATI // ----------------------------- if (!ws.authenticated) { sendToClient(ws, { type: "error", message: "Not authenticated", }); return; } // ----------------------------- // ACK evento // ----------------------------- if (data.type === "ack" && data.event_id) { console.log(`๐ŸŸข [WS ACK] user=${ws.user} session=${ws.session_id} event_id=${data.event_id}`); await dbWs.deletePendingEvent(data.event_id); await dbWs.updateSessionAck(ws.session_id); return; } // ----------------------------- // PONG (keepalive) // ----------------------------- if (data.type === "pong") { console.log(`๐Ÿ“ [WS PONG] user=${ws.user} session=${ws.session_id}`); await dbWs.updateSessionAck(ws.session_id); return; } // ----------------------------- // RECOVERY DONE // ----------------------------- if (data.type === "recovery_done") { console.log(`๐Ÿ”„ [WS RECOVERY DONE] user=${ws.user} session=${ws.session_id}`); await dbWs.clearNeedFullSync(ws.session_id); await dbWs.updateSessionAck(ws.session_id); return; } // ----------------------------- // ALTRI MESSAGGI // ----------------------------- console.log(`๐Ÿ“จ [WS MESSAGE] user=${ws.user} data=${JSON.stringify(data)}`); }); ws.on("close", () => { console.log(`โŒ [WS DISCONNECT] user=${ws.user} session=${ws.session_id} at=${new Date().toISOString()}`); if (ws.session_id) wsBySession.delete(ws.session_id); }); }); // ===================================================== // BROADCAST AFFIDABILE PER UTENTE // ===================================================== wss.broadcastToUserReliable = async function (user, payload) { console.log(`๐Ÿ“ฃ [WS BROADCAST] user=${user} payload=${JSON.stringify(payload)} at=${new Date().toISOString()}`); const sessions = await dbWs.getSessionsByUser(user); if (!sessions || sessions.length === 0) { console.log(`โš ๏ธ [WS BROADCAST] Nessuna sessione attiva per user=${user}`); return; } for (const s of sessions) { const ws = wsBySession.get(s.session_id); const event_id = randomUUID(); const msg = { ...payload, event_id }; const json = JSON.stringify(msg); const dormancy = await getSessionDormancyMs(s.session_id); if (ws && ws.readyState === WebSocket.OPEN && ws.authenticated) { console.log(` ๐Ÿ“ค consegnato a session=${s.session_id}`); ws.send(json); await dbWs.addPendingEvent(event_id, s.session_id, msg); continue; } if (dormancy < WS_DORMANT_MS) { console.log(` ๐ŸŸฆ session=${s.session_id} offline <2min โ†’ salvo in pending`); await dbWs.addPendingEvent(event_id, s.session_id, msg); continue; } if (dormancy < WS_RETENTION_MS) { console.log(` ๐ŸŸจ session=${s.session_id} offline >2min โ†’ need_full_sync`); await dbWs.setSessionNeedFullSync(s.session_id, true); continue; } console.log(` ๐ŸŸฅ session=${s.session_id} offline >30gg โ†’ need_full_sync (full)`); await dbWs.setSessionNeedFullSync(s.session_id, true); } }; wss.broadcastToUser = wss.broadcastToUserReliable; // ===================================================== // BROADCAST PER ADMIN // ===================================================== wss.broadcastToAdmins = function (msg) { const json = JSON.stringify(msg); console.log(`๐Ÿ“ฃ [WS BROADCAST ADMINS] โ†’ ${json}`); for (const client of wss.clients) { if ( client.readyState === WebSocket.OPEN && client.authenticated && client.user === "Admin" ) { console.log(` โ†ณ inviato a Admin`); client.send(json); } } }; // ===================================================== // RETRY LOOP // ===================================================== setInterval(async () => { console.log(`โฑ๏ธ [WS RETRY LOOP] tick at=${new Date().toISOString()}`); const pending = await dbWs.getPendingEventsOlderThan(RETRY_INTERVAL_MS, MAX_RETRY_BATCH); for (const ev of pending) { const ws = wsBySession.get(ev.session_id); if (ev.retries >= MAX_RETRIES) { console.warn(`โฑ๏ธ [WS TIMEOUT] event_id=${ev.event_id} session=${ev.session_id} โ†’ recovery`); await dbWs.setSessionNeedFullSync(ev.session_id, true); await dbWs.deletePendingEvent(ev.event_id); continue; } if (ws && ws.readyState === WebSocket.OPEN && ws.authenticated) { console.log(`๐Ÿ” [WS RETRY] event_id=${ev.event_id} retries=${ev.retries} session=${ev.session_id}`); const payload = typeof ev.payload === "string" ? ev.payload : JSON.stringify(ev.payload); ws.send(payload); await dbWs.updatePendingEventRetry(ev.event_id); continue; } const sessionRow = await dbWs.getSession(ev.session_id); const lastAck = sessionRow?.last_ack || 0; const dormant = !lastAck || (Date.now() - lastAck) > WS_DORMANT_MS; const dormancy = await getSessionDormancyMs(ev.session_id); if (ws && ws.readyState === WebSocket.OPEN && ws.authenticated) { console.log(`๐Ÿ” [WS RETRY] event_id=${ev.event_id} retries=${ev.retries} session=${ev.session_id}`); const payload = typeof ev.payload === "string" ? ev.payload : JSON.stringify(ev.payload); ws.send(payload); await dbWs.updatePendingEventRetry(ev.event_id); continue; } if (dormancy < WS_DORMANT_MS) { console.log(`๐ŸŸฆ [WS WAIT] session=${ev.session_id} offline <2min โ†’ pending conservato`); continue; } if (dormancy < WS_RETENTION_MS) { console.log(`๐ŸŸจ [WS DORMANT] session=${ev.session_id} offline >2min โ†’ need_full_sync`); await dbWs.setSessionNeedFullSync(ev.session_id, true); await dbWs.deletePendingEvent(ev.event_id); continue; } console.log(`๐ŸŸฅ [WS EXPIRED] session=${ev.session_id} offline >30gg โ†’ need_full_sync`); await dbWs.setSessionNeedFullSync(ev.session_id, true); await dbWs.deletePendingEvent(ev.event_id); } }, RETRY_INTERVAL_MS); module.exports = wss;