275 lines
No EOL
8.6 KiB
Text
275 lines
No EOL
8.6 KiB
Text
// ws-server.js
|
||
require("dotenv").config();
|
||
const WebSocket = require("ws");
|
||
const jwt = require("jsonwebtoken");
|
||
const { randomUUID } = require("crypto");
|
||
const dbWs = require("./db/dbWs");
|
||
|
||
const PORT = process.env.WS_PORT || 4002;
|
||
const HOST = process.env.WS_HOST || "0.0.0.0";
|
||
const JWT_SECRET = process.env.JWT_SECRET;
|
||
|
||
if (!JWT_SECRET) {
|
||
console.error("❌ ERRORE: JWT_SECRET non definito nel .env");
|
||
process.exit(1);
|
||
}
|
||
|
||
const wss = new WebSocket.Server({ port: PORT, host: HOST });
|
||
console.log(`🚀 WebSocket server attivo su ws://${HOST}:${PORT}`);
|
||
|
||
// Mappa session_id → ws (in memoria)
|
||
const wsBySession = new Map();
|
||
|
||
// =====================================================
|
||
// COSTANTI (coerenti con sync.js)
|
||
// =====================================================
|
||
|
||
// finestra WS “dormant”: 2 minuti
|
||
const WS_DORMANT_MS = 120000;
|
||
|
||
// retry tick
|
||
const RETRY_INTERVAL_MS = 3000;
|
||
|
||
// massimo retry: 40 * 3s = 120s = 2 minuti
|
||
const MAX_RETRIES = 40;
|
||
|
||
// limita lavoro per tick
|
||
const MAX_RETRY_BATCH = 500;
|
||
|
||
// =====================================================
|
||
// FUNZIONE DI INVIO CON LOG
|
||
// =====================================================
|
||
function sendToClient(ws, msg) {
|
||
const json = JSON.stringify(msg);
|
||
console.log(`📤 [WS SEND] → a ${ws.user || "??"}: ${json}`);
|
||
ws.send(json);
|
||
}
|
||
|
||
// =====================================================
|
||
// CONNESSIONE CLIENT
|
||
// =====================================================
|
||
wss.on("connection", (ws) => {
|
||
console.log("🔌 Nuovo client connesso");
|
||
|
||
ws.authenticated = false;
|
||
ws.user = null;
|
||
ws.session_id = null;
|
||
|
||
ws.on("message", async (msg) => {
|
||
console.log("📩 [WS RAW MESSAGE]:", msg.toString());
|
||
|
||
let data;
|
||
try {
|
||
data = JSON.parse(msg.toString());
|
||
} catch (err) {
|
||
console.error("❌ Errore parsing JSON:", err);
|
||
return;
|
||
}
|
||
|
||
// -----------------------------
|
||
// AUTENTICAZIONE JWT
|
||
// -----------------------------
|
||
if (data.type === "auth") {
|
||
try {
|
||
const payload = jwt.verify(data.token, JWT_SECRET);
|
||
const user = payload.name;
|
||
const session_id = data.session_id || randomUUID();
|
||
|
||
ws.user = user;
|
||
ws.session_id = session_id;
|
||
ws.authenticated = true;
|
||
|
||
wsBySession.set(session_id, ws);
|
||
|
||
// Recupera o crea sessione
|
||
const existing = await dbWs.getSession(session_id);
|
||
if (!existing) {
|
||
await dbWs.createSession(session_id, user);
|
||
} else {
|
||
// “touch” last_ack all’auth (come segnale di presenza)
|
||
await dbWs.updateSessionAck(session_id);
|
||
}
|
||
|
||
const sessionRow = await dbWs.getSession(session_id);
|
||
|
||
// ✅ invio auth_ok con need_full_sync, ma NON lo azzero qui
|
||
sendToClient(ws, {
|
||
type: "auth_ok",
|
||
user,
|
||
session_id,
|
||
need_full_sync: !!sessionRow.need_full_sync,
|
||
});
|
||
|
||
console.log(`🔐 Client autenticato: ${user} (session=${session_id})`);
|
||
} catch (err) {
|
||
sendToClient(ws, {
|
||
type: "auth_error",
|
||
error: err.message,
|
||
});
|
||
ws.close();
|
||
}
|
||
return;
|
||
}
|
||
|
||
// -----------------------------
|
||
// MESSAGGI NON AUTORIZZATI
|
||
// -----------------------------
|
||
if (!ws.authenticated) {
|
||
sendToClient(ws, {
|
||
type: "error",
|
||
message: "Not authenticated",
|
||
});
|
||
return;
|
||
}
|
||
|
||
// -----------------------------
|
||
// ACK evento (reliable delivery)
|
||
// -----------------------------
|
||
if (data.type === "ack" && data.event_id) {
|
||
await dbWs.deletePendingEvent(data.event_id);
|
||
await dbWs.updateSessionAck(ws.session_id);
|
||
return;
|
||
}
|
||
|
||
// -----------------------------
|
||
// PONG (keepalive)
|
||
// -----------------------------
|
||
if (data.type === "pong") {
|
||
await dbWs.updateSessionAck(ws.session_id);
|
||
return;
|
||
}
|
||
|
||
// -----------------------------
|
||
// RECOVERY DONE (client)
|
||
// Il client invia questo messaggio DOPO aver completato progressive/full
|
||
// quando ha ricevuto need_full_sync=true.
|
||
// -----------------------------
|
||
if (data.type === "recovery_done") {
|
||
await dbWs.clearNeedFullSync(ws.session_id);
|
||
await dbWs.updateSessionAck(ws.session_id);
|
||
console.log(`✅ recovery_done ricevuto: session=${ws.session_id} user=${ws.user}`);
|
||
return;
|
||
}
|
||
|
||
// -----------------------------
|
||
// ALTRI MESSAGGI DAL CLIENT
|
||
// -----------------------------
|
||
console.log(`📨 Messaggio da ${ws.user}:`, data);
|
||
});
|
||
|
||
ws.on("close", () => {
|
||
console.log(`❌ Client disconnesso: ${ws.user || "sconosciuto"}`);
|
||
if (ws.session_id) {
|
||
wsBySession.delete(ws.session_id);
|
||
}
|
||
});
|
||
});
|
||
|
||
// =====================================================
|
||
// BROADCAST AFFIDABILE PER UTENTE (event_id + pending)
|
||
// =====================================================
|
||
wss.broadcastToUserReliable = async function (user, payload) {
|
||
const sessions = await dbWs.getSessionsByUser(user);
|
||
if (!sessions || sessions.length === 0) return;
|
||
|
||
console.log(`📣 [WS BROADCAST USER=${user}] → payload=${JSON.stringify(payload)}`);
|
||
|
||
for (const s of sessions) {
|
||
const ws = wsBySession.get(s.session_id);
|
||
|
||
// event_id diverso per ogni sessione
|
||
const event_id = randomUUID();
|
||
const msg = { ...payload, event_id };
|
||
const json = JSON.stringify(msg);
|
||
|
||
if (ws && ws.readyState === WebSocket.OPEN && ws.authenticated) {
|
||
console.log(` ↳ inviato a session=${s.session_id}`);
|
||
ws.send(json);
|
||
|
||
// salva pending event per retry/ack
|
||
await dbWs.addPendingEvent(event_id, s.session_id, msg);
|
||
} else {
|
||
// sessione non attiva: NON perdiamo l’evento subito,
|
||
// ma qui segnaliamo che al prossimo reconnect potrebbe servire recovery
|
||
await dbWs.setSessionNeedFullSync(s.session_id, true);
|
||
// NB: l’evento verrà gestito dal retry loop se era già pending,
|
||
// oppure verrà “coperto” dalla progressive/full successiva.
|
||
}
|
||
}
|
||
};
|
||
|
||
// Mantieni compatibilità
|
||
wss.broadcastToUser = wss.broadcastToUserReliable;
|
||
|
||
// =====================================================
|
||
// BROADCAST PER ADMIN
|
||
// =====================================================
|
||
wss.broadcastToAdmins = function (msg) {
|
||
const json = JSON.stringify(msg);
|
||
console.log(`📣 [WS BROADCAST ADMINS] → ${json}`);
|
||
|
||
for (const client of wss.clients) {
|
||
if (
|
||
client.readyState === WebSocket.OPEN &&
|
||
client.authenticated &&
|
||
client.user === "Admin"
|
||
) {
|
||
console.log(` ↳ inviato a Admin`);
|
||
client.send(json);
|
||
}
|
||
}
|
||
};
|
||
|
||
// =====================================================
|
||
// RETRY + TIMEOUT (2 minuti)
|
||
// =====================================================
|
||
setInterval(async () => {
|
||
const pending = await dbWs.getPendingEventsOlderThan(RETRY_INTERVAL_MS, MAX_RETRY_BATCH);
|
||
|
||
for (const ev of pending) {
|
||
const ws = wsBySession.get(ev.session_id);
|
||
|
||
// se troppe retry -> consideriamo sessione “dormiente” e passiamo a recovery
|
||
if (ev.retries >= MAX_RETRIES) {
|
||
console.warn(`⏱️ Timeout evento: ${ev.event_id} session=${ev.session_id}`);
|
||
|
||
// marca recovery
|
||
await dbWs.setSessionNeedFullSync(ev.session_id, true);
|
||
|
||
// elimina questo pending (evita accumulo infinito)
|
||
await dbWs.deletePendingEvent(ev.event_id);
|
||
|
||
// non cancelliamo la sessione: la recovery avverrà al reconnect
|
||
continue;
|
||
}
|
||
|
||
// retry se WS attivo
|
||
if (ws && ws.readyState === WebSocket.OPEN && ws.authenticated) {
|
||
console.log(`🔁 Retry event_id=${ev.event_id} session=${ev.session_id}`);
|
||
|
||
// payload in DB è TEXT JSON (stringa)
|
||
const payload = typeof ev.payload === "string" ? ev.payload : JSON.stringify(ev.payload);
|
||
ws.send(payload);
|
||
|
||
await dbWs.updatePendingEventRetry(ev.event_id);
|
||
continue;
|
||
}
|
||
|
||
// WS non attivo: applichiamo la logica “entro 2 minuti via WS”
|
||
const sessionRow = await dbWs.getSession(ev.session_id);
|
||
const lastAck = sessionRow?.last_ack || 0;
|
||
const dormant = !lastAck || (Date.now() - lastAck) > WS_DORMANT_MS;
|
||
|
||
if (dormant) {
|
||
// oltre finestra: passa a recovery
|
||
await dbWs.setSessionNeedFullSync(ev.session_id, true);
|
||
await dbWs.deletePendingEvent(ev.event_id);
|
||
} else {
|
||
// entro finestra: NON cancellare pending,
|
||
// lo consegneremo quando il client si riconnette (retry loop continuerà a tentare)
|
||
// opzionale: nessuna azione
|
||
}
|
||
}
|
||
}, RETRY_INTERVAL_MS);
|
||
|
||
module.exports = wss; |