136 lines
3.6 KiB
Text
136 lines
3.6 KiB
Text
// db/dbWs.js
|
|
console.log(">>> dbWs.js CARICATO:", __filename);
|
|
const knex = require("./knex");
|
|
|
|
// ===============================
|
|
// INIT TABELLE WS
|
|
// ===============================
|
|
async function initWsTables() {
|
|
// ---- ws_sessions ----
|
|
const hasSessions = await knex.schema.hasTable("ws_sessions");
|
|
if (!hasSessions) {
|
|
await knex.schema.createTable("ws_sessions", (t) => {
|
|
t.string("session_id").primary();
|
|
t.string("user").notNullable();
|
|
t.integer("connected_at").notNullable();
|
|
t.integer("last_ack").notNullable();
|
|
t.boolean("need_full_sync").notNullable().defaultTo(false);
|
|
});
|
|
|
|
console.log("✔ Tabella 'ws_sessions' creata");
|
|
} else {
|
|
console.log("✔ Tabella 'ws_sessions' già esistente");
|
|
}
|
|
|
|
// ---- ws_pending_events ----
|
|
const hasPending = await knex.schema.hasTable("ws_pending_events");
|
|
if (!hasPending) {
|
|
await knex.schema.createTable("ws_pending_events", (t) => {
|
|
t.string("event_id").primary();
|
|
t.string("session_id").notNullable();
|
|
t.text("payload").notNullable();
|
|
t.integer("sent_at").notNullable();
|
|
t.integer("retries").notNullable().defaultTo(0);
|
|
});
|
|
|
|
console.log("✔ Tabella 'ws_pending_events' creata");
|
|
} else {
|
|
console.log("✔ Tabella 'ws_pending_events' già esistente");
|
|
}
|
|
}
|
|
|
|
// ===============================
|
|
// SESSIONI
|
|
// ===============================
|
|
async function createSession(session_id, user) {
|
|
const now = Date.now();
|
|
await knex("ws_sessions").insert({
|
|
session_id,
|
|
user,
|
|
connected_at: now,
|
|
last_ack: now,
|
|
need_full_sync: false
|
|
});
|
|
}
|
|
|
|
async function getSession(session_id) {
|
|
return knex("ws_sessions").where({ session_id }).first();
|
|
}
|
|
|
|
async function updateSessionAck(session_id) {
|
|
await knex("ws_sessions")
|
|
.where({ session_id })
|
|
.update({ last_ack: Date.now() });
|
|
}
|
|
|
|
async function setSessionNeedFullSync(session_id, value) {
|
|
await knex("ws_sessions")
|
|
.where({ session_id })
|
|
.update({ need_full_sync: value });
|
|
}
|
|
|
|
// 🔥 FUNZIONE MANCANTE — AGGIUNTA ORA
|
|
async function clearNeedFullSync(session_id) {
|
|
await knex("ws_sessions")
|
|
.where({ session_id })
|
|
.update({ need_full_sync: false });
|
|
console.log(">>> clearNeedFullSync() ESEGUITA per", session_id);
|
|
}
|
|
|
|
async function deleteSession(session_id) {
|
|
await knex("ws_sessions").where({ session_id }).del();
|
|
await knex("ws_pending_events").where({ session_id }).del();
|
|
}
|
|
|
|
async function getSessionsByUser(user) {
|
|
return knex("ws_sessions").where({ user });
|
|
}
|
|
|
|
// ===============================
|
|
// EVENTI PENDENTI
|
|
// ===============================
|
|
async function addPendingEvent(event_id, session_id, payload) {
|
|
await knex("ws_pending_events").insert({
|
|
event_id,
|
|
session_id,
|
|
payload: JSON.stringify(payload),
|
|
sent_at: Date.now(),
|
|
retries: 0
|
|
});
|
|
}
|
|
|
|
async function deletePendingEvent(event_id) {
|
|
await knex("ws_pending_events").where({ event_id }).del();
|
|
}
|
|
|
|
|
|
async function getPendingEventsOlderThan(msAgo, limit = 500) {
|
|
const threshold = Date.now() - msAgo;
|
|
return knex("ws_pending_events")
|
|
.where("sent_at", "<", threshold)
|
|
.limit(limit);
|
|
}
|
|
|
|
async function updatePendingEventRetry(event_id) {
|
|
await knex("ws_pending_events")
|
|
.where({ event_id })
|
|
.update({
|
|
sent_at: Date.now(),
|
|
retries: knex.raw("retries + 1")
|
|
});
|
|
}
|
|
|
|
module.exports = {
|
|
initWsTables,
|
|
createSession,
|
|
getSession,
|
|
updateSessionAck,
|
|
setSessionNeedFullSync,
|
|
clearNeedFullSync, // <--- AGGIUNTO QUI
|
|
deleteSession,
|
|
getSessionsByUser,
|
|
addPendingEvent,
|
|
deletePendingEvent,
|
|
getPendingEventsOlderThan,
|
|
updatePendingEventRetry
|
|
};
|