170 lines
4.3 KiB
JavaScript
170 lines
4.3 KiB
JavaScript
"use strict";
|
|
|
|
let websocket = require("websocket");
|
|
let {http} = require("./HTTPServer");
|
|
let {randomUUID} = require("crypto");
|
|
const { Client } = require("./Client.js");
|
|
const { termoutput } = require("./config");
|
|
const { CLIENT_CREATED, CLIENT_DESTROY } = require("./IPC");
|
|
const stats = require("./stats");
|
|
termoutput && console.log("Web Socket Protocol is ready");
|
|
|
|
http.addListener("upgrade",() => {
|
|
termoutput && console.log("HTTP Upgrading to WebSocket");
|
|
})
|
|
|
|
let wsServer = new websocket.server({
|
|
httpServer: http,
|
|
autoAcceptConnections: true
|
|
});
|
|
/*
|
|
process.send({
|
|
core: "writestat",
|
|
writeBytes:0,
|
|
readBytes:0,
|
|
totalBytes:0,
|
|
recaivedPacket:0,
|
|
sendedPacket:0,
|
|
totalPacket:0
|
|
})*/
|
|
|
|
let global = new Map();
|
|
let clients = new Map();
|
|
|
|
|
|
wsServer.addListener("connect",(socket) => {
|
|
let xClient = new Client();
|
|
let id = randomUUID();
|
|
socket.id = id;
|
|
xClient.id = id;
|
|
xClient.socket = socket;
|
|
xClient.created_at = new Date();
|
|
Client.clients.set(id, xClient);
|
|
clients.set(id, xClient);
|
|
|
|
stats.mwse_clients++;
|
|
|
|
CLIENT_CREATED(id);
|
|
|
|
emit("connect", global, xClient);
|
|
|
|
let oldw = 0, oldr = 0;
|
|
let timer = setInterval(()=>{
|
|
let writed = socket.socket.bytesRead - oldr;
|
|
let readed = socket.socket.bytesWritten - oldw;
|
|
stats.ws_total_bytes += (writed + readed);
|
|
stats.ws_readed_bytes += readed;
|
|
stats.ws_writed_bytes += writed;
|
|
oldr = socket.socket.bytesRead;
|
|
oldw = socket.socket.bytesWritten;
|
|
}, 1000)
|
|
|
|
socket.addListener("close",()=>{
|
|
stats.mwse_clients--;
|
|
emit("disconnect", global, xClient);
|
|
CLIENT_DESTROY(id);
|
|
Client.clients.delete(id);
|
|
clearInterval(timer);
|
|
clearInterval(pingTimer);
|
|
});
|
|
|
|
let pingTimer = setInterval(()=> socket.ping('saQut') , 10_000);
|
|
|
|
socket.addListener("pong",validationText => {
|
|
if(validationText.toString('utf8') != "saQut"){
|
|
socket.close();
|
|
}
|
|
})
|
|
socket.addListener("message",({type,utf8Data}) => {
|
|
stats.ws_recaived_packs++;
|
|
stats.ws_total_packs++;
|
|
|
|
if(type == "utf8")
|
|
{
|
|
let json;
|
|
try{
|
|
json = JSON.parse(utf8Data);
|
|
emit('services', global, xClient, json);
|
|
let [payload, id, action] = json;
|
|
if(typeof id === "string")
|
|
{
|
|
action = id;
|
|
id = void 0;
|
|
};
|
|
emitService(global, xClient, id, payload, action);
|
|
}catch{
|
|
emit("messageError", global, xClient, utf8Data);
|
|
}
|
|
}
|
|
});
|
|
});
|
|
|
|
/**
|
|
* @type {Map<string, Function[]>}
|
|
*/
|
|
let events = new Map();
|
|
/**
|
|
* @type {Map<string, Function[]>}
|
|
*/
|
|
let services = []
|
|
/**
|
|
*
|
|
* @param {string} event
|
|
* @param {(global:Map<string, any>, client:Client, data:any) => any} func
|
|
*/
|
|
exports.addListener = (event, func) => {
|
|
if(!events.has(event))
|
|
{
|
|
events.set(event,[]);
|
|
};
|
|
events.get(event).push(func);
|
|
};
|
|
/**
|
|
*
|
|
* @param {string} event
|
|
* @param {(data:{global:Map<string, any>, client:Client, message:any,response:Function,end:Function,next:Function}) => any} func
|
|
*/
|
|
exports.addService = (func) => {
|
|
services.push(func);
|
|
};
|
|
function emit(event,...args)
|
|
{
|
|
if(events.has(event))
|
|
{
|
|
for (const callback of events.get(event)) {
|
|
callback(...args);
|
|
}
|
|
};
|
|
};
|
|
/**
|
|
*
|
|
* @param {Map} global
|
|
* @param {Client} local
|
|
* @param {number} id
|
|
* @param {{[key:string]:any}} payload
|
|
* @param {"R"|"S"} action [R]EQUEST flag or [S]TREAM flag
|
|
*/
|
|
async function emitService(global, client, id, payload, action)
|
|
{
|
|
let willContinue = false;
|
|
for (const callback of services) {
|
|
await callback({
|
|
message: payload,
|
|
action,
|
|
client,
|
|
global,
|
|
messageId: id,
|
|
response:(obj)=>{
|
|
id != undefined && client.send([obj, id, 'C']) // continue ([C]ONTINUE flag)
|
|
},
|
|
end:(obj)=>{
|
|
id != undefined && client.send([obj, id, 'E']) // stopped data stream (this channel) ([E]ND flag)
|
|
},
|
|
next:function(){
|
|
willContinue = true;
|
|
}
|
|
});
|
|
if(willContinue === false) break;
|
|
}
|
|
};
|
|
exports.websocket = wsServer; |