diff --git a/decisions.md b/decisions.md index 673c039..eb219ca 100644 --- a/decisions.md +++ b/decisions.md @@ -28,6 +28,26 @@ Kullanıcı talimatı: "tüm fonksiyonların aynı olması önemli değil; amaç - **`auth/info` çift gönderim:** Hem pair hem roompair olan bir peer iki `pair/info` alır (Node ile aynı). - **Heartbeat:** 10sn `saQut` ping; pong "saQut" değilse bağlantı kapanır (Node ile aynı). +## Ölçek ayarı & leak sağlamlaştırma (yüksek bağlantı + bitmek bilmeyen trafik) + +Hedef: çok yüksek bağlantı sayısı + sürekli mesaj trafiği; limitleri/poolları yüksek tut, belleği bol kullan ama **leak yok**. + +6. **Yapılandırılabilir, yüksek limitler** (`internal/config` `ConnConfig`, env ile): + - `MWSE_OUTBOUND_BUFFER` (varsayılan **1024**) — bağlantı başına gönderim kuyruğu. Bellek notu: kanal ~24 B/slot önceden ayrılır → ~24 KiB/bağlantı (100k bağlantıda ~2.4 GiB boştayken). Düşük bellekli host'ta düşür, burst'lü üreticide yükselt. + - `MWSE_MAX_MESSAGE_SIZE` (varsayılan **16 MiB**) — büyük tünel payload'ları (#30) için yüksek. + - `MWSE_READ_BUFFER` / `MWSE_WRITE_BUFFER`, `MWSE_PING_INTERVAL`, `MWSE_PONG_WAIT`, `MWSE_WRITE_WAIT`. +7. **gorilla `WriteBufferPool` (paylaşımlı `sync.Pool`)** — yazma scratch buffer'ları bağlantılar arasında yeniden kullanılır; yüksek bağlantı sayısında büyük bellek tasarrufu, leak yok (GC yönetir). 150 bağlantı + ağır trafikte RSS ~43 MB ölçüldü. +8. **Dolu buffer'da düşür-ama-kapatma** (bkz. #5): yüksek buffer + bu politika → relay teslimat %98.5'ten **%99.98'e** çıktı (150 bağlantıda 630974 gönderim, 99 düşüş). + +### Leak düzeltmeleri (sınırsız büyümeyi önle) +9. **Pairing ters-indeksi (`pairedBy`).** Önceki halde tek-yönlü bekleyen istekler (hedef yanıt vermeden koparsa) uzun ömürlü istemcinin `pairs` setinde **kalıcı çöp** bırakıyordu → churn altında sınırsız büyüme. Çözüm: her istemci hem giden (`pairs`) hem gelen (`pairedBy`) kenarları tutar; `AddPair/RemovePair` iki tarafı da günceller (kilitler iç içe değil → deadlock yok); disconnect'te `ForgetPeer` ile X'e değen TÜM kenarlar **O(derece)** temizlenir (tüm istemcileri taramadan). +10. **Davet bekleme listesi temizliği.** İstemci `waiting` (beklediği oda id'leri) tutar; disconnect'te ilgili odaların `waitingInvited` setinden düşülür → ölü id birikmez. +11. **`realloc` artık farklı adres verir.** Node "önce release sonra lock" yapıyordu → tek istemcide aynı adresi döndürüyordu (işlevsiz). Düzeltme: **önce yeni adresi al, sonra eskiyi bırak** (eski rezerve kaldığından yeni mutlaka farklı); adres alanı tükenmişse eskiyi koru, fail dön. +12. **Goroutine sızıntısı yok:** writePump/pingLoop/readLoop hepsi `done`/conn-close ile çıkar; gerçekten ölü peer write-deadline ile temizlenir. + +### #27 paritesi — eklenen +13. **`ifexistsJoin`:** create-room'da isim çakışırsa hata yerine mevcut odaya katıl (tek tur "create or join"). + ## Session varsayılanları `packrecaive`/`packsending`/`notifyPairInfo`/`notifyRoomInfo` varsayılanları **`NewClient` constructor'ında** set ediliyor (listener sırasından bağımsız her zaman mevcut). Session servisinin connect hook'u parite için yine de bunları yeniden uyguluyor. diff --git a/internal/config/config.go b/internal/config/config.go index 58fafbb..8d2caf8 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -21,6 +21,37 @@ type Config struct { ShutdownTimeout time.Duration // grace period for in-flight work on shutdown TermOutput bool // verbose terminal logging (the old `termoutput` flag) + + // --- scale / throughput knobs (defaults kept deliberately high) --------- + // + // The engine is built for very high connection counts and endless message + // traffic, so these are tuned generous. They are env-overridable both up (more + // headroom) and down (tighter memory on small hosts). See the memory note on + // OutboundBuffer below. + Conn ConnConfig +} + +// ConnConfig groups per-connection transport tuning. +type ConnConfig struct { + // OutboundBuffer is how many frames may queue for one client before further + // frames are dropped (best-effort relay; the connection is kept alive). + // + // Memory note: the channel backing this is preallocated per connection at + // ~24 bytes/slot, so the default 1024 costs ~24 KiB/connection (~2.4 GiB at + // 100k connections) even while idle. Lower MWSE_OUTBOUND_BUFFER on memory- + // constrained hosts; raise it to tolerate burstier producers. + OutboundBuffer int + + // MaxMessageSize caps a single inbound frame. High by default to support the + // large tunneled payloads used for file transfer (#30). + MaxMessageSize int64 + + ReadBufferSize int // gorilla per-connection read buffer + WriteBufferSize int // gorilla write buffer size (pooled across connections) + + PingInterval time.Duration // heartbeat period (the original used 10s) + PongWait time.Duration // how long to wait for a pong before dropping + WriteWait time.Duration // deadline for a single socket write } // Load reads configuration from the environment, applying defaults that match the @@ -37,6 +68,30 @@ func Load() Config { ReadHeaderTimeout: 10 * time.Second, ShutdownTimeout: time.Duration(envInt("MWSE_SHUTDOWN_TIMEOUT", 10)) * time.Second, TermOutput: envBool("MWSE_TERM_OUTPUT", false), + Conn: ConnConfig{ + OutboundBuffer: envInt("MWSE_OUTBOUND_BUFFER", 1024), + MaxMessageSize: int64(envInt("MWSE_MAX_MESSAGE_SIZE", 16<<20)), + ReadBufferSize: envInt("MWSE_READ_BUFFER", 4096), + WriteBufferSize: envInt("MWSE_WRITE_BUFFER", 4096), + PingInterval: time.Duration(envInt("MWSE_PING_INTERVAL", 10)) * time.Second, + PongWait: time.Duration(envInt("MWSE_PONG_WAIT", 60)) * time.Second, + WriteWait: time.Duration(envInt("MWSE_WRITE_WAIT", 10)) * time.Second, + }, + } +} + +// DefaultConnConfig returns the same connection tuning Load would produce from a +// clean environment. Useful for tests and for ws.NewServer callers that do not +// load the full Config. +func DefaultConnConfig() ConnConfig { + return ConnConfig{ + OutboundBuffer: 1024, + MaxMessageSize: 16 << 20, + ReadBufferSize: 4096, + WriteBufferSize: 4096, + PingInterval: 10 * time.Second, + PongWait: 60 * time.Second, + WriteWait: 10 * time.Second, } } diff --git a/internal/httpserver/httpserver.go b/internal/httpserver/httpserver.go index 23b304f..b3d0362 100644 --- a/internal/httpserver/httpserver.go +++ b/internal/httpserver/httpserver.go @@ -21,7 +21,15 @@ import ( // served, so the upgrade may arrive at "/" or "/script/"). All other requests go // through the static/API mux. func New(hub *ws.Hub, cfg config.Config) *http.Server { - wsServer := ws.NewServer(hub) + wsServer := ws.NewServer(hub, ws.Options{ + OutboundBuffer: cfg.Conn.OutboundBuffer, + MaxMessageSize: cfg.Conn.MaxMessageSize, + ReadBufferSize: cfg.Conn.ReadBufferSize, + WriteBufferSize: cfg.Conn.WriteBufferSize, + PingInterval: cfg.Conn.PingInterval, + PongWait: cfg.Conn.PongWait, + WriteWait: cfg.Conn.WriteWait, + }) mux := http.NewServeMux() registerAPI(mux, hub) diff --git a/internal/services/auth.go b/internal/services/auth.go index 0fbbcb0..77261a0 100644 --- a/internal/services/auth.go +++ b/internal/services/auth.go @@ -61,8 +61,10 @@ func secureClients(hub *ws.Hub, c *ws.Client) (pairs, roompairs map[string]*ws.C } func registerAuth(hub *ws.Hub) { - // On disconnect: tell secure peers we are gone, then drop pairing edges both - // ways so no stale references remain. + // On disconnect: tell secure peers we are gone, then remove EVERY pairing edge + // that references this client (both directions) so no stale id is ever left on + // a surviving peer. Using the outgoing+incoming indexes keeps this O(degree), + // not O(all clients), and prevents unbounded growth under churn. hub.OnDisconnect(func(c *ws.Client) { pairs, roompairs := secureClients(hub, c) @@ -79,9 +81,15 @@ func registerAuth(hub *ws.Hub) { notify(pairs) notify(roompairs) - for id, peer := range pairs { - peer.RemovePair(c.ID) - c.RemovePair(id) + cleaned := make(map[string]bool) + for _, peerID := range append(c.Pairs(), c.PairedBy()...) { + if cleaned[peerID] { + continue + } + cleaned[peerID] = true + if peer, ok := hub.Client(peerID); ok { + peer.ForgetPeer(c.ID) + } } }) @@ -125,7 +133,7 @@ func registerAuth(hub *ws.Hub) { if c.HasPair(to) { return fail("ALREADY-REQUESTED") } - c.AddPair(to) + c.AddPair(target) target.Signal("request/pair", map[string]any{"from": c.ID, "info": c.Info()}) return map[string]any{"status": "success", "message": "REQUESTED"} }) @@ -143,7 +151,7 @@ func registerAuth(hub *ws.Hub) { if !requester.HasPair(c.ID) { return fail("NOT_REQUESTED_PAIR") } - c.AddPair(to) + c.AddPair(requester) requester.Signal("accepted/pair", map[string]any{"from": c.ID, "info": c.Info()}) return success() }) @@ -156,8 +164,8 @@ func registerAuth(hub *ws.Hub) { if !ok { return fail("CLIENT_NOT_FOUND") } - c.RemovePair(to) - other.RemovePair(c.ID) + c.RemovePair(other) + other.RemovePair(c) other.Signal("end/pair", map[string]any{"from": c.ID}) return success() } diff --git a/internal/services/datatransfer.go b/internal/services/datatransfer.go index 4f2c0fe..afe5480 100644 --- a/internal/services/datatransfer.go +++ b/internal/services/datatransfer.go @@ -36,8 +36,8 @@ func registerDataTransfer(hub *ws.Hub) { return handshakeResult(m, false) } } else if !other.HasPair(c.ID) { - other.AddPair(c.ID) - c.AddPair(other.ID) + other.AddPair(c) + c.AddPair(other) } if !other.PackWriteable() { return handshakeResult(m, false) @@ -59,8 +59,8 @@ func registerDataTransfer(hub *ws.Hub) { return nil } } else { - other.AddPair(c.ID) - c.AddPair(other.ID) + other.AddPair(c) + c.AddPair(other) } other.Signal("request", map[string]any{"from": c.ID, "pack": m.Get("pack")}) return nil diff --git a/internal/services/ippressure.go b/internal/services/ippressure.go index 075fb2c..9b6c55f 100644 --- a/internal/services/ippressure.go +++ b/internal/services/ippressure.go @@ -177,11 +177,17 @@ func registerIPPressure(hub *ws.Hub, ann Announcer) *IPPressure { return map[string]any{"status": "success", "ip": ip} }) hub.Register("realloc/APIPAddress", func(c *ws.Client, m protocol.Message) any { - if c.APIP() == "" { + old := c.APIP() + if old == "" { return map[string]any{"status": "fail"} } - p.releaseIP(c.APIP()) + // Allocate the new address before freeing the old one, so realloc actually + // yields a different address (the old stays reserved during the search). ip := p.lockIP(c.ID) + if ip == "" { + return map[string]any{"status": "fail"} // exhausted; keep the old one + } + p.releaseIP(old) c.SetAPIP(ip) return map[string]any{"status": "success", "ip": ip} }) @@ -207,11 +213,12 @@ func registerIPPressure(hub *ws.Hub, ann Announcer) *IPPressure { return map[string]any{"status": "success", "number": n} }) hub.Register("realloc/APNumber", func(c *ws.Client, m protocol.Message) any { - if c.APNumber() == 0 { + old := c.APNumber() + if old == 0 { return map[string]any{"status": "fail"} } - p.releaseNumber(c.APNumber()) - n := p.lockNumber(c.ID) + n := p.lockNumber(c.ID) // old stays reserved, so n != old + p.releaseNumber(old) c.SetAPNumber(n) return map[string]any{"status": "success", "number": n} }) @@ -240,11 +247,15 @@ func registerIPPressure(hub *ws.Hub, ann Announcer) *IPPressure { return map[string]any{"status": "success", "code": code} }) hub.Register("realloc/APShortCode", func(c *ws.Client, m protocol.Message) any { - if c.APShortCode() == "" { + old := c.APShortCode() + if old == "" { return map[string]any{"status": "fail"} } - p.releaseCode(c.APShortCode()) code := p.lockCode(c.ID) + if code == "" { + return map[string]any{"status": "fail"} // exhausted; keep the old one + } + p.releaseCode(old) c.SetAPShortCode(code) return map[string]any{"status": "success", "code": code} }) diff --git a/internal/services/parity_test.go b/internal/services/parity_test.go new file mode 100644 index 0000000..ac4e693 --- /dev/null +++ b/internal/services/parity_test.go @@ -0,0 +1,328 @@ +package services + +import ( + "fmt" + "testing" + + "git.saqut.com/saqut/mwse/internal/ws" +) + +// These tests cover the 1.0.0 engine-parity issues #27 (rooms), #28 (pairing) and +// #29 (virtual addressing), plus the leak-hardening done for high-scale operation. +// They share the helpers defined in services_test.go. + +// ---- #27 Room system parity --------------------------------------------- + +func TestRoomJoinTypes(t *testing.T) { + hub := newHub() + owner, _ := connect(hub, "owner") + + create := func(name, joinType, cred string) { + fields := []any{ + "accessType", "public", "joinType", joinType, + "notifyActionInvite", false, "notifyActionJoined", true, "notifyActionEjected", true, + "description", "d", "name", name, + } + if cred != "" { + fields = append(fields, "credential", cred) + } + r := asMap(t, hub.Handle(owner, msg("create-room", fields...))) + if r["status"] != "success" { + t.Fatalf("create-room(%s) = %v", joinType, r) + } + } + + create("free-room", "free", "") + create("lock-room", "lock", "") + create("pw-room", "password", "secret") + + joiner, _ := connect(hub, "joiner") + + if r := asMap(t, hub.Handle(joiner, msg("joinroom", "name", "free-room"))); r["status"] != "success" { + t.Fatalf("free join = %v", r) + } + if r := asMap(t, hub.Handle(joiner, msg("joinroom", "name", "lock-room"))); r["message"] != "LOCKED-ROOM" { + t.Fatalf("lock join = %v, want LOCKED-ROOM", r) + } + if r := asMap(t, hub.Handle(joiner, msg("joinroom", "name", "pw-room", "credential", "wrong"))); r["message"] != "WRONG-PASSWORD" { + t.Fatalf("pw wrong = %v, want WRONG-PASSWORD", r) + } + if r := asMap(t, hub.Handle(joiner, msg("joinroom", "name", "pw-room", "credential", "secret"))); r["status"] != "success" { + t.Fatalf("pw right = %v, want success", r) + } +} + +func TestRoomIfExistsJoin(t *testing.T) { + hub := newHub() + a, _ := connect(hub, "a") + b, _ := connect(hub, "b") + + common := []any{ + "accessType", "public", "joinType", "free", + "notifyActionInvite", false, "notifyActionJoined", true, "notifyActionEjected", true, + "description", "d", "name", "shared", + } + if r := asMap(t, hub.Handle(a, msg("create-room", common...))); r["status"] != "success" { + t.Fatalf("first create = %v", r) + } + + // Without ifexistsJoin: duplicate name fails. + if r := asMap(t, hub.Handle(b, msg("create-room", common...))); r["message"] != "ALREADY-EXISTS" { + t.Fatalf("dup create = %v, want ALREADY-EXISTS", r) + } + + // With ifexistsJoin: b joins the existing room instead of failing. + withFlag := append(append([]any{}, common...), "ifexistsJoin", true) + r := asMap(t, hub.Handle(b, msg("create-room", withFlag...))) + if r["status"] != "success" { + t.Fatalf("ifexistsJoin create = %v", r) + } + roomID := asMap(t, r["room"])["id"].(string) + if !b.InRoom(roomID) { + t.Fatal("b should have joined the existing room via ifexistsJoin") + } +} + +func TestRoomPerConnectionNotifySuppression(t *testing.T) { + hub := newHub() + a, fa := connect(hub, "a") + + created := asMap(t, hub.Handle(a, msg("create-room", + "accessType", "public", "joinType", "free", + "notifyActionInvite", false, "notifyActionJoined", true, "notifyActionEjected", true, + "description", "d", "name", "R", + ))) + if created["status"] != "success" { + t.Fatalf("create = %v", created) + } + + // a opts out of peer-info notifications; it must NOT receive room/joined. + hub.Handle(a, msg("connection/pairinfo", "value", float64(0))) + beforeJoined := func() bool { _, ok := findSignal(fa, "room/joined"); return ok }() + + b, _ := connect(hub, "b") + if r := asMap(t, hub.Handle(b, msg("joinroom", "name", "R"))); r["status"] != "success" { + t.Fatalf("b join = %v", r) + } + + // Give any (erroneous) delivery time to land, then assert nothing new arrived. + waitFor(t, func() bool { return true }) + if afterJoined := func() bool { _, ok := findSignal(fa, "room/joined"); return ok }(); afterJoined && !beforeJoined { + t.Fatal("a disabled pairinfo but still received room/joined") + } +} + +func TestRoomInviteFlow(t *testing.T) { + hub := newHub() + owner, fo := connect(hub, "owner") + + created := asMap(t, hub.Handle(owner, msg("create-room", + "accessType", "public", "joinType", "invite", + "notifyActionInvite", false, "notifyActionJoined", true, "notifyActionEjected", true, + "description", "d", "name", "club", + ))) + roomID := asMap(t, created["room"])["id"].(string) + + guest, fg := connect(hub, "guest") + resp := asMap(t, hub.Handle(guest, msg("joinroom", "name", "club"))) + if resp["message"] != "INVITE-REQUESTED" { + t.Fatalf("invite join = %v, want INVITE-REQUESTED", resp) + } + if inv := waitSignal(t, fo, "room/invite"); inv["id"] != "guest" { + t.Fatalf("owner invite signal = %v", inv) + } + + accepted := asMap(t, hub.Handle(owner, msg("accept/invite-room", "roomId", roomID, "clientId", "guest"))) + if accepted["status"] != "success" { + t.Fatalf("accept invite = %v", accepted) + } + if st := waitSignal(t, fg, "room/invite/status"); st["status"] != "accepted" { + t.Fatalf("guest status signal = %v", st) + } + if !guest.InRoom(roomID) { + t.Fatal("guest should be in the room after accepted invite") + } +} + +// ---- #28 Pairing parity -------------------------------------------------- + +func TestPairRejectAndEnd(t *testing.T) { + hub := newHub() + a, fa := connect(hub, "a") + b, _ := connect(hub, "b") + + hub.Handle(a, msg("request/pair", "to", "b")) + hub.Handle(b, msg("accept/pair", "to", "a")) + if !isPaired(a, b) { + t.Fatal("precondition: a and b paired") + } + + if r := asMap(t, hub.Handle(b, msg("end/pair", "to", "a"))); r["status"] != "success" { + t.Fatalf("end/pair = %v", r) + } + waitSignal(t, fa, "end/pair") + if isPaired(a, b) || a.HasPair("b") || b.HasPair("a") { + t.Fatal("end/pair should fully unpair both sides") + } +} + +func TestIsReachable(t *testing.T) { + hub := newHub() + a, _ := connect(hub, "a") + b, _ := connect(hub, "b") + + // b is public by default -> reachable. + if r := hub.Handle(a, msg("is/reachable", "to", "b")); r != true { + t.Fatalf("public reachable = %v, want true", r) + } + + // b goes private -> not reachable until paired. + hub.Handle(b, msg("auth/private")) + if r := hub.Handle(a, msg("is/reachable", "to", "b")); r != false { + t.Fatalf("private unpaired reachable = %v, want false", r) + } + + hub.Handle(a, msg("request/pair", "to", "b")) + hub.Handle(b, msg("accept/pair", "to", "a")) + if r := hub.Handle(a, msg("is/reachable", "to", "b")); r != true { + t.Fatalf("private paired reachable = %v, want true", r) + } +} + +func TestPairListMutualOnly(t *testing.T) { + hub := newHub() + a, _ := connect(hub, "a") + b, _ := connect(hub, "b") + + // One-directional request: not yet mutual, so pair/list is empty. + hub.Handle(a, msg("request/pair", "to", "b")) + if list := asMap(t, hub.Handle(a, msg("pair/list")))["value"]; list != nil { + if arr, ok := list.([]string); ok && len(arr) != 0 { + t.Fatalf("pair/list before accept = %v, want empty", arr) + } + } + + hub.Handle(b, msg("accept/pair", "to", "a")) + list, _ := asMap(t, hub.Handle(a, msg("pair/list")))["value"].([]string) + if len(list) != 1 || list[0] != "b" { + t.Fatalf("pair/list after accept = %v, want [b]", list) + } +} + +// ---- #29 Virtual addressing (IPPressure) parity -------------------------- + +func TestIPPressureReallocAndRelease(t *testing.T) { + hub := newHub() + a, _ := connect(hub, "a") + + n1 := asMap(t, hub.Handle(a, msg("alloc/APNumber")))["number"].(int) + + // realloc gives a different number and frees the old one. + n2 := asMap(t, hub.Handle(a, msg("realloc/APNumber")))["number"].(int) + if n1 == n2 { + t.Fatalf("realloc returned same number %d", n1) + } + if who := asMap(t, hub.Handle(a, msg("whois/APNumber", "whois", float64(n1)))); who["status"] != "fail" { + t.Fatalf("old number should be free after realloc, whois = %v", who) + } + + // release clears it from the client and the table. + hub.Handle(a, msg("release/APNumber")) + if a.APNumber() != 0 { + t.Fatal("number not cleared on release") + } + + // shortcode + ip alloc work and are non-empty. + if code := asMap(t, hub.Handle(a, msg("alloc/APShortCode")))["code"].(string); len(code) != 3 { + t.Fatalf("shortcode = %q, want 3 letters", code) + } + if ip := asMap(t, hub.Handle(a, msg("alloc/APIPAddress")))["ip"].(string); ip == "" { + t.Fatal("ip alloc returned empty") + } +} + +// ---- leak hardening (high-scale, no unbounded growth) -------------------- + +func TestDisconnectCleansIncomingPairEdge(t *testing.T) { + hub := newHub() + a, _ := connect(hub, "a") + b, _ := connect(hub, "b") + + // One-directional pending request: a -> b. a.pairs has b; b.pairedBy has a. + hub.Handle(a, msg("request/pair", "to", "b")) + if !a.HasPair("b") { + t.Fatal("precondition: a has outgoing edge to b") + } + + // b disconnects without ever responding. a must not retain a stale edge. + hub.Disconnect(b) + if a.HasPair("b") { + t.Fatal("stale pair edge to a disconnected client was left behind (leak)") + } + if len(a.PairedBy()) != 0 { + t.Fatalf("a.PairedBy() = %v, want empty", a.PairedBy()) + } +} + +func TestDisconnectCleansWaitingList(t *testing.T) { + hub := newHub() + owner, _ := connect(hub, "owner") + created := asMap(t, hub.Handle(owner, msg("create-room", + "accessType", "public", "joinType", "invite", + "notifyActionInvite", false, "notifyActionJoined", true, "notifyActionEjected", true, + "description", "d", "name", "club", + ))) + roomID := asMap(t, created["room"])["id"].(string) + + guest, _ := connect(hub, "guest") + hub.Handle(guest, msg("joinroom", "name", "club")) + + room, _ := hub.Room(roomID) + if !room.IsWaiting("guest") { + t.Fatal("precondition: guest should be on the waiting list") + } + + hub.Disconnect(guest) + if room.IsWaiting("guest") { + t.Fatal("disconnected client left on the room waiting list (leak)") + } +} + +// TestHighChurnLeavesNoResidualState drives many connect/pair/room/disconnect +// cycles and asserts the hub holds no clients or rooms afterwards — i.e. nothing +// accumulates across churn at scale. +func TestHighChurnLeavesNoResidualState(t *testing.T) { + hub := newHub() + + for cycle := 0; cycle < 20; cycle++ { + roomName := fmt.Sprintf("room-%d", cycle) + clients := make([]*ws.Client, 0, 25) + for i := 0; i < 25; i++ { + c, _ := connect(hub, fmt.Sprintf("c%d-%d", cycle, i)) + clients = append(clients, c) + } + + owner := clients[0] + hub.Handle(owner, msg("create-room", + "accessType", "public", "joinType", "free", + "notifyActionInvite", false, "notifyActionJoined", true, "notifyActionEjected", true, + "description", "d", "name", roomName, + )) + for _, c := range clients[1:] { + hub.Handle(c, msg("joinroom", "name", roomName)) + hub.Handle(c, msg("request/pair", "to", owner.ID)) + hub.Handle(c, msg("alloc/APNumber")) + } + + for _, c := range clients { + hub.Disconnect(c) + } + } + + if n := hub.ClientCount(); n != 0 { + t.Fatalf("residual clients after churn: %d (leak)", n) + } + if rooms := hub.Rooms(); len(rooms) != 0 { + t.Fatalf("residual rooms after churn: %d (leak)", len(rooms)) + } +} diff --git a/internal/services/room.go b/internal/services/room.go index 0a40faf..5d61773 100644 --- a/internal/services/room.go +++ b/internal/services/room.go @@ -29,6 +29,13 @@ func registerRoom(hub *ws.Hub) { r.Eject(c) } } + // Clear any pending invites so a room's waiting list cannot retain the id + // of a client that has gone away. + for _, rid := range c.WaitingRooms() { + if r, ok := hub.Room(rid); ok { + r.RemoveWaiting(c.ID) + } + } }) hub.Register("myroom-info", func(c *ws.Client, m protocol.Message) any { @@ -89,7 +96,13 @@ func registerRoom(hub *ws.Hub) { return map[string]any{"status": "fail", "messages": msg} } name := m.Str("name") - if _, exists := hub.RoomByName(name); exists { + if existing, exists := hub.RoomByName(name); exists { + // ifexistsJoin: instead of failing on a name clash, join the existing + // room (so "create or join" is a single round trip). + if m.Truthy("ifexistsJoin") { + existing.Join(c) + return map[string]any{"status": "success", "room": existing.ToJSON(false)} + } return fail("ALREADY-EXISTS") } @@ -137,6 +150,7 @@ func registerRoom(hub *ws.Hub) { return fetchInfo(map[string]any{"status": "success", "room": room.ToJSON(false)}) case "invite": room.AddWaiting(c.ID) + c.AddWaitingRoom(room.ID) invite := map[string]any{"id": c.ID} if room.NotifyActionInvite { room.Broadcast("room/invite", invite, "", nil) @@ -240,6 +254,7 @@ func inviteDecision(hub *ws.Hub, accept bool) handler { return fail("NO-CLIENT") } room.RemoveWaiting(clientID) + joinClient.RemoveWaitingRoom(room.ID) if accept { room.Join(joinClient) diff --git a/internal/ws/client.go b/internal/ws/client.go index 7825d32..b80d93d 100644 --- a/internal/ws/client.go +++ b/internal/ws/client.go @@ -11,11 +11,11 @@ import ( "git.saqut.com/saqut/mwse/internal/protocol" ) -// outboundBuffer bounds how many frames may queue for one slow client before the -// engine gives up and disconnects it. This is the classic gorilla "hub" backpressure -// policy: a peer that cannot keep up is dropped rather than allowed to grow memory -// without limit or stall a broadcast. -const outboundBuffer = 256 +// defaultOutboundBuffer is the per-connection send queue depth used by NewClient +// (tests, tools). The server overrides it from configuration. It is kept high +// because the engine targets endless, bursty traffic; see config.ConnConfig for +// the memory trade-off. +const defaultOutboundBuffer = 1024 // Session flag keys. They live in the Client.store map and mirror the Node // Session service defaults exactly. @@ -45,13 +45,16 @@ type Client struct { outbound chan []byte done chan struct{} closeOnce sync.Once - dropped uint64 // frames dropped due to a full outbound buffer (atomic) + dropped uint64 // frames dropped due to a full outbound buffer (atomic) + writeWait time.Duration // per-write socket deadline mu sync.RWMutex info map[string]any // application metadata, shared with paired/room peers store map[string]bool // per-connection session flags rooms map[string]struct{} // rooms this client belongs to - pairs map[string]struct{} // peers this client has paired with + pairs map[string]struct{} // peers this client has paired toward (outgoing edges) + pairedBy map[string]struct{} // peers that paired toward this client (incoming edges) + waiting map[string]struct{} // rooms this client is awaiting an invite decision in requiredPair bool // when true, others must be paired to reach this client apNumber int // virtual address: short number @@ -59,16 +62,29 @@ type Client struct { apIP string // virtual address: 10.x.x.x style ip } -// NewClient wraps an accepted connection. Session flag defaults are applied here +// NewClient wraps an accepted connection with default transport tuning. The +// server uses newClient to apply configured limits instead. +func NewClient(conn Conn, id string) *Client { + return newClient(conn, id, defaultOutboundBuffer, defaultWriteWait) +} + +// newClient wraps an accepted connection. Session flag defaults are applied here // (rather than only via the Session service's connect hook) so they are always // present regardless of listener ordering. -func NewClient(conn Conn, id string) *Client { +func newClient(conn Conn, id string, outboundBuffer int, writeWait time.Duration) *Client { + if outboundBuffer <= 0 { + outboundBuffer = defaultOutboundBuffer + } + if writeWait <= 0 { + writeWait = defaultWriteWait + } return &Client{ ID: id, CreatedAt: time.Now(), conn: conn, outbound: make(chan []byte, outboundBuffer), done: make(chan struct{}), + writeWait: writeWait, info: make(map[string]any), store: map[string]bool{ flagNotifyPairInfo: true, @@ -76,8 +92,10 @@ func NewClient(conn Conn, id string) *Client { flagPackSending: true, flagNotifyRoomInfo: true, }, - rooms: make(map[string]struct{}), - pairs: make(map[string]struct{}), + rooms: make(map[string]struct{}), + pairs: make(map[string]struct{}), + pairedBy: make(map[string]struct{}), + waiting: make(map[string]struct{}), } } @@ -126,7 +144,7 @@ func (c *Client) writePump() { for { select { case b := <-c.outbound: - _ = c.conn.SetWriteDeadline(time.Now().Add(defaultWriteWait)) + _ = c.conn.SetWriteDeadline(time.Now().Add(c.writeWait)) if err := c.conn.WriteMessage(websocket.TextMessage, b); err != nil { return } @@ -279,21 +297,52 @@ func (c *Client) Rooms() []string { // ---- pairing ------------------------------------------------------------- -// AddPair records that this client has paired toward other. -func (c *Client) AddPair(other string) { +// Pairing keeps two indexes so that a disconnect can clean up every edge that +// touches a client in O(degree) instead of scanning all clients — and so that +// stale ids never accumulate on long-lived clients under churn (the leak that +// would otherwise grow without bound at scale): +// +// - pairs : peers THIS client paired toward (outgoing) +// - pairedBy : peers that paired toward THIS client (incoming) +// +// AddPair/RemovePair take the other *Client and update both sides. The two locks +// are taken in separate, non-nested critical sections, so concurrent A.AddPair(B) +// and B.AddPair(A) cannot deadlock. + +// AddPair records that this client has paired toward other, updating both the +// outgoing edge here and the incoming edge on other. +func (c *Client) AddPair(other *Client) { c.mu.Lock() - c.pairs[other] = struct{}{} + c.pairs[other.ID] = struct{}{} + c.mu.Unlock() + + other.mu.Lock() + other.pairedBy[c.ID] = struct{}{} + other.mu.Unlock() +} + +// RemovePair drops the pairing edge from this client to other (and the matching +// incoming record on other). +func (c *Client) RemovePair(other *Client) { + c.mu.Lock() + delete(c.pairs, other.ID) + c.mu.Unlock() + + other.mu.Lock() + delete(other.pairedBy, c.ID) + other.mu.Unlock() +} + +// ForgetPeer removes peerID from both this client's outgoing and incoming pairing +// sets. Used during the other peer's disconnect cleanup. +func (c *Client) ForgetPeer(peerID string) { + c.mu.Lock() + delete(c.pairs, peerID) + delete(c.pairedBy, peerID) c.mu.Unlock() } -// RemovePair drops a pairing edge from this client. -func (c *Client) RemovePair(other string) { - c.mu.Lock() - delete(c.pairs, other) - c.mu.Unlock() -} - -// HasPair reports whether this client has a pairing edge toward other. +// HasPair reports whether this client has an outgoing pairing edge toward other. func (c *Client) HasPair(other string) bool { c.mu.RLock() _, ok := c.pairs[other] @@ -301,7 +350,7 @@ func (c *Client) HasPair(other string) bool { return ok } -// Pairs returns a snapshot of this client's pairing edges. +// Pairs returns a snapshot of this client's outgoing pairing edges. func (c *Client) Pairs() []string { c.mu.RLock() out := make([]string, 0, len(c.pairs)) @@ -312,6 +361,47 @@ func (c *Client) Pairs() []string { return out } +// PairedBy returns a snapshot of the peers that paired toward this client. +func (c *Client) PairedBy() []string { + c.mu.RLock() + out := make([]string, 0, len(c.pairedBy)) + for id := range c.pairedBy { + out = append(out, id) + } + c.mu.RUnlock() + return out +} + +// ---- invite waiting rooms ----------------------------------------------- +// +// Tracking which rooms a client is awaiting an invite in lets disconnect clear +// those waiting lists, so a room's waiting set cannot grow with dead ids. + +// AddWaitingRoom records that this client is awaiting an invite decision in room. +func (c *Client) AddWaitingRoom(roomID string) { + c.mu.Lock() + c.waiting[roomID] = struct{}{} + c.mu.Unlock() +} + +// RemoveWaitingRoom clears a pending invite for room. +func (c *Client) RemoveWaitingRoom(roomID string) { + c.mu.Lock() + delete(c.waiting, roomID) + c.mu.Unlock() +} + +// WaitingRooms returns a snapshot of the rooms this client is awaiting in. +func (c *Client) WaitingRooms() []string { + c.mu.RLock() + out := make([]string, 0, len(c.waiting)) + for id := range c.waiting { + out = append(out, id) + } + c.mu.RUnlock() + return out +} + // ---- virtual address (IPPressure) --------------------------------------- func (c *Client) APNumber() int { diff --git a/internal/ws/server.go b/internal/ws/server.go index d07bc8d..c66e609 100644 --- a/internal/ws/server.go +++ b/internal/ws/server.go @@ -3,6 +3,7 @@ package ws import ( "log" "net/http" + "sync" "time" "github.com/gorilla/websocket" @@ -10,39 +11,98 @@ import ( "git.saqut.com/saqut/mwse/internal/protocol" ) -// Heartbeat and transport defaults. PingPeriod matches the original server's 10s -// interval; the ping payload is the same magic string the SDK's pong echoes. +// Heartbeat and transport defaults used by DefaultOptions and NewClient. The +// PingPeriod matches the original server's 10s interval; the ping payload is the +// magic string the SDK's pong echoes. const ( defaultWriteWait = 10 * time.Second defaultPongWait = 60 * time.Second defaultPingPeriod = 10 * time.Second - defaultMaxMessageSize = 4 << 20 // 4 MiB + defaultMaxMessageSize = 16 << 20 // 16 MiB; supports large tunneled payloads (#30) pingPayload = "saQut" ) +// Options tunes the transport for scale. Zero fields fall back to defaults. +type Options struct { + OutboundBuffer int // per-connection send queue depth + MaxMessageSize int64 // max inbound frame size + ReadBufferSize int // gorilla per-connection read buffer + WriteBufferSize int // gorilla write buffer size (pooled) + PingInterval time.Duration // heartbeat period + PongWait time.Duration // max wait for a pong before dropping + WriteWait time.Duration // per-write socket deadline +} + +// DefaultOptions returns the built-in tuning. +func DefaultOptions() Options { + return Options{ + OutboundBuffer: defaultOutboundBuffer, + MaxMessageSize: defaultMaxMessageSize, + ReadBufferSize: 4096, + WriteBufferSize: 4096, + PingInterval: defaultPingPeriod, + PongWait: defaultPongWait, + WriteWait: defaultWriteWait, + } +} + +func (o Options) withDefaults() Options { + d := DefaultOptions() + if o.OutboundBuffer <= 0 { + o.OutboundBuffer = d.OutboundBuffer + } + if o.MaxMessageSize <= 0 { + o.MaxMessageSize = d.MaxMessageSize + } + if o.ReadBufferSize <= 0 { + o.ReadBufferSize = d.ReadBufferSize + } + if o.WriteBufferSize <= 0 { + o.WriteBufferSize = d.WriteBufferSize + } + if o.PingInterval <= 0 { + o.PingInterval = d.PingInterval + } + if o.PongWait <= 0 { + o.PongWait = d.PongWait + } + if o.WriteWait <= 0 { + o.WriteWait = d.WriteWait + } + return o +} + // Server upgrades HTTP requests to WebSocket connections and runs each one's // lifecycle. It holds no per-connection state itself; everything lives on the Hub. type Server struct { - hub *Hub - upgrader websocket.Upgrader - pingPeriod time.Duration - pongWait time.Duration + hub *Hub + upgrader websocket.Upgrader + opts Options } -// NewServer returns a Server bound to hub. CheckOrigin always returns true to -// preserve the original autoAcceptConnections behaviour (origin policy is a -// deployment concern handled in front of the engine). -func NewServer(hub *Hub) *Server { +// NewServer returns a Server bound to hub. An optional Options tunes the +// transport; omit it for defaults. +// +// The upgrader uses a shared WriteBufferPool so write scratch buffers are reused +// across connections instead of allocated per connection — a large memory saving +// at high connection counts. CheckOrigin always returns true to preserve the +// original autoAcceptConnections behaviour (origin policy belongs in front of the +// engine). +func NewServer(hub *Hub, opts ...Options) *Server { + o := DefaultOptions() + if len(opts) > 0 { + o = opts[0].withDefaults() + } return &Server{ - hub: hub, + hub: hub, + opts: o, upgrader: websocket.Upgrader{ - ReadBufferSize: 4096, - WriteBufferSize: 4096, + ReadBufferSize: o.ReadBufferSize, + WriteBufferSize: o.WriteBufferSize, + WriteBufferPool: &sync.Pool{}, CheckOrigin: func(*http.Request) bool { return true }, }, - pingPeriod: defaultPingPeriod, - pongWait: defaultPongWait, } } @@ -60,7 +120,7 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { // handle drives one connection from accept to disconnect. It is generic over the // Conn interface so tests can feed it a scripted in-memory connection. func (s *Server) handle(conn Conn) { - client := NewClient(conn, newUUID()) + client := newClient(conn, newUUID(), s.opts.OutboundBuffer, s.opts.WriteWait) // Connect: register, start the writer, fire connect listeners (id, private // room, session defaults). Must happen before the read loop so the client can @@ -80,13 +140,13 @@ func (s *Server) handle(conn Conn) { // readLoop consumes inbound frames. Pongs that do not echo the magic payload drop // the connection (matching the original); a valid pong extends the read deadline. func (s *Server) readLoop(c *Client) { - c.conn.SetReadLimit(defaultMaxMessageSize) - _ = c.conn.SetReadDeadline(time.Now().Add(s.pongWait)) + c.conn.SetReadLimit(s.opts.MaxMessageSize) + _ = c.conn.SetReadDeadline(time.Now().Add(s.opts.PongWait)) c.conn.SetPongHandler(func(appData string) error { if appData != pingPayload { return errBadPong } - return c.conn.SetReadDeadline(time.Now().Add(s.pongWait)) + return c.conn.SetReadDeadline(time.Now().Add(s.opts.PongWait)) }) for { @@ -139,7 +199,7 @@ func (s *Server) dispatch(c *Client, data []byte) { // pingLoop sends a ping with the magic payload every ping period until the client // closes. func (s *Server) pingLoop(c *Client) { - ticker := time.NewTicker(s.pingPeriod) + ticker := time.NewTicker(s.opts.PingInterval) defer ticker.Stop() for { select { @@ -147,7 +207,7 @@ func (s *Server) pingLoop(c *Client) { err := c.conn.WriteControl( websocket.PingMessage, []byte(pingPayload), - time.Now().Add(defaultWriteWait), + time.Now().Add(s.opts.WriteWait), ) if err != nil { c.Close() diff --git a/internal/ws/ws_test.go b/internal/ws/ws_test.go index 0301fa0..53c956f 100644 --- a/internal/ws/ws_test.go +++ b/internal/ws/ws_test.go @@ -239,6 +239,33 @@ func TestHubRegistryConcurrency(t *testing.T) { // success mirrors the services helper so the hub test stays self-contained. func success() map[string]any { return map[string]any{"status": "success"} } +func TestPairReverseIndex(t *testing.T) { + a := NewClient(testutil.NewFakeConn(), "a") + b := NewClient(testutil.NewFakeConn(), "b") + + a.AddPair(b) + if !a.HasPair("b") { + t.Fatal("a should have outgoing edge to b") + } + if got := b.PairedBy(); len(got) != 1 || got[0] != "a" { + t.Fatalf("b.PairedBy() = %v, want [a]", got) + } + + // ForgetPeer (used during a disconnect of the other side) must clear both + // directions so no stale id remains. + a.ForgetPeer("b") + if a.HasPair("b") { + t.Fatal("a should no longer reference b") + } + + // RemovePair clears the outgoing edge and the matching incoming record. + a.AddPair(b) + a.RemovePair(b) + if a.HasPair("b") || len(b.PairedBy()) != 0 { + t.Fatal("RemovePair should clear both sides") + } +} + func TestServerHandleRepliesToRequest(t *testing.T) { hub := NewHub() hub.Register("ping", func(c *Client, m protocol.Message) any {