photo_server_json_con_aves22/ws-server.js.ok
2026-04-18 20:14:42 +02:00

275 lines
No EOL
8.6 KiB
Text
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// ws-server.js
require("dotenv").config();
const WebSocket = require("ws");
const jwt = require("jsonwebtoken");
const { randomUUID } = require("crypto");
const dbWs = require("./db/dbWs");
const PORT = process.env.WS_PORT || 4002;
const HOST = process.env.WS_HOST || "0.0.0.0";
const JWT_SECRET = process.env.JWT_SECRET;
if (!JWT_SECRET) {
console.error("❌ ERRORE: JWT_SECRET non definito nel .env");
process.exit(1);
}
const wss = new WebSocket.Server({ port: PORT, host: HOST });
console.log(`🚀 WebSocket server attivo su ws://${HOST}:${PORT}`);
// Mappa session_id → ws (in memoria)
const wsBySession = new Map();
// =====================================================
// COSTANTI (coerenti con sync.js)
// =====================================================
// finestra WS “dormant”: 2 minuti
const WS_DORMANT_MS = 120000;
// retry tick
const RETRY_INTERVAL_MS = 3000;
// massimo retry: 40 * 3s = 120s = 2 minuti
const MAX_RETRIES = 40;
// limita lavoro per tick
const MAX_RETRY_BATCH = 500;
// =====================================================
// FUNZIONE DI INVIO CON LOG
// =====================================================
function sendToClient(ws, msg) {
const json = JSON.stringify(msg);
console.log(`📤 [WS SEND] → a ${ws.user || "??"}: ${json}`);
ws.send(json);
}
// =====================================================
// CONNESSIONE CLIENT
// =====================================================
wss.on("connection", (ws) => {
console.log("🔌 Nuovo client connesso");
ws.authenticated = false;
ws.user = null;
ws.session_id = null;
ws.on("message", async (msg) => {
console.log("📩 [WS RAW MESSAGE]:", msg.toString());
let data;
try {
data = JSON.parse(msg.toString());
} catch (err) {
console.error("❌ Errore parsing JSON:", err);
return;
}
// -----------------------------
// AUTENTICAZIONE JWT
// -----------------------------
if (data.type === "auth") {
try {
const payload = jwt.verify(data.token, JWT_SECRET);
const user = payload.name;
const session_id = data.session_id || randomUUID();
ws.user = user;
ws.session_id = session_id;
ws.authenticated = true;
wsBySession.set(session_id, ws);
// Recupera o crea sessione
const existing = await dbWs.getSession(session_id);
if (!existing) {
await dbWs.createSession(session_id, user);
} else {
// “touch” last_ack allauth (come segnale di presenza)
await dbWs.updateSessionAck(session_id);
}
const sessionRow = await dbWs.getSession(session_id);
// ✅ invio auth_ok con need_full_sync, ma NON lo azzero qui
sendToClient(ws, {
type: "auth_ok",
user,
session_id,
need_full_sync: !!sessionRow.need_full_sync,
});
console.log(`🔐 Client autenticato: ${user} (session=${session_id})`);
} catch (err) {
sendToClient(ws, {
type: "auth_error",
error: err.message,
});
ws.close();
}
return;
}
// -----------------------------
// MESSAGGI NON AUTORIZZATI
// -----------------------------
if (!ws.authenticated) {
sendToClient(ws, {
type: "error",
message: "Not authenticated",
});
return;
}
// -----------------------------
// ACK evento (reliable delivery)
// -----------------------------
if (data.type === "ack" && data.event_id) {
await dbWs.deletePendingEvent(data.event_id);
await dbWs.updateSessionAck(ws.session_id);
return;
}
// -----------------------------
// PONG (keepalive)
// -----------------------------
if (data.type === "pong") {
await dbWs.updateSessionAck(ws.session_id);
return;
}
// -----------------------------
// RECOVERY DONE (client)
// Il client invia questo messaggio DOPO aver completato progressive/full
// quando ha ricevuto need_full_sync=true.
// -----------------------------
if (data.type === "recovery_done") {
await dbWs.clearNeedFullSync(ws.session_id);
await dbWs.updateSessionAck(ws.session_id);
console.log(`✅ recovery_done ricevuto: session=${ws.session_id} user=${ws.user}`);
return;
}
// -----------------------------
// ALTRI MESSAGGI DAL CLIENT
// -----------------------------
console.log(`📨 Messaggio da ${ws.user}:`, data);
});
ws.on("close", () => {
console.log(`❌ Client disconnesso: ${ws.user || "sconosciuto"}`);
if (ws.session_id) {
wsBySession.delete(ws.session_id);
}
});
});
// =====================================================
// BROADCAST AFFIDABILE PER UTENTE (event_id + pending)
// =====================================================
wss.broadcastToUserReliable = async function (user, payload) {
const sessions = await dbWs.getSessionsByUser(user);
if (!sessions || sessions.length === 0) return;
console.log(`📣 [WS BROADCAST USER=${user}] → payload=${JSON.stringify(payload)}`);
for (const s of sessions) {
const ws = wsBySession.get(s.session_id);
// event_id diverso per ogni sessione
const event_id = randomUUID();
const msg = { ...payload, event_id };
const json = JSON.stringify(msg);
if (ws && ws.readyState === WebSocket.OPEN && ws.authenticated) {
console.log(` ↳ inviato a session=${s.session_id}`);
ws.send(json);
// salva pending event per retry/ack
await dbWs.addPendingEvent(event_id, s.session_id, msg);
} else {
// sessione non attiva: NON perdiamo levento subito,
// ma qui segnaliamo che al prossimo reconnect potrebbe servire recovery
await dbWs.setSessionNeedFullSync(s.session_id, true);
// NB: levento verrà gestito dal retry loop se era già pending,
// oppure verrà “coperto” dalla progressive/full successiva.
}
}
};
// Mantieni compatibilità
wss.broadcastToUser = wss.broadcastToUserReliable;
// =====================================================
// BROADCAST PER ADMIN
// =====================================================
wss.broadcastToAdmins = function (msg) {
const json = JSON.stringify(msg);
console.log(`📣 [WS BROADCAST ADMINS] → ${json}`);
for (const client of wss.clients) {
if (
client.readyState === WebSocket.OPEN &&
client.authenticated &&
client.user === "Admin"
) {
console.log(` ↳ inviato a Admin`);
client.send(json);
}
}
};
// =====================================================
// RETRY + TIMEOUT (2 minuti)
// =====================================================
setInterval(async () => {
const pending = await dbWs.getPendingEventsOlderThan(RETRY_INTERVAL_MS, MAX_RETRY_BATCH);
for (const ev of pending) {
const ws = wsBySession.get(ev.session_id);
// se troppe retry -> consideriamo sessione “dormiente” e passiamo a recovery
if (ev.retries >= MAX_RETRIES) {
console.warn(`⏱️ Timeout evento: ${ev.event_id} session=${ev.session_id}`);
// marca recovery
await dbWs.setSessionNeedFullSync(ev.session_id, true);
// elimina questo pending (evita accumulo infinito)
await dbWs.deletePendingEvent(ev.event_id);
// non cancelliamo la sessione: la recovery avverrà al reconnect
continue;
}
// retry se WS attivo
if (ws && ws.readyState === WebSocket.OPEN && ws.authenticated) {
console.log(`🔁 Retry event_id=${ev.event_id} session=${ev.session_id}`);
// payload in DB è TEXT JSON (stringa)
const payload = typeof ev.payload === "string" ? ev.payload : JSON.stringify(ev.payload);
ws.send(payload);
await dbWs.updatePendingEventRetry(ev.event_id);
continue;
}
// WS non attivo: applichiamo la logica “entro 2 minuti via WS”
const sessionRow = await dbWs.getSession(ev.session_id);
const lastAck = sessionRow?.last_ack || 0;
const dormant = !lastAck || (Date.now() - lastAck) > WS_DORMANT_MS;
if (dormant) {
// oltre finestra: passa a recovery
await dbWs.setSessionNeedFullSync(ev.session_id, true);
await dbWs.deletePendingEvent(ev.event_id);
} else {
// entro finestra: NON cancellare pending,
// lo consegneremo quando il client si riconnette (retry loop continuerà a tentare)
// opzionale: nessuna azione
}
}
}, RETRY_INTERVAL_MS);
module.exports = wss;