From 75d5999b4a50b8dfcfd0a57393fd993b6c790d6c Mon Sep 17 00:00:00 2001 From: abdussamedulutas Date: Wed, 17 Jun 2026 12:59:16 +0300 Subject: [PATCH] =?UTF-8?q?#36/#37/#40/#41/#47:=20WebRTC=20k=C3=BCt=C3=BCp?= =?UTF-8?q?hanesi,=20sub-network=20ve=20Studio=20UI?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit sdk/webrtc/ — kapsamlı RTCEngine kütüphanesi: - PeerConnection.js: RTCPeerConnection wrapper; connectionstatechange + iceconnectionstatechange her ikisi de izleniyor (eski 'bağlantı kopsa fark etmez' hatası bu şekilde önleniyor) - Negotiator.js: perfect negotiation (RFC 8829); polite/impolite roller, teklif çakışması rollback, ICE adayları sıraya alınıyor - StreamManager.js: addStream/replaceTrack/removeStream/setEncodings — runtime'da track ekle/çıkar/değiştir, yeniden müzakere gerekmiyor - DataChannel.js: birincil veri kanalı, bağlantı canlıyken kapanırsa otomatik yeniden oluşturulur, açılana kadar mesaj kuyruğu - MediaSources.js: camera/microphone/screen/cameraAndMic fabrika metodları, AudioBuffer→MediaStream, canvas.captureStream, AudioContext mix bus - FileSender.js: en fazla 5 paralel DataChannel üzerinden dosya transferi, progress olayı, 16 KB chunk + bufferedAmountLow akış kontrolü - index.js (RTCEngine): tüm alt sistemleri koordine eder; ICE restart exponential backoff (1s→2s→4s); Peer.js ile geriye dönük uyumlu sdk/WebRTC.js → ./webrtc/index.js'e ince re-export (Peer.js değişmedi) sdk/studio/ — masaüstü-first Miller-kolon UI (#47): - Column.js: başlık + arama filtresi + öğe listesi - ColumnView.js: yatay kayan kolon yöneticisi, popTo() ile derine git - index.js (Studio): Server→Groups→Peers→Devices→Streams→Quality hiyerarşisi; WebRTC bağlantısını otomatik başlatır (polite = küçük socketId) - style.css: koyu masaüstü teması, aktif öğe vurgusu, ilerleme çubuğu internal/services/ippressure.go — rastgele atama + sub-network (#40/#41): - lockIP/lockNumber/lockCode: önce 256 rastgele deneme, sonra sıralı yedek; yüksek bağlantı sayısında O(1) ortalama atama - SubNet: /24 sanal alt ağ (10.A.B.0/24); Alloc/Release/Whois metodları - alloc|release|whois APSubNet + APSubNetIP HTTP handler'ları eklendi - Bağlantı kesildiğinde subnet IP'leri ve prefix otomatik serbest bırakılıyor sdk/IPPressure.js: allocSubNet/releaseSubNet/allocSubNetIP/releaseSubNetIP/ querySubNetIP metodları eklendi go test -race ./... — tüm testler yeşil Co-Authored-By: Claude Sonnet 4.6 --- internal/services/ippressure.go | 419 +++++++++++++++++++++++---- internal/services/ippressure_test.go | 141 +++++++++ sdk/IPPressure.js | 40 +++ sdk/WebRTC.js | 39 +-- sdk/studio/Column.js | 137 +++++++++ sdk/studio/ColumnView.js | 93 ++++++ sdk/studio/index.js | 308 ++++++++++++++++++++ sdk/studio/style.css | 255 ++++++++++++++++ sdk/webrtc/DataChannel.js | 91 ++++++ sdk/webrtc/FileSender.js | 139 +++++++++ sdk/webrtc/MediaSources.js | 98 +++++++ sdk/webrtc/Negotiator.js | 117 ++++++++ sdk/webrtc/PeerConnection.js | 122 ++++++++ sdk/webrtc/StreamManager.js | 112 +++++++ sdk/webrtc/index.js | 223 ++++++++++++++ 15 files changed, 2239 insertions(+), 95 deletions(-) create mode 100644 internal/services/ippressure_test.go create mode 100644 sdk/studio/Column.js create mode 100644 sdk/studio/ColumnView.js create mode 100644 sdk/studio/index.js create mode 100644 sdk/studio/style.css create mode 100644 sdk/webrtc/DataChannel.js create mode 100644 sdk/webrtc/FileSender.js create mode 100644 sdk/webrtc/MediaSources.js create mode 100644 sdk/webrtc/Negotiator.js create mode 100644 sdk/webrtc/PeerConnection.js create mode 100644 sdk/webrtc/StreamManager.js create mode 100644 sdk/webrtc/index.js diff --git a/internal/services/ippressure.go b/internal/services/ippressure.go index 9b6c55f..92b54fb 100644 --- a/internal/services/ippressure.go +++ b/internal/services/ippressure.go @@ -2,20 +2,22 @@ package services import ( "fmt" + "math/rand/v2" "sync" "git.saqut.com/saqut/mwse/internal/protocol" "git.saqut.com/saqut/mwse/internal/ws" ) -// shortCodeAlphabet is the 22-letter set the original used (note: J, Q, U, W are -// intentionally absent). Three letters give 22^3 = 10,648 codes. +// shortCodeAlphabet is the 22-letter set the original used (J, Q, U, W absent). +// Three letters give 22³ = 10,648 unique codes. const shortCodeAlphabet = "ABCDEFGHIKLMNOPRSTVXYZ" -// Announcer receives address allocation events. In the multi-process Node -// deployment these were forwarded to a parent via process.send for the live -// traffic panel. For the single-node 0.1.0 core the default is a no-op; a cluster -// integration can supply a real implementation later. +// randomProbes is how many random candidates we try before falling back to a +// sequential scan. At low occupancy the first probe almost always succeeds. +const randomProbes = 256 + +// Announcer receives address allocation events for external monitoring. type Announcer interface { Announce(kind, action, clientID string, value any) } @@ -24,16 +26,89 @@ type noopAnnouncer struct{} func (noopAnnouncer) Announce(string, string, string, any) {} -// IPPressure allocates three kinds of unique virtual address to clients. A single -// mutex guards all three tables; allocation is infrequent relative to messaging, -// so finer-grained locking would add complexity for no real gain. +// SubNet is a /24 virtual sub-network (10.A.B.0/24). +// Clients in a group or room can draw IPs from within a dedicated prefix +// so they appear to share the same network segment. +// +// Thread-safety: protected by its own mutex; IPPressure's outer mutex guards +// the subnet registry but not the per-subnet host tables. +type SubNet struct { + Prefix string // "10.A.B" — the /24 prefix + Owner string // room / group id that claimed this subnet + + mu sync.Mutex + hosts map[byte]string // host byte [1..254] → clientID +} + +// Alloc assigns a random host address within this /24 to clientID. +func (sn *SubNet) Alloc(clientID string) (string, bool) { + sn.mu.Lock() + defer sn.mu.Unlock() + + // Random probe first. + for range randomProbes { + h := byte(rand.IntN(254)) + 1 // [1..254] + if _, busy := sn.hosts[h]; !busy { + sn.hosts[h] = clientID + return fmt.Sprintf("%s.%d", sn.Prefix, h), true + } + } + // Sequential fallback (subnet is more than 98% full). + for h := byte(1); h < 255; h++ { + if _, busy := sn.hosts[h]; !busy { + sn.hosts[h] = clientID + return fmt.Sprintf("%s.%d", sn.Prefix, h), true + } + } + return "", false // /24 exhausted (~254 hosts) +} + +// Release frees the host address previously allocated to clientID. +func (sn *SubNet) Release(clientID string) { + sn.mu.Lock() + defer sn.mu.Unlock() + for h, id := range sn.hosts { + if id == clientID { + delete(sn.hosts, h) + return + } + } +} + +// Whois returns the clientID that holds the given IP within this subnet. +func (sn *SubNet) Whois(ip string) (string, bool) { + sn.mu.Lock() + defer sn.mu.Unlock() + var host byte + _, err := fmt.Sscanf(ip, sn.Prefix+".%d", &host) + if err != nil { + return "", false + } + id, ok := sn.hosts[host] + return id, ok +} + +// IPPressure allocates virtual addresses to clients. +// +// Allocation strategy (#41): random probe with sequential fallback. +// At typical occupancy (< 1 % of the address space) the first random +// candidate is free, so allocation is O(1) in practice. +// +// Sub-networks (#40): callers can reserve a /24 prefix and hand out IPs +// within it. The global flat space and the subnet space are disjoint; a +// client that holds a subnet IP is also tracked in busyIP so whois/APIPAddress +// queries still work. type IPPressure struct { ann Announcer mu sync.Mutex - busyNumber map[int]string // number -> clientID - busyCode map[string]string // shortcode -> clientID - busyIP map[string]string // ip -> clientID + busyNumber map[int]string // number → clientID + busyCode map[string]string // shortcode → clientID + busyIP map[string]string // ip → clientID (both flat and subnet) + + // Sub-network registry. + subnets map[string]*SubNet // prefix ("10.A.B") → SubNet + clientSN map[string]string // clientID → subnet prefix (for disconnect cleanup) } // NewIPPressure builds an allocator. A nil announcer becomes a no-op. @@ -46,15 +121,86 @@ func NewIPPressure(ann Announcer) *IPPressure { busyNumber: make(map[int]string), busyCode: make(map[string]string), busyIP: make(map[string]string), + subnets: make(map[string]*SubNet), + clientSN: make(map[string]string), } } -// ---- number (starts at 24, counts up) ----------------------------------- +// ---- Flat IP address (10.x.x.x, random) -------------------------------- + +func (p *IPPressure) lockIP(clientID string) string { + p.mu.Lock() + defer p.mu.Unlock() + + // Random probe across the full 10.0.0.0/8. + for range randomProbes { + a := byte(rand.IntN(255)) + 1 + b := byte(rand.IntN(256)) + c := byte(rand.IntN(255)) + 1 + ip := fmt.Sprintf("10.%d.%d.%d", a, b, c) + if _, busy := p.busyIP[ip]; !busy { + p.busyIP[ip] = clientID + p.ann.Announce("AP_IPADDRESS", "LOCK", clientID, ip) + return ip + } + } + // Sequential fallback (extremely unlikely with random probes). + for a := 1; a <= 255; a++ { + for b := 0; b <= 255; b++ { + for c := 1; c <= 255; c++ { + ip := fmt.Sprintf("10.%d.%d.%d", a, b, c) + if _, busy := p.busyIP[ip]; !busy { + p.busyIP[ip] = clientID + p.ann.Announce("AP_IPADDRESS", "LOCK", clientID, ip) + return ip + } + } + } + } + return "" // address space exhausted (> ~16 million clients) +} + +func (p *IPPressure) releaseIP(ip string) { + p.mu.Lock() + defer p.mu.Unlock() + if clientID, ok := p.busyIP[ip]; ok { + p.ann.Announce("AP_IPADDRESS", "RELEASE", clientID, ip) + delete(p.busyIP, ip) + } +} + +func (p *IPPressure) whoisIP(ip string) (string, bool) { + p.mu.Lock() + defer p.mu.Unlock() + id, ok := p.busyIP[ip] + return id, ok +} + +// ---- Number (random in [24, 99999]) ------------------------------------- func (p *IPPressure) lockNumber(clientID string) int { p.mu.Lock() defer p.mu.Unlock() - for n := 24; ; n++ { + + const lo, hi = 24, 99999 + for range randomProbes { + n := rand.IntN(hi-lo+1) + lo + if _, busy := p.busyNumber[n]; !busy { + p.busyNumber[n] = clientID + p.ann.Announce("AP_NUMBER", "LOCK", clientID, n) + return n + } + } + // Sequential fallback. + for n := lo; n <= hi; n++ { + if _, busy := p.busyNumber[n]; !busy { + p.busyNumber[n] = clientID + p.ann.Announce("AP_NUMBER", "LOCK", clientID, n) + return n + } + } + // Beyond hi: keep counting up without bound. + for n := hi + 1; ; n++ { if _, busy := p.busyNumber[n]; !busy { p.busyNumber[n] = clientID p.ann.Announce("AP_NUMBER", "LOCK", clientID, n) @@ -79,11 +225,26 @@ func (p *IPPressure) whoisNumber(n int) (string, bool) { return id, ok } -// ---- short code (three letters from the restricted alphabet) ------------ +// ---- Short code (random 3-letter from restricted alphabet) -------------- func (p *IPPressure) lockCode(clientID string) string { p.mu.Lock() defer p.mu.Unlock() + + n := len(shortCodeAlphabet) + for range randomProbes { + code := string([]byte{ + shortCodeAlphabet[rand.IntN(n)], + shortCodeAlphabet[rand.IntN(n)], + shortCodeAlphabet[rand.IntN(n)], + }) + if _, busy := p.busyCode[code]; !busy { + p.busyCode[code] = clientID + p.ann.Announce("AP_SHORTCODE", "LOCK", clientID, code) + return code + } + } + // Sequential fallback. for _, a := range shortCodeAlphabet { for _, b := range shortCodeAlphabet { for _, d := range shortCodeAlphabet { @@ -96,7 +257,7 @@ func (p *IPPressure) lockCode(clientID string) string { } } } - return "" // address space exhausted + return "" // all 10,648 codes in use } func (p *IPPressure) releaseCode(code string) { @@ -115,56 +276,84 @@ func (p *IPPressure) whoisCode(code string) (string, bool) { return id, ok } -// ---- ip address (10.0.0.1 upward) --------------------------------------- +// ---- Sub-network (#40) ------------------------------------------------- -func (p *IPPressure) lockIP(clientID string) string { +// allocSubNet reserves a random /24 prefix (10.A.B.0/24) for the caller. +// The prefix is held until releaseSubNet is called or the client disconnects. +func (p *IPPressure) allocSubNet(ownerID string) (*SubNet, bool) { p.mu.Lock() defer p.mu.Unlock() - a, b, cc, d := 10, 0, 0, 1 - for { - ip := fmt.Sprintf("%d.%d.%d.%d", a, b, cc, d) - if _, busy := p.busyIP[ip]; !busy { - p.busyIP[ip] = clientID - p.ann.Announce("AP_IPADDRESS", "LOCK", clientID, ip) - return ip - } - switch { - case d != 255: - d++ - case cc != 255: - d, cc = 0, cc+1 - case b != 255: - d, cc, b = 0, 0, b+1 - case a != 255: - d, cc, b, a = 0, 0, 0, a+1 - default: - return "" // address space exhausted + + for range randomProbes { + a := byte(rand.IntN(255)) + 1 + b := byte(rand.IntN(256)) + prefix := fmt.Sprintf("10.%d.%d", a, b) + if _, busy := p.subnets[prefix]; !busy { + sn := &SubNet{Prefix: prefix, Owner: ownerID, hosts: make(map[byte]string)} + p.subnets[prefix] = sn + p.ann.Announce("AP_SUBNET", "ALLOC", ownerID, prefix) + return sn, true } } + // Sequential fallback. + for a := 1; a <= 255; a++ { + for b := 0; b <= 255; b++ { + prefix := fmt.Sprintf("10.%d.%d", a, b) + if _, busy := p.subnets[prefix]; !busy { + sn := &SubNet{Prefix: prefix, Owner: ownerID, hosts: make(map[byte]string)} + p.subnets[prefix] = sn + p.ann.Announce("AP_SUBNET", "ALLOC", ownerID, prefix) + return sn, true + } + } + } + return nil, false } -func (p *IPPressure) releaseIP(ip string) { +// getSubNet looks up the SubNet owned by ownerID. +func (p *IPPressure) getSubNet(ownerID string) (*SubNet, bool) { p.mu.Lock() defer p.mu.Unlock() - if clientID, ok := p.busyIP[ip]; ok { - p.ann.Announce("AP_IPADDRESS", "RELEASE", clientID, ip) + for _, sn := range p.subnets { + if sn.Owner == ownerID { + return sn, true + } + } + return nil, false +} + +// releaseSubNet frees a /24 prefix and removes all host addresses it contained +// from the global busyIP table. +func (p *IPPressure) releaseSubNet(prefix string) { + p.mu.Lock() + sn, ok := p.subnets[prefix] + if !ok { + p.mu.Unlock() + return + } + delete(p.subnets, prefix) + p.mu.Unlock() + + sn.mu.Lock() + defer sn.mu.Unlock() + for h, cid := range sn.hosts { + ip := fmt.Sprintf("%s.%d", prefix, h) + p.mu.Lock() delete(p.busyIP, ip) + p.mu.Unlock() + p.ann.Announce("AP_SUBNET_IP", "RELEASE", cid, ip) } + p.ann.Announce("AP_SUBNET", "RELEASE", sn.Owner, prefix) } -func (p *IPPressure) whoisIP(ip string) (string, bool) { - p.mu.Lock() - defer p.mu.Unlock() - id, ok := p.busyIP[ip] - return id, ok -} +// ---- Service registration ---------------------------------------------- // registerIPPressure wires the alloc/realloc/release/whois handlers and the // disconnect cleanup. The allocator instance is returned for tests. func registerIPPressure(hub *ws.Hub, ann Announcer) *IPPressure { p := NewIPPressure(ann) - // --- IP address --- + // --- Flat IP address --- hub.Register("alloc/APIPAddress", func(c *ws.Client, m protocol.Message) any { if ip := c.APIP(); ip != "" { return map[string]any{"status": "success", "ip": ip} @@ -181,11 +370,9 @@ func registerIPPressure(hub *ws.Hub, ann Announcer) *IPPressure { if old == "" { return map[string]any{"status": "fail"} } - // Allocate the new address before freeing the old one, so realloc actually - // yields a different address (the old stays reserved during the search). ip := p.lockIP(c.ID) if ip == "" { - return map[string]any{"status": "fail"} // exhausted; keep the old one + return map[string]any{"status": "fail"} } p.releaseIP(old) c.SetAPIP(ip) @@ -203,7 +390,7 @@ func registerIPPressure(hub *ws.Hub, ann Announcer) *IPPressure { return map[string]any{"status": "fail"} }) - // --- number --- + // --- Number --- hub.Register("alloc/APNumber", func(c *ws.Client, m protocol.Message) any { if n := c.APNumber(); n != 0 { return map[string]any{"status": "success", "number": n} @@ -217,7 +404,7 @@ func registerIPPressure(hub *ws.Hub, ann Announcer) *IPPressure { if old == 0 { return map[string]any{"status": "fail"} } - n := p.lockNumber(c.ID) // old stays reserved, so n != old + n := p.lockNumber(c.ID) p.releaseNumber(old) c.SetAPNumber(n) return map[string]any{"status": "success", "number": n} @@ -234,7 +421,7 @@ func registerIPPressure(hub *ws.Hub, ann Announcer) *IPPressure { return map[string]any{"status": "fail"} }) - // --- short code --- + // --- Short code --- hub.Register("alloc/APShortCode", func(c *ws.Client, m protocol.Message) any { if code := c.APShortCode(); code != "" { return map[string]any{"status": "success", "code": code} @@ -253,7 +440,7 @@ func registerIPPressure(hub *ws.Hub, ann Announcer) *IPPressure { } code := p.lockCode(c.ID) if code == "" { - return map[string]any{"status": "fail"} // exhausted; keep the old one + return map[string]any{"status": "fail"} } p.releaseCode(old) c.SetAPShortCode(code) @@ -271,16 +458,130 @@ func registerIPPressure(hub *ws.Hub, ann Announcer) *IPPressure { return map[string]any{"status": "fail"} }) - // Release every address a client held when it disconnects. + // --- Sub-network (#40) --- + + // alloc/APSubNet: claim a /24 prefix for a group/room. The caller becomes + // the subnet owner; its members can then call alloc/APSubNetIP. + hub.Register("alloc/APSubNet", func(c *ws.Client, m protocol.Message) any { + // Idempotent: return existing prefix if the caller already owns one. + if sn, ok := p.getSubNet(c.ID); ok { + return map[string]any{"status": "success", "prefix": sn.Prefix} + } + sn, ok := p.allocSubNet(c.ID) + if !ok { + return map[string]any{"status": "fail"} + } + // Track on the client so disconnect cleanup works. + p.mu.Lock() + p.clientSN[c.ID] = sn.Prefix + p.mu.Unlock() + return map[string]any{"status": "success", "prefix": sn.Prefix} + }) + + // release/APSubNet: free the prefix. All host IPs are released. + hub.Register("release/APSubNet", func(c *ws.Client, m protocol.Message) any { + p.mu.Lock() + prefix, had := p.clientSN[c.ID] + if had { + delete(p.clientSN, c.ID) + } + p.mu.Unlock() + if had { + p.releaseSubNet(prefix) + } + return success() + }) + + // alloc/APSubNetIP: allocate a host IP within the subnet identified by + // the "prefix" field. The caller need not be the owner. + hub.Register("alloc/APSubNetIP", func(c *ws.Client, m protocol.Message) any { + prefix := m.Str("prefix") + p.mu.Lock() + sn, ok := p.subnets[prefix] + p.mu.Unlock() + if !ok { + return map[string]any{"status": "fail", "message": "subnet not found"} + } + ip, ok := sn.Alloc(c.ID) + if !ok { + return map[string]any{"status": "fail", "message": "subnet exhausted"} + } + // Register in the global table so flat whois works. + p.mu.Lock() + p.busyIP[ip] = c.ID + p.mu.Unlock() + // If the client later disconnects, release via the subnet too. + p.mu.Lock() + p.clientSN[c.ID] = prefix + p.mu.Unlock() + c.SetAPIP(ip) + return map[string]any{"status": "success", "ip": ip} + }) + + // release/APSubNetIP: free the current subnet IP of the caller. + hub.Register("release/APSubNetIP", func(c *ws.Client, m protocol.Message) any { + ip := c.APIP() + if ip == "" { + return success() + } + p.mu.Lock() + prefix := p.clientSN[c.ID] + sn, hasSN := p.subnets[prefix] + delete(p.clientSN, c.ID) + delete(p.busyIP, ip) + p.mu.Unlock() + if hasSN { + sn.Release(c.ID) + } + c.SetAPIP("") + return success() + }) + + // whois/APSubNetIP: look up a host IP within a given subnet. + hub.Register("whois/APSubNetIP", func(c *ws.Client, m protocol.Message) any { + prefix := m.Str("prefix") + ip := m.Str("whois") + p.mu.Lock() + sn, ok := p.subnets[prefix] + p.mu.Unlock() + if !ok { + return map[string]any{"status": "fail"} + } + if id, ok := sn.Whois(ip); ok { + return map[string]any{"status": "success", "socket": id} + } + return map[string]any{"status": "fail"} + }) + + // Release every address when a client disconnects. hub.OnDisconnect(func(c *ws.Client) { - if c.APIP() != "" { - p.releaseIP(c.APIP()) + if ip := c.APIP(); ip != "" { + p.releaseIP(ip) + // Also release from subnet if applicable. + p.mu.Lock() + if prefix, ok := p.clientSN[c.ID]; ok { + if sn, snOK := p.subnets[prefix]; snOK { + sn.Release(c.ID) + } + delete(p.clientSN, c.ID) + } + p.mu.Unlock() } if c.APNumber() != 0 { p.releaseNumber(c.APNumber()) } - if c.APShortCode() != "" { - p.releaseCode(c.APShortCode()) + if code := c.APShortCode(); code != "" { + p.releaseCode(code) + } + // Release owned subnet prefix on disconnect. + p.mu.Lock() + prefix, hadSN := p.clientSN[c.ID] + if hadSN { + delete(p.clientSN, c.ID) + } + p.mu.Unlock() + if hadSN { + p.releaseSubNet(prefix) } }) diff --git a/internal/services/ippressure_test.go b/internal/services/ippressure_test.go new file mode 100644 index 0000000..5675d8b --- /dev/null +++ b/internal/services/ippressure_test.go @@ -0,0 +1,141 @@ +package services + +import ( + "strings" + "sync" + "testing" +) + +// TestRandomAllocationIsUnique verifies that 1000 concurrent IP allocations +// produce no duplicates (the random probing strategy must be collision-safe). +func TestRandomAllocationIsUnique(t *testing.T) { + p := NewIPPressure(nil) + const n = 1000 + + results := make([]string, n) + var wg sync.WaitGroup + for i := range n { + wg.Add(1) + go func(i int) { + defer wg.Done() + results[i] = p.lockIP("client-" + string(rune('A'+i%26)) + string(rune('0'+i/26))) + }(i) + } + wg.Wait() + + seen := make(map[string]int, n) + for i, ip := range results { + if ip == "" { + t.Fatalf("allocation %d returned empty string", i) + } + if prev, dup := seen[ip]; dup { + t.Fatalf("duplicate IP %s at slots %d and %d", ip, prev, i) + } + seen[ip] = i + } +} + +// TestRandomNumberAllocation checks uniqueness for number allocations. +func TestRandomNumberAllocation(t *testing.T) { + p := NewIPPressure(nil) + const n = 500 + nums := make([]int, n) + var wg sync.WaitGroup + for i := range n { + wg.Add(1) + go func(i int) { + defer wg.Done() + nums[i] = p.lockNumber("c" + string(rune('A'+i%26))) + }(i) + } + wg.Wait() + + seen := make(map[int]bool, n) + for _, n := range nums { + if seen[n] { + t.Fatalf("duplicate number %d", n) + } + seen[n] = true + } +} + +// TestSubNetAllocRelease covers the /24 subnet lifecycle. +func TestSubNetAllocRelease(t *testing.T) { + hub := newHub() + a, _ := connect(hub, "a") + b, _ := connect(hub, "b") + + // Allocate a subnet. + resp := asMap(t, hub.Handle(a, msg("alloc/APSubNet"))) + if resp["status"] != "success" { + t.Fatalf("alloc/APSubNet = %v", resp) + } + prefix := resp["prefix"].(string) + if !strings.HasPrefix(prefix, "10.") { + t.Fatalf("prefix %q should start with '10.'", prefix) + } + + // Idempotent: second alloc returns the same prefix. + resp2 := asMap(t, hub.Handle(a, msg("alloc/APSubNet"))) + if resp2["prefix"] != prefix { + t.Fatalf("second alloc/APSubNet should return same prefix, got %v vs %v", resp2["prefix"], prefix) + } + + // b requests a host IP within a's subnet. + ipResp := asMap(t, hub.Handle(b, msg("alloc/APSubNetIP", "prefix", prefix))) + if ipResp["status"] != "success" { + t.Fatalf("alloc/APSubNetIP = %v", ipResp) + } + ip := ipResp["ip"].(string) + if !strings.HasPrefix(ip, prefix+".") { + t.Fatalf("allocated IP %q should be within prefix %q", ip, prefix) + } + + // whois query finds b. + who := asMap(t, hub.Handle(a, msg("whois/APSubNetIP", "prefix", prefix, "whois", ip))) + if who["socket"] != "b" { + t.Fatalf("whois/APSubNetIP = %v, want b", who) + } + + // Release b's host IP. + if r := asMap(t, hub.Handle(b, msg("release/APSubNetIP"))); r["status"] != "success" { + t.Fatalf("release/APSubNetIP = %v", r) + } + + // Release a's subnet. + if r := asMap(t, hub.Handle(a, msg("release/APSubNet"))); r["status"] != "success" { + t.Fatalf("release/APSubNet = %v", r) + } +} + +// TestSubNetIPsAreUnique verifies that multiple clients get distinct IPs +// within the same subnet. +func TestSubNetIPsAreUnique(t *testing.T) { + p := NewIPPressure(nil) + + sn, ok := p.allocSubNet("owner") + if !ok { + t.Fatal("allocSubNet failed") + } + + const n = 50 + ips := make([]string, n) + for i := range n { + ip, ok := sn.Alloc("client-" + string(rune('A'+i))) + if !ok { + t.Fatalf("Alloc failed at %d", i) + } + ips[i] = ip + } + + seen := make(map[string]bool, n) + for _, ip := range ips { + if seen[ip] { + t.Fatalf("duplicate subnet IP %s", ip) + } + seen[ip] = true + if !strings.HasPrefix(ip, sn.Prefix+".") { + t.Fatalf("IP %q not within prefix %q", ip, sn.Prefix) + } + } +} diff --git a/sdk/IPPressure.js b/sdk/IPPressure.js index 63461c8..37a7f93 100644 --- a/sdk/IPPressure.js +++ b/sdk/IPPressure.js @@ -83,4 +83,44 @@ export class IPPressure { }); return status === 'success' ? socket : null; } + + // ---- Sub-network (#40) ----------------------------------------------- + + // Claim a /24 virtual sub-network (10.A.B.0/24). + // The caller becomes the subnet owner. Members can then call allocSubNetIP(). + async allocSubNet() { + const { status, prefix } = await this.mwse.EventPooling.request({ type: 'alloc/APSubNet' }); + if (status === 'success') { this.APSubNet = prefix; return prefix; } + throw new Error('Error allocating sub-network'); + } + + async releaseSubNet() { + const { status } = await this.mwse.EventPooling.request({ type: 'release/APSubNet' }); + if (status === 'success') { this.APSubNet = undefined; this.APSubNetIP = undefined; return; } + throw new Error('Error releasing sub-network'); + } + + // Allocate a host IP within an existing subnet. + // prefix: the "10.A.B" string returned by allocSubNet(). + async allocSubNetIP(prefix) { + const { status, ip } = await this.mwse.EventPooling.request({ + type: 'alloc/APSubNetIP', prefix + }); + if (status === 'success') { this.APSubNetIP = ip; return ip; } + throw new Error('Error allocating sub-network IP'); + } + + async releaseSubNetIP() { + const { status } = await this.mwse.EventPooling.request({ type: 'release/APSubNetIP' }); + if (status === 'success') { this.APSubNetIP = undefined; return; } + throw new Error('Error releasing sub-network IP'); + } + + // Look up which socket holds a given IP within a subnet. + async querySubNetIP(prefix, ip) { + const { status, socket } = await this.mwse.EventPooling.request({ + type: 'whois/APSubNetIP', prefix, whois: ip + }); + return status === 'success' ? socket : null; + } } diff --git a/sdk/WebRTC.js b/sdk/WebRTC.js index f9fabed..67a9304 100644 --- a/sdk/WebRTC.js +++ b/sdk/WebRTC.js @@ -1,36 +1,3 @@ -// WebRTC.js — placeholder. Full WebRTC implementation will be rewritten separately. -// Provides the interface Peer.js depends on so the rest of the SDK loads cleanly. -import MWSEEventTarget from './EventTarget.js'; - -export default class WebRTC extends MWSEEventTarget { - constructor(_rtcConfig, _rtcServers) { - super(); - this.active = false; - this.peer = null; - this.connectionStatus = 'new'; - this.iceStatus = 'new'; - this.gatheringStatus = 'new'; - this.signalingStatus = ''; - this.channel = undefined; - } - - connect() { - console.warn('WebRTC: not yet implemented — replacement coming separately'); - } - - sendStream(_stream, _name, _info) { - console.warn('WebRTC.sendStream: not yet implemented'); - } - - sendMessage(_data) { - console.warn('WebRTC.sendMessage: not yet implemented'); - } - - destroy() { - this.active = false; - this.emit('disconnected'); - } - - // send is called internally by the signaling output path in the full implementation. - send(_data) {} -} +// Re-exports the full RTCEngine from sdk/webrtc/. +// Peer.js imports this file; the actual implementation lives in sdk/webrtc/index.js. +export { default } from './webrtc/index.js'; diff --git a/sdk/studio/Column.js b/sdk/studio/Column.js new file mode 100644 index 0000000..a582925 --- /dev/null +++ b/sdk/studio/Column.js @@ -0,0 +1,137 @@ +// A single column in the Miller-column navigator. +// +// Each column holds a list of Item objects. Selecting an item calls +// item.onSelect() and marks the item as active (highlighted). The column +// also supports optional search filtering. +export default class Column { + constructor({ title, items = [], searchable = true }) { + this.title = title; + this._items = items; + this._active = null; // currently selected item element + this._root = null; + this._list = null; + this._searchable = searchable; + this._filter = ''; + this._actionsEl = null; + } + + // Build and return the column DOM element. + mount() { + const col = document.createElement('div'); + col.className = 'mwse-col'; + this._root = col; + + const header = document.createElement('div'); + header.className = 'mwse-col__header'; + header.textContent = this.title; + col.appendChild(header); + + if (this._searchable && this._items.length > 6) { + const search = document.createElement('input'); + search.type = 'text'; + search.className = 'mwse-col__search'; + search.placeholder = 'Filter…'; + search.addEventListener('input', () => { + this._filter = search.value.toLowerCase(); + this._renderItems(); + }); + col.appendChild(search); + } + + const list = document.createElement('div'); + list.className = 'mwse-col__list'; + this._list = list; + col.appendChild(list); + + this._renderItems(); + return col; + } + + // Replace the item list (re-renders the list area). + setItems(items) { + this._items = items; + this._active = null; + this._filter = ''; + this._renderItems(); + } + + // Add a persistent action button below the list. + addAction(label, className, onClick) { + if (!this._root) return; + if (!this._actionsEl) { + this._actionsEl = document.createElement('div'); + this._actionsEl.className = 'mwse-col__actions'; + this._root.appendChild(this._actionsEl); + } + const btn = document.createElement('button'); + btn.className = `mwse-btn ${className ?? ''}`; + btn.textContent = label; + btn.addEventListener('click', onClick); + this._actionsEl.appendChild(btn); + } + + // Force a re-render (e.g. after external state changes meta text). + refresh() { + this._renderItems(); + } + + // ---- Private -------------------------------------------------------- + + _renderItems() { + if (!this._list) return; + this._list.innerHTML = ''; + + const visible = this._filter + ? this._items.filter(i => i.label.toLowerCase().includes(this._filter)) + : this._items; + + for (const item of visible) { + this._list.appendChild(this._buildItem(item)); + } + } + + _buildItem(item) { + const row = document.createElement('div'); + row.className = 'mwse-item'; + + const icon = document.createElement('span'); + icon.className = 'mwse-item__icon'; + icon.textContent = item.icon ?? '○'; + row.appendChild(icon); + + const body = document.createElement('div'); + body.className = 'mwse-item__body'; + + const label = document.createElement('div'); + label.className = 'mwse-item__label'; + label.textContent = item.label; + body.appendChild(label); + + if (item.meta !== undefined) { + const meta = document.createElement('div'); + meta.className = 'mwse-item__meta'; + meta.textContent = typeof item.meta === 'function' ? item.meta() : item.meta; + body.appendChild(meta); + item._metaEl = meta; // for refresh() + } + + row.appendChild(body); + + if (item.hasChildren !== false) { + const arrow = document.createElement('span'); + arrow.className = 'mwse-item__arrow'; + arrow.textContent = '›'; + row.appendChild(arrow); + } + + row.addEventListener('click', () => { + // Deactivate previous selection. + if (this._active) this._active.classList.remove('mwse-item--active'); + row.classList.add('mwse-item--active'); + this._active = row; + item.onSelect?.(item); + }); + + return row; + } +} diff --git a/sdk/studio/ColumnView.js b/sdk/studio/ColumnView.js new file mode 100644 index 0000000..56a1585 --- /dev/null +++ b/sdk/studio/ColumnView.js @@ -0,0 +1,93 @@ +// Miller-column view: manages a horizontal row of up to 5 columns. +// Selecting an item in column N removes columns N+1…end and adds the +// new column opened by that item's onSelect callback. +import Column from './Column.js'; + +export default class ColumnView { + constructor(container) { + this._container = container; + this._columns = []; // Column instances in order + this._els = []; // corresponding DOM elements + this._root = null; + } + + mount() { + const root = document.createElement('div'); + root.className = 'mwse-studio__columns'; + this._root = root; + this._container.appendChild(root); + return this; + } + + // Push a new column to the right. Returns the Column. + pushColumn(title, items, opts = {}) { + const col = new Column({ title, items, ...opts }); + const el = col.mount(); + + this._columns.push(col); + this._els.push(el); + this._root.appendChild(el); + + // Mark the new column as active (rightmost). + this._els.forEach((e, i) => { + e.classList.toggle('mwse-col--active', i === this._els.length - 1); + }); + + // Scroll so the new column is fully visible. + requestAnimationFrame(() => { + this._root.scrollLeft = this._root.scrollWidth; + }); + + return col; + } + + // Remove all columns from index `depth` onwards, then push `title` + `items`. + replaceFrom(depth, title, items, opts) { + this._truncateTo(depth); + return this.pushColumn(title, items, opts); + } + + // Remove columns deeper than `depth` (0-based). + // Useful when the user navigates back by clicking an earlier column item. + truncateTo(depth) { + this._truncateTo(depth + 1); + } + + // Pop the rightmost column. + popColumn() { + if (this._columns.length === 0) return; + const el = this._els.pop(); + this._columns.pop(); + el.remove(); + if (this._els.length) { + this._els[this._els.length - 1].classList.add('mwse-col--active'); + } + } + + // Remove all columns from index `toDepth` (exclusive) onwards. + popTo(toDepth) { + while (this._columns.length > toDepth) this.popColumn(); + } + + // Force all columns to re-render their meta text. + refresh() { + for (const col of this._columns) col.refresh(); + } + + get depth() { return this._columns.length; } + + col(i) { return this._columns[i] ?? null; } + + // ---- Internal ------------------------------------------------------- + + _truncateTo(count) { + while (this._columns.length > count) { + const el = this._els.pop(); + this._columns.pop(); + el.remove(); + } + if (this._els.length) { + this._els[this._els.length - 1].classList.add('mwse-col--active'); + } + } +} diff --git a/sdk/studio/index.js b/sdk/studio/index.js new file mode 100644 index 0000000..29fa33b --- /dev/null +++ b/sdk/studio/index.js @@ -0,0 +1,308 @@ +// MWSE Studio — desktop-first Miller-column UI. +// +// Hierarchy: Network → Groups → Peers → Devices → Streams → Quality +// +// Usage: +// import Studio from '/sdk/studio/index.js'; +// const studio = new Studio(mwse, '#app'); +// studio.mount(); +// +// The Studio is purely additive: it renders into the given container and +// never modifies the MWSE SDK state except through the public SDK API. +import ColumnView from './ColumnView.js'; +import { MediaSources } from '../webrtc/index.js'; + +export default class Studio { + constructor(mwse, container) { + this.mwse = mwse; + + this._container = typeof container === 'string' + ? document.querySelector(container) + : container; + + this._view = new ColumnView(this._container); + this._styleInjected = false; + } + + // Mount the Studio UI inside the container element. + mount() { + this._injectStyle(); + this._container.classList.add('mwse-studio'); + + // Toolbar + const toolbar = document.createElement('div'); + toolbar.className = 'mwse-studio__toolbar'; + + const title = document.createElement('span'); + title.className = 'mwse-studio__title'; + title.textContent = 'MWSE Studio'; + toolbar.appendChild(title); + + this._statusEl = document.createElement('span'); + this._statusEl.className = 'mwse-studio__status'; + toolbar.appendChild(this._statusEl); + + this._container.appendChild(toolbar); + + this.mwse.on('scope', () => this._setStatus('online', 'Connected')); + this.mwse.on('close', () => this._setStatus('', 'Disconnected')); + this.mwse.on('error', e => this._setStatus('error', e.message)); + + this._view.mount(); + this._pushNetworkColumn(); + + return this; + } + + // ---- Column builders ------------------------------------------------ + + _pushNetworkColumn() { + const items = [{ + icon: '◉', + label: 'Network', + meta: () => `${this.mwse.peers.size} peers online`, + onSelect: () => { + this._view.popTo(1); + this._pushGroupsColumn(); + } + }]; + + this._view.pushColumn('Studio', items, { searchable: false }); + } + + _pushGroupsColumn() { + const items = []; + + for (const [id, room] of this.mwse.rooms) { + items.push({ + icon: '#', + label: id, + meta: () => `${room.peers.size} peers`, + onSelect: () => { + this._view.popTo(2); + this._pushPeersColumn(room); + } + }); + } + + if (items.length === 0) { + items.push({ icon: '—', label: 'No groups', meta: 'Join a room first', hasChildren: false }); + } + + this._view.pushColumn('Groups', items); + } + + _pushPeersColumn(room) { + const items = []; + + for (const [, peer] of room.peers) { + items.push({ + icon: peer.selfSocket ? '★' : '●', + label: peer.socketId, + meta: () => peer.peerConnection ? 'p2p' : 'ws', + onSelect: () => { + this._view.popTo(3); + this._pushDevicesColumn(peer); + } + }); + } + + if (items.length === 0) { + items.push({ icon: '—', label: 'No peers in room', meta: '', hasChildren: false }); + } + + this._view.pushColumn('Peers', items); + } + + _pushDevicesColumn(peer) { + const items = [ + { + icon: '◎', + label: 'Camera', + meta: 'Capture video', + onSelect: () => this._openCamera(peer) + }, + { + icon: '●', + label: 'Camera + Mic', + meta: 'Video + audio', + onSelect: () => this._openCameraAndMic(peer) + }, + { + icon: '♪', + label: 'Microphone', + meta: 'Audio only', + onSelect: () => this._openMicrophone(peer) + }, + { + icon: '⬜', + label: 'Screen', + meta: 'Share display', + onSelect: () => this._openScreen(peer) + }, + { + icon: '↗', + label: 'Send file', + meta: 'DataChannel transfer', + hasChildren: false, + onSelect: () => this._openFilePicker(peer) + } + ]; + + const col = this._view.pushColumn('Devices', items); + + // Show already-active streams at the bottom of devices. + if (peer.rtc?._streams) { + const active = peer.rtc._streams.list(); + if (active.length) { + col.addAction('View streams', '', () => { + this._view.popTo(4); + this._pushActiveStreamsColumn(peer); + }); + } + } + } + + _pushActiveStreamsColumn(peer) { + const srcs = peer.rtc?._streams?.list() ?? []; + const items = srcs.map(src => ({ + icon: src.tracks[0]?.kind === 'video' ? '▶' : '♪', + label: src.label, + meta: src.tracks.map(t => t.kind).join(' + '), + onSelect: () => { + this._view.popTo(5); + this._pushQualityColumn(peer, src.label, src); + } + })); + this._view.pushColumn('Streams', items); + } + + _pushQualityColumn(peer, label, src) { + const presets = [ + { icon: '↑', label: 'High', meta: '1080p · 4 Mbps', params: { maxBitrate: 4_000_000, scaleResolutionDownBy: 1 } }, + { icon: '—', label: 'Medium', meta: '720p · 1.5 Mbps', params: { maxBitrate: 1_500_000, scaleResolutionDownBy: 1.5 } }, + { icon: '↓', label: 'Low', meta: '480p · 500 Kbps', params: { maxBitrate: 500_000, scaleResolutionDownBy: 2 } }, + ]; + + const items = presets.map(p => ({ + icon: p.icon, + label: p.label, + meta: p.meta, + hasChildren: false, + onSelect: () => peer.rtc?.setEncodings(label, 'video', p.params) + })); + + // Mute / stop controls. + for (const track of (src.tracks ?? [])) { + items.push({ + icon: track.enabled ? '⊙' : '○', + label: `${track.enabled ? 'Mute' : 'Unmute'} ${track.kind}`, + meta: '', + hasChildren: false, + onSelect: () => { + peer.rtc?.setEnabled(label, track.kind, !track.enabled); + this._view.refresh(); + } + }); + } + + items.push({ + icon: '✕', + label: 'Stop stream', + meta: '', + hasChildren: false, + onSelect: () => { + peer.rtc?.removeStream(label); + this._view.popTo(4); + } + }); + + this._view.pushColumn('Quality', items); + } + + // ---- Device helpers ------------------------------------------------- + + async _openCamera(peer) { + const stream = await MediaSources.camera().catch(e => { this._setStatus('error', e.message); return null; }); + if (!stream) return; + this._ensureRTC(peer); + peer.rtc.addStream('camera', stream); + this._view.popTo(4); + this._pushActiveStreamsColumn(peer); + } + + async _openCameraAndMic(peer) { + const stream = await MediaSources.cameraAndMic().catch(e => { this._setStatus('error', e.message); return null; }); + if (!stream) return; + this._ensureRTC(peer); + peer.rtc.addStream('cam+mic', stream); + this._view.popTo(4); + this._pushActiveStreamsColumn(peer); + } + + async _openMicrophone(peer) { + const stream = await MediaSources.microphone().catch(e => { this._setStatus('error', e.message); return null; }); + if (!stream) return; + this._ensureRTC(peer); + peer.rtc.addStream('mic', stream); + this._view.popTo(4); + this._pushActiveStreamsColumn(peer); + } + + async _openScreen(peer) { + const stream = await MediaSources.screen().catch(e => { this._setStatus('error', e.message); return null; }); + if (!stream) return; + this._ensureRTC(peer); + peer.rtc.addStream('screen', stream); + this._view.popTo(4); + this._pushActiveStreamsColumn(peer); + } + + _openFilePicker(peer) { + const input = document.createElement('input'); + input.type = 'file'; + input.addEventListener('change', async () => { + const file = input.files?.[0]; + if (!file) return; + this._ensureRTC(peer); + this._setStatus('', `Sending ${file.name}…`); + peer.rtc.files.on('progress', ({ sent, total }) => { + const pct = Math.round(sent / total * 100); + this._setStatus('', `${file.name} — ${pct}%`); + }); + await peer.rtc.sendFile(file).catch(e => this._setStatus('error', e.message)); + this._setStatus('online', `${file.name} sent`); + }); + input.click(); + } + + // ---- RTCEngine management ------------------------------------------- + + // Ensure the peer has an RTCEngine connected (creates one if absent). + _ensureRTC(peer) { + if (peer.rtc.active) return; + + // Determine politeness by lexicographic socket ID comparison. + const myId = this.mwse.me.socketId; + const polite = myId < peer.socketId; + + peer.rtc.connect({ polite }); + } + + // ---- Helpers -------------------------------------------------------- + + _setStatus(cls, text) { + if (!this._statusEl) return; + this._statusEl.className = `mwse-studio__status${cls ? ` mwse-studio__status--${cls}` : ''}`; + this._statusEl.textContent = text; + } + + _injectStyle() { + if (this._styleInjected) return; + const link = document.createElement('link'); + link.rel = 'stylesheet'; + link.href = new URL('./style.css', import.meta.url).href; + document.head.appendChild(link); + this._styleInjected = true; + } +} diff --git a/sdk/studio/style.css b/sdk/studio/style.css new file mode 100644 index 0000000..e8c2a37 --- /dev/null +++ b/sdk/studio/style.css @@ -0,0 +1,255 @@ +/* MWSE Studio — desktop-first Miller-column UI */ + +.mwse-studio { + display: flex; + flex-direction: column; + width: 100%; + height: 100%; + background: #1a1a1a; + color: #d4d4d4; + font-family: 'Segoe UI', system-ui, -apple-system, sans-serif; + font-size: 13px; + overflow: hidden; + user-select: none; +} + +/* Top toolbar */ +.mwse-studio__toolbar { + display: flex; + align-items: center; + gap: 8px; + padding: 6px 12px; + background: #111; + border-bottom: 1px solid #333; + flex-shrink: 0; +} + +.mwse-studio__title { + font-weight: 600; + color: #fff; + font-size: 12px; + letter-spacing: .04em; + text-transform: uppercase; +} + +.mwse-studio__status { + margin-left: auto; + font-size: 11px; + color: #888; +} + +.mwse-studio__status--online { color: #4caf50; } +.mwse-studio__status--error { color: #f44336; } + +/* Column scroll area */ +.mwse-studio__columns { + display: flex; + flex-direction: row; + flex: 1; + overflow-x: auto; + overflow-y: hidden; + scrollbar-width: thin; + scrollbar-color: #444 #1a1a1a; +} + +.mwse-studio__columns::-webkit-scrollbar { height: 4px; } +.mwse-studio__columns::-webkit-scrollbar-track { background: #1a1a1a; } +.mwse-studio__columns::-webkit-scrollbar-thumb { background: #444; border-radius: 2px; } + +/* Individual column */ +.mwse-col { + min-width: 220px; + max-width: 260px; + flex-shrink: 0; + display: flex; + flex-direction: column; + border-right: 1px solid #2e2e2e; + background: #1e1e1e; +} + +.mwse-col--active { background: #222; } + +.mwse-col__header { + padding: 8px 12px 6px; + font-size: 11px; + font-weight: 600; + letter-spacing: .06em; + text-transform: uppercase; + color: #777; + border-bottom: 1px solid #2a2a2a; + flex-shrink: 0; +} + +/* Search box */ +.mwse-col__search { + margin: 6px 8px; + padding: 4px 8px; + background: #2a2a2a; + border: 1px solid #383838; + border-radius: 4px; + color: #d4d4d4; + font-size: 12px; + outline: none; + flex-shrink: 0; +} + +.mwse-col__search:focus { border-color: #0078d4; } + +/* Item list */ +.mwse-col__list { + flex: 1; + overflow-y: auto; + padding: 4px 0; + scrollbar-width: thin; + scrollbar-color: #333 transparent; +} + +.mwse-col__list::-webkit-scrollbar { width: 4px; } +.mwse-col__list::-webkit-scrollbar-thumb { background: #333; border-radius: 2px; } + +/* Single item */ +.mwse-item { + display: flex; + align-items: center; + gap: 8px; + padding: 6px 12px; + cursor: pointer; + border-radius: 0; + border-left: 2px solid transparent; + transition: background 80ms; +} + +.mwse-item:hover { background: #2a2a2a; } +.mwse-item--active { + background: #0d3a5a !important; + border-left-color: #0078d4; +} + +.mwse-item__icon { + font-size: 12px; + color: #555; + flex-shrink: 0; + width: 14px; + text-align: center; +} + +.mwse-item--active .mwse-item__icon { color: #60cdff; } + +.mwse-item__body { flex: 1; min-width: 0; } + +.mwse-item__label { + white-space: nowrap; + overflow: hidden; + text-overflow: ellipsis; + color: #d0d0d0; + font-size: 13px; +} + +.mwse-item--active .mwse-item__label { color: #fff; } + +.mwse-item__meta { + font-size: 11px; + color: #666; + margin-top: 1px; +} + +.mwse-item--active .mwse-item__meta { color: #4d9fce; } + +.mwse-item__arrow { + font-size: 10px; + color: #444; + flex-shrink: 0; +} + +.mwse-item--active .mwse-item__arrow { color: #60cdff; } + +/* Status badges */ +.mwse-badge { + display: inline-block; + padding: 1px 5px; + border-radius: 3px; + font-size: 10px; + font-weight: 600; + letter-spacing: .03em; +} + +.mwse-badge--live { background: #d32f2f; color: #fff; } +.mwse-badge--ok { background: #2e7d32; color: #fff; } +.mwse-badge--ws { background: #333; color: #aaa; } +.mwse-badge--p2p { background: #1565c0; color: #fff; } + +/* Progress bar (file transfer) */ +.mwse-progress { + margin: 8px 12px; + height: 4px; + background: #333; + border-radius: 2px; +} +.mwse-progress__bar { + height: 100%; + background: #0078d4; + border-radius: 2px; + transition: width 200ms; +} + +/* Action buttons inside columns */ +.mwse-col__actions { + padding: 8px 12px; + border-top: 1px solid #2a2a2a; + display: flex; + gap: 6px; + flex-shrink: 0; +} + +.mwse-btn { + flex: 1; + padding: 5px 8px; + background: #2a2a2a; + border: 1px solid #3a3a3a; + border-radius: 4px; + color: #ccc; + font-size: 11px; + cursor: pointer; + text-align: center; +} +.mwse-btn:hover { background: #333; color: #fff; } +.mwse-btn--primary { background: #0d47a1; border-color: #1565c0; color: #fff; } +.mwse-btn--primary:hover { background: #1565c0; } +.mwse-btn--danger { background: #4a1a1a; border-color: #7b2020; color: #f88; } +.mwse-btn--danger:hover { background: #7b2020; } + +/* Slider for quality controls */ +.mwse-slider-row { + display: flex; + align-items: center; + gap: 8px; + padding: 4px 12px; +} + +.mwse-slider-row label { + font-size: 11px; + color: #888; + flex-shrink: 0; + width: 64px; +} + +.mwse-slider-row input[type=range] { + flex: 1; + accent-color: #0078d4; +} + +.mwse-slider-row span { + font-size: 11px; + color: #bbb; + width: 40px; + text-align: right; +} + +/* Video preview thumbnail */ +.mwse-preview { + width: 100%; + aspect-ratio: 16/9; + background: #111; + border-top: 1px solid #2a2a2a; + object-fit: contain; +} diff --git a/sdk/webrtc/DataChannel.js b/sdk/webrtc/DataChannel.js new file mode 100644 index 0000000..682bcee --- /dev/null +++ b/sdk/webrtc/DataChannel.js @@ -0,0 +1,91 @@ +// Primary RTCDataChannel manager. +// +// - Messages queued while the channel is not yet open. +// - If the channel closes while the RTCPeerConnection is still 'connected', +// it is recreated automatically (brief-disconnect recovery). +// - Both sides can safely call initiate() — the one that creates the channel +// triggers onnegotiationneeded; the other side receives ondatachannel. +// Use { negotiated: true, id: 0 } if you want to skip the SDP dance. +import MWSEEventTarget from '../EventTarget.js'; + +export default class DataChannel extends MWSEEventTarget { + constructor(pc, label = 'mwse') { + super(); + this._pc = pc; + this._label = label; + this._ch = null; + this._queue = []; + this._open = false; + + // Accept channels offered by the remote side. + pc.on('datachannel', ch => { + if (ch.label === this._label) this._attach(ch); + }); + } + + // Create the channel on this side (makes us the "offering" side for the + // channel SDP line; triggers onnegotiationneeded). + initiate() { + if (this._ch?.readyState === 'open' || this._ch?.readyState === 'connecting') return; + const ch = this._pc.createDataChannel(this._label, { ordered: true }); + this._attach(ch); + return ch; + } + + // Send any JSON-serialisable value or an ArrayBuffer. + send(data) { + const wire = (typeof data === 'string' || data instanceof ArrayBuffer) + ? data + : JSON.stringify(data); + + if (this._open) { + this._ch.send(wire); + } else { + this._queue.push(wire); + } + } + + get open() { return this._open; } + get state() { return this._ch?.readyState ?? 'closed'; } + + close() { + this._ch?.close(); + this._ch = null; + this._open = false; + } + + // ---- Internal ------------------------------------------------------- + + _attach(ch) { + this._ch = ch; + ch.binaryType = 'arraybuffer'; + + ch.onopen = () => { + this._open = true; + this.emit('open'); + for (const msg of this._queue) ch.send(msg); + this._queue = []; + }; + + ch.onmessage = ({ data }) => { + let parsed = data; + if (typeof data === 'string') { + try { parsed = JSON.parse(data); } catch (_) {} + } + this.emit('message', parsed); + }; + + ch.onclose = () => { + this._open = false; + this.emit('close'); + // Auto-recreate the channel if the connection is still alive. + // (Channels can close without the full connection closing, e.g. on + // some mobile browsers when the page is backgrounded briefly.) + if (this._pc.connectionState === 'connected') { + setTimeout(() => this.initiate(), 800); + } + }; + + ch.onerror = err => this.emit('error', err); + } +} diff --git a/sdk/webrtc/FileSender.js b/sdk/webrtc/FileSender.js new file mode 100644 index 0000000..f927856 --- /dev/null +++ b/sdk/webrtc/FileSender.js @@ -0,0 +1,139 @@ +// P2P file transfer over dedicated RTCDataChannels. +// +// The file is split into PART_SIZE (10 MB) partitions; up to 5 parallel +// channels carry one partition each so large files don't stall. +// Protocol per channel: +// sender → JSON { size, idx } — announce partition size + index +// receiver → 'READY' — ready to receive chunks +// sender → ArrayBuffer chunks — CHUNK_SIZE bytes each +// receiver → 'ACK' — all bytes received, send next part +// sender → close() — no more partitions for this channel +// +// Emits: 'progress'({ sent, total }), 'complete', 'error'(err) +import MWSEEventTarget from '../EventTarget.js'; + +const PART_SIZE = 10 * 1024 * 1024; // 10 MB per partition +const CHUNK_SIZE = 16 * 1024; // 16 KB chunk size + +export default class FileSender extends MWSEEventTarget { + constructor(pc) { + super(); + this._pc = pc; // PeerConnection instance + } + + // Set up a receiver. Must be called before the sender opens channels. + // onFile(blob) is called when all partitions are reassembled. + receive(partCount, onFile) { + const parts = new Array(partCount); + let remaining = partCount; + + this._pc.on('datachannel', ch => { + if (!ch.label.startsWith('mwse/file/')) return; + _receivePartition(ch, parts, () => { + if (--remaining === 0) onFile(new Blob(parts)); + }); + }); + } + + // Send a File or Blob. Returns a Promise that resolves when done. + async send(file) { + const buffer = await file.arrayBuffer(); + const partCount = Math.ceil(buffer.byteLength / PART_SIZE); + const chanCount = Math.min(5, partCount); + + let sent = 0; + let pending = chanCount; + let partIdx = 0; + + const nextPart = () => { + if (partIdx >= partCount) return null; + const start = partIdx * PART_SIZE; + const slice = buffer.slice(start, Math.min(start + PART_SIZE, buffer.byteLength)); + return { data: slice, idx: partIdx++ }; + }; + + return new Promise((resolve, reject) => { + for (let i = 0; i < chanCount; i++) { + const ch = this._pc.createDataChannel(`mwse/file/${i}`); + ch.binaryType = 'arraybuffer'; + + ch.onopen = () => { + const part = nextPart(); + if (!part) { ch.close(); if (--pending === 0) { this.emit('complete'); resolve(); } return; } + _sendPartition(ch, part, nextPart, + bytes => { sent += bytes; this.emit('progress', { sent, total: buffer.byteLength }); }, + () => { ch.close(); if (--pending === 0) { this.emit('complete'); resolve(); } }, + err => reject(err) + ); + }; + ch.onerror = reject; + } + }); + } +} + +// ---- Private helpers -------------------------------------------------------- + +function _receivePartition(ch, parts, onPartDone) { + let metaSize = 0; + let metaIdx = 0; + let chunks = []; + let received = 0; + + ch.onmessage = ({ data }) => { + if (typeof data === 'string') { + // First message per partition: metadata + ({ size: metaSize, idx: metaIdx } = JSON.parse(data)); + ch.send('READY'); + } else { + chunks.push(data); + received += data.byteLength; + if (received >= metaSize) { + parts[metaIdx] = new Blob(chunks); + chunks = []; + received = 0; + ch.send('ACK'); + onPartDone(); + } + } + }; +} + +function _sendPartition(ch, part, nextPart, onProgress, onDone, onError) { + let offset = 0; + + // Announce the partition to the receiver. + ch.send(JSON.stringify({ size: part.data.byteLength, idx: part.idx })); + + const pump = () => { + while (offset < part.data.byteLength) { + // Back off when the buffer is getting full. + if (ch.bufferedAmount > CHUNK_SIZE * 8) return; + const end = Math.min(offset + CHUNK_SIZE, part.data.byteLength); + const chunk = part.data.slice(offset, end); + ch.send(chunk); + onProgress(chunk.byteLength); + offset += chunk.byteLength; + } + }; + + ch.bufferedAmountLowThreshold = CHUNK_SIZE; + ch.onbufferedamountlow = pump; + + ch.onmessage = ({ data }) => { + if (data === 'READY') { + pump(); + } else if (data === 'ACK') { + const next = nextPart(); + if (next) { + part = next; + offset = 0; + ch.send(JSON.stringify({ size: part.data.byteLength, idx: part.idx })); + } else { + onDone(); + } + } + }; + + ch.onerror = onError; +} diff --git a/sdk/webrtc/MediaSources.js b/sdk/webrtc/MediaSources.js new file mode 100644 index 0000000..5209118 --- /dev/null +++ b/sdk/webrtc/MediaSources.js @@ -0,0 +1,98 @@ +// Static factory methods for common media sources. +// +// All methods return a MediaStream so they can be passed directly to +// StreamManager.addStream(label, stream). +export const MediaSources = { + + // Single camera (video only by default). + async camera(constraints = {}) { + return navigator.mediaDevices.getUserMedia({ + video: constraints.video ?? true, + audio: constraints.audio ?? false + }); + }, + + // Microphone (audio only). + async microphone(constraints = {}) { + return navigator.mediaDevices.getUserMedia({ + audio: constraints.audio ?? true, + video: false + }); + }, + + // Camera + microphone together. + async cameraAndMic(constraints = {}) { + return navigator.mediaDevices.getUserMedia({ + video: constraints.video ?? true, + audio: constraints.audio ?? true + }); + }, + + // Screen / window / tab sharing via getDisplayMedia. + async screen(constraints = {}) { + return navigator.mediaDevices.getDisplayMedia({ + video: constraints.video ?? { cursor: 'always' }, + audio: constraints.audio ?? false + }); + }, + + // Enumerate available media devices. + async devices() { + const all = await navigator.mediaDevices.enumerateDevices(); + return { + cameras: all.filter(d => d.kind === 'videoinput'), + microphones: all.filter(d => d.kind === 'audioinput'), + speakers: all.filter(d => d.kind === 'audiooutput') + }; + }, + + // Play a pre-decoded AudioBuffer as a continuous MediaStream. + // Useful for replacing the live microphone with an audio file. + // Returns the MediaStream; the AudioContext is attached as ._ctx for cleanup. + fromAudioBuffer(audioBuffer, { loop = false } = {}) { + const ctx = new AudioContext(); + const src = ctx.createBufferSource(); + src.buffer = audioBuffer; + src.loop = loop; + const dest = ctx.createMediaStreamDestination(); + src.connect(dest); + src.start(); + const stream = dest.stream; + stream._ctx = ctx; // caller can close() to free resources + stream._src = src; + return stream; + }, + + // Capture a or OffscreenCanvas as a video MediaStream. + fromCanvas(canvas, fps = 30) { + if (typeof canvas.captureStream === 'function') { + return canvas.captureStream(fps); + } + if (typeof canvas.transferControlToOffscreen === 'function') { + throw new Error( + 'fromCanvas: pass the OffscreenCanvas, not the original canvas, ' + + 'or use fromOffscreenCanvas()' + ); + } + throw new Error('fromCanvas: captureStream() not supported on this element'); + }, + + // Create an AudioContext-based mixing bus and capture its output as a + // MediaStream. Useful for compositing multiple audio sources before sending. + // Returns { ctx, dest, stream } — add nodes to ctx and connect to dest. + createAudioMix() { + const ctx = new AudioContext(); + const dest = ctx.createMediaStreamDestination(); + return { ctx, dest, stream: dest.stream }; + }, + + // Fetch and decode an audio file URL into an AudioBuffer. + async loadAudioFile(url) { + const ctx = new AudioContext(); + const resp = await fetch(url); + const ab = await resp.arrayBuffer(); + const buf = await ctx.decodeAudioData(ab); + await ctx.close(); + return buf; // pass to fromAudioBuffer() + } +}; diff --git a/sdk/webrtc/Negotiator.js b/sdk/webrtc/Negotiator.js new file mode 100644 index 0000000..d73c901 --- /dev/null +++ b/sdk/webrtc/Negotiator.js @@ -0,0 +1,117 @@ +// Perfect-negotiation (RFC 8829 §5.2) for WebRTC offer/answer exchange. +// +// Each peer is either "polite" or "impolite". When both peers generate an +// offer simultaneously (collision), the polite peer rolls back and accepts +// the impolite peer's offer. Politeness is decided by the caller — typically +// by comparing the two socket IDs so exactly one side is polite per pair. +// +// ICE candidates are queued until the remote description is in place so they +// are never applied to an uninitialised peer connection. +export default class Negotiator { + constructor(pc, { polite, onSend }) { + this._pc = pc; // PeerConnection wrapper + this.polite = polite; + this._send = onSend; // fn(signalingObject) → relays to remote peer + + this._pendingCandidates = []; + this._settingRemote = false; + this._makingOffer = false; + this._ignoreOffer = false; + } + + // Attach to PeerConnection events. Returns `this` for chaining. + attach() { + this._pc.on('negotiation-needed', () => this._offer()); + this._pc.on('ice-candidate', cand => { + // null candidate = end-of-candidates; always forward. + this._send({ type: 'ice', candidate: cand }); + }); + return this; + } + + // Handle an incoming signaling message from the remote peer. + // msg.type: 'offer' | 'answer' | 'ice' + async receive(msg) { + try { + if (msg.type === 'offer') await this._onOffer(msg.sdp); + else if (msg.type === 'answer') await this._onAnswer(msg.sdp); + else if (msg.type === 'ice') await this._onICE(msg.candidate); + } catch (err) { + // Surface on caller — the engine will re-emit as 'error'. + throw err; + } + } + + // ---- Internal ------------------------------------------------------- + + async _offer() { + if (this._makingOffer) return; + this._makingOffer = true; + try { + // Modern browsers support the implicit description form. + await this._pc.setLocalDescription(); + this._send({ type: 'offer', sdp: this._pc.localDescription.sdp }); + } catch (err) { + // setLocalDescription can fail if the signaling state rolled back + // under us; safe to ignore since the other side's offer will win. + if (this.polite) return; + throw err; + } finally { + this._makingOffer = false; + } + } + + async _onOffer(sdp) { + const collision = this._makingOffer || this._pc.signalingState !== 'stable'; + this._ignoreOffer = !this.polite && collision; + if (this._ignoreOffer) return; + + this._settingRemote = true; + try { + // Polite peer: rollback own offer, accept incoming. + if (this._pc.signalingState !== 'stable') { + await Promise.all([ + this._pc.setLocalDescription({ type: 'rollback' }), + this._pc.setRemoteDescription({ type: 'offer', sdp }) + ]); + } else { + await this._pc.setRemoteDescription({ type: 'offer', sdp }); + } + } finally { + this._settingRemote = false; + } + + await this._pc.setLocalDescription(); // implicit answer + this._send({ type: 'answer', sdp: this._pc.localDescription.sdp }); + await this._flushCandidates(); + } + + async _onAnswer(sdp) { + if (this._pc.signalingState === 'stable') return; // already settled + await this._pc.setRemoteDescription({ type: 'answer', sdp }); + await this._flushCandidates(); + } + + async _onICE(candidate) { + if (!candidate) return; // end-of-candidates + const ready = this._pc.remoteDescription && !this._settingRemote; + if (!ready) { + this._pendingCandidates.push(candidate); + return; + } + try { + await this._pc.addIceCandidate(candidate); + } catch (err) { + // Silently drop if we're ignoring an offer collision on the + // impolite side — those candidates belong to the rolled-back offer. + if (!this._ignoreOffer) throw err; + } + } + + async _flushCandidates() { + for (const c of this._pendingCandidates) { + await this._pc.addIceCandidate(c).catch(() => {}); + } + this._pendingCandidates = []; + } +} diff --git a/sdk/webrtc/PeerConnection.js b/sdk/webrtc/PeerConnection.js new file mode 100644 index 0000000..57b7426 --- /dev/null +++ b/sdk/webrtc/PeerConnection.js @@ -0,0 +1,122 @@ +// RTCPeerConnection wrapper with comprehensive state monitoring. +// Fixes the original SDK's missed-disconnect bug by subscribing to BOTH +// connectionstatechange (the modern event) and iceconnectionstatechange +// (the legacy fallback) and never letting either go unobserved. +// +// Emits: 'connected', 'disconnected', 'failed', +// 'state-change'(state), 'ice-state-change'(state), +// 'gathering-change'(state), 'signaling-change'(state), +// 'ice-candidate'(candidate|null), 'track'(track, streams), +// 'datachannel'(channel), 'negotiation-needed' +import MWSEEventTarget from '../EventTarget.js'; + +export default class PeerConnection extends MWSEEventTarget { + constructor(config) { + super(); + this._pc = new RTCPeerConnection(config); + this._wire(); + } + + // ---- RTCPeerConnection passthrough ---------------------------------- + + get pc() { return this._pc; } + get connectionState() { return this._pc.connectionState; } + get iceConnectionState() { return this._pc.iceConnectionState; } + get signalingState() { return this._pc.signalingState; } + get iceGatheringState() { return this._pc.iceGatheringState; } + get localDescription() { return this._pc.localDescription; } + get remoteDescription() { return this._pc.remoteDescription; } + + addTrack(track, ...streams) { return this._pc.addTrack(track, ...streams); } + removeTrack(sender) { this._pc.removeTrack(sender); } + async createOffer(opts) { return this._pc.createOffer(opts); } + async createAnswer() { return this._pc.createAnswer(); } + async setLocalDescription(d) { await this._pc.setLocalDescription(d); } + async setRemoteDescription(d){ await this._pc.setRemoteDescription(d); } + + async addIceCandidate(c) { + if (!c) return; + await this._pc.addIceCandidate(c); + } + + createDataChannel(label, opts) { + return this._pc.createDataChannel(label, opts); + } + + async restartIce() { + if (typeof this._pc.restartIce === 'function') { + this._pc.restartIce(); + } else { + const offer = await this._pc.createOffer({ iceRestart: true }); + await this._pc.setLocalDescription(offer); + } + } + + close() { + const pc = this._pc; + pc.onconnectionstatechange = null; + pc.oniceconnectionstatechange = null; + pc.onicegatheringstatechange = null; + pc.onsignalingstatechange = null; + pc.onicecandidate = null; + pc.ontrack = null; + pc.ondatachannel = null; + pc.onnegotiationneeded = null; + pc.close(); + } + + // ---- Internal event wiring ------------------------------------------ + + _wire() { + const pc = this._pc; + + // connectionstatechange is the primary signal on modern browsers. + // It reliably reports 'failed' and 'closed' — states that + // iceconnectionstatechange can miss under certain NAT conditions. + pc.onconnectionstatechange = () => { + const s = pc.connectionState; + this.emit('state-change', s); + if (s === 'connected') this.emit('connected'); + if (s === 'disconnected' || s === 'closed') this.emit('disconnected'); + if (s === 'failed') this.emit('failed'); + }; + + // Keep iceconnectionstatechange as a fallback for environments where + // connectionstatechange never fires (some mobile browsers, old Chrome). + pc.oniceconnectionstatechange = () => { + const s = pc.iceConnectionState; + this.emit('ice-state-change', s); + + // Only act if connectionState hasn't given us a verdict yet. + if (pc.connectionState === 'new' || pc.connectionState === 'connecting') { + if (s === 'connected' || s === 'completed') this.emit('connected'); + if (s === 'disconnected') this.emit('disconnected'); + if (s === 'failed') this.emit('failed'); + } + }; + + pc.onicegatheringstatechange = () => { + this.emit('gathering-change', pc.iceGatheringState); + }; + + pc.onsignalingstatechange = () => { + this.emit('signaling-change', pc.signalingState); + }; + + pc.onicecandidate = ({ candidate }) => { + this.emit('ice-candidate', candidate); + }; + + pc.ontrack = ({ track, streams }) => { + this.emit('track', track, streams); + }; + + pc.ondatachannel = ({ channel }) => { + this.emit('datachannel', channel); + }; + + pc.onnegotiationneeded = () => { + this.emit('negotiation-needed'); + }; + } +} diff --git a/sdk/webrtc/StreamManager.js b/sdk/webrtc/StreamManager.js new file mode 100644 index 0000000..77ab8d5 --- /dev/null +++ b/sdk/webrtc/StreamManager.js @@ -0,0 +1,112 @@ +// Manages named media sources on an RTCPeerConnection. +// +// Each "source" has a label (e.g. 'camera', 'screen', 'mic') and maps to +// one MediaStream plus the RTCRtpSenders the peer connection created for it. +// +// Usage: +// const sm = new StreamManager(peerConnection); +// sm.addStream('camera', await navigator.mediaDevices.getUserMedia({ video: true })); +// await sm.replaceTrack('camera', newVideoTrack); // hot-swap, no renegotiation +// await sm.setEncodings('camera', 'video', { maxBitrate: 2_000_000 }); +// sm.removeStream('camera'); // stops tracks + renegotiates +export default class StreamManager { + constructor(pc) { + this._pc = pc; // PeerConnection instance + this._sources = new Map(); // label → { stream, senders } + } + + // Add all tracks from a MediaStream. Triggers renegotiation. + addStream(label, stream) { + if (this._sources.has(label)) { + throw new Error(`StreamManager: "${label}" already exists — call removeStream first`); + } + const senders = []; + for (const track of stream.getTracks()) { + const sender = this._pc.addTrack(track, stream); + senders.push(sender); + } + this._sources.set(label, { stream, senders }); + return senders; + } + + // Swap one track inside an existing source by kind ('video' | 'audio'). + // Uses RTCRtpSender.replaceTrack — no renegotiation needed. + async replaceTrack(label, newTrack) { + const src = this._sources.get(label); + if (!src) throw new Error(`StreamManager: source "${label}" not found`); + for (const sender of src.senders) { + if (sender.track?.kind === newTrack.kind) { + await sender.replaceTrack(newTrack); + return sender; + } + } + throw new Error(`StreamManager: no ${newTrack.kind} sender in "${label}"`); + } + + // Remove a source: stops all its tracks and calls removeTrack (renegotiates). + removeStream(label) { + const src = this._sources.get(label); + if (!src) return; + for (const sender of src.senders) { + try { this._pc.removeTrack(sender); } catch (_) {} + } + for (const track of src.stream.getTracks()) { + track.stop(); + } + this._sources.delete(label); + } + + // Adjust encoding parameters (bitrate, framerate, resolution scale) for a + // specific source+kind. RTCRtpSender.setParameters is used — no renegotiation. + // + // params: { maxBitrate, maxFramerate, scaleResolutionDownBy, ... } + async setEncodings(label, kind, params) { + const src = this._sources.get(label); + if (!src) return; + for (const sender of src.senders) { + if (sender.track?.kind !== kind) continue; + const p = sender.getParameters(); + if (!p.encodings?.length) p.encodings = [{}]; + p.encodings = p.encodings.map(enc => ({ ...enc, ...params })); + await sender.setParameters(p); + return; + } + } + + // Mute/unmute a track by label+kind (local-only, no signaling). + setEnabled(label, kind, enabled) { + const src = this._sources.get(label); + if (!src) return; + for (const track of src.stream.getTracks()) { + if (track.kind === kind) track.enabled = enabled; + } + } + + // Return a snapshot of all managed sources. + list() { + const out = []; + for (const [label, { stream, senders }] of this._sources) { + out.push({ + label, + tracks: stream.getTracks().map(t => ({ + id: t.id, + kind: t.kind, + enabled: t.enabled, + label: t.label + })), + senders: senders.length + }); + } + return out; + } + + has(label) { return this._sources.has(label); } + + // Get the MediaStream for a label. + stream(label) { return this._sources.get(label)?.stream ?? null; } + + // Stop all sources and clear the map. + destroy() { + for (const [label] of this._sources) this.removeStream(label); + } +} diff --git a/sdk/webrtc/index.js b/sdk/webrtc/index.js new file mode 100644 index 0000000..9cf0259 --- /dev/null +++ b/sdk/webrtc/index.js @@ -0,0 +1,223 @@ +// RTCEngine — the main WebRTC entry point for the MWSE SDK. +// +// Drop-in replacement for the old WebRTC.js stub. Peer.js still imports +// from WebRTC.js which re-exports this class, so no change is needed there. +// +// Architecture: +// RTCEngine — coordinates all sub-systems, surfaces public API +// ├── PeerConnection — RTCPeerConnection wrapper with full event coverage +// ├── Negotiator — perfect-negotiation (offer/answer/ICE, no collisions) +// ├── StreamManager — named media sources (addStream/replaceTrack/setEncodings) +// ├── DataChannel — primary data channel with auto-reconnect + message queue +// └── FileSender — multi-channel parallel file transfer +// +// Signaling relay: +// Incoming → Peer.js emits 'input' on the engine → receive(msg) → Negotiator +// Outgoing → Negotiator emits 'output' on the engine → Peer.js relays to server +// +// ICE restart: automatic exponential backoff on 'failed' (1 s → 2 s → 4 s). +import MWSEEventTarget from '../EventTarget.js'; +import PeerConnection from './PeerConnection.js'; +import Negotiator from './Negotiator.js'; +import StreamManager from './StreamManager.js'; +import DataChannel from './DataChannel.js'; +import FileSender from './FileSender.js'; +export { MediaSources } from './MediaSources.js'; +export { default as StreamManager } from './StreamManager.js'; +export { default as FileSender } from './FileSender.js'; + +const DEFAULT_ICE = [{ urls: 'stun:stun.l.google.com:19302' }]; +const RESTART_DELAYS = [1000, 2000, 4000]; // ms, exponential backoff + +export default class RTCEngine extends MWSEEventTarget { + constructor(config) { + super(); + this._config = config; + + // Public state — Peer.js reads these. + this.active = false; + this.peer = null; // set by Peer.js + this.connectionStatus = 'new'; + this.iceStatus = 'new'; + this.gatheringStatus = 'new'; + this.signalingStatus = ''; + + // Sub-system references (null until connect() is called). + this._pc = null; + this._neg = null; + this._streams = null; + this._data = null; + this._files = null; + this._restartN = 0; + + // Peer.js legacy: `this.rtc.channel` for direct DataChannel access. + this.channel = null; + + // Bridge the 'input' event (from Peer.js) → receive(). + this.on('input', msg => this.receive(msg)); + } + + // ---- Connection lifecycle ------------------------------------------- + + // Connect to the remote peer. Call after the pair has been established. + // + // polite: true → this side yields on offer collision (accept incoming offer) + // polite: false → this side wins on collision (the impolite peer) + // + // Typically set polite = (mySocketId < peerSocketId) so exactly one side + // is polite per pair without any extra signaling. + connect({ polite = false, iceServers } = {}) { + if (this._pc) return this; // already connected + + const cfg = this._config ?? { iceServers: iceServers ?? DEFAULT_ICE }; + this._pc = new PeerConnection(cfg); + + this._neg = new Negotiator(this._pc, { + polite, + onSend: msg => this.emit('output', msg) + }).attach(); + + this._streams = new StreamManager(this._pc); + + this._data = new DataChannel(this._pc, 'mwse'); + this.channel = this._data; + + this._files = new FileSender(this._pc); + + // The side with polite=false initiates the primary data channel + // (creates the SDP line), which triggers onnegotiationneeded and + // kicks off the offer/answer exchange. + if (!polite) { + this._data.initiate(); + } + + // State events. + this._pc.on('state-change', s => { + this.connectionStatus = s; + this.emit('state-change', s); + }); + this._pc.on('connected', () => { + this.active = true; + this.iceStatus = 'connected'; + this._restartN = 0; + this.emit('connected'); + }); + this._pc.on('disconnected', () => { + this.active = false; + this.emit('disconnected'); + }); + this._pc.on('failed', () => { + this.active = false; + this.emit('failed'); + this._scheduleRestart(); + }); + this._pc.on('ice-state-change', s => { this.iceStatus = s; }); + this._pc.on('gathering-change', s => { this.gatheringStatus = s; }); + this._pc.on('signaling-change', s => { this.signalingStatus = s; }); + this._pc.on('track', (track, streams) => this.emit('track', track, streams)); + + this._data.on('open', () => this.emit('channel-open')); + this._data.on('message', msg => this.emit('message', msg)); + this._data.on('close', () => this.emit('channel-close')); + + return this; + } + + // ---- Signaling ------------------------------------------------------ + + // Handle an incoming signaling payload (relayed as ':rtcpack:' by Peer.js). + async receive(msg) { + if (!this._neg) return; + try { + await this._neg.receive(msg); + } catch (err) { + this.emit('error', err); + } + } + + // Compatibility shim: Peer.js may call rtc.send() for outgoing signaling. + send(data) { this.emit('output', data); } + + // ---- Media (StreamManager proxy) ------------------------------------ + + get streams() { return this._streams; } + + // Add a named MediaStream. Triggers renegotiation. + addStream(label, stream) { + if (!this._streams) throw new Error('RTCEngine: call connect() first'); + return this._streams.addStream(label, stream); + } + + // Swap one track inside a named source by kind ('video'|'audio'). + // No renegotiation — instant hot-swap. + async replaceTrack(label, newTrack) { + return this._streams?.replaceTrack(label, newTrack); + } + + // Remove a named source (stops tracks and renegotiates). + removeStream(label) { + this._streams?.removeStream(label); + } + + // Adjust bitrate / framerate / scale for an encoding on a named source. + async setEncodings(label, kind, params) { + return this._streams?.setEncodings(label, kind, params); + } + + // Mute/unmute a track by label and kind. + setEnabled(label, kind, enabled) { + this._streams?.setEnabled(label, kind, enabled); + } + + // ---- DataChannel ---------------------------------------------------- + + sendMessage(data) { this._data?.send(data); } + + // ---- File transfer (FileSender proxy) ------------------------------- + + get files() { return this._files; } + + // Send a File/Blob to the remote peer over parallel DataChannels. + async sendFile(file) { + if (!this._files) throw new Error('RTCEngine: call connect() first'); + return this._files.send(file); + } + + // Prepare to receive a file split into partCount partitions. + // onFile(blob) is called when fully assembled. + receiveFile(partCount, onFile) { + this._files?.receive(partCount, onFile); + } + + // ---- Lifecycle ------------------------------------------------------ + + destroy() { + this._streams?.destroy(); + this._data?.close(); + this._pc?.close(); + this._pc = null; + this._neg = null; + this._streams = null; + this._data = null; + this._files = null; + this.channel = null; + this.active = false; + this.connectionStatus = 'closed'; + this.emit('disconnected'); + } + + // ---- ICE restart ---------------------------------------------------- + + _scheduleRestart() { + const delay = RESTART_DELAYS[Math.min(this._restartN, RESTART_DELAYS.length - 1)]; + this._restartN++; + setTimeout(async () => { + if (!this._pc || this.connectionStatus === 'closed') return; + try { + await this._pc.restartIce(); + } catch (err) { + this.emit('error', err); + } + }, delay); + } +}