317 lines
10 KiB
JavaScript
317 lines
10 KiB
JavaScript
// ws-server.js
|
|
require("dotenv").config();
|
|
const WebSocket = require("ws");
|
|
const jwt = require("jsonwebtoken");
|
|
const { randomUUID } = require("crypto");
|
|
const dbWs = require("./db/dbWs");
|
|
|
|
const PORT = process.env.WS_PORT || 4002;
|
|
const HOST = process.env.WS_HOST || "0.0.0.0";
|
|
const JWT_SECRET = process.env.JWT_SECRET;
|
|
|
|
if (!JWT_SECRET) {
|
|
console.error("❌ ERRORE: JWT_SECRET non definito nel .env");
|
|
process.exit(1);
|
|
}
|
|
|
|
const wss = new WebSocket.Server({ port: PORT, host: HOST });
|
|
console.log(`🚀 WebSocket server attivo su ws://${HOST}:${PORT}`);
|
|
|
|
// =====================================================
|
|
// PULIZIA SESSIONI E PENDING AL RIAVVIO SERVER
|
|
// =====================================================
|
|
(async () => {
|
|
try {
|
|
console.log("🧹 [WS CLEANUP] Pulizia sessioni e pending al riavvio server...");
|
|
|
|
await dbWs.deleteAllSessions();
|
|
await dbWs.deleteAllPendingEvents();
|
|
|
|
console.log("🧹 [WS CLEANUP] Completata: tutte le sessioni e pending rimossi");
|
|
} catch (err) {
|
|
console.error("❌ [WS CLEANUP ERROR]:", err);
|
|
}
|
|
})();
|
|
|
|
|
|
// Mappa session_id → ws
|
|
const wsBySession = new Map();
|
|
|
|
// =====================================================
|
|
// COSTANTI
|
|
// =====================================================
|
|
const WS_DORMANT_MS = 2 * 60 * 1000; // 2 minuti
|
|
const WS_RETENTION_MS = 30 * 24 * 60 * 60 * 1000; // 30 giorni
|
|
|
|
const RETRY_INTERVAL_MS = 3000; // OK così
|
|
const MAX_RETRIES = 40; // OK così
|
|
const MAX_RETRY_BATCH = 500; // OK così
|
|
|
|
|
|
// =====================================================
|
|
// HELPER: calcolo dormancy sessione
|
|
// =====================================================
|
|
async function getSessionDormancyMs(session_id) {
|
|
const s = await dbWs.getSession(session_id);
|
|
if (!s || !s.last_ack) return Infinity;
|
|
return Date.now() - s.last_ack;
|
|
}
|
|
|
|
// =====================================================
|
|
// FUNZIONE DI INVIO CON LOG
|
|
// =====================================================
|
|
function sendToClient(ws, msg) {
|
|
const json = JSON.stringify(msg);
|
|
console.log(`📤 [WS SEND] at=${new Date().toISOString()} → user=${ws.user || "??"}: ${json}`);
|
|
ws.send(json);
|
|
}
|
|
|
|
// =====================================================
|
|
// CONNESSIONE CLIENT
|
|
// =====================================================
|
|
wss.on("connection", (ws) => {
|
|
console.log(`🔌 [WS CONNECT] Nuovo client at=${new Date().toISOString()}`);
|
|
|
|
ws.authenticated = false;
|
|
ws.user = null;
|
|
ws.session_id = null;
|
|
|
|
ws.on("message", async (msg) => {
|
|
console.log(`📩 [WS RAW] at=${new Date().toISOString()} msg=${msg.toString()}`);
|
|
|
|
let data;
|
|
try {
|
|
data = JSON.parse(msg.toString());
|
|
} catch (err) {
|
|
console.error("❌ Errore parsing JSON:", err);
|
|
return;
|
|
}
|
|
|
|
// -----------------------------
|
|
// AUTENTICAZIONE JWT
|
|
// -----------------------------
|
|
if (data.type === "auth") {
|
|
console.log(`🔐 [WS AUTH] Richiesta auth at=${new Date().toISOString()}`);
|
|
|
|
try {
|
|
const payload = jwt.verify(data.token, JWT_SECRET);
|
|
const user = payload.name;
|
|
const session_id = data.session_id || randomUUID();
|
|
|
|
ws.user = user;
|
|
ws.session_id = session_id;
|
|
ws.authenticated = true;
|
|
|
|
wsBySession.set(session_id, ws);
|
|
|
|
console.log(`🔐 [WS AUTH OK] user=${user} session=${session_id}`);
|
|
|
|
const existing = await dbWs.getSession(session_id);
|
|
if (!existing) {
|
|
console.log(`🆕 [WS SESSION] Creo nuova sessione=${session_id}`);
|
|
await dbWs.createSession(session_id, user);
|
|
} else {
|
|
console.log(`♻️ [WS SESSION] Sessione esistente, aggiorno last_ack`);
|
|
await dbWs.updateSessionAck(session_id);
|
|
}
|
|
|
|
const sessionRow = await dbWs.getSession(session_id);
|
|
|
|
sendToClient(ws, {
|
|
type: "auth_ok",
|
|
user,
|
|
session_id,
|
|
need_full_sync: !!sessionRow.need_full_sync,
|
|
});
|
|
|
|
} catch (err) {
|
|
console.error("❌ [WS AUTH ERROR]:", err);
|
|
sendToClient(ws, { type: "auth_error", error: err.message });
|
|
ws.close();
|
|
}
|
|
return;
|
|
}
|
|
|
|
// -----------------------------
|
|
// MESSAGGI NON AUTORIZZATI
|
|
// -----------------------------
|
|
if (!ws.authenticated) {
|
|
sendToClient(ws, {
|
|
type: "error",
|
|
message: "Not authenticated",
|
|
});
|
|
return;
|
|
}
|
|
|
|
// -----------------------------
|
|
// ACK evento
|
|
// -----------------------------
|
|
if (data.type === "ack" && data.event_id) {
|
|
console.log(`🟢 [WS ACK] user=${ws.user} session=${ws.session_id} event_id=${data.event_id}`);
|
|
await dbWs.deletePendingEvent(data.event_id);
|
|
await dbWs.updateSessionAck(ws.session_id);
|
|
return;
|
|
}
|
|
|
|
// -----------------------------
|
|
// PONG (keepalive)
|
|
// -----------------------------
|
|
if (data.type === "pong") {
|
|
console.log(`🏓 [WS PONG] user=${ws.user} session=${ws.session_id}`);
|
|
await dbWs.updateSessionAck(ws.session_id);
|
|
return;
|
|
}
|
|
|
|
// -----------------------------
|
|
// RECOVERY DONE
|
|
// -----------------------------
|
|
if (data.type === "recovery_done") {
|
|
console.log(`🔄 [WS RECOVERY DONE] user=${ws.user} session=${ws.session_id}`);
|
|
await dbWs.clearNeedFullSync(ws.session_id);
|
|
await dbWs.updateSessionAck(ws.session_id);
|
|
return;
|
|
}
|
|
|
|
// -----------------------------
|
|
// ALTRI MESSAGGI
|
|
// -----------------------------
|
|
console.log(`📨 [WS MESSAGE] user=${ws.user} data=${JSON.stringify(data)}`);
|
|
});
|
|
|
|
ws.on("close", () => {
|
|
console.log(`❌ [WS DISCONNECT] user=${ws.user} session=${ws.session_id} at=${new Date().toISOString()}`);
|
|
if (ws.session_id) wsBySession.delete(ws.session_id);
|
|
});
|
|
});
|
|
|
|
// =====================================================
|
|
// BROADCAST AFFIDABILE PER UTENTE
|
|
// =====================================================
|
|
wss.broadcastToUserReliable = async function (user, payload) {
|
|
console.log(`📣 [WS BROADCAST] user=${user} payload=${JSON.stringify(payload)} at=${new Date().toISOString()}`);
|
|
|
|
const sessions = await dbWs.getSessionsByUser(user);
|
|
if (!sessions || sessions.length === 0) {
|
|
console.log(`⚠️ [WS BROADCAST] Nessuna sessione attiva per user=${user}`);
|
|
return;
|
|
}
|
|
|
|
for (const s of sessions) {
|
|
const ws = wsBySession.get(s.session_id);
|
|
|
|
const event_id = randomUUID();
|
|
const msg = { ...payload, event_id };
|
|
const json = JSON.stringify(msg);
|
|
|
|
|
|
const dormancy = await getSessionDormancyMs(s.session_id);
|
|
|
|
if (ws && ws.readyState === WebSocket.OPEN && ws.authenticated) {
|
|
console.log(` 📤 consegnato a session=${s.session_id}`);
|
|
ws.send(json);
|
|
await dbWs.addPendingEvent(event_id, s.session_id, msg);
|
|
continue;
|
|
}
|
|
|
|
if (dormancy < WS_DORMANT_MS) {
|
|
console.log(` 🟦 session=${s.session_id} offline <2min → salvo in pending`);
|
|
await dbWs.addPendingEvent(event_id, s.session_id, msg);
|
|
continue;
|
|
}
|
|
|
|
if (dormancy < WS_RETENTION_MS) {
|
|
console.log(` 🟨 session=${s.session_id} offline >2min → need_full_sync`);
|
|
await dbWs.setSessionNeedFullSync(s.session_id, true);
|
|
continue;
|
|
}
|
|
|
|
console.log(` 🟥 session=${s.session_id} offline >30gg → need_full_sync (full)`);
|
|
await dbWs.setSessionNeedFullSync(s.session_id, true);
|
|
|
|
}
|
|
};
|
|
|
|
wss.broadcastToUser = wss.broadcastToUserReliable;
|
|
|
|
// =====================================================
|
|
// BROADCAST PER ADMIN
|
|
// =====================================================
|
|
wss.broadcastToAdmins = function (msg) {
|
|
const json = JSON.stringify(msg);
|
|
console.log(`📣 [WS BROADCAST ADMINS] → ${json}`);
|
|
|
|
for (const client of wss.clients) {
|
|
if (
|
|
client.readyState === WebSocket.OPEN &&
|
|
client.authenticated &&
|
|
client.user === "Admin"
|
|
) {
|
|
console.log(` ↳ inviato a Admin`);
|
|
client.send(json);
|
|
}
|
|
}
|
|
};
|
|
|
|
// =====================================================
|
|
// RETRY LOOP
|
|
// =====================================================
|
|
setInterval(async () => {
|
|
console.log(`⏱️ [WS RETRY LOOP] tick at=${new Date().toISOString()}`);
|
|
|
|
const pending = await dbWs.getPendingEventsOlderThan(RETRY_INTERVAL_MS, MAX_RETRY_BATCH);
|
|
|
|
for (const ev of pending) {
|
|
const ws = wsBySession.get(ev.session_id);
|
|
|
|
if (ev.retries >= MAX_RETRIES) {
|
|
console.warn(`⏱️ [WS TIMEOUT] event_id=${ev.event_id} session=${ev.session_id} → recovery`);
|
|
await dbWs.setSessionNeedFullSync(ev.session_id, true);
|
|
await dbWs.deletePendingEvent(ev.event_id);
|
|
continue;
|
|
}
|
|
|
|
if (ws && ws.readyState === WebSocket.OPEN && ws.authenticated) {
|
|
console.log(`🔁 [WS RETRY] event_id=${ev.event_id} retries=${ev.retries} session=${ev.session_id}`);
|
|
const payload = typeof ev.payload === "string" ? ev.payload : JSON.stringify(ev.payload);
|
|
ws.send(payload);
|
|
await dbWs.updatePendingEventRetry(ev.event_id);
|
|
continue;
|
|
}
|
|
|
|
const sessionRow = await dbWs.getSession(ev.session_id);
|
|
const lastAck = sessionRow?.last_ack || 0;
|
|
const dormant = !lastAck || (Date.now() - lastAck) > WS_DORMANT_MS;
|
|
|
|
const dormancy = await getSessionDormancyMs(ev.session_id);
|
|
|
|
if (ws && ws.readyState === WebSocket.OPEN && ws.authenticated) {
|
|
console.log(`🔁 [WS RETRY] event_id=${ev.event_id} retries=${ev.retries} session=${ev.session_id}`);
|
|
const payload = typeof ev.payload === "string" ? ev.payload : JSON.stringify(ev.payload);
|
|
ws.send(payload);
|
|
await dbWs.updatePendingEventRetry(ev.event_id);
|
|
continue;
|
|
}
|
|
|
|
if (dormancy < WS_DORMANT_MS) {
|
|
console.log(`🟦 [WS WAIT] session=${ev.session_id} offline <2min → pending conservato`);
|
|
continue;
|
|
}
|
|
|
|
if (dormancy < WS_RETENTION_MS) {
|
|
console.log(`🟨 [WS DORMANT] session=${ev.session_id} offline >2min → need_full_sync`);
|
|
await dbWs.setSessionNeedFullSync(ev.session_id, true);
|
|
await dbWs.deletePendingEvent(ev.event_id);
|
|
continue;
|
|
}
|
|
|
|
console.log(`🟥 [WS EXPIRED] session=${ev.session_id} offline >30gg → need_full_sync`);
|
|
await dbWs.setSessionNeedFullSync(ev.session_id, true);
|
|
await dbWs.deletePendingEvent(ev.event_id);
|
|
|
|
|
|
|
|
|
|
}
|
|
}, RETRY_INTERVAL_MS);
|
|
|
|
module.exports = wss;
|