From e4f644a346cf75dd21e8691a1e5c2cb104fde16b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Abdussamed=20ULUTA=C5=9E?= Date: Wed, 1 Mar 2023 00:33:07 +0300 Subject: [PATCH] Multitasking completed for Client, only room left --- Source/Client.js | 13 +++- Source/{Notify.js => IPC.js} | 60 +++++++++++++++++- Source/Services/Auth.js | 8 ++- Source/Services/IPPressure.js | 115 +++++++++++++++++++++++++++++++++- Source/Services/Room.js | 41 +++++++++++- Source/WebSocket.js | 8 +-- index.js | 74 +++++++++++++--------- test.html | 24 ++----- 8 files changed, 280 insertions(+), 63 deletions(-) rename Source/{Notify.js => IPC.js} (52%) diff --git a/Source/Client.js b/Source/Client.js index 72b061b..abf64d0 100644 --- a/Source/Client.js +++ b/Source/Client.js @@ -1,4 +1,4 @@ -const { CLIENT_SEND_MESSAGE } = require("./Notify"); +const { CLIENT_SEND_MESSAGE, CLIENT_UPDATE_PROP } = require("./IPC"); function Client() { @@ -26,6 +26,14 @@ function Client() this.isProxy = false; this.proxyProcess = null; + + this.sync = function(...args){ + process.nextTick(()=>{ + for (const name of args) { + CLIENT_UPDATE_PROP(this.id, name, this[name]); + } + }) + }; }; /** * @type {Map} @@ -39,6 +47,7 @@ Client.prototype.peerRequest = function(client){ let info = {}; this.store.forEach((value, name) => info[name] = value); this.pairs.add(client.id); + this.sync('pairs'); client.send([{ from: this.id, info @@ -50,6 +59,7 @@ Client.prototype.peerRequest = function(client){ */ Client.prototype.acceptPeerRequest = function(client){ this.pairs.add(client.id); + this.sync('pairs'); client.send([{ from: this.id },'accepted/pair']); @@ -60,6 +70,7 @@ Client.prototype.peerRequest = function(client){ Client.prototype.rejectPeerRequest = function(client){ this.pairs.delete(client.id); client.pairs.delete(this.id); + this.sync('pairs'); client.send([{ from: this.id },'rejected/pair']); diff --git a/Source/Notify.js b/Source/IPC.js similarity index 52% rename from Source/Notify.js rename to Source/IPC.js index 9b34a38..4f08811 100644 --- a/Source/Notify.js +++ b/Source/IPC.js @@ -1,8 +1,11 @@ +const { Room } = require("./Services/Room"); + process.on('message',data => { const {Client} = require("./Client.js") switch(data.type) { case "CLIENT_CREATED":{ + slog("CLIENT_CREATED"); let client = new Client(); client.isProxy = true; client.proxyProcess = data.pid; @@ -11,25 +14,39 @@ process.on('message',data => { break; } case "CLIENT_UPDATE_PROP":{ + slog("CLIENT_UPDATE_PROP"); let client = Client.clients.get(data.uuid); - client[data.name] = value; + client[data.name] = data.value; break; } case "CLIENT_SEND_MESSAGE":{ + slog("CLIENT_SEND_MESSAGE"); let client = Client.clients.get(data.uuid); client.send(data.message) break; } case "CLIENT_DESTROY":{ + slog("CLIENT_DESTROY"); Client.clients.delete(data.uuid); break; } + case "ROOM_CREATED":{ + slog("ROOM_CREATED"); + let room = Room.fromJSON(data.value); + Room.rooms.set(room.id, room); + break; + } + case "ROOM_DESTROY":{ + slog("ROOM_DESTROY"); + Room.rooms.delete(data.value); + break; + } } }); function CLIENT_CREATED(uuid) { - console.log(process.pid, "CLIENT_CREATED"); + mlog("CLIENT_CREATED"); process.send({ type:'CLIENT_CREATED', uuid: uuid @@ -37,6 +54,7 @@ function CLIENT_CREATED(uuid) }; function CLIENT_UPDATE_PROP(uuid, name, value) { + mlog("CLIENT_UPDATE_PROP"); process.send({ type:'CLIENT_UPDATE_PROP', uuid: uuid, @@ -46,6 +64,7 @@ function CLIENT_UPDATE_PROP(uuid, name, value) }; function CLIENT_SEND_MESSAGE(uuid, message) { + mlog("CLIENT_SEND_MESSAGE"); process.send({ type:'CLIENT_SEND_MESSAGE', uuid: uuid, @@ -54,13 +73,48 @@ function CLIENT_SEND_MESSAGE(uuid, message) }; function CLIENT_DESTROY(uuid) { + mlog("CLIENT_DESTROY"); process.send({ type:'CLIENT_DESTROY', uuid: uuid }) }; + +function ROOM_CREATED(room) +{ + mlog("ROOM_CREATED"); + let raw = room.toJSON(true); + process.send({ + type:'ROOM_CREATED', + value: raw + }) +}; + +function ROOM_DESTROY(room) +{ + mlog("ROOM_DESTROY"); + process.send({ + type:'ROOM_DESTROY', + value: room.id + }) +}; + +function mlog(command) +{ + console.log("M",process.pid, command) +} +function slog(command) +{ + console.log("S",process.pid, command) +} + + exports.CLIENT_CREATED = CLIENT_CREATED; exports.CLIENT_UPDATE_PROP = CLIENT_UPDATE_PROP; exports.CLIENT_DESTROY = CLIENT_DESTROY; -exports.CLIENT_SEND_MESSAGE = CLIENT_SEND_MESSAGE; \ No newline at end of file +exports.CLIENT_SEND_MESSAGE = CLIENT_SEND_MESSAGE; +exports.ROOM_CREATED = ROOM_CREATED; +exports.ROOM_DESTROY = ROOM_DESTROY; +exports.mlog = mlog; +exports.slog = slog; \ No newline at end of file diff --git a/Source/Services/Auth.js b/Source/Services/Auth.js index e12e256..660096a 100644 --- a/Source/Services/Auth.js +++ b/Source/Services/Auth.js @@ -1,4 +1,5 @@ const { Client } = require("../Client.js"); +const { CLIENT_UPDATE_PROP } = require("../IPC.js"); let {addService, addListener} = require("../WebSocket.js"); addService(({ @@ -13,13 +14,16 @@ addService(({ case "auth/pair-system":{ if(value == 'everybody') { - client.requiredPair = true; + client.requiredPair = true; end({status:"success"}); + client.sync('requiredPair'); } if(value == 'disable') { client.requiredPair = false; end({status:"success"}); + client.sync('requiredPair'); + //CLIENT_UPDATE_PROP(client.id, 'requiredPair', client.requiredPair); } break; } @@ -29,6 +33,7 @@ addService(({ } case 'auth/public':{ client.requiredPair = false; + client.sync('requiredPair'); return end({ value: 'success', mode: 'public' @@ -36,6 +41,7 @@ addService(({ } case 'auth/private':{ client.requiredPair = true; + client.sync('requiredPair'); return end({ value: 'success', mode: 'private' diff --git a/Source/Services/IPPressure.js b/Source/Services/IPPressure.js index 1f9ac98..1a05d58 100644 --- a/Source/Services/IPPressure.js +++ b/Source/Services/IPPressure.js @@ -3,6 +3,7 @@ let {addService, addListener} = require("../WebSocket.js"); let { randomBytes } = require("node:crypto"); +const { slog } = require("../IPC"); class APNumber{ /** @@ -11,14 +12,20 @@ class APNumber{ static busyNumbers = new Map(); /** * @type {number} + * @param {Client} client */ - static lock() + static lock(client) { let c = 24; while(true){ if(!APNumber.busyNumbers.has(c)) { - APNumber.busyNumbers.set(c,true); + APNumber.busyNumbers.set(c,client); + process.send({ + type: 'AP_NUMBER/LOCK', + uuid: client.id, + value: c + }) return c; } c++; @@ -30,6 +37,11 @@ class APNumber{ static release(num) { APNumber.busyNumbers.delete(num); + process.send({ + type: 'AP_NUMBER/RELEASE', + uuid: APNumber.busyNumbers.get(num).id, + value: num + }) } static whois(num){ return APNumber.busyNumbers.get(num)?.id; @@ -56,6 +68,11 @@ class APShortCode{ if(APShortCode.busyCodes.has(code) == false) { APShortCode.busyCodes.set(code, client); + process.send({ + type: 'AP_SHORTCODE/LOCK', + uuid: APShortCode.busyCodes.get(num).id, + value: code + }) return code; } if(!thirdLetter.end()) @@ -80,6 +97,11 @@ class APShortCode{ } static release(code){ APShortCode.busyCodes.delete(code); + process.send({ + type: 'AP_SHORTCODE/RELEASE', + uuid: APShortCode.busyCodes.get(code).id, + value: code + }) } static whois(num){ return APShortCode.busyCodes.get(num)?.id; @@ -122,6 +144,11 @@ class APIPAddress{ if(APIPAddress.busyIP.has(code) == false) { APIPAddress.busyIP.set(code, client); + process.send({ + type: 'AP_IPADDRESS/LOCK', + uuid: APIPAddress.busyIP.get(code).id, + value: code + }) return code; } if(D != 255) @@ -156,6 +183,11 @@ class APIPAddress{ } } static release(code){ + process.send({ + type: 'AP_IPADDRESS/RELEASE', + uuid: APIPAddress.busyIP.get(code).id, + value: code + }) APIPAddress.busyIP.delete(code); } static whois(num){ @@ -167,7 +199,6 @@ exports.APNumber = APNumber; exports.APShortCode = APShortCode; exports.APIPAddress = APIPAddress; - addService(({ client, message, @@ -186,6 +217,7 @@ addService(({ }; let value = APIPAddress.lock(client); client.APIPAddress = value; + client.sync('APIPAddress'); end({ status : "success", ip : value @@ -201,6 +233,7 @@ addService(({ }; let value = APNumber.lock(client); client.APNumber = value; + client.sync('APNumber'); end({ status : "success", number : value @@ -216,6 +249,7 @@ addService(({ }; let value = APShortCode.lock(client); client.APShortCode = value; + client.sync('APShortCode'); end({ status : "success", code : value @@ -230,6 +264,7 @@ addService(({ } APIPAddress.release(client.APIPAddress); let value = APIPAddress.lock(client); + client.sync('APIPAddress'); end({ status : "success", ip : value @@ -244,6 +279,7 @@ addService(({ } APNumber.release(client.APNumber); let value = APNumber.lock(client); + client.sync('APNumber'); end({ status : "success", number : value @@ -258,6 +294,7 @@ addService(({ } APShortCode.release(client.APShortCode); let value = APShortCode.lock(client); + client.sync('APShortCode'); end({ status : "success", code : value @@ -266,6 +303,8 @@ addService(({ } case "release/APIPAddress":{ APIPAddress.release(client.APIPAddress); + client.APIPAddress = void 0; + client.sync('APShortCode'); end({ status : "success" }) @@ -273,6 +312,8 @@ addService(({ } case "release/APNumber":{ APNumber.release(client.APNumber); + client.APNumber = void 0; + client.sync('APIPAddress'); end({ status : "success" }) @@ -280,6 +321,8 @@ addService(({ } case "release/APShortCode":{ APShortCode.release(client.APShortCode); + client.APShortCode = void 0; + client.sync('APIPAddress'); end({ status : "success" }) @@ -333,6 +376,72 @@ addService(({ } }) +process.on('message',({type, uuid, value}) => { + switch(type) + { + case "AP_NUMBER/LOCK":{ + console.log("S",process.pid, 'IPPressure SYNCED') + APNumber.busyNumbers.set(value, client); + let client = Client.clients.get(uuid); + if(client) + { + client.APNumber = value; + client.sync('APNumber'); + } + } + case "AP_NUMBER/RELEASE":{ + console.log("S",process.pid, 'IPPressure SYNCED') + APNumber.busyNumbers.delete(value); + let client = Client.clients.get(uuid); + if(client) + { + client.APNumber = void 0; + client.sync('APNumber'); + } + } + case "AP_SHORTCODE/LOCK":{ + console.log("S",process.pid, 'IPPressure SYNCED') + APShortCode.busyCodes.set(value, client); + let client = Client.clients.get(uuid); + if(client) + { + client.APShortCode = value; + client.sync('APShortCode'); + } + } + case "AP_SHORTCODE/RELEASE":{ + console.log("S",process.pid, 'IPPressure SYNCED') + APShortCode.busyCodes.delete(value); + let client = Client.clients.get(uuid); + if(client) + { + client.APShortCode = void 0; + client.sync('APShortCode'); + } + } + case "AP_IPADDRESS/LOCK":{ + console.log("S",process.pid, 'IPPressure SYNCED') + APIPAddress.busyIP.set(value, client); + let client = Client.clients.get(uuid); + if(client) + { + client.APIPAddress = value; + client.sync('APIPAddress'); + } + } + case "AP_IPADDRESS/RELEASE":{ + console.log("S",process.pid, 'IPPressure SYNCED') + APIPAddress.busyIP.delete(value); + let client = Client.clients.get(uuid); + if(client) + { + client.APIPAddress = void 0; + client.sync('APIPAddress'); + } + } + } +}) + addListener('disconnect',(global, client)=>{ if(client.APIPAddress != 0) { diff --git a/Source/Services/Room.js b/Source/Services/Room.js index 6d26d40..8c693f9 100644 --- a/Source/Services/Room.js +++ b/Source/Services/Room.js @@ -3,6 +3,7 @@ let {randomUUID,createHash} = require("crypto"); const joi = require("joi"); let {addService,addListener} = require("../WebSocket.js"); const { termoutput } = require("../config.js"); +const { ROOM_CREATED, ROOM_DESTROY } = require("../IPC.js"); let term = require("terminal-kit").terminal; function Sha256(update) @@ -70,9 +71,10 @@ function Room() */ Room.prototype.publish = function(room){ Room.rooms.set(this.id, this); + ROOM_CREATED(this); termoutput && term.green("Room Published ").white(this.name," in ").yellow(this.clients.size).white(" clients")('\n'); }; -Room.prototype.toJSON = function(){ +Room.prototype.toJSON = function(detailed){ let obj = {}; obj.id = this.id; obj.accessType = this.accessType; @@ -82,8 +84,44 @@ Room.prototype.toJSON = function(){ obj.name = this.name; obj.owner = this.owner.id; obj.waitingInvited = [...this.waitingInvited]; + if(detailed) + { + obj.credential = this.credential; + obj.notifyActionInvite = this.notifyActionInvite; + obj.notifyActionJoined = this.notifyActionJoined; + obj.notifyActionEjected = this.notifyActionEjected; + obj.clients = [...this.clients.keys()]; + } return obj; }; +/** + * @param {Object} data + * @param {Room} room + */ +Room.fromJSON = function(data, room){ + room = room || new Room(); + let obj = {}; + room.id = data.id; + room.accessType = data.accessType; + room.createdAt = data.createdAt; + room.description = data.description; + room.joinType = data.joinType; + room.name = data.name; + room.owner = data.owner.id; + room.waitingInvited = new Set(data.waitingInvited); + obj.credential = data.credential; + obj.notifyActionInvite = data.notifyActionInvite; + obj.notifyActionJoined = data.notifyActionJoined; + obj.notifyActionEjected = data.notifyActionEjected; + obj.clients = new Map( + data.clients.map(e => ([ + e, // map key + Client.clients.get(e) // map value + ]) + ) + ) + return room; +}; Room.prototype.send = function(obj, withOut){ for (const client of this.clients.values()) { if(client.id != withOut) @@ -116,6 +154,7 @@ Room.prototype.down = function(){ ownerid: this.owner.id },'room/closed']); Room.rooms.delete(this.id); + ROOM_DESTROY(this) }; /** * @param {Client} client diff --git a/Source/WebSocket.js b/Source/WebSocket.js index c972c86..e0fbc1e 100644 --- a/Source/WebSocket.js +++ b/Source/WebSocket.js @@ -5,7 +5,7 @@ let {http} = require("./HTTPServer"); let {randomUUID} = require("crypto"); const { Client } = require("./Client.js"); const { termoutput } = require("./config"); -const { CLIENT_CREATED, CLIENT_DESTROY } = require("./Notify"); +const { CLIENT_CREATED, CLIENT_DESTROY } = require("./IPC"); termoutput && console.log("Web Socket Protocol is ready"); http.addListener("upgrade",() => { @@ -20,10 +20,6 @@ let wsServer = new websocket.server({ let global = new Map(); let clients = new Map(); wsServer.addListener("connect",(socket) => { - - socket.send("worker id " + process.pid); - return; - let xClient = new Client(); let id = randomUUID(); socket.id = id; @@ -33,6 +29,8 @@ wsServer.addListener("connect",(socket) => { Client.clients.set(id, xClient); clients.set(id, xClient); + console.log("Client:", id,"on worker pid:",process.pid) + CLIENT_CREATED(id); emit("connect", global, xClient); diff --git a/index.js b/index.js index 231b10b..a49cf8b 100644 --- a/index.js +++ b/index.js @@ -1,66 +1,78 @@ /** @type {import('node:cluster').Cluster} */ const cluster = require("cluster"); const os = require("os"); -const {randomInt} = require("crypto"); +/** + * Use Round Robin algorithm for cluster process load balancer + */ cluster.schedulingPolicy = cluster.SCHED_RR; async function main() { - let master = new Map(); - - if(cluster.isPrimary) + if(cluster.isPrimary == false) { - let e = 0|0 - while (e < 10) - { - e++; - let flow = await generateFlow(); - flow.send({ - TYPE:"start" - }) - master.set(flow.id, flow); - flow.message((_id, obj) =>{ - for (const [id,{send}] of master) { - if(_id !== id) + // This process is a worker / slave + // Compile source code and run + require("./Source/index"); + // stay here + return; + }; + + // This process is a primary / master + + // Worker process list + const master = new Map(); + + const coreCount = 2 // os.cpus().length; + for(let index = 0; index < coreCount; index++) + { + // Open slave process + let worker = await generateFlow(); + // Save process with id + master.set(worker.id, worker); + // Listen process for commands + worker.message( + // This process want to send payload to sibling process with IPC + (workerId, payload) =>{ + for (const [siblingWorkerId,{send}] of master) { + // No sending to itself + if(workerId !== siblingWorkerId) { + // Send command to sibling with IPC send({ - ...obj, - pid: flow.id + ...payload, + pid: worker.id }) } } - }) - }; - }else{ - let Application; - let synced = []; - process.on("message",data => { - switch(data.TYPE) - { - case "start":{ - Application = require("./Source/index"); - break; - } } - }) + ) } } + + + async function generateFlow(N) { + // Mirror this process with for (low-level os multitasking) const worker = cluster.fork(); + // Wait process is online await new Promise(ok => { worker.addListener("online",()=> { ok() }) }); + // Get process pid on the os let id = worker.process.pid; + // Simplification wrapping send and get events with IPC's event functions return { id, send: message => worker.send(message), message: (callback) => worker.addListener("message", e => callback(id,e)) } } + +// Run immediately process.nextTick(main); \ No newline at end of file diff --git a/test.html b/test.html index 46e430d..8f203d6 100644 --- a/test.html +++ b/test.html @@ -33,15 +33,11 @@ console.log(args) }); iroom = room; - /*setInterval(()=>{ - wsjs.server.tranferToServer({ - e0:Math.random(), - e1:Math.random(), - e2:Math.random(), - e3:Math.random(), - e4:Math.random() - }) - }, 10)*/ + document.addEventListener("click",async () => { + const r = wsjs.virtualPressure.allocAPIPAddress(); + debugger; + console.log(r); + }) }); wsjs.on('peer',(peer)=>{ peer.on('message',(...args)=>{ @@ -50,15 +46,7 @@ }) }; - let ws = new WebSocket("ws://localhost:7707"); - ws.onmessage = function({data}){ - message.innerText = data; - } - - // main(); - /*for (const iterator of Array.from({length:90}).fill(0)) { - main(); - }*/ + main(); \ No newline at end of file