MWSE/sdk/webrtc/FileSender.js

140 lines
4.8 KiB
JavaScript

// P2P file transfer over dedicated RTCDataChannels.
//
// The file is split into PART_SIZE (10 MB) partitions; up to 5 parallel
// channels carry one partition each so large files don't stall.
// Protocol per channel:
// sender → JSON { size, idx } — announce partition size + index
// receiver → 'READY' — ready to receive chunks
// sender → ArrayBuffer chunks — CHUNK_SIZE bytes each
// receiver → 'ACK' — all bytes received, send next part
// sender → close() — no more partitions for this channel
//
// Emits: 'progress'({ sent, total }), 'complete', 'error'(err)
import MWSEEventTarget from '../EventTarget.js';
const PART_SIZE = 10 * 1024 * 1024; // 10 MB per partition
const CHUNK_SIZE = 16 * 1024; // 16 KB chunk size
export default class FileSender extends MWSEEventTarget {
constructor(pc) {
super();
this._pc = pc; // PeerConnection instance
}
// Set up a receiver. Must be called before the sender opens channels.
// onFile(blob) is called when all partitions are reassembled.
receive(partCount, onFile) {
const parts = new Array(partCount);
let remaining = partCount;
this._pc.on('datachannel', ch => {
if (!ch.label.startsWith('mwse/file/')) return;
_receivePartition(ch, parts, () => {
if (--remaining === 0) onFile(new Blob(parts));
});
});
}
// Send a File or Blob. Returns a Promise that resolves when done.
async send(file) {
const buffer = await file.arrayBuffer();
const partCount = Math.ceil(buffer.byteLength / PART_SIZE);
const chanCount = Math.min(5, partCount);
let sent = 0;
let pending = chanCount;
let partIdx = 0;
const nextPart = () => {
if (partIdx >= partCount) return null;
const start = partIdx * PART_SIZE;
const slice = buffer.slice(start, Math.min(start + PART_SIZE, buffer.byteLength));
return { data: slice, idx: partIdx++ };
};
return new Promise((resolve, reject) => {
for (let i = 0; i < chanCount; i++) {
const ch = this._pc.createDataChannel(`mwse/file/${i}`);
ch.binaryType = 'arraybuffer';
ch.onopen = () => {
const part = nextPart();
if (!part) { ch.close(); if (--pending === 0) { this.emit('complete'); resolve(); } return; }
_sendPartition(ch, part, nextPart,
bytes => { sent += bytes; this.emit('progress', { sent, total: buffer.byteLength }); },
() => { ch.close(); if (--pending === 0) { this.emit('complete'); resolve(); } },
err => reject(err)
);
};
ch.onerror = reject;
}
});
}
}
// ---- Private helpers --------------------------------------------------------
function _receivePartition(ch, parts, onPartDone) {
let metaSize = 0;
let metaIdx = 0;
let chunks = [];
let received = 0;
ch.onmessage = ({ data }) => {
if (typeof data === 'string') {
// First message per partition: metadata
({ size: metaSize, idx: metaIdx } = JSON.parse(data));
ch.send('READY');
} else {
chunks.push(data);
received += data.byteLength;
if (received >= metaSize) {
parts[metaIdx] = new Blob(chunks);
chunks = [];
received = 0;
ch.send('ACK');
onPartDone();
}
}
};
}
function _sendPartition(ch, part, nextPart, onProgress, onDone, onError) {
let offset = 0;
// Announce the partition to the receiver.
ch.send(JSON.stringify({ size: part.data.byteLength, idx: part.idx }));
const pump = () => {
while (offset < part.data.byteLength) {
// Back off when the buffer is getting full.
if (ch.bufferedAmount > CHUNK_SIZE * 8) return;
const end = Math.min(offset + CHUNK_SIZE, part.data.byteLength);
const chunk = part.data.slice(offset, end);
ch.send(chunk);
onProgress(chunk.byteLength);
offset += chunk.byteLength;
}
};
ch.bufferedAmountLowThreshold = CHUNK_SIZE;
ch.onbufferedamountlow = pump;
ch.onmessage = ({ data }) => {
if (data === 'READY') {
pump();
} else if (data === 'ACK') {
const next = nextPart();
if (next) {
part = next;
offset = 0;
ch.send(JSON.stringify({ size: part.data.byteLength, idx: part.idx }));
} else {
onDone();
}
}
};
ch.onerror = onError;
}