Merge branch 'filetransport' into stable

This commit is contained in:
Abdussamed 2023-06-09 22:41:29 +03:00
commit 27e517238f
8 changed files with 415 additions and 39 deletions

View File

@ -27,6 +27,9 @@ function Client()
* @type {Set<string>} * @type {Set<string>}
*/ */
this.rooms = new Set(); this.rooms = new Set();
/**
* @type {Set<string>}
*/
this.pairs = new Set(); this.pairs = new Set();
this.requiredPair = false; this.requiredPair = false;
@ -172,7 +175,7 @@ Client.prototype.rejectPeerRequest = function(client){
this.sync('pairs'); this.sync('pairs');
client.send([{ client.send([{
from: this.id from: this.id
},'rejected/pair']); },'end/pair']);
}; };
/** /**
* @param {Client|string} client * @param {Client|string} client
@ -185,6 +188,9 @@ Client.prototype.isPaired = function(client){
} }
return client.pairs.has(this.id) && this.pairs.has(client.id); return client.pairs.has(this.id) && this.pairs.has(client.id);
}; };
/**
* @returns {string[]}
*/
Client.prototype.pairList = function(){ Client.prototype.pairList = function(){
return [...this.pairs.values()].filter(e => this.isPaired(e)); return [...this.pairs.values()].filter(e => this.isPaired(e));
}; };

View File

@ -127,10 +127,28 @@ addService(({
client.rejectPeerRequest(pairclient); client.rejectPeerRequest(pairclient);
break; break;
} }
case 'end/pair':{
if(Client.clients.has(to)){
return end({
status: 'fail',
message: 'CLIENT-NOT-FOUND'
})
};
let pairclient = Client.clients.get(to);
if(!pairclient.pairs.has(client.id))
{
return end({
status: 'success',
message: 'NOT-PAIRED'
})
}
client.rejectPeerRequest(pairclient);
break;
}
case 'pair/list':{ case 'pair/list':{
end({ end({
type:'pair/list', type:'pair/list',
value: pairList value: client.pairList()
}) })
break; break;
} }
@ -149,38 +167,6 @@ addService(({
} }
break; break;
} }
// case 'auth/check':{
// let auth = client.store.has('user');
// return end({
// value: auth
// })
// }
// case 'auth/login':{
// if(username == '*' && password == '*')
// {
// return end({
// status: 'success'
// })
// }else{
// return end({
// status: 'fail'
// })
// }
// }
// case 'auth/logout':{
// let auth = client.store.has('user');
// if(auth)
// {
// client.store.delete('user');
// return end({
// status: 'success'
// })
// }else{
// return end({
// status: 'fail'
// })
// }
// }
case 'auth/info':{ case 'auth/info':{
client.info.set(name, value); client.info.set(name, value);
let clients = client.getSucureClients(); let clients = client.getSucureClients();

262
frontend/P2PFileSender.ts Normal file
View File

@ -0,0 +1,262 @@
import "webrtc-adapter";
import WebRTC from "./WebRTC";
import Peer from "./Peer";
export default class P2PFileSender
{
public rtc : RTCPeerConnection;
public peer : Peer;
public webrtc : WebRTC;
public totalSize : number = 0;
public isReady : boolean = false;
public isStarted : boolean = false;
public isSending : boolean = false;
public isRecaiving : boolean = false;
public processedSize : number = 0;
public recaivedFile? : File;
public bufferSizePerChannel : number = 10e6;
public bufferSizePerPack : number = 10e3;
public safeBufferSizePerPack : number = 10e3 - 1;
public constructor(webrtc : WebRTC, peer : Peer)
{
this.webrtc = webrtc;
this.rtc = webrtc.rtc;
this.peer = peer;
}
public async RecaiveFile(
_rtc: RTCPeerConnection,
fileMetadata: {name:string, type:string},
channelCount: number,
_totalSize: number,
onEnded: Function
)
{
//let totals = {};
// let index = 0;
/*setChannelStatus(Array.from({length:channelCount}).map((e, index) => {
return {
name: `${index+1}. Kanal`,
current: 0,
currentTotal: 0,
total: 0
}
}));*/
let parts : Blob[] = [];
this.webrtc.on('datachannel',(datachannel:RTCDataChannel) => {
//let channelIndex = index++;
let current = 0;
let totalSize = 0;
let currentPart = 0;
let bufferAmount : ArrayBuffer[] = [];
datachannel.onmessage = function({data}){
if(totalSize == 0)
{
let {
size,
part,
} = JSON.parse(data);
totalSize = size;
currentPart = part;
/*updateChannelStatus(channelIndex, n => {
return {
...n,
total: totalSize,
current: 0
}
});*/
datachannel.send("READY");
}else{
current += data.byteLength;
bufferAmount.push(data);
/*updateChannelStatus(channelIndex, n => {
return {
...n,
current: data.byteLength + n.current,
currentTotal: data.byteLength + n.currentTotal,
}
});
setProcessedSize(n => n + data.byteLength);*/
if(current == totalSize)
{
parts[currentPart] = new Blob(bufferAmount);
bufferAmount = [];
//totals[datachannel.label] += totalSize;
totalSize = 0;
currentPart = 0;
current = 0;
datachannel.send("TOTAL_RECAIVED");
}
}
};
datachannel.onclose = () => {
channelCount--;
if(channelCount == 0)
{
let file = new File(parts, fileMetadata.name, {
type: fileMetadata.type,
lastModified: +new Date
});
onEnded(file);
}
};
})
}
public async SendFile(
file: File,
metadata: object
)
{
this.isSending = true;
this.isStarted = true;
let buffer = await file.arrayBuffer();
let partCount = Math.ceil(buffer.byteLength / 10e6);
let channelCount = Math.min(5, partCount);
if(this.webrtc.iceStatus != "connected")
{
throw new Error("WebRTC is a not ready")
}
this.peer.send({
type: 'file',
name: file.name,
size: file.size,
mimetype: file.type,
partCount,
channelCount,
metadata: metadata
});
let channels : RTCDataChannel[] = [];
for(let channelIndex = 0; channelIndex < channelCount; channelIndex++)
{
let channel = this.rtc.createDataChannel("\\?\\file_" + channelIndex);
channel.binaryType = "arraybuffer";
await new Promise(ok => {
channel.onopen = () => {
ok(void 0);
}
});
channels.push(channel);
};
let currentPart = 0;
let next = () => {
if(currentPart < partCount)
{
let bufferPart = buffer.slice(currentPart * 10e6, currentPart * 10e6 + 10e6)
currentPart++;
return [bufferPart, currentPart - 1];
};
return [false,0];
};
let spyChannelIndex = channels.length;
await new Promise(ok => {
for (let channelIndex = 0; channelIndex < channels.length; channelIndex++)
{
this.sendPartition(
channels[channelIndex],
next,
channelIndex,
() => {
spyChannelIndex--;
if(spyChannelIndex == 0)
{
this.isSending = false;
this.isStarted = false;
ok(undefined)
}
}
);
}
})
}
protected sendPartition(
channel: RTCDataChannel,
nextblob10mb: () => (number | ArrayBuffer)[] | (number | boolean)[],
_channelIndex: number,
onEnded: Function
)
{
let [currentBuffer,currentPartition] = nextblob10mb();
let currentPart = 0;
let next = () => {
if(!(currentBuffer instanceof ArrayBuffer))
{
return;
}
let bufferPart = currentBuffer.slice(currentPart * 16e3, currentPart * 16e3 + 16e3)
currentPart++;
if(bufferPart.byteLength != 0)
{
/*
updateChannelStatus(channelIndex, n => {
return {
...n,
current: bufferPart.byteLength + n.current,
currentTotal: bufferPart.byteLength + n.currentTotal
}
});
setProcessedSize(n => n + bufferPart.byteLength);
*/
return bufferPart
}
};
channel.addEventListener("message",({data}) => {
if(data == "READY")
{
this.sendFileChannel(channel, next)
}
if(data == "TOTAL_RECAIVED")
{
[currentBuffer,currentPartition] = nextblob10mb();
currentPart = 0;
if(currentBuffer != false)
{
/*updateChannelStatus(channelIndex, n => {
return {
...n,
total: currentBuffer.byteLength,
current: 0,
}
});*/
channel.send(JSON.stringify({
size: (currentBuffer as ArrayBuffer).byteLength,
part: currentPartition
}))
}else{
channel.close();
onEnded();
}
}
});
channel.send(JSON.stringify({
size: (currentBuffer as ArrayBuffer).byteLength,
part: currentPartition
}))
}
protected sendFileChannel(
channel: RTCDataChannel,
getNextBlob: () => ArrayBuffer | undefined
)
{
channel.addEventListener("bufferedamountlow",function(){
let buffer = getNextBlob();
if(buffer)
{
channel.send(buffer);
}
});
channel.bufferedAmountLowThreshold = 16e3 - 1;
let c = getNextBlob();
c && channel.send(c);
}
};

View File

@ -119,6 +119,68 @@ export default class Peer extends EventTarget
value: 'disable' value: 'disable'
}); });
} }
async requestPair()
{
let {message,status} = await this.mwse.EventPooling.request({
type:'request/pair',
to: this.socketId
});
if(
message == "ALREADY-PAIRED" ||
message == "ALREADY-REQUESTED"
)
{
console.warn("Already paired or pair requested")
};
if(status == "fail")
{
console.error("Request Pair Error",status, message);
return false;
}
return true;
}
async endPair()
{
await this.mwse.EventPooling.request({
type:'end/pair',
to: this.socketId
});
this.forget();
}
async acceptPair()
{
let {message,status} = await this.mwse.EventPooling.request({
type:'accept/pair',
to: this.socketId
});
if(status == "fail")
{
console.error("Pair Error",status, message);
return false;
}
return true;
}
async rejectPair()
{
let {message,status} = await this.mwse.EventPooling.request({
type:'reject/pair',
to: this.socketId
});
if(status == "fail")
{
console.error("Pair Error",status, message);
return false;
}
return true;
}
async getPairedList() : Promise<string>
{
let {value} = await this.mwse.EventPooling.request({
type:'pair/list',
to: this.socketId
});
return value;
}
async send(pack: any){ async send(pack: any){
let isOpenedP2P = this.peerConnection && this.rtc?.active; let isOpenedP2P = this.peerConnection && this.rtc?.active;
let isOpenedServer = this.mwse.server.connected; let isOpenedServer = this.mwse.server.connected;

View File

@ -1,3 +1,4 @@
import P2PFileSender from "./P2PFileSender";
import Peer from "./Peer"; import Peer from "./Peer";
import "webrtc-adapter"; import "webrtc-adapter";
interface TransferStreamInfo interface TransferStreamInfo
@ -46,6 +47,8 @@ export default class WebRTC
public peer? : Peer; public peer? : Peer;
public FileTransportChannel? : P2PFileSender;
constructor( constructor(
rtcConfig?: RTCConfiguration, rtcConfig?: RTCConfiguration,
rtcServers?: RTCIceServer[] rtcServers?: RTCIceServer[]
@ -394,6 +397,46 @@ export default class WebRTC
this.sendingStream.clear(); this.sendingStream.clear();
} }
public async SendFile(file:File, meta: object)
{
if(!this.peer)
{
throw new Error("Peer is not ready");
}
this.FileTransportChannel = new P2PFileSender(this, this.peer);
await this.FileTransportChannel.SendFile(file, meta);
}
public async RecaiveFile(
chnlCount:number,
filemeta: {
name: string;
type: string;
},
totalSize: number
) : Promise<File>
{
if(!this.peer)
{
throw new Error("Peer is not ready");
}
this.FileTransportChannel = new P2PFileSender(this, this.peer);
return await new Promise(recaivedFile => {
if(this.FileTransportChannel)
{
this.FileTransportChannel.RecaiveFile(
this.rtc,
filemeta,
chnlCount,
totalSize,
(file: File) => {
recaivedFile(file)
}
);
}
})
}
} }
WebRTC.requireGC = false; WebRTC.requireGC = false;

View File

@ -99,13 +99,30 @@ export default class MWSE extends EventTarget {
}) })
this.EventPooling.signal("pair/info", (payload : {from : string,name: string, value: string | number | boolean}) => { this.EventPooling.signal("pair/info", (payload : {from : string,name: string, value: string | number | boolean}) => {
let {from, name, value} = payload; let {from, name, value} = payload;
let peer = this.peer(from); let peer = this.peer(from);
peer.info.info[name] = value; peer.info.info[name] = value;
peer.emit("info", name, value); peer.emit("info", name, value);
}) })
this.EventPooling.signal("request/pair", (payload : {from : string,info: any}) => {
let {from, info} = payload;
let peer = this.peer(from);
peer.info.info = info;
peer.emit("request/pair", peer);
this.peer('me').emit('request/pair', peer);
})
this.EventPooling.signal("accepted/pair", (payload : {from : string,info: any}) => {
let {from, info} = payload;
let peer = this.peer(from);
peer.info.info = info;
peer.emit("accepted/pair", peer);
this.peer('me').emit('accepted/pairr', peer);
})
this.EventPooling.signal("end/pair", (payload : {from : string,info: any}) => {
let {from, info} = payload;
let peer = this.peer(from);
peer.emit("endPair", info);
this.peer('me').emit('endPair', from, info);
})
} }
public room(options: IRoomOptions | string) : Room public room(options: IRoomOptions | string) : Room
{ {

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long