diff --git a/Source/Client.js b/Source/Client.js index dcf38fd..e1f9220 100644 --- a/Source/Client.js +++ b/Source/Client.js @@ -1,3 +1,5 @@ +const { CLIENT_SEND_MESSAGE, CLIENT_UPDATE_PROP } = require("./IPC"); +const stats = require("./stats"); function Client() { /** @@ -21,6 +23,17 @@ function Client() this.APNumber = 0; this.APShortCode = 0; this.APIPAddress = 0; + + this.isProxy = false; + this.proxyProcess = null; + + this.sync = function(...args){ + process.nextTick(()=>{ + for (const name of args) { + CLIENT_UPDATE_PROP(this.id, name, this[name]); + } + }) + }; }; /** * @type {Map} @@ -34,6 +47,7 @@ Client.prototype.peerRequest = function(client){ let info = {}; this.store.forEach((value, name) => info[name] = value); this.pairs.add(client.id); + this.sync('pairs'); client.send([{ from: this.id, info @@ -45,6 +59,7 @@ Client.prototype.peerRequest = function(client){ */ Client.prototype.acceptPeerRequest = function(client){ this.pairs.add(client.id); + this.sync('pairs'); client.send([{ from: this.id },'accepted/pair']); @@ -55,6 +70,7 @@ Client.prototype.peerRequest = function(client){ Client.prototype.rejectPeerRequest = function(client){ this.pairs.delete(client.id); client.pairs.delete(this.id); + this.sync('pairs'); client.send([{ from: this.id },'rejected/pair']); @@ -75,7 +91,13 @@ Client.prototype.pairList = function(){ }; Client.prototype.send = function(obj){ - this.socket.sendUTF(JSON.stringify(obj)); + if(this.isProxy) + { + CLIENT_SEND_MESSAGE(this.id, obj, this.proxyProcess) + }else{ + stats.ws_sended_packs++; + this.socket.sendUTF(JSON.stringify(obj)); + } }; exports.Client = Client; \ No newline at end of file diff --git a/Source/HTTPServer.js b/Source/HTTPServer.js index 47bc01b..b549be8 100644 --- a/Source/HTTPServer.js +++ b/Source/HTTPServer.js @@ -6,19 +6,27 @@ let compression = require("compression"); let {resolve} = require("path"); const { termoutput } = require("./config"); let server = http.createServer(); +const stats = require("./stats"); let app = express(); server.addListener("request", app); app.use(compression({ level: 9 })); + server.listen(7707,'0.0.0.0',() => { termoutput && console.log("HTTP Service Running..."); }); +server.addListener("error",(err)=> { + console.err(err) +}) exports.http = server; app.get("/script",(request, response)=>{ response.sendFile(resolve("./script/index.js")) }); +app.get("/test",(request, response)=>{ + response.sendFile(resolve("./script/test.html")) +}); app.get("/index.js.map",(request, response)=>{ response.sendFile(resolve("./script/index.js.map")) }); @@ -28,6 +36,12 @@ app.get("/webrtc.js",(request, response)=>{ app.get("/webrtc.adapter.js",(request, response)=>{ response.sendFile(resolve("./script/webrtc.adapter.js")) }); +app.get("/",(request, response)=>{ + response.sendFile(resolve("./script/index.html")) +}); +app.post("/stats",(request, response)=>{ + response.json(stats.others); +}); app.get("*",(request, response)=>{ response.sendFile(resolve("./script/status.xml")) }); \ No newline at end of file diff --git a/Source/IPC.js b/Source/IPC.js new file mode 100644 index 0000000..2637309 --- /dev/null +++ b/Source/IPC.js @@ -0,0 +1,187 @@ + +process.on('message',data => { + const { Client } = require("./Client.js"); + const { Room } = require("./Services/Room"); + switch(data.type) + { + case "CLIENT_CREATED":{ + slog("CLIENT_CREATED"); + let client = new Client(); + client.isProxy = true; + client.proxyProcess = data.pid; + client.id = data.uuid; + Client.clients.set(client.id, client); + break; + } + case "CLIENT_UPDATE_PROP":{ + slog("CLIENT_UPDATE_PROP"); + let client = Client.clients.get(data.uuid); + client[data.name] = data.value; + break; + } + case "CLIENT_SEND_MESSAGE":{ + //slog("CLIENT_SEND_MESSAGE"); + let client = Client.clients.get(data.uuid); + if(client.isProxy != true) + { + client.send(data.message) + } + break; + } + case "CLIENT_DESTROY":{ + slog("CLIENT_DESTROY"); + Client.clients.delete(data.uuid); + break; + } + case "ROOM_CREATED":{ + slog("ROOM_CREATED"); + let room = Room.fromJSON(data.value); + Room.rooms.set(room.id, room); + break; + } + case "ROOM_UPDATE_PROP":{ + slog("ROOM_UPDATE_PROP"); + let room = Room.rooms.get(data.uuid); + room[data.name] = data.value; + break; + } + case "ROOM_JOIN_CLIENT":{ + slog("ROOM_JOIN_CLIENT"); + let room = Room.rooms.get(data.uuid); + let client = Client.clients.get(data.client); + if(room && client) + { + client.rooms.add(room.id); + room.clients.set(client.id, client); + } + break; + } + case "ROOM_EJECT_CLIENT":{ + slog("ROOM_EJECT_CLIENT"); + let room = Room.rooms.get(data.uuid); + let client = Client.clients.get(data.client); + if(room && client) + { + client.rooms.delete(room.id); + room.clients.delete(client.id, client); + } + break; + } + case "ROOM_DESTROY":{ + slog("ROOM_DESTROY"); + Room.rooms.delete(data.value); + break; + } + } +}); + +function CLIENT_CREATED(uuid) +{ + mlog("CLIENT_CREATED"); + process.send({ + type:'CLIENT_CREATED', + uuid: uuid + }) +}; +function CLIENT_UPDATE_PROP(uuid, name, value) +{ + mlog("CLIENT_UPDATE_PROP"); + process.send({ + type:'CLIENT_UPDATE_PROP', + uuid: uuid, + name, + value + }) +}; +function CLIENT_SEND_MESSAGE(uuid, message, clusterPid) +{ + mlog("CLIENT_SEND_MESSAGE"); + process.send({ + type:'CLIENT_SEND_MESSAGE', + uuid: uuid, + message, + process:clusterPid + }) +}; +function CLIENT_DESTROY(uuid) +{ + mlog("CLIENT_DESTROY"); + process.send({ + type:'CLIENT_DESTROY', + uuid: uuid + }) +}; + + +function ROOM_CREATED(room) +{ + mlog("ROOM_CREATED"); + let raw = room.toJSON(true); + process.send({ + type:'ROOM_CREATED', + value: raw + }) +}; + +function ROOM_UPDATE_PROP(uuid, name, value) +{ + mlog("ROOM_UPDATE_PROP"); + process.send({ + type:'ROOM_UPDATE_PROP', + uuid: uuid, + name, + value + }) +}; + +function ROOM_JOIN_CLIENT(uuid, client) +{ + mlog("ROOM_JOIN_CLIENT"); + process.send({ + type:'ROOM_JOIN_CLIENT', + uuid: uuid, + client + }) +}; +function ROOM_EJECT_CLIENT(uuid, client) +{ + mlog("ROOM_EJECT_CLIENT"); + process.send({ + type:'ROOM_EJECT_CLIENT', + uuid: uuid, + client + }) +}; + +function ROOM_DESTROY(room) +{ + mlog("ROOM_DESTROY"); + process.send({ + type:'ROOM_DESTROY', + value: room.id + }) +}; + +function mlog(command) +{ + return; + console.log("M",process.pid, command) +} +function slog(command) +{ + return; + console.log("S",process.pid, command) +} + + +exports.CLIENT_CREATED = CLIENT_CREATED; +exports.CLIENT_UPDATE_PROP = CLIENT_UPDATE_PROP; +exports.CLIENT_DESTROY = CLIENT_DESTROY; +exports.CLIENT_SEND_MESSAGE = CLIENT_SEND_MESSAGE; +exports.ROOM_CREATED = ROOM_CREATED; +exports.ROOM_UPDATE_PROP = ROOM_UPDATE_PROP; +exports.ROOM_JOIN_CLIENT = ROOM_JOIN_CLIENT; +exports.ROOM_EJECT_CLIENT = ROOM_EJECT_CLIENT; +exports.ROOM_DESTROY = ROOM_DESTROY; +exports.mlog = mlog; +exports.slog = slog; \ No newline at end of file diff --git a/Source/Services/Auth.js b/Source/Services/Auth.js index e12e256..660096a 100644 --- a/Source/Services/Auth.js +++ b/Source/Services/Auth.js @@ -1,4 +1,5 @@ const { Client } = require("../Client.js"); +const { CLIENT_UPDATE_PROP } = require("../IPC.js"); let {addService, addListener} = require("../WebSocket.js"); addService(({ @@ -13,13 +14,16 @@ addService(({ case "auth/pair-system":{ if(value == 'everybody') { - client.requiredPair = true; + client.requiredPair = true; end({status:"success"}); + client.sync('requiredPair'); } if(value == 'disable') { client.requiredPair = false; end({status:"success"}); + client.sync('requiredPair'); + //CLIENT_UPDATE_PROP(client.id, 'requiredPair', client.requiredPair); } break; } @@ -29,6 +33,7 @@ addService(({ } case 'auth/public':{ client.requiredPair = false; + client.sync('requiredPair'); return end({ value: 'success', mode: 'public' @@ -36,6 +41,7 @@ addService(({ } case 'auth/private':{ client.requiredPair = true; + client.sync('requiredPair'); return end({ value: 'success', mode: 'private' diff --git a/Source/Services/IPPressure.js b/Source/Services/IPPressure.js index 1f9ac98..cd8ce09 100644 --- a/Source/Services/IPPressure.js +++ b/Source/Services/IPPressure.js @@ -3,6 +3,7 @@ let {addService, addListener} = require("../WebSocket.js"); let { randomBytes } = require("node:crypto"); +const { slog } = require("../IPC"); class APNumber{ /** @@ -11,14 +12,20 @@ class APNumber{ static busyNumbers = new Map(); /** * @type {number} + * @param {Client} client */ - static lock() + static lock(client) { let c = 24; while(true){ if(!APNumber.busyNumbers.has(c)) { - APNumber.busyNumbers.set(c,true); + APNumber.busyNumbers.set(c,client); + process.send({ + type: 'AP_NUMBER/LOCK', + uuid: client.id, + value: c + }) return c; } c++; @@ -30,6 +37,11 @@ class APNumber{ static release(num) { APNumber.busyNumbers.delete(num); + process.send({ + type: 'AP_NUMBER/RELEASE', + uuid: APNumber.busyNumbers.get(num).id, + value: num + }) } static whois(num){ return APNumber.busyNumbers.get(num)?.id; @@ -56,6 +68,11 @@ class APShortCode{ if(APShortCode.busyCodes.has(code) == false) { APShortCode.busyCodes.set(code, client); + process.send({ + type: 'AP_SHORTCODE/LOCK', + uuid: APShortCode.busyCodes.get(num).id, + value: code + }) return code; } if(!thirdLetter.end()) @@ -79,7 +96,15 @@ class APShortCode{ } } static release(code){ - APShortCode.busyCodes.delete(code); + if(APShortCode.busyCodes.has(code)) + { + process.send({ + type: 'AP_SHORTCODE/RELEASE', + uuid: APShortCode.busyCodes.get(code).id, + value: code + }) + APShortCode.busyCodes.delete(code); + } } static whois(num){ return APShortCode.busyCodes.get(num)?.id; @@ -122,6 +147,11 @@ class APIPAddress{ if(APIPAddress.busyIP.has(code) == false) { APIPAddress.busyIP.set(code, client); + process.send({ + type: 'AP_IPADDRESS/LOCK', + uuid: APIPAddress.busyIP.get(code).id, + value: code + }) return code; } if(D != 255) @@ -156,7 +186,15 @@ class APIPAddress{ } } static release(code){ - APIPAddress.busyIP.delete(code); + if(APIPAddress.busyIP.has(code)) + { + process.send({ + type: 'AP_IPADDRESS/RELEASE', + uuid: APIPAddress.busyIP.get(code).id, + value: code + }) + APIPAddress.busyIP.delete(code); + } } static whois(num){ return APIPAddress.busyIP.get(num)?.id; @@ -167,7 +205,6 @@ exports.APNumber = APNumber; exports.APShortCode = APShortCode; exports.APIPAddress = APIPAddress; - addService(({ client, message, @@ -186,6 +223,7 @@ addService(({ }; let value = APIPAddress.lock(client); client.APIPAddress = value; + client.sync('APIPAddress'); end({ status : "success", ip : value @@ -201,6 +239,7 @@ addService(({ }; let value = APNumber.lock(client); client.APNumber = value; + client.sync('APNumber'); end({ status : "success", number : value @@ -216,6 +255,7 @@ addService(({ }; let value = APShortCode.lock(client); client.APShortCode = value; + client.sync('APShortCode'); end({ status : "success", code : value @@ -230,6 +270,7 @@ addService(({ } APIPAddress.release(client.APIPAddress); let value = APIPAddress.lock(client); + client.sync('APIPAddress'); end({ status : "success", ip : value @@ -244,6 +285,7 @@ addService(({ } APNumber.release(client.APNumber); let value = APNumber.lock(client); + client.sync('APNumber'); end({ status : "success", number : value @@ -258,6 +300,7 @@ addService(({ } APShortCode.release(client.APShortCode); let value = APShortCode.lock(client); + client.sync('APShortCode'); end({ status : "success", code : value @@ -266,6 +309,8 @@ addService(({ } case "release/APIPAddress":{ APIPAddress.release(client.APIPAddress); + client.APIPAddress = void 0; + client.sync('APShortCode'); end({ status : "success" }) @@ -273,6 +318,8 @@ addService(({ } case "release/APNumber":{ APNumber.release(client.APNumber); + client.APNumber = void 0; + client.sync('APIPAddress'); end({ status : "success" }) @@ -280,6 +327,8 @@ addService(({ } case "release/APShortCode":{ APShortCode.release(client.APShortCode); + client.APShortCode = void 0; + client.sync('APIPAddress'); end({ status : "success" }) @@ -333,6 +382,72 @@ addService(({ } }) +process.on('message',({type, uuid, value}) => { + switch(type) + { + case "AP_NUMBER/LOCK":{ + console.log("S",process.pid, 'IPPressure SYNCED') + let client = Client.clients.get(uuid); + APNumber.busyNumbers.set(value, client); + if(client) + { + client.APNumber = value; + client.sync('APNumber'); + } + } + case "AP_NUMBER/RELEASE":{ + console.log("S",process.pid, 'IPPressure SYNCED') + APNumber.busyNumbers.delete(value); + let client = Client.clients.get(uuid); + if(client) + { + client.APNumber = void 0; + client.sync('APNumber'); + } + } + case "AP_SHORTCODE/LOCK":{ + console.log("S",process.pid, 'IPPressure SYNCED') + let client = Client.clients.get(uuid); + APShortCode.busyCodes.set(value, client); + if(client) + { + client.APShortCode = value; + client.sync('APShortCode'); + } + } + case "AP_SHORTCODE/RELEASE":{ + console.log("S",process.pid, 'IPPressure SYNCED') + APShortCode.busyCodes.delete(value); + let client = Client.clients.get(uuid); + if(client) + { + client.APShortCode = void 0; + client.sync('APShortCode'); + } + } + case "AP_IPADDRESS/LOCK":{ + console.log("S",process.pid, 'IPPressure SYNCED') + let client = Client.clients.get(uuid); + APIPAddress.busyIP.set(value, client); + if(client) + { + client.APIPAddress = value; + client.sync('APIPAddress'); + } + } + case "AP_IPADDRESS/RELEASE":{ + console.log("S",process.pid, 'IPPressure SYNCED') + APIPAddress.busyIP.delete(value); + let client = Client.clients.get(uuid); + if(client) + { + client.APIPAddress = void 0; + client.sync('APIPAddress'); + } + } + } +}) + addListener('disconnect',(global, client)=>{ if(client.APIPAddress != 0) { diff --git a/Source/Services/Room.js b/Source/Services/Room.js index 6d26d40..c797c0b 100644 --- a/Source/Services/Room.js +++ b/Source/Services/Room.js @@ -3,6 +3,7 @@ let {randomUUID,createHash} = require("crypto"); const joi = require("joi"); let {addService,addListener} = require("../WebSocket.js"); const { termoutput } = require("../config.js"); +const { ROOM_CREATED, ROOM_DESTROY, ROOM_UPDATE_PROP, ROOM_JOIN_CLIENT, ROOM_EJECT_CLIENT } = require("../IPC.js"); let term = require("terminal-kit").terminal; function Sha256(update) @@ -64,15 +65,24 @@ function Room() * @type {string[]} */ this.waitingInvited = new Set(); + + this.sync = function(...args){ + process.nextTick(()=>{ + for (const name of args) { + ROOM_UPDATE_PROP(this.id, name, this[name]); + } + }) + }; } /** * @param {Room} room */ Room.prototype.publish = function(room){ Room.rooms.set(this.id, this); + ROOM_CREATED(this); termoutput && term.green("Room Published ").white(this.name," in ").yellow(this.clients.size).white(" clients")('\n'); }; -Room.prototype.toJSON = function(){ +Room.prototype.toJSON = function(detailed){ let obj = {}; obj.id = this.id; obj.accessType = this.accessType; @@ -82,8 +92,47 @@ Room.prototype.toJSON = function(){ obj.name = this.name; obj.owner = this.owner.id; obj.waitingInvited = [...this.waitingInvited]; + if(detailed) + { + obj.credential = this.credential; + obj.notifyActionInvite = this.notifyActionInvite; + obj.notifyActionJoined = this.notifyActionJoined; + obj.notifyActionEjected = this.notifyActionEjected; + obj.clients = [...this.clients.keys()]; + } return obj; }; +/** + * @param {Object} data + * @param {Room} room + */ +Room.fromJSON = function(data, room){ + room = room || new Room(); + let obj = {}; + room.id = data.id; + room.accessType = data.accessType; + room.createdAt = data.createdAt; + room.description = data.description; + room.joinType = data.joinType; + room.name = data.name; + if(data.owner && Client.clients.has(data.owner)) + { + room.owner = Client.clients.get(data.owner); + } + room.waitingInvited = new Set(data.waitingInvited); + obj.credential = data.credential; + obj.notifyActionInvite = data.notifyActionInvite; + obj.notifyActionJoined = data.notifyActionJoined; + obj.notifyActionEjected = data.notifyActionEjected; + obj.clients = new Map( + data.clients.map(e => ([ + e, // map key + Client.clients.get(e) // map value + ]) + ) + ) + return room; +}; Room.prototype.send = function(obj, withOut){ for (const client of this.clients.values()) { if(client.id != withOut) @@ -107,6 +156,7 @@ Room.prototype.join = function(client){ }; client.rooms.add(this.id); this.clients.set(client.id, client); + ROOM_JOIN_CLIENT(this.id, client.id); termoutput && term.green("Client Room joined ").white(this.name," in ").yellow(this.clients.size + "").white(" clients")('\n'); }; Room.prototype.down = function(){ @@ -116,6 +166,7 @@ Room.prototype.down = function(){ ownerid: this.owner.id },'room/closed']); Room.rooms.delete(this.id); + ROOM_DESTROY(this) }; /** * @param {Client} client @@ -131,6 +182,7 @@ Room.prototype.eject = function(client){ } client.rooms.delete(this.id); this.clients.delete(client.id); + ROOM_EJECT_CLIENT(this.id, client.id); if(this.clients.size == 0) { @@ -154,8 +206,8 @@ addListener('connect',(global, client)=>{ room.id = client.id; room.name = "Your Room | " + client.id; room.owner = client; - room.join(client); room.publish(); + room.join(client); }); addListener('disconnect',(global, client)=>{ @@ -296,8 +348,8 @@ addService(({ { room.credential = Sha256(message.credential + ""); } - room.join(client); room.publish(); + room.join(client); end({ status: "success", room: room.toJSON() diff --git a/Source/WebSocket.js b/Source/WebSocket.js index 470e41e..63c6ceb 100644 --- a/Source/WebSocket.js +++ b/Source/WebSocket.js @@ -5,6 +5,8 @@ let {http} = require("./HTTPServer"); let {randomUUID} = require("crypto"); const { Client } = require("./Client.js"); const { termoutput } = require("./config"); +const { CLIENT_CREATED, CLIENT_DESTROY } = require("./IPC"); +const stats = require("./stats"); termoutput && console.log("Web Socket Protocol is ready"); http.addListener("upgrade",() => { @@ -15,11 +17,22 @@ let wsServer = new websocket.server({ httpServer: http, autoAcceptConnections: true }); +/* +process.send({ + core: "writestat", + writeBytes:0, + readBytes:0, + totalBytes:0, + recaivedPacket:0, + sendedPacket:0, + totalPacket:0 +})*/ let global = new Map(); let clients = new Map(); -wsServer.addListener("connect",(socket) => { + +wsServer.addListener("connect",(socket) => { let xClient = new Client(); let id = randomUUID(); socket.id = id; @@ -29,12 +42,34 @@ wsServer.addListener("connect",(socket) => { Client.clients.set(id, xClient); clients.set(id, xClient); + console.log("Client:", id,"on worker pid:",process.pid) + + CLIENT_CREATED(id); + emit("connect", global, xClient); + + let oldw = 0, oldr = 0; + let timer = setInterval(()=>{ + let writed = socket.socket.bytesRead - oldr; + let readed = socket.socket.bytesWritten - oldw; + stats.ws_total_bytes += (writed + readed); + stats.ws_readed_bytes += readed; + stats.ws_writed_bytes += writed; + oldr = socket.socket.bytesRead; + oldw = socket.socket.bytesWritten; + }, 1000) + socket.addListener("close",()=>{ emit("disconnect", global, xClient); + CLIENT_DESTROY(id); Client.clients.set(id, xClient); + clearInterval(timer); }); + socket.addListener("message",({type,utf8Data}) => { + stats.ws_recaived_packs++; + stats.ws_total_packs++; + if(type == "utf8") { let json; diff --git a/Source/stats.js b/Source/stats.js new file mode 100644 index 0000000..fdf776c --- /dev/null +++ b/Source/stats.js @@ -0,0 +1,53 @@ +const { mlog } = require("./IPC"); + +exports.ws_writed_bytes = 0; +exports.ws_readed_bytes = 0; +exports.ws_total_bytes = 0; +exports.ws_sended_packs = 0; +exports.ws_recaived_packs = 0; +exports.ws_total_packs = 0; + + +process.send({ + core: "writestat", + ws_writed_bytes: exports.ws_writed_bytes, + ws_readed_bytes: exports.ws_readed_bytes, + ws_total_bytes: exports.ws_total_bytes, + ws_sended_packs: exports.ws_sended_packs, + ws_recaived_packs: exports.ws_recaived_packs, + ws_total_packs: exports.ws_total_packs +}); +setInterval(()=>{ + process.send({ + core: "writestat", + ws_writed_bytes: exports.ws_writed_bytes, + ws_readed_bytes: exports.ws_readed_bytes, + ws_total_bytes: exports.ws_total_bytes, + ws_sended_packs: exports.ws_sended_packs, + ws_recaived_packs: exports.ws_recaived_packs, + ws_total_packs: exports.ws_total_packs + }) + mlog(`writed ${exports.ws_writed_bytes} bytes, readed ${exports.ws_readed_bytes} bytes`); + exports.ws_writed_bytes = 0; + exports.ws_readed_bytes = 0; + exports.ws_total_bytes = 0; + exports.ws_sended_packs = 0; + exports.ws_recaived_packs = 0; + exports.ws_total_packs = 0; + process.send({ + core: "readstat", + ws_writed_bytes: exports.ws_writed_bytes, + ws_readed_bytes: exports.ws_readed_bytes, + ws_total_bytes: exports.ws_total_bytes, + ws_sended_packs: exports.ws_sended_packs, + ws_recaived_packs: exports.ws_recaived_packs, + ws_total_packs: exports.ws_total_packs + }) +}, 3000) + +process.on('message', stat => { + if(stat.type == ':stats:') + { + exports.others = stat.data; + } +}) diff --git a/index.js b/index.js new file mode 100644 index 0000000..e57a966 --- /dev/null +++ b/index.js @@ -0,0 +1,125 @@ +/** @type {import('node:cluster').Cluster} */ +const cluster = require("cluster"); +const os = require("os"); +let {randomUUID} = require("crypto"); + +/** + * Use Round Robin algorithm for cluster process load balancer + */ +cluster.schedulingPolicy = cluster.SCHED_RR; + +async function main() +{ + if(cluster.isPrimary == false) + { + console.log("Slave Process PID:", process.pid); + // This process is a worker / slave + // Compile source code and run + require("./Source/index"); + // stay here + return; + }; + + // This process is a primary / master + console.log("Master Process PID:", process.pid); + + // Worker process list + const master = new Map(); + + const coreCount = 3 //os.cpus().length; + for(let index = 0; index < coreCount; index++) + { + // Open slave process + let worker = await generateFlow(); + // Save process with id + master.set(worker.id, worker); + // Listen process for commands + worker.message( + // This process want to send payload to sibling process with IPC + (workerId, payload) =>{ + // Check Target worker + if(payload.core) + { + switch(payload.core) + { + case "writestat":{ + master.get(workerId).setStats(payload) + break; + } + case "readstat":{ + master.get(workerId).send({ + type: ':stats:', + data: [ + ...master.entries() + ].map(( + [, master] + ) => { + let e = master.getStats(); + return { + core: master.uuid, + ws_writed_bytes:e.ws_writed_bytes, + ws_readed_bytes:e.ws_readed_bytes, + ws_total_bytes:e.ws_total_bytes, + ws_sended_packs:e.ws_sended_packs, + ws_recaived_packs:e.ws_recaived_packs, + ws_total_packs:e.ws_total_packs + } + }) + }) + break; + } + } + }else if(payload.process) + { + master.get(payload.process).send({ + ...payload, + pid: worker.id + }) + }else for (const [siblingWorkerId,{send}] of master) { + // No sending to itself + if(workerId !== siblingWorkerId) + { + + // Send command to sibling with IPC + send({ + ...payload, + pid: worker.id + }) + } + } + } + ) + } +} + + + + +async function generateFlow() +{ + // Mirror this process with for (low-level os multitasking) + const worker = cluster.fork(); + // Wait process is online + await new Promise(ok => { + worker.addListener("online",()=> { + ok() + }) + }); + // Get process pid on the os + let id = worker.process.pid; + + let stats = {}; + + // Simplification wrapping send and get events with IPC's event functions + return { + id, + uuid: randomUUID(), + send: message => worker.send(message), + message: (callback) => worker.addListener("message", e => callback(id,e)), + getStats: () => stats, + setStats: e => Object.assign(stats, e) + } +} + +// Run immediately +process.nextTick(main); \ No newline at end of file diff --git a/script/index.html b/script/index.html new file mode 100644 index 0000000..9933e70 --- /dev/null +++ b/script/index.html @@ -0,0 +1,678 @@ + + + + + + + Network meter + + +
+
+ +
+
+ + + + \ No newline at end of file diff --git a/test.html b/script/test.html similarity index 59% rename from test.html rename to script/test.html index d0ecd7a..632ddc6 100644 --- a/test.html +++ b/script/test.html @@ -7,12 +7,13 @@ Document - +

+

+