diff --git a/Source/Client.js b/Source/Client.js index 800e4ed..e1f9220 100644 --- a/Source/Client.js +++ b/Source/Client.js @@ -1,5 +1,5 @@ const { CLIENT_SEND_MESSAGE, CLIENT_UPDATE_PROP } = require("./IPC"); - +const stats = require("./stats"); function Client() { /** @@ -95,6 +95,7 @@ Client.prototype.send = function(obj){ { CLIENT_SEND_MESSAGE(this.id, obj, this.proxyProcess) }else{ + stats.ws_sended_packs++; this.socket.sendUTF(JSON.stringify(obj)); } }; diff --git a/Source/HTTPServer.js b/Source/HTTPServer.js index 147bb97..8dfe180 100644 --- a/Source/HTTPServer.js +++ b/Source/HTTPServer.js @@ -6,6 +6,7 @@ let compression = require("compression"); let {resolve} = require("path"); const { termoutput } = require("./config"); let server = http.createServer(); +const stats = require("./stats"); let app = express(); server.addListener("request", app); app.use(compression({ @@ -35,6 +36,9 @@ app.get("/webrtc.adapter.js",(request, response)=>{ app.get("/",(request, response)=>{ response.sendFile(resolve("./script/index.html")) }); +app.post("/stats",(request, response)=>{ + response.json(stats.others); +}); app.get("*",(request, response)=>{ response.sendFile(resolve("./script/status.xml")) }); \ No newline at end of file diff --git a/Source/WebSocket.js b/Source/WebSocket.js index e0fbc1e..63c6ceb 100644 --- a/Source/WebSocket.js +++ b/Source/WebSocket.js @@ -6,6 +6,7 @@ let {randomUUID} = require("crypto"); const { Client } = require("./Client.js"); const { termoutput } = require("./config"); const { CLIENT_CREATED, CLIENT_DESTROY } = require("./IPC"); +const stats = require("./stats"); termoutput && console.log("Web Socket Protocol is ready"); http.addListener("upgrade",() => { @@ -16,9 +17,21 @@ let wsServer = new websocket.server({ httpServer: http, autoAcceptConnections: true }); +/* +process.send({ + core: "writestat", + writeBytes:0, + readBytes:0, + totalBytes:0, + recaivedPacket:0, + sendedPacket:0, + totalPacket:0 +})*/ let global = new Map(); let clients = new Map(); + + wsServer.addListener("connect",(socket) => { let xClient = new Client(); let id = randomUUID(); @@ -34,12 +47,29 @@ wsServer.addListener("connect",(socket) => { CLIENT_CREATED(id); emit("connect", global, xClient); + + let oldw = 0, oldr = 0; + let timer = setInterval(()=>{ + let writed = socket.socket.bytesRead - oldr; + let readed = socket.socket.bytesWritten - oldw; + stats.ws_total_bytes += (writed + readed); + stats.ws_readed_bytes += readed; + stats.ws_writed_bytes += writed; + oldr = socket.socket.bytesRead; + oldw = socket.socket.bytesWritten; + }, 1000) + socket.addListener("close",()=>{ emit("disconnect", global, xClient); CLIENT_DESTROY(id); Client.clients.set(id, xClient); + clearInterval(timer); }); + socket.addListener("message",({type,utf8Data}) => { + stats.ws_recaived_packs++; + stats.ws_total_packs++; + if(type == "utf8") { let json; diff --git a/Source/stats.js b/Source/stats.js new file mode 100644 index 0000000..bd00ffc --- /dev/null +++ b/Source/stats.js @@ -0,0 +1,50 @@ +exports.ws_writed_bytes = 0; +exports.ws_readed_bytes = 0; +exports.ws_total_bytes = 0; +exports.ws_sended_packs = 0; +exports.ws_recaived_packs = 0; +exports.ws_total_packs = 0; + + +process.send({ + core: "writestat", + ws_writed_bytes: exports.ws_writed_bytes, + ws_readed_bytes: exports.ws_readed_bytes, + ws_total_bytes: exports.ws_total_bytes, + ws_sended_packs: exports.ws_sended_packs, + ws_recaived_packs: exports.ws_recaived_packs, + ws_total_packs: exports.ws_total_packs +}); +setInterval(()=>{ + process.send({ + core: "writestat", + ws_writed_bytes: exports.ws_writed_bytes, + ws_readed_bytes: exports.ws_readed_bytes, + ws_total_bytes: exports.ws_total_bytes, + ws_sended_packs: exports.ws_sended_packs, + ws_recaived_packs: exports.ws_recaived_packs, + ws_total_packs: exports.ws_total_packs + }) + exports.ws_writed_bytes = 0; + exports.ws_readed_bytes = 0; + exports.ws_total_bytes = 0; + exports.ws_sended_packs = 0; + exports.ws_recaived_packs = 0; + exports.ws_total_packs = 0; + process.send({ + core: "readstat", + ws_writed_bytes: exports.ws_writed_bytes, + ws_readed_bytes: exports.ws_readed_bytes, + ws_total_bytes: exports.ws_total_bytes, + ws_sended_packs: exports.ws_sended_packs, + ws_recaived_packs: exports.ws_recaived_packs, + ws_total_packs: exports.ws_total_packs + }) +}, 3000) + +process.on('message', stat => { + if(stat.type == ':stats:') + { + exports.others = stat.data; + } +}) diff --git a/index.js b/index.js index 7073bde..2180803 100644 --- a/index.js +++ b/index.js @@ -1,6 +1,7 @@ /** @type {import('node:cluster').Cluster} */ const cluster = require("cluster"); const os = require("os"); +let {randomUUID} = require("crypto"); /** * Use Round Robin algorithm for cluster process load balancer @@ -37,7 +38,38 @@ async function main() // This process want to send payload to sibling process with IPC (workerId, payload) =>{ // Check Target worker - if(payload.process) + if(payload.core) + { + switch(payload.core) + { + case "writestat":{ + master.get(workerId).setStats(payload) + break; + } + case "readstat":{ + master.get(workerId).send({ + type: ':stats:', + data: [ + ...master.entries() + ].map(( + [, master] + ) => { + let e = master.getStats(); + return { + core: master.uuid, + ws_writed_bytes:e.ws_writed_bytes, + ws_readed_bytes:e.ws_readed_bytes, + ws_total_bytes:e.ws_total_bytes, + ws_sended_packs:e.ws_sended_packs, + ws_recaived_packs:e.ws_recaived_packs, + ws_total_packs:e.ws_total_packs + } + }) + }) + break; + } + } + }else if(payload.process) { master.get(payload.process).send({ ...payload, @@ -63,7 +95,7 @@ async function main() -async function generateFlow(N) +async function generateFlow() { // Mirror this process with for (low-level os multitasking) const worker = cluster.fork(); @@ -75,12 +107,17 @@ async function generateFlow(N) }); // Get process pid on the os let id = worker.process.pid; + + let stats = {}; // Simplification wrapping send and get events with IPC's event functions return { id, + uuid: randomUUID(), send: message => worker.send(message), - message: (callback) => worker.addListener("message", e => callback(id,e)) + message: (callback) => worker.addListener("message", e => callback(id,e)), + getStats: () => stats, + setStats: e => Object.assign(stats, e) } } diff --git a/script/index.html b/script/index.html index e8805ae..bd34043 100644 --- a/script/index.html +++ b/script/index.html @@ -31,6 +31,7 @@ flex: 1 1 25%; max-width: 100%; overflow: auto; + margin-bottom: 50px; } .speed-container > div canvas{ max-width: 100%; @@ -597,7 +598,7 @@ { iTargetSpeed = value; createCanvas.ps.innerText = value; - createCanvas.pw.innerText = pressure; + createCanvas.pw.innerText = pressure + '%'; draw() } function convertSpeedToAngle(options) { @@ -630,6 +631,7 @@ update } } + /* let elem = document.querySelector(".speed-container"); for(let e = 0; e < 8; e++) { @@ -638,6 +640,39 @@ elem.appendChild(container); meter.update(54,25) } + */ + let elem = document.querySelector(".speed-container"); + let meters = new Map(); + async function reloadData() + { + let data = await fetchData(); + setTimeout(reloadData, 3000) + }; + let isFirst = true; + async function fetchData() + { + let response = await fetch("/stats",{ + method: "post", + credentials: "same-origin", + cache: "no-cache", + mode:"no-cors" + }).then(e => e.json()); + for (const { + ws_total_packs, + core + } of response) { + if(!meters.has(core)) + { + let meter = CreateMeter(); + let container = meter.createCanvas(); + elem.appendChild(container); + meters.set(core, meter) + }; + let _meter = meters.get(core); + _meter.update(ws_total_packs,0) + } + } + reloadData(); \ No newline at end of file diff --git a/test.html b/test.html index f22bcbe..3cf3de0 100644 --- a/test.html +++ b/test.html @@ -51,9 +51,9 @@ } gg.w = 0; gg.r = 0; - setInterval(()=>{ + /*setInterval(()=>{ window.location.reload(); - }, 30000) + }, 30000)*/ main();