photo_server_json_con_aves22/ws-server.js
2026-04-18 20:14:42 +02:00

317 lines
10 KiB
JavaScript

// ws-server.js
require("dotenv").config();
const WebSocket = require("ws");
const jwt = require("jsonwebtoken");
const { randomUUID } = require("crypto");
const dbWs = require("./db/dbWs");
const PORT = process.env.WS_PORT || 4002;
const HOST = process.env.WS_HOST || "0.0.0.0";
const JWT_SECRET = process.env.JWT_SECRET;
if (!JWT_SECRET) {
console.error("❌ ERRORE: JWT_SECRET non definito nel .env");
process.exit(1);
}
const wss = new WebSocket.Server({ port: PORT, host: HOST });
console.log(`🚀 WebSocket server attivo su ws://${HOST}:${PORT}`);
// =====================================================
// PULIZIA SESSIONI E PENDING AL RIAVVIO SERVER
// =====================================================
(async () => {
try {
console.log("🧹 [WS CLEANUP] Pulizia sessioni e pending al riavvio server...");
await dbWs.deleteAllSessions();
await dbWs.deleteAllPendingEvents();
console.log("🧹 [WS CLEANUP] Completata: tutte le sessioni e pending rimossi");
} catch (err) {
console.error("❌ [WS CLEANUP ERROR]:", err);
}
})();
// Mappa session_id → ws
const wsBySession = new Map();
// =====================================================
// COSTANTI
// =====================================================
const WS_DORMANT_MS = 2 * 60 * 1000; // 2 minuti
const WS_RETENTION_MS = 30 * 24 * 60 * 60 * 1000; // 30 giorni
const RETRY_INTERVAL_MS = 3000; // OK così
const MAX_RETRIES = 40; // OK così
const MAX_RETRY_BATCH = 500; // OK così
// =====================================================
// HELPER: calcolo dormancy sessione
// =====================================================
async function getSessionDormancyMs(session_id) {
const s = await dbWs.getSession(session_id);
if (!s || !s.last_ack) return Infinity;
return Date.now() - s.last_ack;
}
// =====================================================
// FUNZIONE DI INVIO CON LOG
// =====================================================
function sendToClient(ws, msg) {
const json = JSON.stringify(msg);
console.log(`📤 [WS SEND] at=${new Date().toISOString()} → user=${ws.user || "??"}: ${json}`);
ws.send(json);
}
// =====================================================
// CONNESSIONE CLIENT
// =====================================================
wss.on("connection", (ws) => {
console.log(`🔌 [WS CONNECT] Nuovo client at=${new Date().toISOString()}`);
ws.authenticated = false;
ws.user = null;
ws.session_id = null;
ws.on("message", async (msg) => {
console.log(`📩 [WS RAW] at=${new Date().toISOString()} msg=${msg.toString()}`);
let data;
try {
data = JSON.parse(msg.toString());
} catch (err) {
console.error("❌ Errore parsing JSON:", err);
return;
}
// -----------------------------
// AUTENTICAZIONE JWT
// -----------------------------
if (data.type === "auth") {
console.log(`🔐 [WS AUTH] Richiesta auth at=${new Date().toISOString()}`);
try {
const payload = jwt.verify(data.token, JWT_SECRET);
const user = payload.name;
const session_id = data.session_id || randomUUID();
ws.user = user;
ws.session_id = session_id;
ws.authenticated = true;
wsBySession.set(session_id, ws);
console.log(`🔐 [WS AUTH OK] user=${user} session=${session_id}`);
const existing = await dbWs.getSession(session_id);
if (!existing) {
console.log(`🆕 [WS SESSION] Creo nuova sessione=${session_id}`);
await dbWs.createSession(session_id, user);
} else {
console.log(`♻️ [WS SESSION] Sessione esistente, aggiorno last_ack`);
await dbWs.updateSessionAck(session_id);
}
const sessionRow = await dbWs.getSession(session_id);
sendToClient(ws, {
type: "auth_ok",
user,
session_id,
need_full_sync: !!sessionRow.need_full_sync,
});
} catch (err) {
console.error("❌ [WS AUTH ERROR]:", err);
sendToClient(ws, { type: "auth_error", error: err.message });
ws.close();
}
return;
}
// -----------------------------
// MESSAGGI NON AUTORIZZATI
// -----------------------------
if (!ws.authenticated) {
sendToClient(ws, {
type: "error",
message: "Not authenticated",
});
return;
}
// -----------------------------
// ACK evento
// -----------------------------
if (data.type === "ack" && data.event_id) {
console.log(`🟢 [WS ACK] user=${ws.user} session=${ws.session_id} event_id=${data.event_id}`);
await dbWs.deletePendingEvent(data.event_id);
await dbWs.updateSessionAck(ws.session_id);
return;
}
// -----------------------------
// PONG (keepalive)
// -----------------------------
if (data.type === "pong") {
console.log(`🏓 [WS PONG] user=${ws.user} session=${ws.session_id}`);
await dbWs.updateSessionAck(ws.session_id);
return;
}
// -----------------------------
// RECOVERY DONE
// -----------------------------
if (data.type === "recovery_done") {
console.log(`🔄 [WS RECOVERY DONE] user=${ws.user} session=${ws.session_id}`);
await dbWs.clearNeedFullSync(ws.session_id);
await dbWs.updateSessionAck(ws.session_id);
return;
}
// -----------------------------
// ALTRI MESSAGGI
// -----------------------------
console.log(`📨 [WS MESSAGE] user=${ws.user} data=${JSON.stringify(data)}`);
});
ws.on("close", () => {
console.log(`❌ [WS DISCONNECT] user=${ws.user} session=${ws.session_id} at=${new Date().toISOString()}`);
if (ws.session_id) wsBySession.delete(ws.session_id);
});
});
// =====================================================
// BROADCAST AFFIDABILE PER UTENTE
// =====================================================
wss.broadcastToUserReliable = async function (user, payload) {
console.log(`📣 [WS BROADCAST] user=${user} payload=${JSON.stringify(payload)} at=${new Date().toISOString()}`);
const sessions = await dbWs.getSessionsByUser(user);
if (!sessions || sessions.length === 0) {
console.log(`⚠️ [WS BROADCAST] Nessuna sessione attiva per user=${user}`);
return;
}
for (const s of sessions) {
const ws = wsBySession.get(s.session_id);
const event_id = randomUUID();
const msg = { ...payload, event_id };
const json = JSON.stringify(msg);
const dormancy = await getSessionDormancyMs(s.session_id);
if (ws && ws.readyState === WebSocket.OPEN && ws.authenticated) {
console.log(` 📤 consegnato a session=${s.session_id}`);
ws.send(json);
await dbWs.addPendingEvent(event_id, s.session_id, msg);
continue;
}
if (dormancy < WS_DORMANT_MS) {
console.log(` 🟦 session=${s.session_id} offline <2min → salvo in pending`);
await dbWs.addPendingEvent(event_id, s.session_id, msg);
continue;
}
if (dormancy < WS_RETENTION_MS) {
console.log(` 🟨 session=${s.session_id} offline >2min → need_full_sync`);
await dbWs.setSessionNeedFullSync(s.session_id, true);
continue;
}
console.log(` 🟥 session=${s.session_id} offline >30gg → need_full_sync (full)`);
await dbWs.setSessionNeedFullSync(s.session_id, true);
}
};
wss.broadcastToUser = wss.broadcastToUserReliable;
// =====================================================
// BROADCAST PER ADMIN
// =====================================================
wss.broadcastToAdmins = function (msg) {
const json = JSON.stringify(msg);
console.log(`📣 [WS BROADCAST ADMINS] → ${json}`);
for (const client of wss.clients) {
if (
client.readyState === WebSocket.OPEN &&
client.authenticated &&
client.user === "Admin"
) {
console.log(` ↳ inviato a Admin`);
client.send(json);
}
}
};
// =====================================================
// RETRY LOOP
// =====================================================
setInterval(async () => {
console.log(`⏱️ [WS RETRY LOOP] tick at=${new Date().toISOString()}`);
const pending = await dbWs.getPendingEventsOlderThan(RETRY_INTERVAL_MS, MAX_RETRY_BATCH);
for (const ev of pending) {
const ws = wsBySession.get(ev.session_id);
if (ev.retries >= MAX_RETRIES) {
console.warn(`⏱️ [WS TIMEOUT] event_id=${ev.event_id} session=${ev.session_id} → recovery`);
await dbWs.setSessionNeedFullSync(ev.session_id, true);
await dbWs.deletePendingEvent(ev.event_id);
continue;
}
if (ws && ws.readyState === WebSocket.OPEN && ws.authenticated) {
console.log(`🔁 [WS RETRY] event_id=${ev.event_id} retries=${ev.retries} session=${ev.session_id}`);
const payload = typeof ev.payload === "string" ? ev.payload : JSON.stringify(ev.payload);
ws.send(payload);
await dbWs.updatePendingEventRetry(ev.event_id);
continue;
}
const sessionRow = await dbWs.getSession(ev.session_id);
const lastAck = sessionRow?.last_ack || 0;
const dormant = !lastAck || (Date.now() - lastAck) > WS_DORMANT_MS;
const dormancy = await getSessionDormancyMs(ev.session_id);
if (ws && ws.readyState === WebSocket.OPEN && ws.authenticated) {
console.log(`🔁 [WS RETRY] event_id=${ev.event_id} retries=${ev.retries} session=${ev.session_id}`);
const payload = typeof ev.payload === "string" ? ev.payload : JSON.stringify(ev.payload);
ws.send(payload);
await dbWs.updatePendingEventRetry(ev.event_id);
continue;
}
if (dormancy < WS_DORMANT_MS) {
console.log(`🟦 [WS WAIT] session=${ev.session_id} offline <2min → pending conservato`);
continue;
}
if (dormancy < WS_RETENTION_MS) {
console.log(`🟨 [WS DORMANT] session=${ev.session_id} offline >2min → need_full_sync`);
await dbWs.setSessionNeedFullSync(ev.session_id, true);
await dbWs.deletePendingEvent(ev.event_id);
continue;
}
console.log(`🟥 [WS EXPIRED] session=${ev.session_id} offline >30gg → need_full_sync`);
await dbWs.setSessionNeedFullSync(ev.session_id, true);
await dbWs.deletePendingEvent(ev.event_id);
}
}, RETRY_INTERVAL_MS);
module.exports = wss;