MWSE/Source/WebSocket.js

170 lines
4.3 KiB
JavaScript

"use strict";
let websocket = require("websocket");
let {http} = require("./HTTPServer");
let {randomUUID} = require("crypto");
const { Client } = require("./Client.js");
const { termoutput } = require("./config");
const { CLIENT_CREATED, CLIENT_DESTROY } = require("./IPC");
const stats = require("./stats");
termoutput && console.log("Web Socket Protocol is ready");
http.addListener("upgrade",() => {
termoutput && console.log("HTTP Upgrading to WebSocket");
})
let wsServer = new websocket.server({
httpServer: http,
autoAcceptConnections: true
});
/*
process.send({
core: "writestat",
writeBytes:0,
readBytes:0,
totalBytes:0,
recaivedPacket:0,
sendedPacket:0,
totalPacket:0
})*/
let global = new Map();
let clients = new Map();
wsServer.addListener("connect",(socket) => {
let xClient = new Client();
let id = randomUUID();
socket.id = id;
xClient.id = id;
xClient.socket = socket;
xClient.created_at = new Date();
Client.clients.set(id, xClient);
clients.set(id, xClient);
stats.mwse_clients++;
CLIENT_CREATED(id);
emit("connect", global, xClient);
let oldw = 0, oldr = 0;
let timer = setInterval(()=>{
let writed = socket.socket.bytesRead - oldr;
let readed = socket.socket.bytesWritten - oldw;
stats.ws_total_bytes += (writed + readed);
stats.ws_readed_bytes += readed;
stats.ws_writed_bytes += writed;
oldr = socket.socket.bytesRead;
oldw = socket.socket.bytesWritten;
}, 1000)
socket.addListener("close",()=>{
stats.mwse_clients--;
emit("disconnect", global, xClient);
CLIENT_DESTROY(id);
Client.clients.delete(id);
clearInterval(timer);
clearInterval(pingTimer);
});
let pingTimer = setInterval(()=> socket.ping('saQut') , 10_000);
socket.addListener("pong",validationText => {
if(validationText.toString('utf8') != "saQut"){
socket.close();
}
})
socket.addListener("message",({type,utf8Data}) => {
stats.ws_recaived_packs++;
stats.ws_total_packs++;
if(type == "utf8")
{
let json;
try{
json = JSON.parse(utf8Data);
emit('services', global, xClient, json);
let [payload, id, action] = json;
if(typeof id === "string")
{
action = id;
id = void 0;
};
emitService(global, xClient, id, payload, action);
}catch{
emit("messageError", global, xClient, utf8Data);
}
}
});
});
/**
* @type {Map<string, Function[]>}
*/
let events = new Map();
/**
* @type {Map<string, Function[]>}
*/
let services = []
/**
*
* @param {string} event
* @param {(global:Map<string, any>, client:Client, data:any) => any} func
*/
exports.addListener = (event, func) => {
if(!events.has(event))
{
events.set(event,[]);
};
events.get(event).push(func);
};
/**
*
* @param {string} event
* @param {(data:{global:Map<string, any>, client:Client, message:any,response:Function,end:Function,next:Function}) => any} func
*/
exports.addService = (func) => {
services.push(func);
};
function emit(event,...args)
{
if(events.has(event))
{
for (const callback of events.get(event)) {
callback(...args);
}
};
};
/**
*
* @param {Map} global
* @param {Client} local
* @param {number} id
* @param {{[key:string]:any}} payload
* @param {"R"|"S"} action [R]EQUEST flag or [S]TREAM flag
*/
async function emitService(global, client, id, payload, action)
{
let willContinue = false;
for (const callback of services) {
await callback({
message: payload,
action,
client,
global,
messageId: id,
response:(obj)=>{
id != undefined && client.send([obj, id, 'C']) // continue ([C]ONTINUE flag)
},
end:(obj)=>{
id != undefined && client.send([obj, id, 'E']) // stopped data stream (this channel) ([E]ND flag)
},
next:function(){
willContinue = true;
}
});
if(willContinue === false) break;
}
};
exports.websocket = wsServer;