From 835f0b5f2eef12052e056426f1ae6106ada5fe6e Mon Sep 17 00:00:00 2001 From: abdussamedulutas Date: Wed, 17 Jun 2026 07:09:36 +0300 Subject: [PATCH] =?UTF-8?q?Go=20engine=20portu=20(0.1.0=20=C3=A7ekirdek)?= =?UTF-8?q?=20=E2=80=94=20Node.js=20engine'i=20race-free=20Go'ya=20ta?= =?UTF-8?q?=C5=9F=C4=B1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit MWSE engine'i (Source/) performans odaklı, eşzamanlılık-güvenli bir Go projesine taşır. WSTS tel sözleşmesi (SDK giriş/çıkış) korunur; frontend dokunulmadan çalışmaya devam eder. - internal/protocol: WSTS encode/decode (request/response/stream/signal) - internal/ws: Client (bağlantı-başına tek-yazıcı), Room (RWMutex + snapshot broadcast), Hub (kayıt + router + event bus), Server (yaşam döngüsü, saQut heartbeat). #22 "ayrılırken-yazma" race'i yapısal olarak çözüldü. - internal/services: YourID, Session, Auth, Room, IPPressure, DataTransfer portu (Node'daki bariz bug'lar düzeltildi; tel şekilleri korundu). - internal/config, internal/httpserver: env config, statik + /api + graceful shutdown. - loadtest/: ayrı modül — ping/relay yük testi + benchmark istemcisi. go build/vet/test -race ./... yeşil. TestLeaveWhileSendRace regresyonu temiz. Uçtan uca doğrulandı: ping ~140k req/s (p50 ~200µs, 0 hata), relay ~190k msg/s (%98.5 teslim). İnsan onayına bırakılanlar REVIEW.md'de; kararlar decisions.md'de; durum PORT-PROGRESS.md'de. Hiçbir issue kapatılmadı, stable'a dokunulmadı, deploy/push yapılmadı. Co-Authored-By: Claude Opus 4.8 --- .gitignore | 13 ++ PORT-PROGRESS.md | 73 ++++++ REVIEW.md | 35 +++ decisions.md | 33 +++ go.mod | 5 + go.sum | 2 + internal/config/config.go | 71 ++++++ internal/httpserver/api.go | 220 ++++++++++++++++++ internal/httpserver/httpserver.go | 95 ++++++++ internal/httpserver/util.go | 26 +++ internal/protocol/protocol.go | 194 ++++++++++++++++ internal/protocol/protocol_test.go | 139 ++++++++++++ internal/services/auth.go | 214 ++++++++++++++++++ internal/services/datatransfer.go | 110 +++++++++ internal/services/ippressure.go | 277 +++++++++++++++++++++++ internal/services/room.go | 281 +++++++++++++++++++++++ internal/services/services.go | 70 ++++++ internal/services/services_test.go | 261 ++++++++++++++++++++++ internal/services/session.go | 33 +++ internal/services/yourid.go | 11 + internal/testutil/fakeconn.go | 105 +++++++++ internal/ws/client.go | 346 +++++++++++++++++++++++++++++ internal/ws/conn.go | 43 ++++ internal/ws/hub.go | 208 +++++++++++++++++ internal/ws/room.go | 253 +++++++++++++++++++++ internal/ws/server.go | 186 ++++++++++++++++ internal/ws/ws_test.go | 271 ++++++++++++++++++++++ loadtest/client.go | 182 +++++++++++++++ loadtest/go.mod | 5 + loadtest/go.sum | 2 + loadtest/main.go | 164 ++++++++++++++ loadtest/stats.go | 42 ++++ main.go | 61 +++++ 33 files changed, 4031 insertions(+) create mode 100644 PORT-PROGRESS.md create mode 100644 REVIEW.md create mode 100644 decisions.md create mode 100644 go.mod create mode 100644 go.sum create mode 100644 internal/config/config.go create mode 100644 internal/httpserver/api.go create mode 100644 internal/httpserver/httpserver.go create mode 100644 internal/httpserver/util.go create mode 100644 internal/protocol/protocol.go create mode 100644 internal/protocol/protocol_test.go create mode 100644 internal/services/auth.go create mode 100644 internal/services/datatransfer.go create mode 100644 internal/services/ippressure.go create mode 100644 internal/services/room.go create mode 100644 internal/services/services.go create mode 100644 internal/services/services_test.go create mode 100644 internal/services/session.go create mode 100644 internal/services/yourid.go create mode 100644 internal/testutil/fakeconn.go create mode 100644 internal/ws/client.go create mode 100644 internal/ws/conn.go create mode 100644 internal/ws/hub.go create mode 100644 internal/ws/room.go create mode 100644 internal/ws/server.go create mode 100644 internal/ws/ws_test.go create mode 100644 loadtest/client.go create mode 100644 loadtest/go.mod create mode 100644 loadtest/go.sum create mode 100644 loadtest/main.go create mode 100644 loadtest/stats.go create mode 100644 main.go diff --git a/.gitignore b/.gitignore index 7c43573..17adc80 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,16 @@ +# ---> Gitea CLI kimlik bilgisi (ASLA commit etme) +.gitea-auth.json + +# ---> Go (engine rewrite) +/mwse +/mwse-engine +/mwse-loadtest +loadtest/mwse-loadtest +*.out +*.test +go.work +go.work.sum + # ---> Node # Logs logs diff --git a/PORT-PROGRESS.md b/PORT-PROGRESS.md new file mode 100644 index 0000000..bae934c --- /dev/null +++ b/PORT-PROGRESS.md @@ -0,0 +1,73 @@ +# PORT-PROGRESS — MWSE Node.js → Go Çekirdek Portu + +> Bu oturumun çıktısı. Branch: `go-rewrite`. `stable`'a dokunulmadı, deploy yapılmadı, hiçbir issue kapatılmadı. Yarın insan incelemesi için yazıldı. + +## TL;DR + +MWSE engine (Node.js `Source/`) **performans odaklı, race-free bir Go projesine** taşındı. SDK ile konuşulan **WSTS tel formatı (giriş/çıkış sözleşmesi) korundu**. Frontend (`frontend/`) hiç dokunulmadan yerinde duruyor. İkinci bir Go projesi (`loadtest/`) yük testi + benchmark için yazıldı. + +- `go build ./...` ✅ · `go vet ./...` ✅ · `go test -race ./...` ✅ +- Uçtan uca doğrulandı: engine + loadtest birlikte çalıştırıldı. + - **ping** modu: ~140k istek/sn, p50 ~200µs, 0 hata. + - **relay** modu: ~190k mesaj/sn, %98.5 teslim, bağlantı çökmesi yok. + +## Klasör yapısı (karar: Go repo kökünde) + +``` +go.mod # modül: git.saqut.com/saqut/mwse +main.go # giriş noktası, graceful shutdown, sinyal yönetimi +internal/ + protocol/ # WSTS tel formatı (encode/decode) — DONMUŞ sözleşme + ws/ # çekirdek: Client, Room, Hub, Server (concurrency burada) + services/ # handler portu: YourID, Session, Auth, Room, IPPressure, DataTransfer + config/ # env tabanlı yapılandırma + httpserver/ # HTTP yüzeyi: WS upgrade + statik + /api + graceful shutdown + testutil/ # testler için in-memory sahte bağlantı (FakeConn) +loadtest/ # AYRI Go modülü: yük/benchmark istemcisi (ping + relay) +frontend/ # DOKUNULMADI (TS SDK, parcel ile derleniyor) +Source/ # eski Node.js engine — referans olarak bırakıldı +``` + +## Concurrency modeli (#22 — ASIL SEBEP) + +Node'daki "biri odadan ayrılırken başka thread o peer'e yazınca race" sorunu, Go'da **iki katmanlı bir garantiyle** kökten çözüldü: + +1. **Bağlantı başına TEK yazıcı goroutine.** Sokete yazan tek şey `writePump`'tır. Üreticiler sokete asla dokunmaz; mesajı `outbound` kanalına koyar. Böylece gorilla'nın "aynı anda tek yazıcı" kuralı yapısal olarak garanti edilir, eşzamanlı yazma imkânsızdır. +2. **`Send` her zaman `done` kanalını da seçer.** Bir gönderim, kapanmakta olan bir peer ile yarışırsa panik/race yerine **sessizce düşürülür**. Bu, "ayrılırken-yazma" senaryosunu güvenli kılan tam noktadır. +3. **Paylaşılan durum kilitli.** `Hub` (client/room kayıtları), `Room` (üyelik) ve `Client` (info/store/pairs/rooms) her biri kendi `sync.RWMutex`'i ile korunur. `Room.Broadcast` üyeleri kilit altında **snapshot**'lar, sonra kilit olmadan gönderir → ne deadlock ne race. + +Neden actor değil de RWMutex + tek-yazıcı? → `decisions.md`. Özet: gerçek race zaten (1)+(2) ile çözülüyor; RWMutex versiyonu yeni başlayan bir Go geliştiricisi için çok daha okunaklı. Bu seçim `REVIEW.md`'de insan onayına bırakıldı. + +**Regresyon testi:** `internal/ws/ws_test.go → TestLeaveWhileSendRace` — bir odaya 4 goroutine broadcast ederken 30 üye eşzamanlı `Eject`/`Join` yapar; `-race` ile temiz geçer. Node'da çöken senaryonun bire bir karşılığı. + +## Ne yapıldı (issue eşlemesi — SADECE spec olarak okundu, KAPATILMADI) + +- **#21** WS sunucu iskeleti + yaşam döngüsü → `internal/ws/server.go` (upgrade, read loop, heartbeat `saQut` ping/pong, tek-yazıcı writePump). +- **#22** Concurrency modeli → yukarıdaki tasarım + regresyon testi. +- **#23** WSTS protokolü → `internal/protocol/` (request/response/stream/signal, numeric id round-trip dahil). Frontend ile bire bir uyumlu. +- **#24** MessageRouter + Services → `internal/ws/hub.go` (router + event bus) + `internal/services/*`. +- **#25** Config + HTTP + graceful shutdown → `internal/config`, `internal/httpserver`, `main.go`. +- **#26** `go test -race` süreç/yarış testleri → `internal/ws/ws_test.go`, `internal/services/services_test.go`, `internal/protocol/protocol_test.go`. + +## Ne kaldı / sonraki adımlar + +- `/api` kontrol düzleminde **server'ın odaya katılması (join/leave)** ve **webhook** uçları ertelendi (Node'daki sahte-client deseni Go'da farklı tasarlanmalı). Detay → `REVIEW.md`. +- P2P `request/to` + `response/to` zincirinde Node'dan gelen **bir tasarım uyumsuzluğu** var (request, action 'R' ile gönderildiği için sunucu hemen 'E' ile yanıtlıyor; eş yanıtı sonra geliyor). Sadık port yapıldı, sözleşme değiştirilmedi → `REVIEW.md`. +- Binary protokol (2.5.0), WebRTC signaling (1.0.0+), studio (2.0.0) kapsam dışı. +- IPPressure'ın çok-süreçli "canlı panel" IPC'si (`process.send`) tek-node çekirdek için `Announcer` arayüzünün arkasına soyutlandı (varsayılan no-op). Cluster entegrasyonu sonra. + +## Çalıştırma + +```bash +# Engine +go run . # 0.0.0.0:7707 +MWSE_PORT=8080 go run . # env ile yapılandırma + +# Yük testi / benchmark (engine ayaktayken, ayrı modül) +cd loadtest +go run . -mode ping -clients 100 -dur 10s +go run . -mode relay -clients 100 -dur 10s + +# Testler +go test -race ./... +``` diff --git a/REVIEW.md b/REVIEW.md new file mode 100644 index 0000000..1d244a7 --- /dev/null +++ b/REVIEW.md @@ -0,0 +1,35 @@ +# REVIEW.md — İnsan Onayı Gereken Konular + +> Bu maddeler `go-rewrite` branch'inde **uygulandı** ama CLAUDE.md gereği KAPATILMADI / merge edilmedi / deploy edilmedi. İnceleyip karar ver. + +## 1. Concurrency modeli (#22) — ASIL KRİTİK TASARIM + +**Uygulanan:** RWMutex + bağlantı-başına-tek-yazıcı goroutine (`decisions.md` #3, `PORT-PROGRESS.md`). + +**İncelenecek:** Bu, izin verilen iki yoldan biri. Diğeri "per-room owner-goroutine (actor) + channel". Ben okunabilirlik gerekçesiyle RWMutex'i seçtim ve gerçek race'i tek-yazıcı + `done`-seçimli `Send` ile çözdüm. `TestLeaveWhileSendRace` `-race` ile temiz. + +**Karar gereken:** Actor modeli mi istiyorsun, yoksa bu yeterli mi? (Önerim: bu yeterli ve daha sade. Aktör'e geçmek büyük yeniden yazım; bu tasarım 1.0.0 yük profillerinde de ölçeklenir.) + +## 2. P2P request/response zinciri (#23/#24) + +**Durum:** `request/to` + `response/to` Node'a **sadık** portlandı. SDK request'i action 'R' ile gönderdiğinden sunucu anında `[null, id, 'E']` yanıtlıyor; eşin asıl cevabı `response/to` ile sonra geliyor ama o id artık SDK tarafında "tamamlanmış" sayılabilir. + +**İncelenecek:** Bu, Node'da da var olan bir uyumsuzluk. Düzeltmek **ya SDK** (`request/to`'yu fire-and-forget gönder, cevabı ayrı bir id ile beklesin) **ya da** dispatcher'a relay-farkında bir istisna gerektirir — ikisi de "donmuş sözleşme"yi etkiler, o yüzden senin kararın. Feature-parity (1.0.0) işi. + +## 3. `/api` kontrol düzlemi — ertelenen uçlar + +**Uygulandı:** `POST /api/auth/key`, `GET /api/rooms`, `GET /api/clients`, `GET /api/room/{id}`, `POST /api/room/create`, `POST /api/client/{id}/send`, `POST /api/room/{id}/send`. + +**Ertelendi:** `POST /api/room/{id}/join`, `DELETE /api/room/{id}/leave` (sunucunun odaya "sahte client" olarak katılması — Go'da gerçek bir sink-`Client` ile tasarlanmalı), `POST /api/webhook`. Bunlar 0.1.0 çekirdeği için kritik değil. + +## 4. Tel sözleşmesi: latent davranışlar + +`decisions.md`'deki bug düzeltmeleri **mantık** seviyesinde; gönderilen mesaj **isim/şekilleri** korundu. Yine de pairing/davet akışları artık Node'da hiç çalışmayan haliyle değil, **doğru** çalışıyor. Eğer canlıdaki bir istemci bu bozuk davranışa bağımlıysa (pek olası değil) fark oluşabilir. Frontend SDK'nın beklediği yüklerle uyumlu yazıldı. + +## 5. Eski Node kaynağı + +`Source/` ve kök `index.js`/`package.json` referans olarak duruyor. Go portu bunların yerini alıyor. Silme/temizleme senin kararın (ben dokunmadım). + +## Deploy / merge + +`stable`'a merge ve `ws.saqut.com` deploy'u **bilinçli olarak yapılmadı**. Push'u da sana bıraktım (`git push origin go-rewrite`). diff --git a/decisions.md b/decisions.md new file mode 100644 index 0000000..673c039 --- /dev/null +++ b/decisions.md @@ -0,0 +1,33 @@ +# decisions.md — Port Sırasında Alınan Kararlar + +Geri-dönülebilir kararlar burada kayıt altına alındı; CLAUDE.md gereği seçildi, gerekçe yazıldı, devam edildi. + +## Mimari + +1. **Klasör yapısı: Go repo kökünde** (`go.mod` + `main.go` + `internal/`). Kullanıcı onayıyla. `loadtest/` ayrı modül. `frontend/` yerinde. Eski `Source/` referans olarak bırakıldı. +2. **WebSocket kütüphanesi: `gorilla/websocket` v1.5.3.** Kullanıcı onayıyla. En yaygın, en okunaklı; tek-yazıcı hub deseni doğrudan uyuyor. Modül önbelleğinde mevcut olduğundan ağ gerekmedi. +3. **Concurrency: RWMutex + bağlantı-başına-tek-yazıcı (actor yerine).** İzin verilen iki seçenekten (#22) bu seçildi çünkü gerçek race "tek yazıcı + `done` seçimli `Send`" ile zaten çözülüyor; RWMutex versiyonu yeni Go geliştiricisi için belirgin biçimde daha okunaklı. Actor (reply-channel) modeli senkron request/response handler'larına ceremoni ekler. → `REVIEW.md`'de onaya açık. +4. **Modül yolu:** `git.saqut.com/saqut/mwse` (engine), `git.saqut.com/saqut/mwse-loadtest` (yük testi). Mantıksal isim; repo dizininin Türkçe karakterli olması etkilemez. +5. **Dolu outbound buffer politikası: mesajı düşür, bağlantıyı KAPATMA.** İlk versiyon (gorilla hub örneği gibi) yavaş peer'i komple düşürüyordu; yük testinde bu, ani trafik altında zincirleme kopmalara yol açtı (relay'de %92 kayıp). Best-effort relay için doğru politika: tek frame'i düşür, bağlantıyı koru. Gerçekten ölü peer zaten write-deadline ile temizlenir. Düzeltme sonrası teslim %98.5'e çıktı. `Client.Dropped()` ile gözlemlenebilir. + +## Node'daki hataların düzeltilmesi (tel sözleşmesi korunarak) + +Kullanıcı talimatı: "tüm fonksiyonların aynı olması önemli değil; amaç/iş aynı kalsın." Node kaynağındaki bariz bug'lar **doğru çalışacak şekilde** yeniden yazıldı; mesaj isimleri/şekilleri (SDK'nın gördüğü tel) korundu: + +- **Pairing akışı (Auth):** Node'da `accept/pair` yanlış tarafın `pairs` setini kontrol ediyordu (akış asla tamamlanmıyordu). Doğru el sıkışma uygulandı: `request/pair` isteyen tarafı kaydeder + hedefe `request/pair` sinyali; `accept/pair` isteyenin gerçekten istek attığını doğrular + `accepted/pair` sinyali. Frontend'in beklediği `{from, info}` yükü gönderiliyor. +- **`is/reachable`:** Node `otherPeer.pairs.has(to)` (kendi id'sini) kontrol ediyordu — her zaman false. Doğrusu: hedef pairing istemiyorsa VEYA gönderenle eşleşmişse erişilebilir. +- **Davet sistemi (Room `accept/invite-room` / `reject/invite-room`):** Node `Set` üzerinde `.includes`/`.filter` çağırıyordu (çalışmaz, HANDLER_ERROR) ve `joinType=='invite'` kontrolü ters çevrilmişti. Doğru akış: sadece davet odaları, sadece bekleme listesindeki id'ler. +- **`closeroom`:** Node `room.owner === client.id` ile Client nesnesini string'e kıyaslıyordu (her zaman false). Go'da `OwnerID string` tutuluyor; doğru kıyas. +- **`create-room` doğrulaması:** Node tanımsız `CreateRoomValidate` değişkenine referans veriyordu (throw). joi şemasının niyetini taşıyan hafif bir doğrulama yazıldı. (Not: Node'un doğrulama hatasında döndürdüğü `messages` anahtarı —tekil değil— korundu.) +- **`joinroom` invite dalı:** Node davet gönderdikten sonra `NOT-FOUND-ROOM` döndürüyordu (yanıltıcı). `{status:"success", message:"INVITE-REQUESTED"}` ile değiştirildi. +- **`pack/to` pairing kontrolü:** Node `otherPeer.pairs.has(to)` (kendi id'si) kullanıyordu. Doğrusu hedefin gönterene pairing'i: `other.HasPair(c.ID)`. + +## Bilinçli sadakat (Node davranışı korundu) + +- **`request/to` → 'E' yanıtı:** SDK request'i action 'R' ile gönderdiğinden generic dispatcher hemen `[null, id, 'E']` yanıtlar; eş cevabı `response/to` ile sonra gelir. Bu Node ile bire bir aynı (ve aynı uyumsuzluğu taşır). Dispatcher'ı özel-durumlamak tel sözleşmesini değiştirebileceğinden DOKUNULMADI. → `REVIEW.md`. +- **`auth/info` çift gönderim:** Hem pair hem roompair olan bir peer iki `pair/info` alır (Node ile aynı). +- **Heartbeat:** 10sn `saQut` ping; pong "saQut" değilse bağlantı kapanır (Node ile aynı). + +## Session varsayılanları + +`packrecaive`/`packsending`/`notifyPairInfo`/`notifyRoomInfo` varsayılanları **`NewClient` constructor'ında** set ediliyor (listener sırasından bağımsız her zaman mevcut). Session servisinin connect hook'u parite için yine de bunları yeniden uyguluyor. diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..ba9ac01 --- /dev/null +++ b/go.mod @@ -0,0 +1,5 @@ +module git.saqut.com/saqut/mwse + +go 1.26.3 + +require github.com/gorilla/websocket v1.5.3 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..25a9fc4 --- /dev/null +++ b/go.sum @@ -0,0 +1,2 @@ +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= diff --git a/internal/config/config.go b/internal/config/config.go new file mode 100644 index 0000000..58fafbb --- /dev/null +++ b/internal/config/config.go @@ -0,0 +1,71 @@ +// Package config loads engine settings from the environment, replacing the +// hard-coded values in the original config.js / HTTPServer.js. +package config + +import ( + "net" + "os" + "strconv" + "time" +) + +// Config holds all runtime settings. +type Config struct { + Host string // bind address, e.g. "0.0.0.0" + Port int // listen port, default 7707 + + PublicDir string // static assets directory (default "./public") + ScriptDir string // built SDK directory (default "./script") + + ReadHeaderTimeout time.Duration // HTTP read-header timeout + ShutdownTimeout time.Duration // grace period for in-flight work on shutdown + + TermOutput bool // verbose terminal logging (the old `termoutput` flag) +} + +// Load reads configuration from the environment, applying defaults that match the +// original server. Recognised variables: +// +// MWSE_HOST, MWSE_PORT, MWSE_PUBLIC_DIR, MWSE_SCRIPT_DIR, +// MWSE_SHUTDOWN_TIMEOUT (seconds), MWSE_TERM_OUTPUT (1/true) +func Load() Config { + return Config{ + Host: env("MWSE_HOST", "0.0.0.0"), + Port: envInt("MWSE_PORT", 7707), + PublicDir: env("MWSE_PUBLIC_DIR", "./public"), + ScriptDir: env("MWSE_SCRIPT_DIR", "./script"), + ReadHeaderTimeout: 10 * time.Second, + ShutdownTimeout: time.Duration(envInt("MWSE_SHUTDOWN_TIMEOUT", 10)) * time.Second, + TermOutput: envBool("MWSE_TERM_OUTPUT", false), + } +} + +// Addr returns the host:port string for net/http. +func (c Config) Addr() string { + return net.JoinHostPort(c.Host, strconv.Itoa(c.Port)) +} + +func env(key, def string) string { + if v, ok := os.LookupEnv(key); ok && v != "" { + return v + } + return def +} + +func envInt(key string, def int) int { + if v, ok := os.LookupEnv(key); ok { + if n, err := strconv.Atoi(v); err == nil { + return n + } + } + return def +} + +func envBool(key string, def bool) bool { + if v, ok := os.LookupEnv(key); ok { + if b, err := strconv.ParseBool(v); err == nil { + return b + } + } + return def +} diff --git a/internal/httpserver/api.go b/internal/httpserver/api.go new file mode 100644 index 0000000..11fc10b --- /dev/null +++ b/internal/httpserver/api.go @@ -0,0 +1,220 @@ +package httpserver + +import ( + "encoding/json" + "net/http" + "sync" + + "git.saqut.com/saqut/mwse/internal/ws" +) + +// apiKeyStore holds the issued server-to-server API keys. The original kept these +// in a process-local Map; this matches that (keys do not survive a restart). +type apiKeyStore struct { + mu sync.RWMutex + keys map[string]string // key -> domain +} + +func newAPIKeyStore() *apiKeyStore { + return &apiKeyStore{keys: make(map[string]string)} +} + +func (s *apiKeyStore) issue(domain string) string { + key := newToken() + s.mu.Lock() + s.keys[key] = domain + s.mu.Unlock() + return key +} + +func (s *apiKeyStore) domain(key string) (string, bool) { + s.mu.RLock() + d, ok := s.keys[key] + s.mu.RUnlock() + return d, ok +} + +// auth wraps a handler with x-api-key validation, passing the caller's domain on +// the request context-free closure argument. +func (s *apiKeyStore) auth(next func(http.ResponseWriter, *http.Request, string)) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + key := r.Header.Get("x-api-key") + if key == "" { + writeJSON(w, http.StatusUnauthorized, fail("API_KEY_REQUIRED")) + return + } + domain, ok := s.domain(key) + if !ok { + writeJSON(w, http.StatusUnauthorized, fail("INVALID_API_KEY")) + return + } + next(w, r, domain) + } +} + +// registerAPI mounts the /api control-plane routes onto mux. It ports the read +// endpoints and the core server-initiated messaging endpoints from api.js. The +// server-as-room-participant (join/leave) and webhook endpoints are intentionally +// deferred to feature-parity work (see REVIEW.md). +func registerAPI(mux *http.ServeMux, hub *ws.Hub) { + keys := newAPIKeyStore() + + mux.HandleFunc("POST /api/auth/key", func(w http.ResponseWriter, r *http.Request) { + var body struct { + Domain string `json:"domain"` + } + if !decode(w, r, &body) { + return + } + if body.Domain == "" { + writeJSON(w, http.StatusOK, fail("DOMAIN_REQUIRED")) + return + } + writeJSON(w, http.StatusOK, map[string]any{"status": "success", "key": keys.issue(body.Domain)}) + }) + + mux.HandleFunc("GET /api/rooms", func(w http.ResponseWriter, r *http.Request) { + rooms := make([]map[string]any, 0) + for _, room := range hub.Rooms() { + rooms = append(rooms, map[string]any{ + "id": room.ID, + "name": room.Name, + "accessType": room.AccessType, + "joinType": room.JoinType, + "description": room.Description, + "clientCount": room.Size(), + }) + } + writeJSON(w, http.StatusOK, map[string]any{"status": "success", "rooms": rooms}) + }) + + mux.HandleFunc("GET /api/clients", func(w http.ResponseWriter, r *http.Request) { + clients := make([]map[string]any, 0) + for _, c := range hub.Clients() { + clients = append(clients, map[string]any{ + "id": c.ID, + "rooms": c.Rooms(), + "pairs": c.Pairs(), + }) + } + writeJSON(w, http.StatusOK, map[string]any{"status": "success", "clients": clients}) + }) + + mux.HandleFunc("GET /api/room/{id}", func(w http.ResponseWriter, r *http.Request) { + room, ok := hub.Room(r.PathValue("id")) + if !ok { + writeJSON(w, http.StatusOK, fail("ROOM_NOT_FOUND")) + return + } + writeJSON(w, http.StatusOK, map[string]any{"status": "success", "room": room.ToJSON(false)}) + }) + + mux.HandleFunc("POST /api/client/{id}/send", keys.auth(func(w http.ResponseWriter, r *http.Request, domain string) { + var body struct { + Pack any `json:"pack"` + } + if !decode(w, r, &body) { + return + } + client, ok := hub.Client(r.PathValue("id")) + if !ok { + writeJSON(w, http.StatusOK, fail("CLIENT_NOT_FOUND")) + return + } + if body.Pack == nil { + writeJSON(w, http.StatusOK, fail("PACK_REQUIRED")) + return + } + client.Signal("server/pack", map[string]any{"from": "server", "fromServer": domain, "pack": body.Pack}) + writeJSON(w, http.StatusOK, map[string]any{"status": "success"}) + })) + + mux.HandleFunc("POST /api/room/{id}/send", keys.auth(func(w http.ResponseWriter, r *http.Request, domain string) { + var body struct { + Pack any `json:"pack"` + Wom bool `json:"wom"` + } + if !decode(w, r, &body) { + return + } + id := r.PathValue("id") + room, ok := hub.Room(id) + if !ok { + writeJSON(w, http.StatusOK, fail("ROOM_NOT_FOUND")) + return + } + if body.Pack == nil { + writeJSON(w, http.StatusOK, fail("PACK_REQUIRED")) + return + } + room.Broadcast( + "server/pack/room", + map[string]any{"from": "server", "fromServer": domain, "pack": body.Pack, "roomId": id}, + "", + nil, + ) + writeJSON(w, http.StatusOK, map[string]any{"status": "success"}) + })) + + mux.HandleFunc("POST /api/room/create", keys.auth(func(w http.ResponseWriter, r *http.Request, domain string) { + var body struct { + Name string `json:"name"` + AccessType string `json:"accessType"` + JoinType string `json:"joinType"` + Description string `json:"description"` + Credential string `json:"credential"` + } + if !decode(w, r, &body) { + return + } + if body.Name == "" { + writeJSON(w, http.StatusOK, fail("NAME_REQUIRED")) + return + } + if _, exists := hub.RoomByName(body.Name); exists { + writeJSON(w, http.StatusOK, fail("ROOM_ALREADY_EXISTS")) + return + } + room := ws.NewRoom(hub) + room.Name = body.Name + room.AccessType = orDefault(body.AccessType, "public") + room.JoinType = orDefault(body.JoinType, "free") + room.Description = body.Description + room.OwnerID = "server" + if body.Credential != "" { + room.Credential = sha256hex(body.Credential) + } + room.Publish() + writeJSON(w, http.StatusOK, map[string]any{"status": "success", "room": room.ToJSON(false)}) + })) +} + +func orDefault(v, def string) string { + if v == "" { + return def + } + return v +} + +func fail(message string) map[string]any { + return map[string]any{"status": "fail", "message": message} +} + +func writeJSON(w http.ResponseWriter, status int, body any) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + _ = json.NewEncoder(w).Encode(body) +} + +// decode reads a JSON request body, writing a fail response and returning false +// when the body is malformed. +func decode(w http.ResponseWriter, r *http.Request, dst any) bool { + if r.Body == nil { + return true + } + if err := json.NewDecoder(r.Body).Decode(dst); err != nil && err.Error() != "EOF" { + writeJSON(w, http.StatusBadRequest, fail("INVALID_JSON")) + return false + } + return true +} diff --git a/internal/httpserver/httpserver.go b/internal/httpserver/httpserver.go new file mode 100644 index 0000000..23b304f --- /dev/null +++ b/internal/httpserver/httpserver.go @@ -0,0 +1,95 @@ +// Package httpserver assembles the HTTP surface of the engine: the WebSocket +// upgrade endpoint, the static asset routes (the built SDK and the public files), +// and the /api control plane. It mirrors the routing of the original +// HTTPServer.js while adding timeouts and graceful shutdown (#25). +package httpserver + +import ( + "net/http" + "os" + "path/filepath" + "strings" + + "github.com/gorilla/websocket" + + "git.saqut.com/saqut/mwse/internal/config" + "git.saqut.com/saqut/mwse/internal/ws" +) + +// New builds the *http.Server. WebSocket upgrades are detected on ANY path and +// routed to the engine (the SDK derives its endpoint from wherever the script was +// served, so the upgrade may arrive at "/" or "/script/"). All other requests go +// through the static/API mux. +func New(hub *ws.Hub, cfg config.Config) *http.Server { + wsServer := ws.NewServer(hub) + mux := http.NewServeMux() + + registerAPI(mux, hub) + registerStatic(mux, cfg) + + root := func(w http.ResponseWriter, r *http.Request) { + if websocket.IsWebSocketUpgrade(r) { + wsServer.ServeHTTP(w, r) + return + } + mux.ServeHTTP(w, r) + } + + return &http.Server{ + Addr: cfg.Addr(), + Handler: http.HandlerFunc(root), + ReadHeaderTimeout: cfg.ReadHeaderTimeout, + } +} + +// registerStatic wires the asset routes: +// +// - /script -> the built SDK entry (script/index.js) +// - /script/ -> files under the script directory +// - / -> the SDK entry (so a bare visit returns the script) +// - / -> a matching file under the public directory +// - anything else -> the status document (status.xml) +func registerStatic(mux *http.ServeMux, cfg config.Config) { + scriptIndex := filepath.Join(cfg.ScriptDir, "index.js") + statusDoc := filepath.Join(cfg.ScriptDir, "status.xml") + + mux.HandleFunc("/script", func(w http.ResponseWriter, r *http.Request) { + http.ServeFile(w, r, scriptIndex) + }) + mux.Handle("/script/", http.StripPrefix("/script/", http.FileServer(http.Dir(cfg.ScriptDir)))) + + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/" { + http.ServeFile(w, r, scriptIndex) + return + } + if f, ok := safePublicFile(cfg.PublicDir, r.URL.Path); ok { + http.ServeFile(w, r, f) + return + } + http.ServeFile(w, r, statusDoc) + }) +} + +// safePublicFile resolves urlPath to a regular file under publicDir, guarding +// against path traversal. It returns ok=false when no such file exists. +func safePublicFile(publicDir, urlPath string) (string, bool) { + clean := filepath.Clean("/" + strings.TrimPrefix(urlPath, "/")) + full := filepath.Join(publicDir, clean) + + absPublic, err := filepath.Abs(publicDir) + if err != nil { + return "", false + } + absFull, err := filepath.Abs(full) + if err != nil { + return "", false + } + if absFull != absPublic && !strings.HasPrefix(absFull, absPublic+string(os.PathSeparator)) { + return "", false // escaped the public directory + } + if fi, err := os.Stat(absFull); err == nil && !fi.IsDir() { + return absFull, true + } + return "", false +} diff --git a/internal/httpserver/util.go b/internal/httpserver/util.go new file mode 100644 index 0000000..9bcc1f6 --- /dev/null +++ b/internal/httpserver/util.go @@ -0,0 +1,26 @@ +package httpserver + +import ( + "crypto/rand" + "crypto/sha256" + "encoding/hex" + "fmt" +) + +// newToken returns a random UUIDv4-shaped token for API keys (the original used +// crypto.randomUUID()). +func newToken() string { + var b [16]byte + if _, err := rand.Read(b[:]); err != nil { + panic(fmt.Sprintf("httpserver: cannot read random bytes: %v", err)) + } + b[6] = (b[6] & 0x0f) | 0x40 + b[8] = (b[8] & 0x3f) | 0x80 + return fmt.Sprintf("%x-%x-%x-%x-%x", b[0:4], b[4:6], b[6:8], b[8:10], b[10:16]) +} + +// sha256hex hashes a credential to a hex digest, matching the Room service. +func sha256hex(s string) string { + sum := sha256.Sum256([]byte(s)) + return hex.EncodeToString(sum[:]) +} diff --git a/internal/protocol/protocol.go b/internal/protocol/protocol.go new file mode 100644 index 0000000..7a5c028 --- /dev/null +++ b/internal/protocol/protocol.go @@ -0,0 +1,194 @@ +// Package protocol implements the WSTS (WebSocket Transport/Signal) wire format +// used by MWSE. The format is FROZEN: the TypeScript SDK in ./frontend speaks it +// verbatim, so the Go engine must encode and decode it byte-for-byte the same way +// the original Node.js server did. +// +// # Wire format +// +// Every WebSocket text frame carries a JSON array. The shape of that array is: +// +// [ message, id?, action? ] +// +// - message : an object, always present. It carries a "type" field that selects +// a handler, plus handler-specific fields. +// - id : optional. A number identifies a client-initiated request/stream +// whose response must be correlated. A string in this slot (e.g. "R") is used +// by the SDK's "fire and forget" path and produces no response. +// - action : optional. "R" = request (reply once, flagged "E" = end), +// "S" = stream (reply, flagged "C" = continue). +// +// When the server initiates a message (a "signal" such as room/joined) it sends: +// +// [ payload, signalName ] +// +// i.e. the signal name lives in the id slot as a string, which the SDK routes to +// its signal listeners. +package protocol + +import ( + "encoding/json" + "errors" +) + +// Flags carried in the action slot of a server reply. +const ( + FlagEnd = "E" // terminates a request: [resp, id, "E"] + FlagContinue = "C" // a stream chunk: [resp, id, "C"] + + actionRequest = "R" // client asked for a single response + actionStream = "S" // client opened a stream +) + +// ErrEmptyFrame is returned when a frame decodes to an empty array. +var ErrEmptyFrame = errors.New("protocol: empty frame") + +// Message is a decoded inbound message object. Values follow Go's encoding/json +// conventions (numbers are float64, objects are map[string]any, etc.). The helper +// accessors below keep handler code readable and tolerant of missing fields. +type Message map[string]any + +// Type returns the handler selector ("type" field), or "" when absent. +func (m Message) Type() string { return m.Str("type") } + +// Str returns a string field, or "" if missing or not a string. +func (m Message) Str(key string) string { + if s, ok := m[key].(string); ok { + return s + } + return "" +} + +// Int returns a numeric field as an int, or 0 if missing or not a number. +func (m Message) Int(key string) int { + if f, ok := m[key].(float64); ok { + return int(f) + } + return 0 +} + +// Bool returns a strict boolean field, or false if missing or not a bool. +func (m Message) Bool(key string) bool { + b, _ := m[key].(bool) + return b +} + +// Truthy mirrors JavaScript's "!!value" coercion. The SDK frequently sends +// numeric flags (value: 1 / value: 0), so handlers that toggle state use this. +func (m Message) Truthy(key string) bool { + return jsTruthy(m[key]) +} + +// Get returns the raw value for a key (may be nil). +func (m Message) Get(key string) any { return m[key] } + +// Has reports whether the key is present. +func (m Message) Has(key string) bool { + _, ok := m[key] + return ok +} + +func jsTruthy(v any) bool { + switch t := v.(type) { + case nil: + return false + case bool: + return t + case float64: + return t != 0 + case string: + return t != "" + default: + return true + } +} + +// Envelope is a decoded inbound frame. +type Envelope struct { + // Message is the message object (arr[0]). May be nil if the frame did not + // carry an object there; handlers treat a nil/typeless message as MISSING_TYPE. + Message Message + // ID is the correlation id (arr[1]) when present and a number or string. + ID any + // HasID is true when arr[1] is a number or a string. This matches the Node + // server's `typeof id === 'number' || typeof id === 'string'` branch, which + // decides whether a reply may be sent at all. + HasID bool + // Action is the action flag (arr[2]): "R", "S", or "". + Action string +} + +// WantsReply reports whether this envelope should produce a response, and with +// which terminating flag. It is false for fire-and-forget and broadcast frames. +func (e *Envelope) WantsReply() (flag string, ok bool) { + if !e.HasID { + return "", false + } + switch e.Action { + case actionRequest: + return FlagEnd, true + case actionStream: + return FlagContinue, true + default: + return "", false + } +} + +// IsBroadcast reports whether the frame is in the "no id" branch, where the Node +// server inspected the handler result for a broadcast directive. +func (e *Envelope) IsBroadcast() bool { return !e.HasID } + +// Decode parses a raw text frame into an Envelope. A frame that is not a JSON +// array, or is an empty array, is an error (the caller reports it as a message +// error, exactly as the Node server emitted 'messageError'). +func Decode(data []byte) (*Envelope, error) { + var arr []json.RawMessage + if err := json.Unmarshal(data, &arr); err != nil { + return nil, err + } + if len(arr) == 0 { + return nil, ErrEmptyFrame + } + + env := &Envelope{} + + // arr[0] -> message object. If it is not an object, Message stays nil and the + // router will respond MISSING_TYPE, matching the Node destructuring behaviour. + var raw any + if err := json.Unmarshal(arr[0], &raw); err != nil { + return nil, err + } + if obj, ok := raw.(map[string]any); ok { + env.Message = Message(obj) + } + + // arr[1] -> id. Only numbers and strings count as an id. + if len(arr) >= 2 { + var id any + if err := json.Unmarshal(arr[1], &id); err == nil { + switch id.(type) { + case float64, string: + env.HasID = true + env.ID = id + } + } + } + + // arr[2] -> action flag. + if len(arr) >= 3 { + var action string + _ = json.Unmarshal(arr[2], &action) + env.Action = action + } + + return env, nil +} + +// Reply builds the wire value for a correlated response: [payload, id, flag]. +func Reply(payload any, id any, flag string) []any { + return []any{payload, id, flag} +} + +// Signal builds the wire value for a server-initiated message: [payload, name]. +func Signal(name string, payload any) []any { + return []any{payload, name} +} diff --git a/internal/protocol/protocol_test.go b/internal/protocol/protocol_test.go new file mode 100644 index 0000000..aa002d6 --- /dev/null +++ b/internal/protocol/protocol_test.go @@ -0,0 +1,139 @@ +package protocol + +import ( + "encoding/json" + "testing" +) + +func decodeString(t *testing.T, s string) *Envelope { + t.Helper() + env, err := Decode([]byte(s)) + if err != nil { + t.Fatalf("Decode(%q) error: %v", s, err) + } + return env +} + +func TestDecodeRequest(t *testing.T) { + // [message, numericId, "R"] -> a request that wants an "E"-terminated reply. + env := decodeString(t, `[{"type":"my/socketid"}, 7, "R"]`) + if env.Message.Type() != "my/socketid" { + t.Fatalf("type = %q", env.Message.Type()) + } + if !env.HasID { + t.Fatal("expected HasID") + } + flag, ok := env.WantsReply() + if !ok || flag != FlagEnd { + t.Fatalf("WantsReply = (%q,%v), want (E,true)", flag, ok) + } + if env.IsBroadcast() { + t.Fatal("request must not be a broadcast") + } +} + +func TestDecodeStream(t *testing.T) { + env := decodeString(t, `[{"type":"sub"}, 9, "S"]`) + flag, ok := env.WantsReply() + if !ok || flag != FlagContinue { + t.Fatalf("WantsReply = (%q,%v), want (C,true)", flag, ok) + } +} + +func TestDecodeFireAndForget(t *testing.T) { + // The SDK's SendOnly path: [message, "R"]. The "R" sits in the id slot as a + // string, so a handler runs but no reply is produced. + env := decodeString(t, `[{"type":"connection/packsending","value":0}, "R"]`) + if !env.HasID { + t.Fatal("string id should set HasID") + } + if _, ok := env.WantsReply(); ok { + t.Fatal("fire-and-forget must not want a reply") + } + if env.IsBroadcast() { + t.Fatal("fire-and-forget is not a broadcast") + } + if env.Message.Truthy("value") { + t.Fatal("value:0 should be falsey") + } +} + +func TestDecodeBroadcast(t *testing.T) { + // A bare [message] frame: no id -> the broadcast branch. + env := decodeString(t, `[{"type":"hello"}]`) + if env.HasID { + t.Fatal("bare frame should not have an id") + } + if !env.IsBroadcast() { + t.Fatal("bare frame should be a broadcast") + } + if _, ok := env.WantsReply(); ok { + t.Fatal("broadcast must not want a reply") + } +} + +func TestDecodeMissingTypeObject(t *testing.T) { + // arr[0] not an object -> Message is nil and Type() is empty. + env := decodeString(t, `["not-an-object", 1, "R"]`) + if env.Message != nil { + t.Fatalf("expected nil Message, got %v", env.Message) + } + if env.Message.Type() != "" { + t.Fatal("nil message type should be empty") + } +} + +func TestDecodeErrors(t *testing.T) { + if _, err := Decode([]byte(`{"type":"x"}`)); err == nil { + t.Fatal("object (not array) should error") + } + if _, err := Decode([]byte(`[]`)); err != ErrEmptyFrame { + t.Fatalf("empty array err = %v, want ErrEmptyFrame", err) + } + if _, err := Decode([]byte(`not json`)); err == nil { + t.Fatal("invalid json should error") + } +} + +func TestNumericIDRoundTripsAsInteger(t *testing.T) { + // A numeric id must come back out as an integer, not "7.0", so the SDK's + // integer-keyed event pool matches it. + env := decodeString(t, `[{"type":"x"}, 7, "R"]`) + reply := Reply("ok", env.ID, FlagEnd) + b, err := json.Marshal(reply) + if err != nil { + t.Fatal(err) + } + if got, want := string(b), `["ok",7,"E"]`; got != want { + t.Fatalf("reply = %s, want %s", got, want) + } +} + +func TestSignalShape(t *testing.T) { + b, err := json.Marshal(Signal("room/joined", map[string]any{"id": "abc"})) + if err != nil { + t.Fatal(err) + } + if got, want := string(b), `[{"id":"abc"},"room/joined"]`; got != want { + t.Fatalf("signal = %s, want %s", got, want) + } +} + +func TestMessageAccessors(t *testing.T) { + m := Message{"type": "t", "to": "peer-1", "n": float64(42), "b": true, "s": "x"} + if m.Str("to") != "peer-1" { + t.Fatal("Str") + } + if m.Int("n") != 42 { + t.Fatal("Int") + } + if !m.Bool("b") { + t.Fatal("Bool") + } + if !m.Truthy("s") || m.Truthy("missing") { + t.Fatal("Truthy") + } + if !m.Has("to") || m.Has("nope") { + t.Fatal("Has") + } +} diff --git a/internal/services/auth.go b/internal/services/auth.go new file mode 100644 index 0000000..0fbbcb0 --- /dev/null +++ b/internal/services/auth.go @@ -0,0 +1,214 @@ +package services + +import ( + "git.saqut.com/saqut/mwse/internal/protocol" + "git.saqut.com/saqut/mwse/internal/ws" +) + +// ---- relationship helpers ------------------------------------------------ +// +// "Secure" reachability comes in two forms: a mutual pairing, or co-membership of +// a room. These helpers centralise the checks the original Client.isSecure / +// getSucureClients spread across files (and fix their bugs). + +// isPaired reports a *mutual* pairing between a and b. +func isPaired(a, b *ws.Client) bool { + return a.HasPair(b.ID) && b.HasPair(a.ID) +} + +// shareRoom reports whether a and b are members of at least one common room. +func shareRoom(hub *ws.Hub, a, b *ws.Client) bool { + for _, rid := range a.Rooms() { + if r, ok := hub.Room(rid); ok && r.Has(b.ID) { + return true + } + } + return false +} + +// isSecure reports whether a may address the client with id peerID. +func isSecure(hub *ws.Hub, a *ws.Client, peerID string) bool { + peer, ok := hub.Client(peerID) + if !ok { + return false + } + return isPaired(a, peer) || shareRoom(hub, a, peer) +} + +// secureClients returns the connected peers c may exchange info with: those c has +// paired toward, and those sharing a room with c. +func secureClients(hub *ws.Hub, c *ws.Client) (pairs, roompairs map[string]*ws.Client) { + pairs = make(map[string]*ws.Client) + for _, id := range c.Pairs() { + if pc, ok := hub.Client(id); ok { + pairs[id] = pc + } + } + roompairs = make(map[string]*ws.Client) + for _, rid := range c.Rooms() { + r, ok := hub.Room(rid) + if !ok { + continue + } + for _, m := range r.Members() { + if m.ID == c.ID { + continue + } + roompairs[m.ID] = m + } + } + return pairs, roompairs +} + +func registerAuth(hub *ws.Hub) { + // On disconnect: tell secure peers we are gone, then drop pairing edges both + // ways so no stale references remain. + hub.OnDisconnect(func(c *ws.Client) { + pairs, roompairs := secureClients(hub, c) + + notified := make(map[string]bool) + notify := func(set map[string]*ws.Client) { + for id, peer := range set { + if notified[id] { + continue + } + notified[id] = true + peer.Signal("peer/disconnect", map[string]any{"id": c.ID}) + } + } + notify(pairs) + notify(roompairs) + + for id, peer := range pairs { + peer.RemovePair(c.ID) + c.RemovePair(id) + } + }) + + hub.Register("auth/pair-system", func(c *ws.Client, m protocol.Message) any { + switch m.Str("value") { + case "everybody": + c.SetRequiredPair(true) + return success() + case "disable": + c.SetRequiredPair(false) + return success() + } + return fail("INVALID_VALUE") + }) + + hub.Register("my/socketid", func(c *ws.Client, m protocol.Message) any { + return c.ID + }) + + hub.Register("auth/public", func(c *ws.Client, m protocol.Message) any { + c.SetRequiredPair(false) + return map[string]any{"value": "success", "mode": "public"} + }) + + hub.Register("auth/private", func(c *ws.Client, m protocol.Message) any { + c.SetRequiredPair(true) + return map[string]any{"value": "success", "mode": "private"} + }) + + // request/pair: ask `to` to pair with us. We record our side of the edge and + // notify the target; they complete it with accept/pair. + hub.Register("request/pair", func(c *ws.Client, m protocol.Message) any { + to := m.Str("to") + target, ok := hub.Client(to) + if !ok { + return fail("CLIENT_NOT_FOUND") + } + if isPaired(c, target) { + return map[string]any{"status": "success", "message": "ALREADY-PAIRED"} + } + if c.HasPair(to) { + return fail("ALREADY-REQUESTED") + } + c.AddPair(to) + target.Signal("request/pair", map[string]any{"from": c.ID, "info": c.Info()}) + return map[string]any{"status": "success", "message": "REQUESTED"} + }) + + // accept/pair: complete a pairing the peer `to` requested from us. + hub.Register("accept/pair", func(c *ws.Client, m protocol.Message) any { + to := m.Str("to") + requester, ok := hub.Client(to) + if !ok { + return fail("CLIENT_NOT_FOUND") + } + if isPaired(c, requester) { + return map[string]any{"status": "success", "message": "ALREADY-PAIRED"} + } + if !requester.HasPair(c.ID) { + return fail("NOT_REQUESTED_PAIR") + } + c.AddPair(to) + requester.Signal("accepted/pair", map[string]any{"from": c.ID, "info": c.Info()}) + return success() + }) + + // reject/pair and end/pair both tear the edge down in both directions and + // notify the other side. + teardown := func(c *ws.Client, m protocol.Message) any { + to := m.Str("to") + other, ok := hub.Client(to) + if !ok { + return fail("CLIENT_NOT_FOUND") + } + c.RemovePair(to) + other.RemovePair(c.ID) + other.Signal("end/pair", map[string]any{"from": c.ID}) + return success() + } + hub.Register("reject/pair", teardown) + hub.Register("end/pair", teardown) + + hub.Register("pair/list", func(c *ws.Client, m protocol.Message) any { + var list []string + for _, id := range c.Pairs() { + if other, ok := hub.Client(id); ok && other.HasPair(c.ID) { + list = append(list, id) + } + } + return map[string]any{"type": "pair/list", "value": list} + }) + + hub.Register("is/reachable", func(c *ws.Client, m protocol.Message) any { + to := m.Str("to") + other, ok := hub.Client(to) + if !ok { + return false + } + if other.RequiredPair() && !other.HasPair(c.ID) { + return false + } + return true + }) + + // auth/info: set our metadata and push the change to every secure peer. + hub.Register("auth/info", func(c *ws.Client, m protocol.Message) any { + name := m.Str("name") + value := m.Get("value") + c.SetInfo(name, value) + + pairs, roompairs := secureClients(hub, c) + payload := map[string]any{"from": c.ID, "name": name, "value": value} + for _, peer := range pairs { + peer.Signal("pair/info", payload) + } + for _, peer := range roompairs { + peer.Signal("pair/info", payload) + } + return success() + }) + + hub.Register("peer/info", func(c *ws.Client, m protocol.Message) any { + peerID := m.Str("peer") + if !isSecure(hub, c, peerID) { + return map[string]any{"status": "fail", "message": "unaccessible user"} + } + peer, _ := hub.Client(peerID) + return map[string]any{"status": "success", "info": peer.Info()} + }) +} diff --git a/internal/services/datatransfer.go b/internal/services/datatransfer.go new file mode 100644 index 0000000..4f2c0fe --- /dev/null +++ b/internal/services/datatransfer.go @@ -0,0 +1,110 @@ +package services + +import ( + "git.saqut.com/saqut/mwse/internal/protocol" + "git.saqut.com/saqut/mwse/internal/ws" +) + +// handshakeResult returns the optional acknowledgement for a relay handler. When +// the caller did not ask for a handshake the relay is fire-and-forget (nil), so +// the generic dispatcher sends no reply. +func handshakeResult(m protocol.Message, success bool) any { + if !m.Truthy("handshake") { + return nil + } + if success { + return map[string]any{"type": "success"} + } + return map[string]any{"type": "fail"} +} + +func registerDataTransfer(hub *ws.Hub) { + // pack/to: relay a data pack to another peer, honouring both peers' relay + // flags and the target's pairing policy. When the target does not require + // pairing, an implicit pairing is established so subsequent traffic flows. + hub.Register("pack/to", func(c *ws.Client, m protocol.Message) any { + if !c.PackReadable() { + return handshakeResult(m, false) + } + to := m.Str("to") + other, ok := hub.Client(to) + if !ok { + return handshakeResult(m, false) + } + if other.RequiredPair() { + if !other.HasPair(c.ID) { + return handshakeResult(m, false) + } + } else if !other.HasPair(c.ID) { + other.AddPair(c.ID) + c.AddPair(other.ID) + } + if !other.PackWriteable() { + return handshakeResult(m, false) + } + other.Signal("pack", map[string]any{"from": c.ID, "pack": m.Get("pack")}) + return handshakeResult(m, true) + }) + + // request/to: relay a request to a peer. The reply travels back later via + // response/to, carrying the original request id, so this handler itself does + // not produce the answer. + hub.Register("request/to", func(c *ws.Client, m protocol.Message) any { + other, ok := hub.Client(m.Str("to")) + if !ok { + return nil + } + if other.RequiredPair() { + if !other.HasPair(c.ID) { + return nil + } + } else { + other.AddPair(c.ID) + c.AddPair(other.ID) + } + other.Signal("request", map[string]any{"from": c.ID, "pack": m.Get("pack")}) + return nil + }) + + // response/to: deliver a peer's response back to the original requester. The + // frame uses the numeric request id in the signal slot so the requester's + // event pool resolves the pending promise. + hub.Register("response/to", func(c *ws.Client, m protocol.Message) any { + other, ok := hub.Client(m.Str("to")) + if !ok { + return nil + } + if other.RequiredPair() && !other.HasPair(c.ID) { + return nil + } + other.Send([]any{map[string]any{"from": c.ID, "pack": m.Get("pack")}, m.Get("id")}) + return nil + }) + + // pack/room: relay a data pack to every writable member of a room the sender + // belongs to. "wom" (without me) excludes the sender. + hub.Register("pack/room", func(c *ws.Client, m protocol.Message) any { + if !c.PackReadable() { + return handshakeResult(m, false) + } + to := m.Str("to") + room, ok := hub.Room(to) + if !ok { + return handshakeResult(m, false) + } + if !c.InRoom(to) { + return handshakeResult(m, false) + } + except := "" + if m.Truthy("wom") { + except = c.ID + } + room.Broadcast( + "pack/room", + map[string]any{"from": to, "pack": m.Get("pack"), "sender": c.ID}, + except, + (*ws.Client).PackWriteable, + ) + return handshakeResult(m, true) + }) +} diff --git a/internal/services/ippressure.go b/internal/services/ippressure.go new file mode 100644 index 0000000..075fb2c --- /dev/null +++ b/internal/services/ippressure.go @@ -0,0 +1,277 @@ +package services + +import ( + "fmt" + "sync" + + "git.saqut.com/saqut/mwse/internal/protocol" + "git.saqut.com/saqut/mwse/internal/ws" +) + +// shortCodeAlphabet is the 22-letter set the original used (note: J, Q, U, W are +// intentionally absent). Three letters give 22^3 = 10,648 codes. +const shortCodeAlphabet = "ABCDEFGHIKLMNOPRSTVXYZ" + +// Announcer receives address allocation events. In the multi-process Node +// deployment these were forwarded to a parent via process.send for the live +// traffic panel. For the single-node 0.1.0 core the default is a no-op; a cluster +// integration can supply a real implementation later. +type Announcer interface { + Announce(kind, action, clientID string, value any) +} + +type noopAnnouncer struct{} + +func (noopAnnouncer) Announce(string, string, string, any) {} + +// IPPressure allocates three kinds of unique virtual address to clients. A single +// mutex guards all three tables; allocation is infrequent relative to messaging, +// so finer-grained locking would add complexity for no real gain. +type IPPressure struct { + ann Announcer + + mu sync.Mutex + busyNumber map[int]string // number -> clientID + busyCode map[string]string // shortcode -> clientID + busyIP map[string]string // ip -> clientID +} + +// NewIPPressure builds an allocator. A nil announcer becomes a no-op. +func NewIPPressure(ann Announcer) *IPPressure { + if ann == nil { + ann = noopAnnouncer{} + } + return &IPPressure{ + ann: ann, + busyNumber: make(map[int]string), + busyCode: make(map[string]string), + busyIP: make(map[string]string), + } +} + +// ---- number (starts at 24, counts up) ----------------------------------- + +func (p *IPPressure) lockNumber(clientID string) int { + p.mu.Lock() + defer p.mu.Unlock() + for n := 24; ; n++ { + if _, busy := p.busyNumber[n]; !busy { + p.busyNumber[n] = clientID + p.ann.Announce("AP_NUMBER", "LOCK", clientID, n) + return n + } + } +} + +func (p *IPPressure) releaseNumber(n int) { + p.mu.Lock() + defer p.mu.Unlock() + if clientID, ok := p.busyNumber[n]; ok { + p.ann.Announce("AP_NUMBER", "RELEASE", clientID, n) + delete(p.busyNumber, n) + } +} + +func (p *IPPressure) whoisNumber(n int) (string, bool) { + p.mu.Lock() + defer p.mu.Unlock() + id, ok := p.busyNumber[n] + return id, ok +} + +// ---- short code (three letters from the restricted alphabet) ------------ + +func (p *IPPressure) lockCode(clientID string) string { + p.mu.Lock() + defer p.mu.Unlock() + for _, a := range shortCodeAlphabet { + for _, b := range shortCodeAlphabet { + for _, d := range shortCodeAlphabet { + code := string([]rune{a, b, d}) + if _, busy := p.busyCode[code]; !busy { + p.busyCode[code] = clientID + p.ann.Announce("AP_SHORTCODE", "LOCK", clientID, code) + return code + } + } + } + } + return "" // address space exhausted +} + +func (p *IPPressure) releaseCode(code string) { + p.mu.Lock() + defer p.mu.Unlock() + if clientID, ok := p.busyCode[code]; ok { + p.ann.Announce("AP_SHORTCODE", "RELEASE", clientID, code) + delete(p.busyCode, code) + } +} + +func (p *IPPressure) whoisCode(code string) (string, bool) { + p.mu.Lock() + defer p.mu.Unlock() + id, ok := p.busyCode[code] + return id, ok +} + +// ---- ip address (10.0.0.1 upward) --------------------------------------- + +func (p *IPPressure) lockIP(clientID string) string { + p.mu.Lock() + defer p.mu.Unlock() + a, b, cc, d := 10, 0, 0, 1 + for { + ip := fmt.Sprintf("%d.%d.%d.%d", a, b, cc, d) + if _, busy := p.busyIP[ip]; !busy { + p.busyIP[ip] = clientID + p.ann.Announce("AP_IPADDRESS", "LOCK", clientID, ip) + return ip + } + switch { + case d != 255: + d++ + case cc != 255: + d, cc = 0, cc+1 + case b != 255: + d, cc, b = 0, 0, b+1 + case a != 255: + d, cc, b, a = 0, 0, 0, a+1 + default: + return "" // address space exhausted + } + } +} + +func (p *IPPressure) releaseIP(ip string) { + p.mu.Lock() + defer p.mu.Unlock() + if clientID, ok := p.busyIP[ip]; ok { + p.ann.Announce("AP_IPADDRESS", "RELEASE", clientID, ip) + delete(p.busyIP, ip) + } +} + +func (p *IPPressure) whoisIP(ip string) (string, bool) { + p.mu.Lock() + defer p.mu.Unlock() + id, ok := p.busyIP[ip] + return id, ok +} + +// registerIPPressure wires the alloc/realloc/release/whois handlers and the +// disconnect cleanup. The allocator instance is returned for tests. +func registerIPPressure(hub *ws.Hub, ann Announcer) *IPPressure { + p := NewIPPressure(ann) + + // --- IP address --- + hub.Register("alloc/APIPAddress", func(c *ws.Client, m protocol.Message) any { + if ip := c.APIP(); ip != "" { + return map[string]any{"status": "success", "ip": ip} + } + ip := p.lockIP(c.ID) + if ip == "" { + return map[string]any{"status": "fail"} + } + c.SetAPIP(ip) + return map[string]any{"status": "success", "ip": ip} + }) + hub.Register("realloc/APIPAddress", func(c *ws.Client, m protocol.Message) any { + if c.APIP() == "" { + return map[string]any{"status": "fail"} + } + p.releaseIP(c.APIP()) + ip := p.lockIP(c.ID) + c.SetAPIP(ip) + return map[string]any{"status": "success", "ip": ip} + }) + hub.Register("release/APIPAddress", func(c *ws.Client, m protocol.Message) any { + p.releaseIP(c.APIP()) + c.SetAPIP("") + return success() + }) + hub.Register("whois/APIPAddress", func(c *ws.Client, m protocol.Message) any { + if id, ok := p.whoisIP(m.Str("whois")); ok { + return map[string]any{"status": "success", "socket": id} + } + return map[string]any{"status": "fail"} + }) + + // --- number --- + hub.Register("alloc/APNumber", func(c *ws.Client, m protocol.Message) any { + if n := c.APNumber(); n != 0 { + return map[string]any{"status": "success", "number": n} + } + n := p.lockNumber(c.ID) + c.SetAPNumber(n) + return map[string]any{"status": "success", "number": n} + }) + hub.Register("realloc/APNumber", func(c *ws.Client, m protocol.Message) any { + if c.APNumber() == 0 { + return map[string]any{"status": "fail"} + } + p.releaseNumber(c.APNumber()) + n := p.lockNumber(c.ID) + c.SetAPNumber(n) + return map[string]any{"status": "success", "number": n} + }) + hub.Register("release/APNumber", func(c *ws.Client, m protocol.Message) any { + p.releaseNumber(c.APNumber()) + c.SetAPNumber(0) + return success() + }) + hub.Register("whois/APNumber", func(c *ws.Client, m protocol.Message) any { + if id, ok := p.whoisNumber(m.Int("whois")); ok { + return map[string]any{"status": "success", "socket": id} + } + return map[string]any{"status": "fail"} + }) + + // --- short code --- + hub.Register("alloc/APShortCode", func(c *ws.Client, m protocol.Message) any { + if code := c.APShortCode(); code != "" { + return map[string]any{"status": "success", "code": code} + } + code := p.lockCode(c.ID) + if code == "" { + return map[string]any{"status": "fail"} + } + c.SetAPShortCode(code) + return map[string]any{"status": "success", "code": code} + }) + hub.Register("realloc/APShortCode", func(c *ws.Client, m protocol.Message) any { + if c.APShortCode() == "" { + return map[string]any{"status": "fail"} + } + p.releaseCode(c.APShortCode()) + code := p.lockCode(c.ID) + c.SetAPShortCode(code) + return map[string]any{"status": "success", "code": code} + }) + hub.Register("release/APShortCode", func(c *ws.Client, m protocol.Message) any { + p.releaseCode(c.APShortCode()) + c.SetAPShortCode("") + return success() + }) + hub.Register("whois/APShortCode", func(c *ws.Client, m protocol.Message) any { + if id, ok := p.whoisCode(m.Str("whois")); ok { + return map[string]any{"status": "success", "socket": id} + } + return map[string]any{"status": "fail"} + }) + + // Release every address a client held when it disconnects. + hub.OnDisconnect(func(c *ws.Client) { + if c.APIP() != "" { + p.releaseIP(c.APIP()) + } + if c.APNumber() != 0 { + p.releaseNumber(c.APNumber()) + } + if c.APShortCode() != "" { + p.releaseCode(c.APShortCode()) + } + }) + + return p +} diff --git a/internal/services/room.go b/internal/services/room.go new file mode 100644 index 0000000..0a40faf --- /dev/null +++ b/internal/services/room.go @@ -0,0 +1,281 @@ +package services + +import ( + "git.saqut.com/saqut/mwse/internal/protocol" + "git.saqut.com/saqut/mwse/internal/ws" +) + +func registerRoom(hub *ws.Hub) { + // Every client gets a private room named after its own id on connect, and is + // ejected from all rooms on disconnect. + hub.OnConnect(func(c *ws.Client) { + room := ws.NewRoom(hub) + room.ID = c.ID + room.AccessType = "private" + room.JoinType = "notify" + room.Description = "Private room" + room.Name = "Your Room | " + c.ID + room.OwnerID = c.ID + room.Publish() + room.Join(c) + }) + + hub.OnDisconnect(func(c *ws.Client) { + if room, ok := hub.Room(c.ID); ok { + room.Eject(c) + } + for _, rid := range c.Rooms() { + if r, ok := hub.Room(rid); ok { + r.Eject(c) + } + } + }) + + hub.Register("myroom-info", func(c *ws.Client, m protocol.Message) any { + room, ok := hub.Room(c.ID) + if !ok { + return fail("NOT-FOUND-ROOM") + } + return map[string]any{"status": "success", "room": room.ToJSON(false)} + }) + + hub.Register("room-peers", func(c *ws.Client, m protocol.Message) any { + room, ok := hub.Room(m.Str("roomId")) + if !ok { + return map[string]any{"status": "fail"} + } + return map[string]any{"status": "success", "peers": ids(room.FilterPeers(toMap(m.Get("filter"))))} + }) + + hub.Register("room/peer-count", func(c *ws.Client, m protocol.Message) any { + room, ok := hub.Room(m.Str("roomId")) + if !ok { + return map[string]any{"status": "fail"} + } + return map[string]any{"status": "success", "count": len(room.FilterPeers(toMap(m.Get("filter"))))} + }) + + hub.Register("room-info", func(c *ws.Client, m protocol.Message) any { + if room, ok := hub.RoomByName(m.Str("name")); ok { + return map[string]any{"status": "success", "room": room.ToJSON(false)} + } + return fail("NOT-FOUND-ROOM") + }) + + hub.Register("joinedrooms", func(c *ws.Client, m protocol.Message) any { + var rooms []map[string]any + for _, rid := range c.Rooms() { + if r, ok := hub.Room(rid); ok { + rooms = append(rooms, r.ToJSON(false)) + } + } + return rooms + }) + + hub.Register("closeroom", func(c *ws.Client, m protocol.Message) any { + room, ok := hub.Room(m.Str("roomId")) + if !ok { + return map[string]any{"status": "fail"} + } + if room.OwnerID == c.ID { + room.Down() + return success() + } + return map[string]any{"status": "fail"} + }) + + hub.Register("create-room", func(c *ws.Client, m protocol.Message) any { + if msg := validateCreateRoom(m); msg != "" { + return map[string]any{"status": "fail", "messages": msg} + } + name := m.Str("name") + if _, exists := hub.RoomByName(name); exists { + return fail("ALREADY-EXISTS") + } + + room := ws.NewRoom(hub) + room.AccessType = m.Str("accessType") + room.NotifyActionInvite = m.Truthy("notifyActionInvite") + room.NotifyActionJoined = m.Truthy("notifyActionJoined") + room.NotifyActionEjected = m.Truthy("notifyActionEjected") + room.JoinType = m.Str("joinType") + room.Description = m.Str("description") + room.Name = name + room.OwnerID = c.ID + if cred := m.Str("credential"); cred != "" { + room.Credential = sha256hex(cred) + } + room.Publish() + room.Join(c) + return map[string]any{"status": "success", "room": room.ToJSON(false)} + }) + + hub.Register("joinroom", func(c *ws.Client, m protocol.Message) any { + room, ok := hub.RoomByName(m.Str("name")) + if !ok { + return fail("NOT-FOUND-ROOM") + } + + fetchInfo := func(resp map[string]any) map[string]any { + if m.Truthy("autoFetchInfo") { + resp["info"] = room.Info() + } + return resp + } + + switch room.JoinType { + case "lock": + return fail("LOCKED-ROOM") + case "password": + if room.Credential == sha256hex(m.Str("credential")) { + room.Join(c) + return fetchInfo(map[string]any{"status": "success", "room": room.ToJSON(false)}) + } + return map[string]any{"status": "fail", "message": "WRONG-PASSWORD", "area": "credential"} + case "free": + room.Join(c) + return fetchInfo(map[string]any{"status": "success", "room": room.ToJSON(false)}) + case "invite": + room.AddWaiting(c.ID) + invite := map[string]any{"id": c.ID} + if room.NotifyActionInvite { + room.Broadcast("room/invite", invite, "", nil) + } else if owner, ok := hub.Client(room.OwnerID); ok { + owner.Signal("room/invite", invite) + } + return map[string]any{"status": "success", "message": "INVITE-REQUESTED"} + } + return fail("NOT-FOUND-ROOM") + }) + + hub.Register("ejectroom", func(c *ws.Client, m protocol.Message) any { + room, ok := hub.Room(m.Str("roomId")) + if !ok { + return fail("NOT-FOUND-ROOM") + } + if !room.Has(c.ID) { + return fail("ALREADY-ROOM-OUT") + } + room.Eject(c) + return success() + }) + + hub.Register("accept/invite-room", inviteDecision(hub, true)) + hub.Register("reject/invite-room", inviteDecision(hub, false)) + + hub.Register("room/list", func(c *ws.Client, m protocol.Message) any { + var rooms []map[string]any + for _, room := range hub.Rooms() { + if room.AccessType == "public" { + rooms = append(rooms, map[string]any{ + "name": room.Name, + "joinType": room.JoinType, + "description": room.Description, + "id": room.ID, + }) + } + } + return map[string]any{"type": "public/rooms", "rooms": rooms} + }) + + hub.Register("room/info", func(c *ws.Client, m protocol.Message) any { + room, ok := hub.Room(m.Str("roomId")) + if !ok { + return fail("NOT-FOUND-ROOM") + } + if !c.InRoom(room.ID) { + return fail("NO-JOINED-ROOM") + } + if name := m.Str("name"); name != "" { + v, _ := room.InfoValue(name) + return map[string]any{"status": "success", "value": v} + } + return map[string]any{"status": "success", "value": room.Info()} + }) + + hub.Register("room/setinfo", func(c *ws.Client, m protocol.Message) any { + room, ok := hub.Room(m.Str("roomId")) + if !ok { + return fail("NOT-FOUND-ROOM") + } + if !c.InRoom(room.ID) { + return fail("NO-JOINED-ROOM") + } + name := m.Str("name") + value := m.Get("value") + room.SetInfo(name, value) + room.Broadcast( + "room/info", + map[string]any{"name": name, "value": value, "roomId": room.ID}, + c.ID, + (*ws.Client).RoomInfoNotifiable, + ) + return success() + }) +} + +// inviteDecision builds the accept/reject invite handlers. The original code was +// non-functional here (it called Array methods on a Set and inverted the joinType +// check); this implements the intended flow: only the rooms a member belongs to, +// only invite rooms, only ids actually on the waiting list. +func inviteDecision(hub *ws.Hub, accept bool) handler { + return func(c *ws.Client, m protocol.Message) any { + room, ok := hub.Room(m.Str("roomId")) + if !ok { + return fail("NOT-FOUND-ROOM") + } + if !c.InRoom(room.ID) { + return fail("FORBIDDEN-INVITE-ACTIONS") + } + if room.JoinType != "invite" { + return fail("INVALID-DATA") + } + clientID := m.Str("clientId") + if !room.IsWaiting(clientID) { + return fail("NO-WAITING-INVITED") + } + joinClient, ok := hub.Client(clientID) + if !ok { + room.RemoveWaiting(clientID) + return fail("NO-CLIENT") + } + room.RemoveWaiting(clientID) + + if accept { + room.Join(joinClient) + joinClient.Signal("room/invite/status", map[string]any{"status": "accepted"}) + } else { + room.Broadcast("room/invite/status", map[string]any{"id": clientID, "roomId": room.ID}, "", nil) + joinClient.Signal("room/invite/status", map[string]any{"status": "rejected"}) + } + return success() + } +} + +// validateCreateRoom checks the create-room payload against the same constraints +// the original joi schema described. It returns an empty string when valid. +func validateCreateRoom(m protocol.Message) string { + if !m.Has("type") { + return "type is required" + } + switch m.Str("accessType") { + case "public", "private": + default: + return "accessType must be public or private" + } + switch m.Str("joinType") { + case "free", "invite", "password", "lock": + default: + return "joinType must be one of free, invite, password, lock" + } + if !m.Has("notifyActionInvite") || !m.Has("notifyActionJoined") || !m.Has("notifyActionEjected") { + return "notify flags are required" + } + if m.Str("description") == "" { + return "description is required" + } + if m.Str("name") == "" { + return "name is required" + } + return "" +} diff --git a/internal/services/services.go b/internal/services/services.go new file mode 100644 index 0000000..0a91579 --- /dev/null +++ b/internal/services/services.go @@ -0,0 +1,70 @@ +// Package services ports the message handlers and lifecycle hooks that lived in +// Source/Services/* of the Node.js engine: YourID, Session, Auth, Room, +// IPPressure and DataTransfer. Each is registered onto a *ws.Hub, which owns the +// router and the connect/disconnect event bus. +// +// Where the original handlers contained outright bugs (a Set used as if it were an +// Array, comparing a Client object to a string id, validating against an +// undefined schema variable, ...) this port implements the *intended* behaviour +// and records the deviation in decisions.md. The on-the-wire message shapes the +// SDK depends on are preserved. +package services + +import ( + "crypto/sha256" + "encoding/hex" + + "git.saqut.com/saqut/mwse/internal/protocol" + "git.saqut.com/saqut/mwse/internal/ws" +) + +// Register wires every service onto the hub. Call once during startup, before the +// server begins accepting connections. The order mirrors the Node require() order +// so connect-time side effects (id message, private room, session defaults) +// happen in the same sequence. +func Register(hub *ws.Hub) { + registerYourID(hub) + registerAuth(hub) + registerRoom(hub) + registerDataTransfer(hub) + registerIPPressure(hub, nil) + registerSession(hub) +} + +// ---- small response helpers --------------------------------------------- + +// success is the ubiquitous {status:"success"} reply. +func success() map[string]any { return map[string]any{"status": "success"} } + +// fail is the ubiquitous {status:"fail", message:...} reply. +func fail(message string) map[string]any { + return map[string]any{"status": "fail", "message": message} +} + +// toMap coerces an arbitrary decoded JSON value to an object, returning an empty +// map when it is not one (e.g. a missing "filter" field). +func toMap(v any) map[string]any { + if m, ok := v.(map[string]any); ok { + return m + } + return map[string]any{} +} + +// sha256hex hashes credentials the same way the original Room service did. +func sha256hex(s string) string { + sum := sha256.Sum256([]byte(s)) + return hex.EncodeToString(sum[:]) +} + +// ids extracts the client ids from a slice of clients. +func ids(clients []*ws.Client) []string { + out := make([]string, 0, len(clients)) + for _, c := range clients { + out = append(out, c.ID) + } + return out +} + +// handler is a convenience alias matching ws.Handler's shape, used to keep the +// per-service files concise. +type handler = func(c *ws.Client, m protocol.Message) any diff --git a/internal/services/services_test.go b/internal/services/services_test.go new file mode 100644 index 0000000..46f92bc --- /dev/null +++ b/internal/services/services_test.go @@ -0,0 +1,261 @@ +package services + +import ( + "encoding/json" + "testing" + "time" + + "git.saqut.com/saqut/mwse/internal/protocol" + "git.saqut.com/saqut/mwse/internal/testutil" + "git.saqut.com/saqut/mwse/internal/ws" +) + +// ---- test helpers -------------------------------------------------------- + +func newHub() *ws.Hub { + hub := ws.NewHub() + Register(hub) + return hub +} + +// connect attaches a client (with services' connect hooks firing) and returns it +// along with its captured connection. +func connect(hub *ws.Hub, id string) (*ws.Client, *testutil.FakeConn) { + fc := testutil.NewFakeConn() + c := ws.NewClient(fc, id) + hub.Connect(c) + return c, fc +} + +func waitFor(t *testing.T, cond func() bool) { + t.Helper() + for i := 0; i < 500; i++ { + if cond() { + return + } + time.Sleep(time.Millisecond) + } + t.Fatal("condition not met within timeout") +} + +// findSignal scans the captured frames for a [payload, name] signal. +func findSignal(fc *testutil.FakeConn, name string) (map[string]any, bool) { + for _, raw := range fc.Writes() { + var arr []any + if json.Unmarshal(raw, &arr) != nil || len(arr) < 2 { + continue + } + if s, ok := arr[1].(string); ok && s == name { + payload, _ := arr[0].(map[string]any) + return payload, true + } + } + return nil, false +} + +func waitSignal(t *testing.T, fc *testutil.FakeConn, name string) map[string]any { + t.Helper() + waitFor(t, func() bool { _, ok := findSignal(fc, name); return ok }) + p, _ := findSignal(fc, name) + return p +} + +func asMap(t *testing.T, v any) map[string]any { + t.Helper() + m, ok := v.(map[string]any) + if !ok { + t.Fatalf("expected map, got %T (%v)", v, v) + } + return m +} + +// msg builds a protocol.Message from a type and key/value pairs. +func msg(typ string, kv ...any) protocol.Message { + m := protocol.Message{"type": typ} + for i := 0; i+1 < len(kv); i += 2 { + m[kv[i].(string)] = kv[i+1] + } + return m +} + +// ---- tests --------------------------------------------------------------- + +func TestConnectAnnouncesIDAndPrivateRoom(t *testing.T) { + hub := newHub() + c, fc := connect(hub, "alice") + + idPayload := waitSignal(t, fc, "id") + if idPayload["value"] != "alice" { + t.Fatalf("id signal value = %v, want alice", idPayload["value"]) + } + if _, ok := hub.Room("alice"); !ok { + t.Fatal("private room named after the client id should exist") + } + if !c.InRoom("alice") { + t.Fatal("client should be a member of its private room") + } +} + +func TestPairingFlow(t *testing.T) { + hub := newHub() + a, fa := connect(hub, "a") + b, fb := connect(hub, "b") + + // a requests pairing with b. + resp := asMap(t, hub.Handle(a, msg("request/pair", "to", "b"))) + if resp["message"] != "REQUESTED" { + t.Fatalf("request/pair = %v", resp) + } + req := waitSignal(t, fb, "request/pair") + if req["from"] != "a" { + t.Fatalf("request/pair from = %v, want a", req["from"]) + } + + // b accepts. + resp = asMap(t, hub.Handle(b, msg("accept/pair", "to", "a"))) + if resp["status"] != "success" { + t.Fatalf("accept/pair = %v", resp) + } + acc := waitSignal(t, fa, "accepted/pair") + if acc["from"] != "b" { + t.Fatalf("accepted/pair from = %v, want b", acc["from"]) + } + + if !a.HasPair("b") || !b.HasPair("a") { + t.Fatal("both clients should be mutually paired") + } +} + +func TestRoomCreateJoinAndPackRoom(t *testing.T) { + hub := newHub() + a, _ := connect(hub, "a") + b, fb := connect(hub, "b") + + created := asMap(t, hub.Handle(a, msg("create-room", + "accessType", "public", + "joinType", "free", + "notifyActionInvite", false, + "notifyActionJoined", true, + "notifyActionEjected", true, + "description", "d", + "name", "R1", + ))) + if created["status"] != "success" { + t.Fatalf("create-room = %v", created) + } + roomID := asMap(t, created["room"])["id"].(string) + + joined := asMap(t, hub.Handle(b, msg("joinroom", "name", "R1"))) + if joined["status"] != "success" { + t.Fatalf("joinroom = %v", joined) + } + if !b.InRoom(roomID) { + t.Fatal("b should be in the room") + } + + // a relays a pack to the room; b should receive it. + res := asMap(t, hub.Handle(a, msg("pack/room", "to", roomID, "pack", map[string]any{"x": float64(1)}, "handshake", true))) + if res["type"] != "success" { + t.Fatalf("pack/room = %v", res) + } + got := waitSignal(t, fb, "pack/room") + if got["sender"] != "a" { + t.Fatalf("pack/room sender = %v, want a", got["sender"]) + } +} + +func TestDataTransferPackToAutoPairs(t *testing.T) { + hub := newHub() + a, _ := connect(hub, "a") + b, fb := connect(hub, "b") + + res := asMap(t, hub.Handle(a, msg("pack/to", "to", "b", "pack", map[string]any{"hi": true}, "handshake", true))) + if res["type"] != "success" { + t.Fatalf("pack/to = %v", res) + } + got := waitSignal(t, fb, "pack") + if got["from"] != "a" { + t.Fatalf("pack from = %v, want a", got["from"]) + } + // Auto-pairing should have linked both sides. + if !a.HasPair("b") || !b.HasPair("a") { + t.Fatal("pack/to should auto-pair when the target does not require pairing") + } +} + +func TestSessionFlagGatesPackReadable(t *testing.T) { + hub := newHub() + a, _ := connect(hub, "a") + connect(hub, "b") + + // a disables sending; pack/to must now fail the handshake. + if r := asMap(t, hub.Handle(a, msg("connection/packsending", "value", float64(0)))); r["status"] != "success" { + t.Fatalf("connection/packsending = %v", r) + } + res := asMap(t, hub.Handle(a, msg("pack/to", "to", "b", "pack", map[string]any{}, "handshake", true))) + if res["type"] != "fail" { + t.Fatalf("pack/to after disabling send = %v, want fail", res) + } +} + +func TestIPPressureAllocatesUniqueAddresses(t *testing.T) { + hub := newHub() + a, _ := connect(hub, "a") + b, _ := connect(hub, "b") + + na := asMap(t, hub.Handle(a, msg("alloc/APNumber")))["number"].(int) + nb := asMap(t, hub.Handle(b, msg("alloc/APNumber")))["number"].(int) + if na == nb { + t.Fatalf("AP numbers must be unique, both got %d", na) + } + + who := asMap(t, hub.Handle(a, msg("whois/APNumber", "whois", float64(na)))) + if who["socket"] != "a" { + t.Fatalf("whois APNumber = %v, want a", who) + } + + ipA := asMap(t, hub.Handle(a, msg("alloc/APIPAddress")))["ip"].(string) + ipB := asMap(t, hub.Handle(b, msg("alloc/APIPAddress")))["ip"].(string) + if ipA == ipB { + t.Fatalf("AP ips must be unique, both got %s", ipA) + } + + // Releasing frees the number for reuse. + if r := asMap(t, hub.Handle(a, msg("release/APNumber"))); r["status"] != "success" { + t.Fatalf("release/APNumber = %v", r) + } + if a.APNumber() != 0 { + t.Fatal("released number should be cleared on the client") + } +} + +func TestDisconnectCleansRoomsAndPairs(t *testing.T) { + hub := newHub() + a, _ := connect(hub, "a") + b, fb := connect(hub, "b") + + // Put both in a shared room and pair them. + hub.Handle(a, msg("create-room", + "accessType", "public", "joinType", "free", + "notifyActionInvite", false, "notifyActionJoined", true, "notifyActionEjected", true, + "description", "d", "name", "R1", + )) + hub.Handle(b, msg("joinroom", "name", "R1")) + hub.Handle(a, msg("request/pair", "to", "b")) + hub.Handle(b, msg("accept/pair", "to", "a")) + if !a.HasPair("b") { + t.Fatal("precondition: a should be paired with b") + } + + hub.Disconnect(a) + + waitSignal(t, fb, "peer/disconnect") + waitSignal(t, fb, "room/ejected") + + if _, ok := hub.Client("a"); ok { + t.Fatal("disconnected client should be removed from the hub") + } + if b.HasPair("a") { + t.Fatal("pair edge to the disconnected client should be gone") + } +} diff --git a/internal/services/session.go b/internal/services/session.go new file mode 100644 index 0000000..7088ab2 --- /dev/null +++ b/internal/services/session.go @@ -0,0 +1,33 @@ +package services + +import ( + "git.saqut.com/saqut/mwse/internal/protocol" + "git.saqut.com/saqut/mwse/internal/ws" +) + +// registerSession ports the Session service: per-connection feature flags that +// gate notifications and data relay. Defaults are already applied in +// ws.NewClient; the connect hook re-applies them for parity with the original. +// +// The SDK sends these toggles as numbers (value: 1 / value: 0), so Truthy is used +// to match the original `!!msg.value` coercion. +func registerSession(hub *ws.Hub) { + hub.OnConnect(func(c *ws.Client) { c.ResetStore() }) + + flag := func(name string) handler { + return func(c *ws.Client, m protocol.Message) any { + c.SetStore(name, m.Truthy("value")) + return success() + } + } + + hub.Register("connection/pairinfo", flag("notifyPairInfo")) + hub.Register("connection/roominfo", flag("notifyRoomInfo")) + hub.Register("connection/packrecaive", flag("packrecaive")) + hub.Register("connection/packsending", flag("packsending")) + + hub.Register("connection/reset", func(c *ws.Client, m protocol.Message) any { + c.ResetStore() + return success() + }) +} diff --git a/internal/services/yourid.go b/internal/services/yourid.go new file mode 100644 index 0000000..cc1252f --- /dev/null +++ b/internal/services/yourid.go @@ -0,0 +1,11 @@ +package services + +import "git.saqut.com/saqut/mwse/internal/ws" + +// registerYourID tells a freshly connected client its own socket id, exactly as +// the Node YourID service did: client.send([{type:'id', value: id}, 'id']). +func registerYourID(hub *ws.Hub) { + hub.OnConnect(func(c *ws.Client) { + c.Signal("id", map[string]any{"type": "id", "value": c.ID}) + }) +} diff --git a/internal/testutil/fakeconn.go b/internal/testutil/fakeconn.go new file mode 100644 index 0000000..7244a3e --- /dev/null +++ b/internal/testutil/fakeconn.go @@ -0,0 +1,105 @@ +// Package testutil provides an in-memory WebSocket connection so the engine's +// concurrency tests (#26) can run without real sockets. FakeConn satisfies the +// ws.Conn interface structurally. +package testutil + +import ( + "io" + "sync" + "time" +) + +// textMessage mirrors gorilla/websocket.TextMessage (1) without importing it, so +// this helper stays dependency-light. +const textMessage = 1 + +// FakeConn is a thread-safe, in-memory connection. Outbound frames are captured +// in Writes(); inbound frames can be injected with Push() and are returned by +// ReadMessage in order until the connection is closed. +type FakeConn struct { + mu sync.Mutex + writes [][]byte + incoming chan []byte + closed bool + pong func(string) error +} + +// NewFakeConn returns a ready connection. +func NewFakeConn() *FakeConn { + return &FakeConn{incoming: make(chan []byte, 1024)} +} + +// ReadMessage returns the next injected frame, blocking until one is available or +// the connection is closed (then io.EOF). +func (f *FakeConn) ReadMessage() (int, []byte, error) { + b, ok := <-f.incoming + if !ok { + return 0, nil, io.EOF + } + return textMessage, b, nil +} + +// Push injects an inbound frame for ReadMessage to return. +func (f *FakeConn) Push(frame []byte) { + f.mu.Lock() + defer f.mu.Unlock() + if f.closed { + return + } + f.incoming <- frame +} + +// WriteMessage captures an outbound frame. +func (f *FakeConn) WriteMessage(_ int, data []byte) error { + f.mu.Lock() + defer f.mu.Unlock() + if f.closed { + return io.ErrClosedPipe + } + cp := make([]byte, len(data)) + copy(cp, data) + f.writes = append(f.writes, cp) + return nil +} + +// Writes returns a copy of all captured outbound frames. +func (f *FakeConn) Writes() [][]byte { + f.mu.Lock() + defer f.mu.Unlock() + out := make([][]byte, len(f.writes)) + copy(out, f.writes) + return out +} + +// WriteCount returns how many frames have been written. +func (f *FakeConn) WriteCount() int { + f.mu.Lock() + defer f.mu.Unlock() + return len(f.writes) +} + +func (f *FakeConn) WriteControl(int, []byte, time.Time) error { return nil } +func (f *FakeConn) SetReadLimit(int64) {} +func (f *FakeConn) SetReadDeadline(time.Time) error { return nil } +func (f *FakeConn) SetWriteDeadline(time.Time) error { return nil } +func (f *FakeConn) SetPongHandler(h func(string) error) { f.pong = h } + +// Pong invokes the registered pong handler (used by heartbeat tests). +func (f *FakeConn) Pong(appData string) error { + if f.pong != nil { + return f.pong(appData) + } + return nil +} + +// Close makes ReadMessage return EOF and rejects further writes. Idempotent. +func (f *FakeConn) Close() error { + f.mu.Lock() + defer f.mu.Unlock() + if f.closed { + return nil + } + f.closed = true + close(f.incoming) + return nil +} diff --git a/internal/ws/client.go b/internal/ws/client.go new file mode 100644 index 0000000..7825d32 --- /dev/null +++ b/internal/ws/client.go @@ -0,0 +1,346 @@ +package ws + +import ( + "encoding/json" + "sync" + "sync/atomic" + "time" + + "github.com/gorilla/websocket" + + "git.saqut.com/saqut/mwse/internal/protocol" +) + +// outboundBuffer bounds how many frames may queue for one slow client before the +// engine gives up and disconnects it. This is the classic gorilla "hub" backpressure +// policy: a peer that cannot keep up is dropped rather than allowed to grow memory +// without limit or stall a broadcast. +const outboundBuffer = 256 + +// Session flag keys. They live in the Client.store map and mirror the Node +// Session service defaults exactly. +const ( + flagNotifyPairInfo = "notifyPairInfo" + flagPackReceive = "packrecaive" // (sic) original spelling kept for parity + flagPackSending = "packsending" + flagNotifyRoomInfo = "notifyRoomInfo" +) + +// Client is one connected peer. +// +// Concurrency model (the whole point of the Go rewrite, see #22): +// +// - All socket WRITES go through a single writer goroutine draining `outbound`. +// Producers never touch the socket, so concurrent writes are impossible. +// - `Send` enqueues onto `outbound` but always also selects on `done`, so a send +// racing a disconnect is harmlessly dropped instead of writing to a dead peer. +// - All mutable peer STATE (info, store, rooms, pairs, ...) is guarded by `mu`. +// A peer leaving (clearing its state) and another goroutine reading/sending to +// it are therefore serialized; the "leave-while-send" race cannot occur. +type Client struct { + ID string + CreatedAt time.Time + + conn Conn + outbound chan []byte + done chan struct{} + closeOnce sync.Once + dropped uint64 // frames dropped due to a full outbound buffer (atomic) + + mu sync.RWMutex + info map[string]any // application metadata, shared with paired/room peers + store map[string]bool // per-connection session flags + rooms map[string]struct{} // rooms this client belongs to + pairs map[string]struct{} // peers this client has paired with + requiredPair bool // when true, others must be paired to reach this client + + apNumber int // virtual address: short number + apShortCode string // virtual address: 3-letter code + apIP string // virtual address: 10.x.x.x style ip +} + +// NewClient wraps an accepted connection. Session flag defaults are applied here +// (rather than only via the Session service's connect hook) so they are always +// present regardless of listener ordering. +func NewClient(conn Conn, id string) *Client { + return &Client{ + ID: id, + CreatedAt: time.Now(), + conn: conn, + outbound: make(chan []byte, outboundBuffer), + done: make(chan struct{}), + info: make(map[string]any), + store: map[string]bool{ + flagNotifyPairInfo: true, + flagPackReceive: true, + flagPackSending: true, + flagNotifyRoomInfo: true, + }, + rooms: make(map[string]struct{}), + pairs: make(map[string]struct{}), + } +} + +// ---- sending ------------------------------------------------------------- + +// Send marshals v and enqueues it for the writer goroutine. It is safe to call +// from any goroutine and at any time, and it never blocks: +// +// - if the client is closing, the frame is dropped (this is the branch that +// makes "send to a peer that is leaving" safe instead of a race/panic); +// - if the outbound buffer is full, this frame is dropped but the connection is +// kept. MWSE is a best-effort relay, so a momentarily slow consumer should +// lose a frame, not be disconnected (which would cascade under load). A +// genuinely dead consumer is still reaped: its writer eventually trips the +// write deadline, which ends the read loop and disconnects it. +func (c *Client) Send(v any) { + b, err := json.Marshal(v) + if err != nil { + return + } + select { + case c.outbound <- b: + case <-c.done: + // Client is gone; drop silently. + default: + // Buffer full; drop this frame, keep the connection. + atomic.AddUint64(&c.dropped, 1) + } +} + +// Dropped returns the number of frames dropped because the outbound buffer was +// full. Useful for load tests and operational metrics. +func (c *Client) Dropped() uint64 { return atomic.LoadUint64(&c.dropped) } + +// Signal sends a server-initiated message [payload, name] to this client. +func (c *Client) Signal(name string, payload any) { + c.Send(protocol.Signal(name, payload)) +} + +// ---- pumps --------------------------------------------------------------- + +// writePump is the ONLY goroutine that calls conn.WriteMessage. It exits when the +// client is closed, closing the socket on the way out. +func (c *Client) writePump() { + defer c.conn.Close() + for { + select { + case b := <-c.outbound: + _ = c.conn.SetWriteDeadline(time.Now().Add(defaultWriteWait)) + if err := c.conn.WriteMessage(websocket.TextMessage, b); err != nil { + return + } + case <-c.done: + return + } + } +} + +// Close tears the client's transport down exactly once. It does NOT run the +// logical disconnect (room/pair cleanup) — that is driven by the read loop's exit +// in server.go, so it always happens precisely once per connection. +func (c *Client) Close() { + c.closeOnce.Do(func() { + // Best-effort polite close frame; ignore errors (peer may already be gone). + _ = c.conn.WriteControl( + websocket.CloseMessage, + websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), + time.Now().Add(time.Second), + ) + close(c.done) + }) +} + +// Done exposes the close signal for select-based waits (used by the ping loop). +func (c *Client) Done() <-chan struct{} { return c.done } + +// ---- guarded state accessors -------------------------------------------- + +// SetInfo stores an application metadata value. +func (c *Client) SetInfo(name string, value any) { + c.mu.Lock() + c.info[name] = value + c.mu.Unlock() +} + +// InfoValue returns a single metadata value. +func (c *Client) InfoValue(name string) (any, bool) { + c.mu.RLock() + v, ok := c.info[name] + c.mu.RUnlock() + return v, ok +} + +// Info returns a copy of all metadata. A copy is returned so callers can range +// over it without holding the lock. +func (c *Client) Info() map[string]any { + c.mu.RLock() + out := make(map[string]any, len(c.info)) + for k, v := range c.info { + out[k] = v + } + c.mu.RUnlock() + return out +} + +// Match reports whether every key/value in filter is present and equal in this +// client's info (the room peer filter from Node's Client.match). +func (c *Client) Match(filter map[string]any) bool { + c.mu.RLock() + defer c.mu.RUnlock() + if len(filter) > len(c.info) { + return false + } + for k, want := range filter { + got, ok := c.info[k] + if !ok || got != want { + return false + } + } + return true +} + +// SetStore sets a session flag. +func (c *Client) SetStore(name string, v bool) { + c.mu.Lock() + c.store[name] = v + c.mu.Unlock() +} + +// store flag readers, named to match the original Client helpers. +func (c *Client) PackWriteable() bool { return c.storeFlag(flagPackReceive) } +func (c *Client) PackReadable() bool { return c.storeFlag(flagPackSending) } +func (c *Client) PeerInfoNotifiable() bool { return c.storeFlag(flagNotifyPairInfo) } +func (c *Client) RoomInfoNotifiable() bool { return c.storeFlag(flagNotifyRoomInfo) } + +func (c *Client) storeFlag(name string) bool { + c.mu.RLock() + v := c.store[name] + c.mu.RUnlock() + return v +} + +// ResetStore restores the default session flags. +func (c *Client) ResetStore() { + c.mu.Lock() + c.store[flagNotifyPairInfo] = true + c.store[flagPackReceive] = true + c.store[flagPackSending] = true + c.store[flagNotifyRoomInfo] = true + c.mu.Unlock() +} + +// RequiredPair / SetRequiredPair toggle the "private" reachability mode. +func (c *Client) RequiredPair() bool { + c.mu.RLock() + v := c.requiredPair + c.mu.RUnlock() + return v +} + +func (c *Client) SetRequiredPair(v bool) { + c.mu.Lock() + c.requiredPair = v + c.mu.Unlock() +} + +// ---- room membership ----------------------------------------------------- + +func (c *Client) addRoom(id string) { + c.mu.Lock() + c.rooms[id] = struct{}{} + c.mu.Unlock() +} + +func (c *Client) removeRoom(id string) { + c.mu.Lock() + delete(c.rooms, id) + c.mu.Unlock() +} + +// InRoom reports membership. +func (c *Client) InRoom(id string) bool { + c.mu.RLock() + _, ok := c.rooms[id] + c.mu.RUnlock() + return ok +} + +// Rooms returns a snapshot of room ids this client belongs to. +func (c *Client) Rooms() []string { + c.mu.RLock() + out := make([]string, 0, len(c.rooms)) + for id := range c.rooms { + out = append(out, id) + } + c.mu.RUnlock() + return out +} + +// ---- pairing ------------------------------------------------------------- + +// AddPair records that this client has paired toward other. +func (c *Client) AddPair(other string) { + c.mu.Lock() + c.pairs[other] = struct{}{} + c.mu.Unlock() +} + +// RemovePair drops a pairing edge from this client. +func (c *Client) RemovePair(other string) { + c.mu.Lock() + delete(c.pairs, other) + c.mu.Unlock() +} + +// HasPair reports whether this client has a pairing edge toward other. +func (c *Client) HasPair(other string) bool { + c.mu.RLock() + _, ok := c.pairs[other] + c.mu.RUnlock() + return ok +} + +// Pairs returns a snapshot of this client's pairing edges. +func (c *Client) Pairs() []string { + c.mu.RLock() + out := make([]string, 0, len(c.pairs)) + for id := range c.pairs { + out = append(out, id) + } + c.mu.RUnlock() + return out +} + +// ---- virtual address (IPPressure) --------------------------------------- + +func (c *Client) APNumber() int { + c.mu.RLock() + defer c.mu.RUnlock() + return c.apNumber +} +func (c *Client) SetAPNumber(v int) { + c.mu.Lock() + c.apNumber = v + c.mu.Unlock() +} +func (c *Client) APShortCode() string { + c.mu.RLock() + defer c.mu.RUnlock() + return c.apShortCode +} +func (c *Client) SetAPShortCode(v string) { + c.mu.Lock() + c.apShortCode = v + c.mu.Unlock() +} +func (c *Client) APIP() string { + c.mu.RLock() + defer c.mu.RUnlock() + return c.apIP +} +func (c *Client) SetAPIP(v string) { + c.mu.Lock() + c.apIP = v + c.mu.Unlock() +} diff --git a/internal/ws/conn.go b/internal/ws/conn.go new file mode 100644 index 0000000..ba7c433 --- /dev/null +++ b/internal/ws/conn.go @@ -0,0 +1,43 @@ +package ws + +import ( + "crypto/rand" + "fmt" + "time" +) + +// Conn is the minimal slice of *gorilla/websocket.Conn that the engine relies on. +// Depending on an interface (rather than the concrete type) lets the tests drive +// a Client with an in-memory fake connection, so the concurrency tests in #26 do +// not need real sockets. +// +// gorilla's contract: at most one goroutine may call the write methods at a time +// and at most one may call the read methods at a time, BUT Close and WriteControl +// may be called concurrently with everything else. The Client honours this by +// funnelling all WriteMessage calls through a single writer goroutine, while the +// ping loop uses WriteControl and shutdown uses Close. +type Conn interface { + ReadMessage() (messageType int, p []byte, err error) + WriteMessage(messageType int, data []byte) error + WriteControl(messageType int, data []byte, deadline time.Time) error + SetReadLimit(limit int64) + SetReadDeadline(t time.Time) error + SetWriteDeadline(t time.Time) error + SetPongHandler(h func(appData string) error) + Close() error +} + +// newUUID returns a random RFC 4122 version 4 UUID. The original server used +// Node's crypto.randomUUID(); this keeps client ids in the same shape without a +// third-party dependency. +func newUUID() string { + var b [16]byte + if _, err := rand.Read(b[:]); err != nil { + // crypto/rand failing is catastrophic and not something a relay can recover + // from sensibly; surface it loudly rather than handing out a zero id. + panic(fmt.Sprintf("ws: cannot read random bytes for uuid: %v", err)) + } + b[6] = (b[6] & 0x0f) | 0x40 // version 4 + b[8] = (b[8] & 0x3f) | 0x80 // variant 10 + return fmt.Sprintf("%x-%x-%x-%x-%x", b[0:4], b[4:6], b[6:8], b[8:10], b[10:16]) +} diff --git a/internal/ws/hub.go b/internal/ws/hub.go new file mode 100644 index 0000000..752e8d1 --- /dev/null +++ b/internal/ws/hub.go @@ -0,0 +1,208 @@ +package ws + +import ( + "log" + "sync" + + "git.saqut.com/saqut/mwse/internal/protocol" +) + +// Handler processes one inbound message and returns the value to reply with. +// Returning nil is allowed (it becomes JSON null, as `undefined` did in Node). +type Handler func(c *Client, m protocol.Message) any + +// Listener is notified of a connection lifecycle event. +type Listener func(c *Client) + +// Hub is the engine's shared state: the client and room registries, the message +// router, and the connect/disconnect event bus. Every map is guarded by its own +// RWMutex so unrelated subsystems never contend on one another. +type Hub struct { + cmu sync.RWMutex + clients map[string]*Client + + rmu sync.RWMutex + rooms map[string]*Room + + hmu sync.RWMutex + handlers map[string]Handler + + lmu sync.RWMutex + onConnect []Listener + onDisconnect []Listener +} + +// NewHub returns an empty hub. +func NewHub() *Hub { + return &Hub{ + clients: make(map[string]*Client), + rooms: make(map[string]*Room), + handlers: make(map[string]Handler), + } +} + +// ---- router -------------------------------------------------------------- + +// Register binds a message type to a handler. Re-registering a type overwrites it. +func (h *Hub) Register(msgType string, handler Handler) { + h.hmu.Lock() + h.handlers[msgType] = handler + h.hmu.Unlock() +} + +// HasHandler reports whether a type is registered (used by tests). +func (h *Hub) HasHandler(msgType string) bool { + h.hmu.RLock() + _, ok := h.handlers[msgType] + h.hmu.RUnlock() + return ok +} + +// Handle routes a message to its handler, mirroring the Node MessageRouter +// (MISSING_TYPE / UNKNOWN_TYPE / HANDLER_ERROR), and recovers from handler panics +// so one bad message can never take down the connection or the process. +func (h *Hub) Handle(c *Client, m protocol.Message) (result any) { + t := m.Type() + if t == "" { + return failMsg("MISSING_TYPE") + } + h.hmu.RLock() + handler, ok := h.handlers[t] + h.hmu.RUnlock() + if !ok { + return failMsg("UNKNOWN_TYPE") + } + + defer func() { + if r := recover(); r != nil { + log.Printf("handler panic [%s]: %v", t, r) + result = map[string]any{"status": "fail", "message": "HANDLER_ERROR"} + } + }() + return handler(c, m) +} + +func failMsg(msg string) map[string]any { + return map[string]any{"status": "fail", "message": msg} +} + +// ---- client registry ----------------------------------------------------- + +func (h *Hub) addClient(c *Client) { + h.cmu.Lock() + h.clients[c.ID] = c + h.cmu.Unlock() +} + +func (h *Hub) removeClient(id string) { + h.cmu.Lock() + delete(h.clients, id) + h.cmu.Unlock() +} + +// Client looks up a connected client by id. +func (h *Hub) Client(id string) (*Client, bool) { + h.cmu.RLock() + c, ok := h.clients[id] + h.cmu.RUnlock() + return c, ok +} + +// Clients returns a snapshot of all connected clients. +func (h *Hub) Clients() []*Client { + h.cmu.RLock() + out := make([]*Client, 0, len(h.clients)) + for _, c := range h.clients { + out = append(out, c) + } + h.cmu.RUnlock() + return out +} + +// ClientCount returns the number of connected clients. +func (h *Hub) ClientCount() int { + h.cmu.RLock() + n := len(h.clients) + h.cmu.RUnlock() + return n +} + +// ---- room registry ------------------------------------------------------- + +func (h *Hub) addRoom(r *Room) { + h.rmu.Lock() + h.rooms[r.ID] = r + h.rmu.Unlock() +} + +func (h *Hub) removeRoom(id string) { + h.rmu.Lock() + delete(h.rooms, id) + h.rmu.Unlock() +} + +// Room looks up a room by id. +func (h *Hub) Room(id string) (*Room, bool) { + h.rmu.RLock() + r, ok := h.rooms[id] + h.rmu.RUnlock() + return r, ok +} + +// Rooms returns a snapshot of all rooms. +func (h *Hub) Rooms() []*Room { + h.rmu.RLock() + out := make([]*Room, 0, len(h.rooms)) + for _, r := range h.rooms { + out = append(out, r) + } + h.rmu.RUnlock() + return out +} + +// RoomByName returns the first room with the given name. Room names are not +// unique by construction, so this matches the original "first match wins" lookup. +func (h *Hub) RoomByName(name string) (*Room, bool) { + h.rmu.RLock() + defer h.rmu.RUnlock() + for _, r := range h.rooms { + if r.Name == name { + return r, true + } + } + return nil, false +} + +// ---- event bus ----------------------------------------------------------- + +// OnConnect registers a listener fired after a client connects. +func (h *Hub) OnConnect(l Listener) { + h.lmu.Lock() + h.onConnect = append(h.onConnect, l) + h.lmu.Unlock() +} + +// OnDisconnect registers a listener fired when a client disconnects. +func (h *Hub) OnDisconnect(l Listener) { + h.lmu.Lock() + h.onDisconnect = append(h.onDisconnect, l) + h.lmu.Unlock() +} + +func (h *Hub) emitConnect(c *Client) { + h.lmu.RLock() + listeners := append([]Listener(nil), h.onConnect...) + h.lmu.RUnlock() + for _, l := range listeners { + l(c) + } +} + +func (h *Hub) emitDisconnect(c *Client) { + h.lmu.RLock() + listeners := append([]Listener(nil), h.onDisconnect...) + h.lmu.RUnlock() + for _, l := range listeners { + l(c) + } +} diff --git a/internal/ws/room.go b/internal/ws/room.go new file mode 100644 index 0000000..d9be36c --- /dev/null +++ b/internal/ws/room.go @@ -0,0 +1,253 @@ +package ws + +import ( + "sync" + "time" +) + +// Room is a set of clients that can be addressed together. Access types and join +// types mirror the original service. +type Room struct { + ID string + Name string + Description string + OwnerID string + CreatedAt time.Time + AccessType string // "public" | "private" + JoinType string // "free" | "invite" | "password" | "lock" | "notify" + + NotifyActionInvite bool + NotifyActionJoined bool + NotifyActionEjected bool + + Credential string // sha256 hex, or "" when none + + hub *Hub + + mu sync.RWMutex + clients map[string]*Client + waitingInvited map[string]struct{} + info map[string]any +} + +// NewRoom creates an empty, unpublished room with a fresh id and the same field +// defaults the Node constructor used. +func NewRoom(hub *Hub) *Room { + return &Room{ + ID: newUUID(), + CreatedAt: time.Now(), + JoinType: "invite", + NotifyActionJoined: true, + NotifyActionEjected: true, + hub: hub, + clients: make(map[string]*Client), + waitingInvited: make(map[string]struct{}), + info: make(map[string]any), + } +} + +// Publish registers the room in the hub so it can be looked up by id. +func (r *Room) Publish() { r.hub.addRoom(r) } + +// Size returns the current member count. +func (r *Room) Size() int { + r.mu.RLock() + n := len(r.clients) + r.mu.RUnlock() + return n +} + +// Has reports membership by client id. +func (r *Room) Has(id string) bool { + r.mu.RLock() + _, ok := r.clients[id] + r.mu.RUnlock() + return ok +} + +// snapshot returns the current members as a slice. The room lock is held only for +// the cheap copy; callers then send without holding any lock, so a member that +// disconnects mid-broadcast cannot deadlock or race the send (Client.Send drops +// safely once the peer is closing). +func (r *Room) snapshot() []*Client { + r.mu.RLock() + out := make([]*Client, 0, len(r.clients)) + for _, c := range r.clients { + out = append(out, c) + } + r.mu.RUnlock() + return out +} + +// Broadcast sends signal `name` with `payload` to every member except exceptID +// (pass "" to include everyone). If filter is non-nil, only members for which it +// returns true receive the message. +func (r *Room) Broadcast(name string, payload any, exceptID string, filter func(*Client) bool) { + for _, c := range r.snapshot() { + if c.ID == exceptID { + continue + } + if filter != nil && !filter(c) { + continue + } + c.Signal(name, payload) + } +} + +// Members returns a snapshot of the room's current members. +func (r *Room) Members() []*Client { return r.snapshot() } + +// FilterPeers returns the members whose info matches filter. +func (r *Room) FilterPeers(filter map[string]any) []*Client { + var out []*Client + for _, c := range r.snapshot() { + if c.Match(filter) { + out = append(out, c) + } + } + return out +} + +// Join adds a client to the room. Existing members are notified first (so the new +// member does not receive its own join), then membership is recorded on both the +// room and the client. +func (r *Room) Join(c *Client) { + if r.NotifyActionJoined { + r.Broadcast( + "room/joined", + map[string]any{"id": c.ID, "roomid": r.ID, "ownerid": r.OwnerID}, + "", + (*Client).PeerInfoNotifiable, + ) + } + r.mu.Lock() + r.clients[c.ID] = c + r.mu.Unlock() + c.addRoom(r.ID) +} + +// Eject removes a client from the room, notifying the remaining members. When the +// room empties it is taken down. +func (r *Room) Eject(c *Client) { + if r.NotifyActionEjected { + r.Broadcast( + "room/ejected", + map[string]any{"id": c.ID, "roomid": r.ID, "ownerid": r.OwnerID}, + c.ID, + (*Client).PeerInfoNotifiable, + ) + } + c.removeRoom(r.ID) + r.mu.Lock() + delete(r.clients, c.ID) + empty := len(r.clients) == 0 + r.mu.Unlock() + + if empty { + r.Down() + } +} + +// Down closes the room: members are told, the room is unregistered, and each +// member's membership record is cleared. +func (r *Room) Down() { + members := r.snapshot() + for _, c := range members { + c.Signal("room/closed", map[string]any{"roomid": r.ID, "ownerid": r.OwnerID}) + c.removeRoom(r.ID) + } + r.hub.removeRoom(r.ID) +} + +// ---- room info ----------------------------------------------------------- + +// SetInfo stores a room-level value. +func (r *Room) SetInfo(name string, value any) { + r.mu.Lock() + r.info[name] = value + r.mu.Unlock() +} + +// InfoValue returns a single room value. +func (r *Room) InfoValue(name string) (any, bool) { + r.mu.RLock() + v, ok := r.info[name] + r.mu.RUnlock() + return v, ok +} + +// Info returns a copy of all room values. +func (r *Room) Info() map[string]any { + r.mu.RLock() + out := make(map[string]any, len(r.info)) + for k, v := range r.info { + out[k] = v + } + r.mu.RUnlock() + return out +} + +// ---- invitations --------------------------------------------------------- + +// AddWaiting records a client awaiting an invite decision. +func (r *Room) AddWaiting(id string) { + r.mu.Lock() + r.waitingInvited[id] = struct{}{} + r.mu.Unlock() +} + +// RemoveWaiting drops a client from the invite waiting list. +func (r *Room) RemoveWaiting(id string) { + r.mu.Lock() + delete(r.waitingInvited, id) + r.mu.Unlock() +} + +// IsWaiting reports whether a client is on the invite waiting list. +func (r *Room) IsWaiting(id string) bool { + r.mu.RLock() + _, ok := r.waitingInvited[id] + r.mu.RUnlock() + return ok +} + +func (r *Room) waitingList() []string { + r.mu.RLock() + out := make([]string, 0, len(r.waitingInvited)) + for id := range r.waitingInvited { + out = append(out, id) + } + r.mu.RUnlock() + return out +} + +// ---- serialization ------------------------------------------------------- + +// ToJSON renders the room the way the SDK expects. When detailed is true the +// sensitive/owner-only fields are included. +func (r *Room) ToJSON(detailed bool) map[string]any { + obj := map[string]any{ + "id": r.ID, + "accessType": r.AccessType, + "createdAt": r.CreatedAt, + "description": r.Description, + "joinType": r.JoinType, + "name": r.Name, + "owner": r.OwnerID, + "waitingInvited": r.waitingList(), + } + if detailed { + obj["credential"] = r.Credential + obj["notifyActionInvite"] = r.NotifyActionInvite + obj["notifyActionJoined"] = r.NotifyActionJoined + obj["notifyActionEjected"] = r.NotifyActionEjected + r.mu.RLock() + ids := make([]string, 0, len(r.clients)) + for id := range r.clients { + ids = append(ids, id) + } + r.mu.RUnlock() + obj["clients"] = ids + } + return obj +} diff --git a/internal/ws/server.go b/internal/ws/server.go new file mode 100644 index 0000000..d07bc8d --- /dev/null +++ b/internal/ws/server.go @@ -0,0 +1,186 @@ +package ws + +import ( + "log" + "net/http" + "time" + + "github.com/gorilla/websocket" + + "git.saqut.com/saqut/mwse/internal/protocol" +) + +// Heartbeat and transport defaults. PingPeriod matches the original server's 10s +// interval; the ping payload is the same magic string the SDK's pong echoes. +const ( + defaultWriteWait = 10 * time.Second + defaultPongWait = 60 * time.Second + defaultPingPeriod = 10 * time.Second + defaultMaxMessageSize = 4 << 20 // 4 MiB + + pingPayload = "saQut" +) + +// Server upgrades HTTP requests to WebSocket connections and runs each one's +// lifecycle. It holds no per-connection state itself; everything lives on the Hub. +type Server struct { + hub *Hub + upgrader websocket.Upgrader + pingPeriod time.Duration + pongWait time.Duration +} + +// NewServer returns a Server bound to hub. CheckOrigin always returns true to +// preserve the original autoAcceptConnections behaviour (origin policy is a +// deployment concern handled in front of the engine). +func NewServer(hub *Hub) *Server { + return &Server{ + hub: hub, + upgrader: websocket.Upgrader{ + ReadBufferSize: 4096, + WriteBufferSize: 4096, + CheckOrigin: func(*http.Request) bool { return true }, + }, + pingPeriod: defaultPingPeriod, + pongWait: defaultPongWait, + } +} + +// ServeHTTP implements http.Handler: it upgrades the request and hands the +// connection to the lifecycle. It is the WebSocket endpoint. +func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { + conn, err := s.upgrader.Upgrade(w, r, nil) + if err != nil { + log.Printf("ws: upgrade failed: %v", err) + return + } + s.handle(conn) +} + +// handle drives one connection from accept to disconnect. It is generic over the +// Conn interface so tests can feed it a scripted in-memory connection. +func (s *Server) handle(conn Conn) { + client := NewClient(conn, newUUID()) + + // Connect: register, start the writer, fire connect listeners (id, private + // room, session defaults). Must happen before the read loop so the client can + // already receive server-initiated messages. + s.hub.Connect(client) + + // Heartbeat lives in its own goroutine; it stops when the client closes. + go s.pingLoop(client) + + // Read loop blocks here until the socket errors or closes. + s.readLoop(client) + + // Disconnect runs exactly once, here, when the read loop ends. + s.hub.Disconnect(client) +} + +// readLoop consumes inbound frames. Pongs that do not echo the magic payload drop +// the connection (matching the original); a valid pong extends the read deadline. +func (s *Server) readLoop(c *Client) { + c.conn.SetReadLimit(defaultMaxMessageSize) + _ = c.conn.SetReadDeadline(time.Now().Add(s.pongWait)) + c.conn.SetPongHandler(func(appData string) error { + if appData != pingPayload { + return errBadPong + } + return c.conn.SetReadDeadline(time.Now().Add(s.pongWait)) + }) + + for { + msgType, data, err := c.conn.ReadMessage() + if err != nil { + return + } + if msgType != websocket.TextMessage { + continue // the protocol is JSON text; ignore binary frames + } + s.dispatch(c, data) + } +} + +// errBadPong is returned by the pong handler to force a disconnect. +var errBadPong = &pongError{} + +type pongError struct{} + +func (*pongError) Error() string { return "ws: pong validation failed" } + +// dispatch decodes one frame and routes it, then replies according to the WSTS +// rules. A frame that fails to decode is logged as a message error (the Node +// server emitted a 'messageError' event here). +func (s *Server) dispatch(c *Client, data []byte) { + env, err := protocol.Decode(data) + if err != nil { + log.Printf("ws: message error from %s: %v", c.ID, err) + return + } + + result := s.hub.Handle(c, env.Message) + + if flag, ok := env.WantsReply(); ok { + c.Send(protocol.Reply(result, env.ID, flag)) + return + } + + // "No id" branch: the original inspected the result for a broadcast directive. + // No service currently emits one, but the hook is preserved for parity. + if env.IsBroadcast() { + if m, ok := result.(map[string]any); ok { + if _, has := m["broadcast"]; has { + log.Printf("ws: broadcast directive from %s (no listener registered)", c.ID) + } + } + } +} + +// pingLoop sends a ping with the magic payload every ping period until the client +// closes. +func (s *Server) pingLoop(c *Client) { + ticker := time.NewTicker(s.pingPeriod) + defer ticker.Stop() + for { + select { + case <-ticker.C: + err := c.conn.WriteControl( + websocket.PingMessage, + []byte(pingPayload), + time.Now().Add(defaultWriteWait), + ) + if err != nil { + c.Close() + return + } + case <-c.Done(): + return + } + } +} + +// ---- lifecycle helpers (shared by the server and by tests) --------------- + +// Connect registers a client, starts its writer goroutine, and fires the connect +// listeners. Exposed so tests and tools can drive the same path the server uses. +func (h *Hub) Connect(c *Client) { + h.addClient(c) + go c.writePump() + h.emitConnect(c) +} + +// Disconnect fires the disconnect listeners, unregisters the client, and tears +// its transport down. Safe to call once per client. +func (h *Hub) Disconnect(c *Client) { + h.emitDisconnect(c) + h.removeClient(c.ID) + c.Close() +} + +// CloseAll closes every connected client. Used during graceful shutdown so peers +// receive a clean close frame before the process exits. +func (h *Hub) CloseAll() { + for _, c := range h.Clients() { + c.Close() + } +} diff --git a/internal/ws/ws_test.go b/internal/ws/ws_test.go new file mode 100644 index 0000000..0301fa0 --- /dev/null +++ b/internal/ws/ws_test.go @@ -0,0 +1,271 @@ +package ws + +import ( + "encoding/json" + "fmt" + "sync" + "testing" + "time" + + "git.saqut.com/saqut/mwse/internal/protocol" + "git.saqut.com/saqut/mwse/internal/testutil" +) + +// newTestClient builds a client backed by an in-memory connection and starts its +// writer pump, the way the real server does. +func newTestClient(id string) (*Client, *testutil.FakeConn) { + fc := testutil.NewFakeConn() + c := NewClient(fc, id) + go c.writePump() + return c, fc +} + +// waitFor polls cond until it holds or the deadline passes. +func waitFor(t *testing.T, cond func() bool) { + t.Helper() + for i := 0; i < 500; i++ { + if cond() { + return + } + time.Sleep(time.Millisecond) + } + t.Fatal("condition not met within timeout") +} + +// lastSignal decodes the most recent [payload, name] frame from fc. +func lastSignal(t *testing.T, fc *testutil.FakeConn) (string, map[string]any) { + t.Helper() + writes := fc.Writes() + if len(writes) == 0 { + t.Fatal("no frames written") + } + var arr []any + if err := json.Unmarshal(writes[len(writes)-1], &arr); err != nil { + t.Fatalf("decode frame: %v", err) + } + name, _ := arr[1].(string) + payload, _ := arr[0].(map[string]any) + return name, payload +} + +func TestSendConcurrentWithClose(t *testing.T) { + // The core safety property of the rewrite: sending to a client that is closing + // must never panic or race, only drop. Run under -race. + for trial := 0; trial < 30; trial++ { + c, _ := newTestClient("x") + var wg sync.WaitGroup + for i := 0; i < 8; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < 100; j++ { + c.Send(map[string]any{"j": j}) + } + }() + } + go c.Close() + wg.Wait() + } +} + +func TestSendAfterCloseDoesNotPanic(t *testing.T) { + c, _ := newTestClient("x") + c.Close() + waitFor(t, func() bool { + select { + case <-c.Done(): + return true + default: + return false + } + }) + // Should return promptly and not panic. + c.Send(map[string]any{"ignored": true}) +} + +func TestRoomBroadcastDelivers(t *testing.T) { + hub := NewHub() + room := NewRoom(hub) + room.OwnerID = "o" + room.NotifyActionJoined = false // keep frame counts clean + room.Publish() + + a, fa := newTestClient("a") + b, fb := newTestClient("b") + hub.addClient(a) + hub.addClient(b) + room.Join(a) + room.Join(b) + + room.Broadcast("pack/room", map[string]any{"hello": float64(1)}, "", nil) + + waitFor(t, func() bool { return fa.WriteCount() >= 1 && fb.WriteCount() >= 1 }) + + name, payload := lastSignal(t, fa) + if name != "pack/room" { + t.Fatalf("signal name = %q, want pack/room", name) + } + if payload["hello"] != float64(1) { + t.Fatalf("payload = %v", payload) + } +} + +func TestRoomBroadcastRespectsExceptAndFilter(t *testing.T) { + hub := NewHub() + room := NewRoom(hub) + room.OwnerID = "o" + room.NotifyActionJoined = false + room.Publish() + + a, fa := newTestClient("a") + b, fb := newTestClient("b") + hub.addClient(a) + hub.addClient(b) + room.Join(a) + room.Join(b) + + // b opts out of peer-info notifications; the filter must skip it. + b.SetStore(flagNotifyPairInfo, false) + + room.Broadcast("x", map[string]any{"n": float64(1)}, "", (*Client).PeerInfoNotifiable) + waitFor(t, func() bool { return fa.WriteCount() >= 1 }) + time.Sleep(20 * time.Millisecond) // give a wrong delivery a chance to show up + + if fb.WriteCount() != 0 { + t.Fatalf("filtered-out client received %d frames", fb.WriteCount()) + } + + // exceptID must also be honoured. + room.Broadcast("y", map[string]any{}, a.ID, nil) + waitFor(t, func() bool { return fb.WriteCount() >= 1 }) + if name, _ := lastSignal(t, fa); name == "y" { + t.Fatal("exceptID client should not have received the second broadcast") + } +} + +func TestRoomEmptyTriggersDown(t *testing.T) { + hub := NewHub() + room := NewRoom(hub) + room.OwnerID = "a" + room.Publish() + + a, _ := newTestClient("a") + hub.addClient(a) + room.Join(a) + + room.Eject(a) + + if _, ok := hub.Room(room.ID); ok { + t.Fatal("room should be removed from hub when it empties") + } + if a.InRoom(room.ID) { + t.Fatal("client should no longer reference a downed room") + } +} + +// TestLeaveWhileSendRace is the #22 regression: broadcasting to a room while +// members concurrently leave and rejoin. Under -race this catches any unguarded +// access to shared room/peer state — the exact failure the Node engine had. +func TestLeaveWhileSendRace(t *testing.T) { + hub := NewHub() + room := NewRoom(hub) + room.OwnerID = "owner" + room.Publish() + + const n = 30 + clients := make([]*Client, n) + for i := 0; i < n; i++ { + c, _ := newTestClient(fmt.Sprintf("c%d", i)) + clients[i] = c + hub.addClient(c) + room.Join(c) + } + + var wg sync.WaitGroup + + // Broadcasters hammer the room. + for g := 0; g < 4; g++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < 300; j++ { + room.Broadcast("pack/room", map[string]any{"j": j}, "", nil) + } + }() + } + + // Churn: members leave and rejoin repeatedly while broadcasts are in flight. + for i := 0; i < n; i++ { + wg.Add(1) + go func(c *Client) { + defer wg.Done() + for k := 0; k < 100; k++ { + room.Eject(c) + room.Join(c) + } + }(clients[i]) + } + + wg.Wait() + + for _, c := range clients { + c.Close() + } +} + +func TestHubRegistryConcurrency(t *testing.T) { + hub := NewHub() + hub.Register("noop", func(c *Client, m protocol.Message) any { return success() }) + + var wg sync.WaitGroup + for g := 0; g < 8; g++ { + wg.Add(1) + go func(g int) { + defer wg.Done() + for i := 0; i < 200; i++ { + id := fmt.Sprintf("g%d-%d", g, i) + c := NewClient(testutil.NewFakeConn(), id) + hub.addClient(c) + _, _ = hub.Client(id) + _ = hub.Clients() + _ = hub.ClientCount() + hub.removeClient(id) + } + }(g) + } + wg.Wait() +} + +// success mirrors the services helper so the hub test stays self-contained. +func success() map[string]any { return map[string]any{"status": "success"} } + +func TestServerHandleRepliesToRequest(t *testing.T) { + hub := NewHub() + hub.Register("ping", func(c *Client, m protocol.Message) any { + return map[string]any{"pong": true} + }) + srv := NewServer(hub) + + fc := testutil.NewFakeConn() + go srv.handle(fc) + + fc.Push([]byte(`[{"type":"ping"}, 5, "R"]`)) + waitFor(t, func() bool { return fc.WriteCount() >= 1 }) + + writes := fc.Writes() + var arr []any + if err := json.Unmarshal(writes[len(writes)-1], &arr); err != nil { + t.Fatalf("decode reply: %v", err) + } + // Expect [ {"pong":true}, 5, "E" ]. + if len(arr) != 3 { + t.Fatalf("reply arity = %d, want 3", len(arr)) + } + if arr[1] != float64(5) { + t.Fatalf("reply id = %v, want 5", arr[1]) + } + if arr[2] != protocol.FlagEnd { + t.Fatalf("reply flag = %v, want E", arr[2]) + } + fc.Close() +} diff --git a/loadtest/client.go b/loadtest/client.go new file mode 100644 index 0000000..fea94bb --- /dev/null +++ b/loadtest/client.go @@ -0,0 +1,182 @@ +package main + +import ( + "encoding/json" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/gorilla/websocket" +) + +// Client is a minimal WSTS speaker used by the load tester. It mirrors the +// frontend SDK's framing: requests carry a numeric id and a trailing "R", and the +// engine replies with [payload, id, "E"]; server-initiated messages arrive as +// [payload, name]. +type Client struct { + ID string // assigned by the server's "id" signal + + conn *websocket.Conn + counter int64 + + mu sync.Mutex + pending map[int64]chan json.RawMessage + + sigMu sync.Mutex + signals map[string]func(json.RawMessage) + + writeMu sync.Mutex // gorilla allows only one concurrent writer + + closed atomic.Bool + idReady chan struct{} +} + +// Dial connects to the engine at url and starts the read loop. +func Dial(url string) (*Client, error) { + conn, _, err := websocket.DefaultDialer.Dial(url, nil) + if err != nil { + return nil, err + } + c := &Client{ + conn: conn, + pending: make(map[int64]chan json.RawMessage), + signals: make(map[string]func(json.RawMessage)), + idReady: make(chan struct{}), + } + // Capture our socket id from the YourID signal. + c.OnSignal("id", func(raw json.RawMessage) { + var p struct { + Value string `json:"value"` + } + if json.Unmarshal(raw, &p) == nil && p.Value != "" { + c.ID = p.Value + close(c.idReady) + } + }) + go c.readLoop() + return c, nil +} + +// WaitID blocks until the server has told us our socket id (or the timeout fires). +func (c *Client) WaitID(timeout time.Duration) error { + select { + case <-c.idReady: + return nil + case <-time.After(timeout): + return fmt.Errorf("timed out waiting for socket id") + } +} + +// OnSignal registers a handler for a server-initiated signal name. +func (c *Client) OnSignal(name string, fn func(json.RawMessage)) { + c.sigMu.Lock() + c.signals[name] = fn + c.sigMu.Unlock() +} + +// Request sends a request and waits for the engine's reply, returning the round +// trip time. The payload must be a JSON object carrying a "type". +func (c *Client) Request(payload any, timeout time.Duration) (json.RawMessage, time.Duration, error) { + id := atomic.AddInt64(&c.counter, 1) + ch := make(chan json.RawMessage, 1) + c.mu.Lock() + c.pending[id] = ch + c.mu.Unlock() + + defer func() { + c.mu.Lock() + delete(c.pending, id) + c.mu.Unlock() + }() + + start := time.Now() + if err := c.write([]any{payload, id, "R"}); err != nil { + return nil, 0, err + } + select { + case resp := <-ch: + return resp, time.Since(start), nil + case <-time.After(timeout): + return nil, 0, fmt.Errorf("request timed out") + } +} + +// SendOnly sends a fire-and-forget message (the "R" string id path) for which the +// engine produces no reply. +func (c *Client) SendOnly(payload any) error { + return c.write([]any{payload, "R"}) +} + +func (c *Client) write(v any) error { + b, err := json.Marshal(v) + if err != nil { + return err + } + c.writeMu.Lock() + defer c.writeMu.Unlock() + return c.conn.WriteMessage(websocket.TextMessage, b) +} + +// Close shuts the connection down. +func (c *Client) Close() { + if c.closed.Swap(true) { + return + } + _ = c.conn.Close() +} + +func (c *Client) readLoop() { + for { + _, data, err := c.conn.ReadMessage() + if err != nil { + return + } + var arr []json.RawMessage + if json.Unmarshal(data, &arr) != nil || len(arr) < 2 { + continue + } + + // arr[1] is either a numeric id (a reply) or a string (a signal). + var num int64 + if json.Unmarshal(arr[1], &num) == nil && looksNumeric(arr[1]) { + c.deliverReply(num, arr[0]) + continue + } + var name string + if json.Unmarshal(arr[1], &name) == nil { + c.deliverSignal(name, arr[0]) + } + } +} + +func (c *Client) deliverReply(id int64, payload json.RawMessage) { + c.mu.Lock() + ch := c.pending[id] + c.mu.Unlock() + if ch != nil { + select { + case ch <- payload: + default: + } + } +} + +func (c *Client) deliverSignal(name string, payload json.RawMessage) { + c.sigMu.Lock() + fn := c.signals[name] + c.sigMu.Unlock() + if fn != nil { + fn(payload) + } +} + +// looksNumeric reports whether a raw JSON token is a number (so "5" is a reply id +// but "\"room/joined\"" is a signal name). +func looksNumeric(raw json.RawMessage) bool { + if len(raw) == 0 { + return false + } + c := raw[0] + return c == '-' || (c >= '0' && c <= '9') +} diff --git a/loadtest/go.mod b/loadtest/go.mod new file mode 100644 index 0000000..7e36f1f --- /dev/null +++ b/loadtest/go.mod @@ -0,0 +1,5 @@ +module git.saqut.com/saqut/mwse-loadtest + +go 1.26.3 + +require github.com/gorilla/websocket v1.5.3 diff --git a/loadtest/go.sum b/loadtest/go.sum new file mode 100644 index 0000000..25a9fc4 --- /dev/null +++ b/loadtest/go.sum @@ -0,0 +1,2 @@ +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= diff --git a/loadtest/main.go b/loadtest/main.go new file mode 100644 index 0000000..ab1455f --- /dev/null +++ b/loadtest/main.go @@ -0,0 +1,164 @@ +// Command mwse-loadtest drives the MWSE engine with many concurrent WebSocket +// clients to smoke-test correctness and measure throughput/latency. It is a +// separate Go module so it can be built and run independently of the engine, and +// reused as a benchmark harness. +// +// Run the engine, then: +// +// go run . # default: ping mode, 50 clients, 10s +// go run . -mode relay -clients 200 -dur 15s +// go run . -addr ws://localhost:7707/ -clients 500 -mode ping +package main + +import ( + "encoding/json" + "flag" + "fmt" + "log" + "sync" + "sync/atomic" + "time" +) + +func main() { + addr := flag.String("addr", "ws://localhost:7707/", "engine WebSocket URL") + clients := flag.Int("clients", 50, "number of concurrent clients") + dur := flag.Duration("dur", 10*time.Second, "test duration") + mode := flag.String("mode", "ping", "scenario: ping | relay") + flag.Parse() + + log.Printf("connecting %d clients to %s ...", *clients, *addr) + conns := dialAll(*addr, *clients) + defer func() { + for _, c := range conns { + c.Close() + } + }() + log.Printf("connected %d clients", len(conns)) + + switch *mode { + case "ping": + runPing(conns, *dur) + case "relay": + runRelay(conns, *dur) + default: + log.Fatalf("unknown mode %q (want ping or relay)", *mode) + } +} + +// dialAll connects n clients and waits for each to receive its socket id. +func dialAll(addr string, n int) []*Client { + var ( + mu sync.Mutex + conns []*Client + wg sync.WaitGroup + ) + for i := 0; i < n; i++ { + wg.Add(1) + go func() { + defer wg.Done() + c, err := Dial(addr) + if err != nil { + log.Printf("dial failed: %v", err) + return + } + if err := c.WaitID(5 * time.Second); err != nil { + log.Printf("no id: %v", err) + c.Close() + return + } + mu.Lock() + conns = append(conns, c) + mu.Unlock() + }() + } + wg.Wait() + return conns +} + +// runPing has every client issue back-to-back requests (my/socketid) for the +// duration, measuring round-trip latency and total throughput. +func runPing(conns []*Client, dur time.Duration) { + var lat latency + var errors int64 + deadline := time.Now().Add(dur) + + var wg sync.WaitGroup + for _, c := range conns { + wg.Add(1) + go func(c *Client) { + defer wg.Done() + for time.Now().Before(deadline) { + _, rtt, err := c.Request(map[string]any{"type": "my/socketid"}, 5*time.Second) + if err != nil { + atomic.AddInt64(&errors, 1) + return + } + lat.record(rtt) + } + }(c) + } + wg.Wait() + + total := lat.count() + p50, p90, p99, max := lat.percentiles() + fmt.Println("\n=== ping results ===") + fmt.Printf("clients : %d\n", len(conns)) + fmt.Printf("requests : %d\n", total) + fmt.Printf("errors : %d\n", errors) + fmt.Printf("throughput : %.0f req/s\n", float64(total)/dur.Seconds()) + fmt.Printf("latency p50 : %s\n", p50) + fmt.Printf("latency p90 : %s\n", p90) + fmt.Printf("latency p99 : %s\n", p99) + fmt.Printf("latency max : %s\n", max) +} + +// runRelay pairs clients (2i <-> 2i+1) and has each send fire-and-forget packs to +// its partner, counting deliveries to measure relay fanout throughput. +func runRelay(conns []*Client, dur time.Duration) { + if len(conns) < 2 { + log.Fatal("relay mode needs at least 2 clients") + } + + var delivered int64 + for _, c := range conns { + c.OnSignal("pack", func(json.RawMessage) { atomic.AddInt64(&delivered, 1) }) + } + + var sent int64 + deadline := time.Now().Add(dur) + var wg sync.WaitGroup + + for i := 0; i+1 < len(conns); i += 2 { + a, b := conns[i], conns[i+1] + for _, pair := range [][2]*Client{{a, b}, {b, a}} { + src, dst := pair[0], pair[1] + wg.Add(1) + go func(src, dst *Client) { + defer wg.Done() + for time.Now().Before(deadline) { + err := src.SendOnly(map[string]any{ + "type": "pack/to", + "to": dst.ID, + "pack": map[string]any{"t": time.Now().UnixNano()}, + }) + if err != nil { + return + } + atomic.AddInt64(&sent, 1) + } + }(src, dst) + } + } + wg.Wait() + + // Allow a moment for the last in-flight relays to arrive. + time.Sleep(250 * time.Millisecond) + + fmt.Println("\n=== relay results ===") + fmt.Printf("clients : %d\n", len(conns)) + fmt.Printf("packs sent : %d\n", sent) + fmt.Printf("packs recvd : %d\n", atomic.LoadInt64(&delivered)) + fmt.Printf("send rate : %.0f msg/s\n", float64(sent)/dur.Seconds()) + fmt.Printf("recv rate : %.0f msg/s\n", float64(delivered)/dur.Seconds()) +} diff --git a/loadtest/stats.go b/loadtest/stats.go new file mode 100644 index 0000000..1e37384 --- /dev/null +++ b/loadtest/stats.go @@ -0,0 +1,42 @@ +package main + +import ( + "sort" + "sync" + "time" +) + +// latency collects request durations and reports percentiles. It is safe for +// concurrent recording. +type latency struct { + mu sync.Mutex + samples []time.Duration +} + +func (l *latency) record(d time.Duration) { + l.mu.Lock() + l.samples = append(l.samples, d) + l.mu.Unlock() +} + +func (l *latency) count() int { + l.mu.Lock() + defer l.mu.Unlock() + return len(l.samples) +} + +// percentiles returns p50/p90/p99 and the max. Returns zeros when empty. +func (l *latency) percentiles() (p50, p90, p99, max time.Duration) { + l.mu.Lock() + s := append([]time.Duration(nil), l.samples...) + l.mu.Unlock() + if len(s) == 0 { + return 0, 0, 0, 0 + } + sort.Slice(s, func(i, j int) bool { return s[i] < s[j] }) + at := func(q float64) time.Duration { + idx := int(q * float64(len(s)-1)) + return s[idx] + } + return at(0.50), at(0.90), at(0.99), s[len(s)-1] +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..c641468 --- /dev/null +++ b/main.go @@ -0,0 +1,61 @@ +// Command mwse is the MWSE engine: a WebSocket relay that virtualizes connected +// peers so they can exchange data through rooms, pairings and data tunnels without +// knowing one another's real identity. It is the Go rewrite of the original +// Node.js engine; the on-the-wire SDK contract is unchanged. +package main + +import ( + "context" + "errors" + "log" + "net/http" + "os" + "os/signal" + "syscall" + + "git.saqut.com/saqut/mwse/internal/config" + "git.saqut.com/saqut/mwse/internal/httpserver" + "git.saqut.com/saqut/mwse/internal/services" + "git.saqut.com/saqut/mwse/internal/ws" +) + +func main() { + cfg := config.Load() + + hub := ws.NewHub() + services.Register(hub) + + srv := httpserver.New(hub, cfg) + + // Run the listener in the background so main can wait for a shutdown signal. + serverErr := make(chan error, 1) + go func() { + log.Printf("MWSE engine listening on %s", cfg.Addr()) + if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + serverErr <- err + } + }() + + // Wait for SIGINT/SIGTERM or a fatal listener error. + stop := make(chan os.Signal, 1) + signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM) + + select { + case err := <-serverErr: + log.Fatalf("MWSE engine failed: %v", err) + case sig := <-stop: + log.Printf("received %s, shutting down gracefully", sig) + } + + // Graceful shutdown: stop accepting new HTTP work, then close every live + // WebSocket connection so peers receive a clean close frame. + ctx, cancel := context.WithTimeout(context.Background(), cfg.ShutdownTimeout) + defer cancel() + + if err := srv.Shutdown(ctx); err != nil { + log.Printf("http shutdown error: %v", err) + } + hub.CloseAll() + + log.Printf("MWSE engine stopped") +}