// db/dbWs.js console.log(">>> dbWs.js CARICATO:", __filename); const knex = require("./knex"); // =============================== // INIT TABELLE WS // =============================== async function initWsTables() { // ---- ws_sessions ---- const hasSessions = await knex.schema.hasTable("ws_sessions"); if (!hasSessions) { await knex.schema.createTable("ws_sessions", (t) => { t.string("session_id").primary(); t.string("user").notNullable(); t.integer("connected_at").notNullable(); t.integer("last_ack").notNullable(); t.boolean("need_full_sync").notNullable().defaultTo(false); }); console.log("✔ Tabella 'ws_sessions' creata"); } else { console.log("✔ Tabella 'ws_sessions' già esistente"); } // ---- ws_pending_events ---- const hasPending = await knex.schema.hasTable("ws_pending_events"); if (!hasPending) { await knex.schema.createTable("ws_pending_events", (t) => { t.string("event_id").primary(); t.string("session_id").notNullable(); t.text("payload").notNullable(); t.integer("sent_at").notNullable(); t.integer("retries").notNullable().defaultTo(0); }); console.log("✔ Tabella 'ws_pending_events' creata"); } else { console.log("✔ Tabella 'ws_pending_events' già esistente"); } } // =============================== // SESSIONI // =============================== async function createSession(session_id, user) { const now = Date.now(); await knex("ws_sessions").insert({ session_id, user, connected_at: now, last_ack: now, need_full_sync: false }); } async function getSession(session_id) { return knex("ws_sessions").where({ session_id }).first(); } async function updateSessionAck(session_id) { await knex("ws_sessions") .where({ session_id }) .update({ last_ack: Date.now() }); } async function setSessionNeedFullSync(session_id, value) { await knex("ws_sessions") .where({ session_id }) .update({ need_full_sync: value }); } async function clearNeedFullSync(session_id) { await knex("ws_sessions") .where({ session_id }) .update({ need_full_sync: false }); console.log(">>> clearNeedFullSync() ESEGUITA per", session_id); } async function deleteSession(session_id) { await knex("ws_sessions").where({ session_id }).del(); await knex("ws_pending_events").where({ session_id }).del(); } async function getSessionsByUser(user) { return knex("ws_sessions").where({ user }); } // =============================== // PULIZIA COMPLETA (server restart) // =============================== async function deleteAllSessions() { console.log(">>> deleteAllSessions() – elimino tutte le sessioni WS"); return knex("ws_sessions").del(); } async function deleteAllPendingEvents() { console.log(">>> deleteAllPendingEvents() – elimino tutti i pending events"); return knex("ws_pending_events").del(); } // =============================== // EVENTI PENDENTI // =============================== async function addPendingEvent(event_id, session_id, payload) { await knex("ws_pending_events").insert({ event_id, session_id, payload: JSON.stringify(payload), sent_at: Date.now(), retries: 0 }); } async function deletePendingEvent(event_id) { await knex("ws_pending_events").where({ event_id }).del(); } async function getPendingEventsOlderThan(msAgo, limit = 500) { const threshold = Date.now() - msAgo; return knex("ws_pending_events") .where("sent_at", "<", threshold) .limit(limit); } async function updatePendingEventRetry(event_id) { await knex("ws_pending_events") .where({ event_id }) .update({ sent_at: Date.now(), retries: knex.raw("retries + 1") }); } module.exports = { initWsTables, createSession, getSession, updateSessionAck, setSessionNeedFullSync, clearNeedFullSync, deleteSession, getSessionsByUser, // ⭐ aggiunte per pulizia server deleteAllSessions, deleteAllPendingEvents, addPendingEvent, deletePendingEvent, getPendingEventsOlderThan, updatePendingEventRetry };