"use strict"; let websocket = require("websocket"); let {http} = require("./HTTPServer"); let {randomUUID} = require("crypto"); const { Client } = require("./Client.js"); const { termoutput } = require("./config"); const { CLIENT_CREATED, CLIENT_DESTROY } = require("./IPC"); const stats = require("./stats"); termoutput && console.log("Web Socket Protocol is ready"); http.addListener("upgrade",() => { termoutput && console.log("HTTP Upgrading to WebSocket"); }) let wsServer = new websocket.server({ httpServer: http, autoAcceptConnections: true }); /* process.send({ core: "writestat", writeBytes:0, readBytes:0, totalBytes:0, recaivedPacket:0, sendedPacket:0, totalPacket:0 })*/ let global = new Map(); let clients = new Map(); wsServer.addListener("connect",(socket) => { let xClient = new Client(); let id = randomUUID(); socket.id = id; xClient.id = id; xClient.socket = socket; xClient.created_at = new Date(); Client.clients.set(id, xClient); clients.set(id, xClient); stats.mwse_clients++; CLIENT_CREATED(id); emit("connect", global, xClient); let oldw = 0, oldr = 0; let timer = setInterval(()=>{ let writed = socket.socket.bytesRead - oldr; let readed = socket.socket.bytesWritten - oldw; stats.ws_total_bytes += (writed + readed); stats.ws_readed_bytes += readed; stats.ws_writed_bytes += writed; oldr = socket.socket.bytesRead; oldw = socket.socket.bytesWritten; }, 1000) socket.addListener("close",()=>{ stats.mwse_clients--; emit("disconnect", global, xClient); CLIENT_DESTROY(id); Client.clients.delete(id); clearInterval(timer); clearInterval(pingTimer); }); let pingTimer = setInterval(()=> socket.ping('saQut') , 10_000); socket.addListener("pong",validationText => { if(validationText.toString('utf8') != "saQut"){ socket.close(); } }) socket.addListener("message",({type,utf8Data}) => { stats.ws_recaived_packs++; stats.ws_total_packs++; if(type == "utf8") { let json; try{ json = JSON.parse(utf8Data); emit('services', global, xClient, json); let [payload, id, action] = json; if(typeof id === "string") { action = id; id = void 0; }; emitService(global, xClient, id, payload, action); }catch{ emit("messageError", global, xClient, utf8Data); } } }); }); /** * @type {Map} */ let events = new Map(); /** * @type {Map} */ let services = [] /** * * @param {string} event * @param {(global:Map, client:Client, data:any) => any} func */ exports.addListener = (event, func) => { if(!events.has(event)) { events.set(event,[]); }; events.get(event).push(func); }; /** * * @param {string} event * @param {(data:{global:Map, client:Client, message:any,response:Function,end:Function,next:Function}) => any} func */ exports.addService = (func) => { services.push(func); }; function emit(event,...args) { if(events.has(event)) { for (const callback of events.get(event)) { callback(...args); } }; }; /** * * @param {Map} global * @param {Client} local * @param {number} id * @param {{[key:string]:any}} payload * @param {"R"|"S"} action [R]EQUEST flag or [S]TREAM flag */ async function emitService(global, client, id, payload, action) { let willContinue = false; for (const callback of services) { await callback({ message: payload, action, client, global, messageId: id, response:(obj)=>{ id != undefined && client.send([obj, id, 'C']) // continue ([C]ONTINUE flag) }, end:(obj)=>{ id != undefined && client.send([obj, id, 'E']) // stopped data stream (this channel) ([E]ND flag) }, next:function(){ willContinue = true; } }); if(willContinue === false) break; } }; exports.websocket = wsServer;