Multitasking & Some bugs fixed

This commit is contained in:
Abdussamed ULUTAŞ 2023-03-04 22:57:51 +03:00
parent e4f644a346
commit c630e20b46
10 changed files with 148 additions and 45 deletions

View File

@ -93,7 +93,7 @@ Client.prototype.pairList = function(){
Client.prototype.send = function(obj){ Client.prototype.send = function(obj){
if(this.isProxy) if(this.isProxy)
{ {
CLIENT_SEND_MESSAGE(this.id, obj) CLIENT_SEND_MESSAGE(this.id, obj, this.proxyProcess)
}else{ }else{
this.socket.sendUTF(JSON.stringify(obj)); this.socket.sendUTF(JSON.stringify(obj));
} }

View File

@ -1,7 +1,7 @@
const { Room } = require("./Services/Room");
process.on('message',data => { process.on('message',data => {
const {Client} = require("./Client.js") const { Client } = require("./Client.js");
const { Room } = require("./Services/Room");
switch(data.type) switch(data.type)
{ {
case "CLIENT_CREATED":{ case "CLIENT_CREATED":{
@ -22,7 +22,10 @@ process.on('message',data => {
case "CLIENT_SEND_MESSAGE":{ case "CLIENT_SEND_MESSAGE":{
slog("CLIENT_SEND_MESSAGE"); slog("CLIENT_SEND_MESSAGE");
let client = Client.clients.get(data.uuid); let client = Client.clients.get(data.uuid);
client.send(data.message) if(client.isProxy != true)
{
client.send(data.message)
}
break; break;
} }
case "CLIENT_DESTROY":{ case "CLIENT_DESTROY":{
@ -36,6 +39,34 @@ process.on('message',data => {
Room.rooms.set(room.id, room); Room.rooms.set(room.id, room);
break; break;
} }
case "ROOM_UPDATE_PROP":{
slog("ROOM_UPDATE_PROP");
let room = Room.rooms.get(data.uuid);
room[data.name] = data.value;
break;
}
case "ROOM_JOIN_CLIENT":{
slog("ROOM_JOIN_CLIENT");
let room = Room.rooms.get(data.uuid);
let client = Client.clients.get(data.client);
if(room && client)
{
client.rooms.add(room.id);
room.clients.set(client.id, client);
}
break;
}
case "ROOM_EJECT_CLIENT":{
slog("ROOM_EJECT_CLIENT");
let room = Room.rooms.get(data.uuid);
let client = Client.clients.get(data.client);
if(room && client)
{
client.rooms.delete(room.id);
room.clients.delete(client.id, client);
}
break;
}
case "ROOM_DESTROY":{ case "ROOM_DESTROY":{
slog("ROOM_DESTROY"); slog("ROOM_DESTROY");
Room.rooms.delete(data.value); Room.rooms.delete(data.value);
@ -62,13 +93,14 @@ function CLIENT_UPDATE_PROP(uuid, name, value)
value value
}) })
}; };
function CLIENT_SEND_MESSAGE(uuid, message) function CLIENT_SEND_MESSAGE(uuid, message, clusterPid)
{ {
mlog("CLIENT_SEND_MESSAGE"); mlog("CLIENT_SEND_MESSAGE");
process.send({ process.send({
type:'CLIENT_SEND_MESSAGE', type:'CLIENT_SEND_MESSAGE',
uuid: uuid, uuid: uuid,
message message,
process:clusterPid
}) })
}; };
function CLIENT_DESTROY(uuid) function CLIENT_DESTROY(uuid)
@ -91,6 +123,36 @@ function ROOM_CREATED(room)
}) })
}; };
function ROOM_UPDATE_PROP(uuid, name, value)
{
mlog("ROOM_UPDATE_PROP");
process.send({
type:'ROOM_UPDATE_PROP',
uuid: uuid,
name,
value
})
};
function ROOM_JOIN_CLIENT(uuid, client)
{
mlog("ROOM_JOIN_CLIENT");
process.send({
type:'ROOM_JOIN_CLIENT',
uuid: uuid,
client
})
};
function ROOM_EJECT_CLIENT(uuid, client)
{
mlog("ROOM_EJECT_CLIENT");
process.send({
type:'ROOM_EJECT_CLIENT',
uuid: uuid,
client
})
};
function ROOM_DESTROY(room) function ROOM_DESTROY(room)
{ {
mlog("ROOM_DESTROY"); mlog("ROOM_DESTROY");
@ -115,6 +177,9 @@ exports.CLIENT_UPDATE_PROP = CLIENT_UPDATE_PROP;
exports.CLIENT_DESTROY = CLIENT_DESTROY; exports.CLIENT_DESTROY = CLIENT_DESTROY;
exports.CLIENT_SEND_MESSAGE = CLIENT_SEND_MESSAGE; exports.CLIENT_SEND_MESSAGE = CLIENT_SEND_MESSAGE;
exports.ROOM_CREATED = ROOM_CREATED; exports.ROOM_CREATED = ROOM_CREATED;
exports.ROOM_UPDATE_PROP = ROOM_UPDATE_PROP;
exports.ROOM_JOIN_CLIENT = ROOM_JOIN_CLIENT;
exports.ROOM_EJECT_CLIENT = ROOM_EJECT_CLIENT;
exports.ROOM_DESTROY = ROOM_DESTROY; exports.ROOM_DESTROY = ROOM_DESTROY;
exports.mlog = mlog; exports.mlog = mlog;
exports.slog = slog; exports.slog = slog;

View File

@ -87,6 +87,7 @@ addService(({
let {to,pack, handshake,wom} = message; let {to,pack, handshake,wom} = message;
if(Room.rooms.has(to)) if(Room.rooms.has(to))
{ {
console.log("Oda da ", Room.rooms.get(to).clients.size,"kişi var")
if(!client.rooms.has(to)) if(!client.rooms.has(to))
{ {
return handshake && end({ return handshake && end({
@ -94,9 +95,9 @@ addService(({
}) })
}; };
Room.rooms.get(to).send([{ Room.rooms.get(to).send([{
from: client.id, from: to,
pack: pack pack: pack
}, 'pack'], wom ? client.id : void 0); }, 'pack/room'], wom ? client.id : void 0);
handshake && end({ handshake && end({
type: 'success' type: 'success'
}) })

View File

@ -96,12 +96,15 @@ class APShortCode{
} }
} }
static release(code){ static release(code){
APShortCode.busyCodes.delete(code); if(APShortCode.busyCodes.has(code))
process.send({ {
type: 'AP_SHORTCODE/RELEASE', process.send({
uuid: APShortCode.busyCodes.get(code).id, type: 'AP_SHORTCODE/RELEASE',
value: code uuid: APShortCode.busyCodes.get(code).id,
}) value: code
})
APShortCode.busyCodes.delete(code);
}
} }
static whois(num){ static whois(num){
return APShortCode.busyCodes.get(num)?.id; return APShortCode.busyCodes.get(num)?.id;
@ -183,12 +186,15 @@ class APIPAddress{
} }
} }
static release(code){ static release(code){
process.send({ if(APIPAddress.busyIP.has(code))
type: 'AP_IPADDRESS/RELEASE', {
uuid: APIPAddress.busyIP.get(code).id, process.send({
value: code type: 'AP_IPADDRESS/RELEASE',
}) uuid: APIPAddress.busyIP.get(code).id,
APIPAddress.busyIP.delete(code); value: code
})
APIPAddress.busyIP.delete(code);
}
} }
static whois(num){ static whois(num){
return APIPAddress.busyIP.get(num)?.id; return APIPAddress.busyIP.get(num)?.id;
@ -381,8 +387,8 @@ process.on('message',({type, uuid, value}) => {
{ {
case "AP_NUMBER/LOCK":{ case "AP_NUMBER/LOCK":{
console.log("S",process.pid, 'IPPressure SYNCED') console.log("S",process.pid, 'IPPressure SYNCED')
APNumber.busyNumbers.set(value, client);
let client = Client.clients.get(uuid); let client = Client.clients.get(uuid);
APNumber.busyNumbers.set(value, client);
if(client) if(client)
{ {
client.APNumber = value; client.APNumber = value;
@ -401,8 +407,8 @@ process.on('message',({type, uuid, value}) => {
} }
case "AP_SHORTCODE/LOCK":{ case "AP_SHORTCODE/LOCK":{
console.log("S",process.pid, 'IPPressure SYNCED') console.log("S",process.pid, 'IPPressure SYNCED')
APShortCode.busyCodes.set(value, client);
let client = Client.clients.get(uuid); let client = Client.clients.get(uuid);
APShortCode.busyCodes.set(value, client);
if(client) if(client)
{ {
client.APShortCode = value; client.APShortCode = value;
@ -421,8 +427,8 @@ process.on('message',({type, uuid, value}) => {
} }
case "AP_IPADDRESS/LOCK":{ case "AP_IPADDRESS/LOCK":{
console.log("S",process.pid, 'IPPressure SYNCED') console.log("S",process.pid, 'IPPressure SYNCED')
APIPAddress.busyIP.set(value, client);
let client = Client.clients.get(uuid); let client = Client.clients.get(uuid);
APIPAddress.busyIP.set(value, client);
if(client) if(client)
{ {
client.APIPAddress = value; client.APIPAddress = value;

View File

@ -3,7 +3,7 @@ let {randomUUID,createHash} = require("crypto");
const joi = require("joi"); const joi = require("joi");
let {addService,addListener} = require("../WebSocket.js"); let {addService,addListener} = require("../WebSocket.js");
const { termoutput } = require("../config.js"); const { termoutput } = require("../config.js");
const { ROOM_CREATED, ROOM_DESTROY } = require("../IPC.js"); const { ROOM_CREATED, ROOM_DESTROY, ROOM_UPDATE_PROP, ROOM_JOIN_CLIENT, ROOM_EJECT_CLIENT } = require("../IPC.js");
let term = require("terminal-kit").terminal; let term = require("terminal-kit").terminal;
function Sha256(update) function Sha256(update)
@ -65,6 +65,14 @@ function Room()
* @type {string[]} * @type {string[]}
*/ */
this.waitingInvited = new Set(); this.waitingInvited = new Set();
this.sync = function(...args){
process.nextTick(()=>{
for (const name of args) {
ROOM_UPDATE_PROP(this.id, name, this[name]);
}
})
};
} }
/** /**
* @param {Room} room * @param {Room} room
@ -107,7 +115,10 @@ Room.fromJSON = function(data, room){
room.description = data.description; room.description = data.description;
room.joinType = data.joinType; room.joinType = data.joinType;
room.name = data.name; room.name = data.name;
room.owner = data.owner.id; if(data.owner && Client.clients.has(data.owner))
{
room.owner = Client.clients.get(data.owner);
}
room.waitingInvited = new Set(data.waitingInvited); room.waitingInvited = new Set(data.waitingInvited);
obj.credential = data.credential; obj.credential = data.credential;
obj.notifyActionInvite = data.notifyActionInvite; obj.notifyActionInvite = data.notifyActionInvite;
@ -145,6 +156,7 @@ Room.prototype.join = function(client){
}; };
client.rooms.add(this.id); client.rooms.add(this.id);
this.clients.set(client.id, client); this.clients.set(client.id, client);
ROOM_JOIN_CLIENT(this.id, client.id);
termoutput && term.green("Client Room joined ").white(this.name," in ").yellow(this.clients.size + "").white(" clients")('\n'); termoutput && term.green("Client Room joined ").white(this.name," in ").yellow(this.clients.size + "").white(" clients")('\n');
}; };
Room.prototype.down = function(){ Room.prototype.down = function(){
@ -170,6 +182,7 @@ Room.prototype.eject = function(client){
} }
client.rooms.delete(this.id); client.rooms.delete(this.id);
this.clients.delete(client.id); this.clients.delete(client.id);
ROOM_EJECT_CLIENT(this.id, client.id);
if(this.clients.size == 0) if(this.clients.size == 0)
{ {
@ -193,8 +206,8 @@ addListener('connect',(global, client)=>{
room.id = client.id; room.id = client.id;
room.name = "Your Room | " + client.id; room.name = "Your Room | " + client.id;
room.owner = client; room.owner = client;
room.join(client);
room.publish(); room.publish();
room.join(client);
}); });
addListener('disconnect',(global, client)=>{ addListener('disconnect',(global, client)=>{
@ -335,8 +348,8 @@ addService(({
{ {
room.credential = Sha256(message.credential + ""); room.credential = Sha256(message.credential + "");
} }
room.join(client);
room.publish(); room.publish();
room.join(client);
end({ end({
status: "success", status: "success",
room: room.toJSON() room: room.toJSON()

View File

@ -70,9 +70,9 @@ export default class MWSE extends EventTarget {
this.peer(from, true).emit('request', scope); this.peer(from, true).emit('request', scope);
this.peer('me').emit('request', scope); this.peer('me').emit('request', scope);
}) })
this.EventPooling.signal('pack/room',(payload : {to:string,pack:any}) => { this.EventPooling.signal('pack/room',(payload : {from:string,pack:any}) => {
let {to,pack} = payload; let {from,pack} = payload;
this.room(to).emit('message', pack); this.room(from).emit('message', pack);
}) })
this.EventPooling.signal('room/joined',(payload : {id:string,roomid:any,ownerid:string}) => { this.EventPooling.signal('room/joined',(payload : {id:string,roomid:any,ownerid:string}) => {
let {id,roomid} = payload; let {id,roomid} = payload;

View File

@ -11,6 +11,7 @@ async function main()
{ {
if(cluster.isPrimary == false) if(cluster.isPrimary == false)
{ {
console.log("Slave", process.pid);
// This process is a worker / slave // This process is a worker / slave
// Compile source code and run // Compile source code and run
require("./Source/index"); require("./Source/index");
@ -19,11 +20,12 @@ async function main()
}; };
// This process is a primary / master // This process is a primary / master
console.log("Master", process.pid);
// Worker process list // Worker process list
const master = new Map(); const master = new Map();
const coreCount = 2 // os.cpus().length; const coreCount = os.cpus().length;
for(let index = 0; index < coreCount; index++) for(let index = 0; index < coreCount; index++)
{ {
// Open slave process // Open slave process
@ -34,10 +36,18 @@ async function main()
worker.message( worker.message(
// This process want to send payload to sibling process with IPC // This process want to send payload to sibling process with IPC
(workerId, payload) =>{ (workerId, payload) =>{
for (const [siblingWorkerId,{send}] of master) { // Check Target worker
if(payload.process)
{
master.get(payload.process).send({
...payload,
pid: worker.id
})
}else for (const [siblingWorkerId,{send}] of master) {
// No sending to itself // No sending to itself
if(workerId !== siblingWorkerId) if(workerId !== siblingWorkerId)
{ {
// Send command to sibling with IPC // Send command to sibling with IPC
send({ send({
...payload, ...payload,

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -7,6 +7,7 @@
<title>Document</title> <title>Document</title>
</head> </head>
<body> <body>
<h2><pre id="log"></pre></h2>
<h1 id="message"></h1> <h1 id="message"></h1>
<script src="./script/index.js"></script> <script src="./script/index.js"></script>
<script> <script>
@ -30,22 +31,29 @@
}); });
await room.createRoom(); await room.createRoom();
room.on('message',(...args)=>{ room.on('message',(...args)=>{
console.log(args) gg.r++
gg()
}); });
iroom = room; iroom = room;
document.addEventListener("click",async () => { setInterval(()=>{
const r = wsjs.virtualPressure.allocAPIPAddress(); room.send({
debugger; type: "merhaba"
console.log(r); }, true)
}) gg.w++;
gg()
}, 200)
}); });
wsjs.on('peer',(peer)=>{
peer.on('message',(...args)=>{
console.log(args);
})
})
}; };
function gg()
{
log.innerHTML = `${gg.w} packet writed\n${gg.r} packet recaived`
}
gg.w = 0;
gg.r = 0;
setInterval(()=>{
window.location.reload();
}, 30000)
main(); main();
</script> </script>
</body> </body>