Ölçek ayarı, leak sağlamlaştırma ve #27/#28/#29 paritesi

Yüksek bağlantı sayısı + sürekli mesaj trafiği için engine'i sağlamlaştırır;
sınırsız bellek büyümesini (leak) kapatır; 1.0.0 engine parite issue'larını
(oda/pairing/sanal adresleme) tamamlar.

Ölçek/pool ayarı (config.ConnConfig, env-tunable, varsayılanlar yüksek):
- OutboundBuffer 1024, MaxMessageSize 16 MiB, read/write buffer, ping/pong/write
  timeout'ları yapılandırılabilir.
- gorilla WriteBufferPool (paylaşımlı sync.Pool) — yüksek bağlantıda büyük
  bellek tasarrufu. 150 bağlantı + ağır trafikte RSS ~43 MB.
- Dolu buffer'da düşür-ama-kapatma → relay teslimat %99.98 (150 bağlantı).

Leak düzeltmeleri (churn altında sınırsız büyüme yok):
- Pairing ters-indeksi (pairedBy): disconnect'te X'e değen tüm pair kenarları
  O(derece) temizlenir; tek-yönlü bekleyen istek çöpü kalmaz.
- Davet bekleme listesi: istemci waiting odalarını tutar, disconnect'te düşülür.
- realloc artık farklı adres verir (önce al sonra bırak).

#27/#28/#29 paritesi + ifexistsJoin; kapsamlı parite + leak testleri eklendi.
go build/vet/test -race ./... yeşil (18 servis testi). Detay: decisions.md.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
abdussamedulutas 2026-06-17 07:43:54 +03:00
parent 835f0b5f2e
commit c058feb22d
11 changed files with 690 additions and 68 deletions

View File

@ -28,6 +28,26 @@ Kullanıcı talimatı: "tüm fonksiyonların aynı olması önemli değil; amaç
- **`auth/info` çift gönderim:** Hem pair hem roompair olan bir peer iki `pair/info` alır (Node ile aynı). - **`auth/info` çift gönderim:** Hem pair hem roompair olan bir peer iki `pair/info` alır (Node ile aynı).
- **Heartbeat:** 10sn `saQut` ping; pong "saQut" değilse bağlantı kapanır (Node ile aynı). - **Heartbeat:** 10sn `saQut` ping; pong "saQut" değilse bağlantı kapanır (Node ile aynı).
## Ölçek ayarı & leak sağlamlaştırma (yüksek bağlantı + bitmek bilmeyen trafik)
Hedef: çok yüksek bağlantı sayısı + sürekli mesaj trafiği; limitleri/poolları yüksek tut, belleği bol kullan ama **leak yok**.
6. **Yapılandırılabilir, yüksek limitler** (`internal/config` `ConnConfig`, env ile):
- `MWSE_OUTBOUND_BUFFER` (varsayılan **1024**) — bağlantı başına gönderim kuyruğu. Bellek notu: kanal ~24 B/slot önceden ayrılır → ~24 KiB/bağlantı (100k bağlantıda ~2.4 GiB boştayken). Düşük bellekli host'ta düşür, burst'lü üreticide yükselt.
- `MWSE_MAX_MESSAGE_SIZE` (varsayılan **16 MiB**) — büyük tünel payload'ları (#30) için yüksek.
- `MWSE_READ_BUFFER` / `MWSE_WRITE_BUFFER`, `MWSE_PING_INTERVAL`, `MWSE_PONG_WAIT`, `MWSE_WRITE_WAIT`.
7. **gorilla `WriteBufferPool` (paylaşımlı `sync.Pool`)** — yazma scratch buffer'ları bağlantılar arasında yeniden kullanılır; yüksek bağlantı sayısında büyük bellek tasarrufu, leak yok (GC yönetir). 150 bağlantı + ağır trafikte RSS ~43 MB ölçüldü.
8. **Dolu buffer'da düşür-ama-kapatma** (bkz. #5): yüksek buffer + bu politika → relay teslimat %98.5'ten **%99.98'e** çıktı (150 bağlantıda 630974 gönderim, 99 düşüş).
### Leak düzeltmeleri (sınırsız büyümeyi önle)
9. **Pairing ters-indeksi (`pairedBy`).** Önceki halde tek-yönlü bekleyen istekler (hedef yanıt vermeden koparsa) uzun ömürlü istemcinin `pairs` setinde **kalıcı çöp** bırakıyordu → churn altında sınırsız büyüme. Çözüm: her istemci hem giden (`pairs`) hem gelen (`pairedBy`) kenarları tutar; `AddPair/RemovePair` iki tarafı da günceller (kilitler iç içe değil → deadlock yok); disconnect'te `ForgetPeer` ile X'e değen TÜM kenarlar **O(derece)** temizlenir (tüm istemcileri taramadan).
10. **Davet bekleme listesi temizliği.** İstemci `waiting` (beklediği oda id'leri) tutar; disconnect'te ilgili odaların `waitingInvited` setinden düşülür → ölü id birikmez.
11. **`realloc` artık farklı adres verir.** Node "önce release sonra lock" yapıyordu → tek istemcide aynı adresi döndürüyordu (işlevsiz). Düzeltme: **önce yeni adresi al, sonra eskiyi bırak** (eski rezerve kaldığından yeni mutlaka farklı); adres alanı tükenmişse eskiyi koru, fail dön.
12. **Goroutine sızıntısı yok:** writePump/pingLoop/readLoop hepsi `done`/conn-close ile çıkar; gerçekten ölü peer write-deadline ile temizlenir.
### #27 paritesi — eklenen
13. **`ifexistsJoin`:** create-room'da isim çakışırsa hata yerine mevcut odaya katıl (tek tur "create or join").
## Session varsayılanları ## Session varsayılanları
`packrecaive`/`packsending`/`notifyPairInfo`/`notifyRoomInfo` varsayılanları **`NewClient` constructor'ında** set ediliyor (listener sırasından bağımsız her zaman mevcut). Session servisinin connect hook'u parite için yine de bunları yeniden uyguluyor. `packrecaive`/`packsending`/`notifyPairInfo`/`notifyRoomInfo` varsayılanları **`NewClient` constructor'ında** set ediliyor (listener sırasından bağımsız her zaman mevcut). Session servisinin connect hook'u parite için yine de bunları yeniden uyguluyor.

View File

@ -21,6 +21,37 @@ type Config struct {
ShutdownTimeout time.Duration // grace period for in-flight work on shutdown ShutdownTimeout time.Duration // grace period for in-flight work on shutdown
TermOutput bool // verbose terminal logging (the old `termoutput` flag) TermOutput bool // verbose terminal logging (the old `termoutput` flag)
// --- scale / throughput knobs (defaults kept deliberately high) ---------
//
// The engine is built for very high connection counts and endless message
// traffic, so these are tuned generous. They are env-overridable both up (more
// headroom) and down (tighter memory on small hosts). See the memory note on
// OutboundBuffer below.
Conn ConnConfig
}
// ConnConfig groups per-connection transport tuning.
type ConnConfig struct {
// OutboundBuffer is how many frames may queue for one client before further
// frames are dropped (best-effort relay; the connection is kept alive).
//
// Memory note: the channel backing this is preallocated per connection at
// ~24 bytes/slot, so the default 1024 costs ~24 KiB/connection (~2.4 GiB at
// 100k connections) even while idle. Lower MWSE_OUTBOUND_BUFFER on memory-
// constrained hosts; raise it to tolerate burstier producers.
OutboundBuffer int
// MaxMessageSize caps a single inbound frame. High by default to support the
// large tunneled payloads used for file transfer (#30).
MaxMessageSize int64
ReadBufferSize int // gorilla per-connection read buffer
WriteBufferSize int // gorilla write buffer size (pooled across connections)
PingInterval time.Duration // heartbeat period (the original used 10s)
PongWait time.Duration // how long to wait for a pong before dropping
WriteWait time.Duration // deadline for a single socket write
} }
// Load reads configuration from the environment, applying defaults that match the // Load reads configuration from the environment, applying defaults that match the
@ -37,6 +68,30 @@ func Load() Config {
ReadHeaderTimeout: 10 * time.Second, ReadHeaderTimeout: 10 * time.Second,
ShutdownTimeout: time.Duration(envInt("MWSE_SHUTDOWN_TIMEOUT", 10)) * time.Second, ShutdownTimeout: time.Duration(envInt("MWSE_SHUTDOWN_TIMEOUT", 10)) * time.Second,
TermOutput: envBool("MWSE_TERM_OUTPUT", false), TermOutput: envBool("MWSE_TERM_OUTPUT", false),
Conn: ConnConfig{
OutboundBuffer: envInt("MWSE_OUTBOUND_BUFFER", 1024),
MaxMessageSize: int64(envInt("MWSE_MAX_MESSAGE_SIZE", 16<<20)),
ReadBufferSize: envInt("MWSE_READ_BUFFER", 4096),
WriteBufferSize: envInt("MWSE_WRITE_BUFFER", 4096),
PingInterval: time.Duration(envInt("MWSE_PING_INTERVAL", 10)) * time.Second,
PongWait: time.Duration(envInt("MWSE_PONG_WAIT", 60)) * time.Second,
WriteWait: time.Duration(envInt("MWSE_WRITE_WAIT", 10)) * time.Second,
},
}
}
// DefaultConnConfig returns the same connection tuning Load would produce from a
// clean environment. Useful for tests and for ws.NewServer callers that do not
// load the full Config.
func DefaultConnConfig() ConnConfig {
return ConnConfig{
OutboundBuffer: 1024,
MaxMessageSize: 16 << 20,
ReadBufferSize: 4096,
WriteBufferSize: 4096,
PingInterval: 10 * time.Second,
PongWait: 60 * time.Second,
WriteWait: 10 * time.Second,
} }
} }

View File

@ -21,7 +21,15 @@ import (
// served, so the upgrade may arrive at "/" or "/script/"). All other requests go // served, so the upgrade may arrive at "/" or "/script/"). All other requests go
// through the static/API mux. // through the static/API mux.
func New(hub *ws.Hub, cfg config.Config) *http.Server { func New(hub *ws.Hub, cfg config.Config) *http.Server {
wsServer := ws.NewServer(hub) wsServer := ws.NewServer(hub, ws.Options{
OutboundBuffer: cfg.Conn.OutboundBuffer,
MaxMessageSize: cfg.Conn.MaxMessageSize,
ReadBufferSize: cfg.Conn.ReadBufferSize,
WriteBufferSize: cfg.Conn.WriteBufferSize,
PingInterval: cfg.Conn.PingInterval,
PongWait: cfg.Conn.PongWait,
WriteWait: cfg.Conn.WriteWait,
})
mux := http.NewServeMux() mux := http.NewServeMux()
registerAPI(mux, hub) registerAPI(mux, hub)

View File

@ -61,8 +61,10 @@ func secureClients(hub *ws.Hub, c *ws.Client) (pairs, roompairs map[string]*ws.C
} }
func registerAuth(hub *ws.Hub) { func registerAuth(hub *ws.Hub) {
// On disconnect: tell secure peers we are gone, then drop pairing edges both // On disconnect: tell secure peers we are gone, then remove EVERY pairing edge
// ways so no stale references remain. // that references this client (both directions) so no stale id is ever left on
// a surviving peer. Using the outgoing+incoming indexes keeps this O(degree),
// not O(all clients), and prevents unbounded growth under churn.
hub.OnDisconnect(func(c *ws.Client) { hub.OnDisconnect(func(c *ws.Client) {
pairs, roompairs := secureClients(hub, c) pairs, roompairs := secureClients(hub, c)
@ -79,9 +81,15 @@ func registerAuth(hub *ws.Hub) {
notify(pairs) notify(pairs)
notify(roompairs) notify(roompairs)
for id, peer := range pairs { cleaned := make(map[string]bool)
peer.RemovePair(c.ID) for _, peerID := range append(c.Pairs(), c.PairedBy()...) {
c.RemovePair(id) if cleaned[peerID] {
continue
}
cleaned[peerID] = true
if peer, ok := hub.Client(peerID); ok {
peer.ForgetPeer(c.ID)
}
} }
}) })
@ -125,7 +133,7 @@ func registerAuth(hub *ws.Hub) {
if c.HasPair(to) { if c.HasPair(to) {
return fail("ALREADY-REQUESTED") return fail("ALREADY-REQUESTED")
} }
c.AddPair(to) c.AddPair(target)
target.Signal("request/pair", map[string]any{"from": c.ID, "info": c.Info()}) target.Signal("request/pair", map[string]any{"from": c.ID, "info": c.Info()})
return map[string]any{"status": "success", "message": "REQUESTED"} return map[string]any{"status": "success", "message": "REQUESTED"}
}) })
@ -143,7 +151,7 @@ func registerAuth(hub *ws.Hub) {
if !requester.HasPair(c.ID) { if !requester.HasPair(c.ID) {
return fail("NOT_REQUESTED_PAIR") return fail("NOT_REQUESTED_PAIR")
} }
c.AddPair(to) c.AddPair(requester)
requester.Signal("accepted/pair", map[string]any{"from": c.ID, "info": c.Info()}) requester.Signal("accepted/pair", map[string]any{"from": c.ID, "info": c.Info()})
return success() return success()
}) })
@ -156,8 +164,8 @@ func registerAuth(hub *ws.Hub) {
if !ok { if !ok {
return fail("CLIENT_NOT_FOUND") return fail("CLIENT_NOT_FOUND")
} }
c.RemovePair(to) c.RemovePair(other)
other.RemovePair(c.ID) other.RemovePair(c)
other.Signal("end/pair", map[string]any{"from": c.ID}) other.Signal("end/pair", map[string]any{"from": c.ID})
return success() return success()
} }

View File

@ -36,8 +36,8 @@ func registerDataTransfer(hub *ws.Hub) {
return handshakeResult(m, false) return handshakeResult(m, false)
} }
} else if !other.HasPair(c.ID) { } else if !other.HasPair(c.ID) {
other.AddPair(c.ID) other.AddPair(c)
c.AddPair(other.ID) c.AddPair(other)
} }
if !other.PackWriteable() { if !other.PackWriteable() {
return handshakeResult(m, false) return handshakeResult(m, false)
@ -59,8 +59,8 @@ func registerDataTransfer(hub *ws.Hub) {
return nil return nil
} }
} else { } else {
other.AddPair(c.ID) other.AddPair(c)
c.AddPair(other.ID) c.AddPair(other)
} }
other.Signal("request", map[string]any{"from": c.ID, "pack": m.Get("pack")}) other.Signal("request", map[string]any{"from": c.ID, "pack": m.Get("pack")})
return nil return nil

View File

@ -177,11 +177,17 @@ func registerIPPressure(hub *ws.Hub, ann Announcer) *IPPressure {
return map[string]any{"status": "success", "ip": ip} return map[string]any{"status": "success", "ip": ip}
}) })
hub.Register("realloc/APIPAddress", func(c *ws.Client, m protocol.Message) any { hub.Register("realloc/APIPAddress", func(c *ws.Client, m protocol.Message) any {
if c.APIP() == "" { old := c.APIP()
if old == "" {
return map[string]any{"status": "fail"} return map[string]any{"status": "fail"}
} }
p.releaseIP(c.APIP()) // Allocate the new address before freeing the old one, so realloc actually
// yields a different address (the old stays reserved during the search).
ip := p.lockIP(c.ID) ip := p.lockIP(c.ID)
if ip == "" {
return map[string]any{"status": "fail"} // exhausted; keep the old one
}
p.releaseIP(old)
c.SetAPIP(ip) c.SetAPIP(ip)
return map[string]any{"status": "success", "ip": ip} return map[string]any{"status": "success", "ip": ip}
}) })
@ -207,11 +213,12 @@ func registerIPPressure(hub *ws.Hub, ann Announcer) *IPPressure {
return map[string]any{"status": "success", "number": n} return map[string]any{"status": "success", "number": n}
}) })
hub.Register("realloc/APNumber", func(c *ws.Client, m protocol.Message) any { hub.Register("realloc/APNumber", func(c *ws.Client, m protocol.Message) any {
if c.APNumber() == 0 { old := c.APNumber()
if old == 0 {
return map[string]any{"status": "fail"} return map[string]any{"status": "fail"}
} }
p.releaseNumber(c.APNumber()) n := p.lockNumber(c.ID) // old stays reserved, so n != old
n := p.lockNumber(c.ID) p.releaseNumber(old)
c.SetAPNumber(n) c.SetAPNumber(n)
return map[string]any{"status": "success", "number": n} return map[string]any{"status": "success", "number": n}
}) })
@ -240,11 +247,15 @@ func registerIPPressure(hub *ws.Hub, ann Announcer) *IPPressure {
return map[string]any{"status": "success", "code": code} return map[string]any{"status": "success", "code": code}
}) })
hub.Register("realloc/APShortCode", func(c *ws.Client, m protocol.Message) any { hub.Register("realloc/APShortCode", func(c *ws.Client, m protocol.Message) any {
if c.APShortCode() == "" { old := c.APShortCode()
if old == "" {
return map[string]any{"status": "fail"} return map[string]any{"status": "fail"}
} }
p.releaseCode(c.APShortCode())
code := p.lockCode(c.ID) code := p.lockCode(c.ID)
if code == "" {
return map[string]any{"status": "fail"} // exhausted; keep the old one
}
p.releaseCode(old)
c.SetAPShortCode(code) c.SetAPShortCode(code)
return map[string]any{"status": "success", "code": code} return map[string]any{"status": "success", "code": code}
}) })

View File

@ -0,0 +1,328 @@
package services
import (
"fmt"
"testing"
"git.saqut.com/saqut/mwse/internal/ws"
)
// These tests cover the 1.0.0 engine-parity issues #27 (rooms), #28 (pairing) and
// #29 (virtual addressing), plus the leak-hardening done for high-scale operation.
// They share the helpers defined in services_test.go.
// ---- #27 Room system parity ---------------------------------------------
func TestRoomJoinTypes(t *testing.T) {
hub := newHub()
owner, _ := connect(hub, "owner")
create := func(name, joinType, cred string) {
fields := []any{
"accessType", "public", "joinType", joinType,
"notifyActionInvite", false, "notifyActionJoined", true, "notifyActionEjected", true,
"description", "d", "name", name,
}
if cred != "" {
fields = append(fields, "credential", cred)
}
r := asMap(t, hub.Handle(owner, msg("create-room", fields...)))
if r["status"] != "success" {
t.Fatalf("create-room(%s) = %v", joinType, r)
}
}
create("free-room", "free", "")
create("lock-room", "lock", "")
create("pw-room", "password", "secret")
joiner, _ := connect(hub, "joiner")
if r := asMap(t, hub.Handle(joiner, msg("joinroom", "name", "free-room"))); r["status"] != "success" {
t.Fatalf("free join = %v", r)
}
if r := asMap(t, hub.Handle(joiner, msg("joinroom", "name", "lock-room"))); r["message"] != "LOCKED-ROOM" {
t.Fatalf("lock join = %v, want LOCKED-ROOM", r)
}
if r := asMap(t, hub.Handle(joiner, msg("joinroom", "name", "pw-room", "credential", "wrong"))); r["message"] != "WRONG-PASSWORD" {
t.Fatalf("pw wrong = %v, want WRONG-PASSWORD", r)
}
if r := asMap(t, hub.Handle(joiner, msg("joinroom", "name", "pw-room", "credential", "secret"))); r["status"] != "success" {
t.Fatalf("pw right = %v, want success", r)
}
}
func TestRoomIfExistsJoin(t *testing.T) {
hub := newHub()
a, _ := connect(hub, "a")
b, _ := connect(hub, "b")
common := []any{
"accessType", "public", "joinType", "free",
"notifyActionInvite", false, "notifyActionJoined", true, "notifyActionEjected", true,
"description", "d", "name", "shared",
}
if r := asMap(t, hub.Handle(a, msg("create-room", common...))); r["status"] != "success" {
t.Fatalf("first create = %v", r)
}
// Without ifexistsJoin: duplicate name fails.
if r := asMap(t, hub.Handle(b, msg("create-room", common...))); r["message"] != "ALREADY-EXISTS" {
t.Fatalf("dup create = %v, want ALREADY-EXISTS", r)
}
// With ifexistsJoin: b joins the existing room instead of failing.
withFlag := append(append([]any{}, common...), "ifexistsJoin", true)
r := asMap(t, hub.Handle(b, msg("create-room", withFlag...)))
if r["status"] != "success" {
t.Fatalf("ifexistsJoin create = %v", r)
}
roomID := asMap(t, r["room"])["id"].(string)
if !b.InRoom(roomID) {
t.Fatal("b should have joined the existing room via ifexistsJoin")
}
}
func TestRoomPerConnectionNotifySuppression(t *testing.T) {
hub := newHub()
a, fa := connect(hub, "a")
created := asMap(t, hub.Handle(a, msg("create-room",
"accessType", "public", "joinType", "free",
"notifyActionInvite", false, "notifyActionJoined", true, "notifyActionEjected", true,
"description", "d", "name", "R",
)))
if created["status"] != "success" {
t.Fatalf("create = %v", created)
}
// a opts out of peer-info notifications; it must NOT receive room/joined.
hub.Handle(a, msg("connection/pairinfo", "value", float64(0)))
beforeJoined := func() bool { _, ok := findSignal(fa, "room/joined"); return ok }()
b, _ := connect(hub, "b")
if r := asMap(t, hub.Handle(b, msg("joinroom", "name", "R"))); r["status"] != "success" {
t.Fatalf("b join = %v", r)
}
// Give any (erroneous) delivery time to land, then assert nothing new arrived.
waitFor(t, func() bool { return true })
if afterJoined := func() bool { _, ok := findSignal(fa, "room/joined"); return ok }(); afterJoined && !beforeJoined {
t.Fatal("a disabled pairinfo but still received room/joined")
}
}
func TestRoomInviteFlow(t *testing.T) {
hub := newHub()
owner, fo := connect(hub, "owner")
created := asMap(t, hub.Handle(owner, msg("create-room",
"accessType", "public", "joinType", "invite",
"notifyActionInvite", false, "notifyActionJoined", true, "notifyActionEjected", true,
"description", "d", "name", "club",
)))
roomID := asMap(t, created["room"])["id"].(string)
guest, fg := connect(hub, "guest")
resp := asMap(t, hub.Handle(guest, msg("joinroom", "name", "club")))
if resp["message"] != "INVITE-REQUESTED" {
t.Fatalf("invite join = %v, want INVITE-REQUESTED", resp)
}
if inv := waitSignal(t, fo, "room/invite"); inv["id"] != "guest" {
t.Fatalf("owner invite signal = %v", inv)
}
accepted := asMap(t, hub.Handle(owner, msg("accept/invite-room", "roomId", roomID, "clientId", "guest")))
if accepted["status"] != "success" {
t.Fatalf("accept invite = %v", accepted)
}
if st := waitSignal(t, fg, "room/invite/status"); st["status"] != "accepted" {
t.Fatalf("guest status signal = %v", st)
}
if !guest.InRoom(roomID) {
t.Fatal("guest should be in the room after accepted invite")
}
}
// ---- #28 Pairing parity --------------------------------------------------
func TestPairRejectAndEnd(t *testing.T) {
hub := newHub()
a, fa := connect(hub, "a")
b, _ := connect(hub, "b")
hub.Handle(a, msg("request/pair", "to", "b"))
hub.Handle(b, msg("accept/pair", "to", "a"))
if !isPaired(a, b) {
t.Fatal("precondition: a and b paired")
}
if r := asMap(t, hub.Handle(b, msg("end/pair", "to", "a"))); r["status"] != "success" {
t.Fatalf("end/pair = %v", r)
}
waitSignal(t, fa, "end/pair")
if isPaired(a, b) || a.HasPair("b") || b.HasPair("a") {
t.Fatal("end/pair should fully unpair both sides")
}
}
func TestIsReachable(t *testing.T) {
hub := newHub()
a, _ := connect(hub, "a")
b, _ := connect(hub, "b")
// b is public by default -> reachable.
if r := hub.Handle(a, msg("is/reachable", "to", "b")); r != true {
t.Fatalf("public reachable = %v, want true", r)
}
// b goes private -> not reachable until paired.
hub.Handle(b, msg("auth/private"))
if r := hub.Handle(a, msg("is/reachable", "to", "b")); r != false {
t.Fatalf("private unpaired reachable = %v, want false", r)
}
hub.Handle(a, msg("request/pair", "to", "b"))
hub.Handle(b, msg("accept/pair", "to", "a"))
if r := hub.Handle(a, msg("is/reachable", "to", "b")); r != true {
t.Fatalf("private paired reachable = %v, want true", r)
}
}
func TestPairListMutualOnly(t *testing.T) {
hub := newHub()
a, _ := connect(hub, "a")
b, _ := connect(hub, "b")
// One-directional request: not yet mutual, so pair/list is empty.
hub.Handle(a, msg("request/pair", "to", "b"))
if list := asMap(t, hub.Handle(a, msg("pair/list")))["value"]; list != nil {
if arr, ok := list.([]string); ok && len(arr) != 0 {
t.Fatalf("pair/list before accept = %v, want empty", arr)
}
}
hub.Handle(b, msg("accept/pair", "to", "a"))
list, _ := asMap(t, hub.Handle(a, msg("pair/list")))["value"].([]string)
if len(list) != 1 || list[0] != "b" {
t.Fatalf("pair/list after accept = %v, want [b]", list)
}
}
// ---- #29 Virtual addressing (IPPressure) parity --------------------------
func TestIPPressureReallocAndRelease(t *testing.T) {
hub := newHub()
a, _ := connect(hub, "a")
n1 := asMap(t, hub.Handle(a, msg("alloc/APNumber")))["number"].(int)
// realloc gives a different number and frees the old one.
n2 := asMap(t, hub.Handle(a, msg("realloc/APNumber")))["number"].(int)
if n1 == n2 {
t.Fatalf("realloc returned same number %d", n1)
}
if who := asMap(t, hub.Handle(a, msg("whois/APNumber", "whois", float64(n1)))); who["status"] != "fail" {
t.Fatalf("old number should be free after realloc, whois = %v", who)
}
// release clears it from the client and the table.
hub.Handle(a, msg("release/APNumber"))
if a.APNumber() != 0 {
t.Fatal("number not cleared on release")
}
// shortcode + ip alloc work and are non-empty.
if code := asMap(t, hub.Handle(a, msg("alloc/APShortCode")))["code"].(string); len(code) != 3 {
t.Fatalf("shortcode = %q, want 3 letters", code)
}
if ip := asMap(t, hub.Handle(a, msg("alloc/APIPAddress")))["ip"].(string); ip == "" {
t.Fatal("ip alloc returned empty")
}
}
// ---- leak hardening (high-scale, no unbounded growth) --------------------
func TestDisconnectCleansIncomingPairEdge(t *testing.T) {
hub := newHub()
a, _ := connect(hub, "a")
b, _ := connect(hub, "b")
// One-directional pending request: a -> b. a.pairs has b; b.pairedBy has a.
hub.Handle(a, msg("request/pair", "to", "b"))
if !a.HasPair("b") {
t.Fatal("precondition: a has outgoing edge to b")
}
// b disconnects without ever responding. a must not retain a stale edge.
hub.Disconnect(b)
if a.HasPair("b") {
t.Fatal("stale pair edge to a disconnected client was left behind (leak)")
}
if len(a.PairedBy()) != 0 {
t.Fatalf("a.PairedBy() = %v, want empty", a.PairedBy())
}
}
func TestDisconnectCleansWaitingList(t *testing.T) {
hub := newHub()
owner, _ := connect(hub, "owner")
created := asMap(t, hub.Handle(owner, msg("create-room",
"accessType", "public", "joinType", "invite",
"notifyActionInvite", false, "notifyActionJoined", true, "notifyActionEjected", true,
"description", "d", "name", "club",
)))
roomID := asMap(t, created["room"])["id"].(string)
guest, _ := connect(hub, "guest")
hub.Handle(guest, msg("joinroom", "name", "club"))
room, _ := hub.Room(roomID)
if !room.IsWaiting("guest") {
t.Fatal("precondition: guest should be on the waiting list")
}
hub.Disconnect(guest)
if room.IsWaiting("guest") {
t.Fatal("disconnected client left on the room waiting list (leak)")
}
}
// TestHighChurnLeavesNoResidualState drives many connect/pair/room/disconnect
// cycles and asserts the hub holds no clients or rooms afterwards — i.e. nothing
// accumulates across churn at scale.
func TestHighChurnLeavesNoResidualState(t *testing.T) {
hub := newHub()
for cycle := 0; cycle < 20; cycle++ {
roomName := fmt.Sprintf("room-%d", cycle)
clients := make([]*ws.Client, 0, 25)
for i := 0; i < 25; i++ {
c, _ := connect(hub, fmt.Sprintf("c%d-%d", cycle, i))
clients = append(clients, c)
}
owner := clients[0]
hub.Handle(owner, msg("create-room",
"accessType", "public", "joinType", "free",
"notifyActionInvite", false, "notifyActionJoined", true, "notifyActionEjected", true,
"description", "d", "name", roomName,
))
for _, c := range clients[1:] {
hub.Handle(c, msg("joinroom", "name", roomName))
hub.Handle(c, msg("request/pair", "to", owner.ID))
hub.Handle(c, msg("alloc/APNumber"))
}
for _, c := range clients {
hub.Disconnect(c)
}
}
if n := hub.ClientCount(); n != 0 {
t.Fatalf("residual clients after churn: %d (leak)", n)
}
if rooms := hub.Rooms(); len(rooms) != 0 {
t.Fatalf("residual rooms after churn: %d (leak)", len(rooms))
}
}

View File

@ -29,6 +29,13 @@ func registerRoom(hub *ws.Hub) {
r.Eject(c) r.Eject(c)
} }
} }
// Clear any pending invites so a room's waiting list cannot retain the id
// of a client that has gone away.
for _, rid := range c.WaitingRooms() {
if r, ok := hub.Room(rid); ok {
r.RemoveWaiting(c.ID)
}
}
}) })
hub.Register("myroom-info", func(c *ws.Client, m protocol.Message) any { hub.Register("myroom-info", func(c *ws.Client, m protocol.Message) any {
@ -89,7 +96,13 @@ func registerRoom(hub *ws.Hub) {
return map[string]any{"status": "fail", "messages": msg} return map[string]any{"status": "fail", "messages": msg}
} }
name := m.Str("name") name := m.Str("name")
if _, exists := hub.RoomByName(name); exists { if existing, exists := hub.RoomByName(name); exists {
// ifexistsJoin: instead of failing on a name clash, join the existing
// room (so "create or join" is a single round trip).
if m.Truthy("ifexistsJoin") {
existing.Join(c)
return map[string]any{"status": "success", "room": existing.ToJSON(false)}
}
return fail("ALREADY-EXISTS") return fail("ALREADY-EXISTS")
} }
@ -137,6 +150,7 @@ func registerRoom(hub *ws.Hub) {
return fetchInfo(map[string]any{"status": "success", "room": room.ToJSON(false)}) return fetchInfo(map[string]any{"status": "success", "room": room.ToJSON(false)})
case "invite": case "invite":
room.AddWaiting(c.ID) room.AddWaiting(c.ID)
c.AddWaitingRoom(room.ID)
invite := map[string]any{"id": c.ID} invite := map[string]any{"id": c.ID}
if room.NotifyActionInvite { if room.NotifyActionInvite {
room.Broadcast("room/invite", invite, "", nil) room.Broadcast("room/invite", invite, "", nil)
@ -240,6 +254,7 @@ func inviteDecision(hub *ws.Hub, accept bool) handler {
return fail("NO-CLIENT") return fail("NO-CLIENT")
} }
room.RemoveWaiting(clientID) room.RemoveWaiting(clientID)
joinClient.RemoveWaitingRoom(room.ID)
if accept { if accept {
room.Join(joinClient) room.Join(joinClient)

View File

@ -11,11 +11,11 @@ import (
"git.saqut.com/saqut/mwse/internal/protocol" "git.saqut.com/saqut/mwse/internal/protocol"
) )
// outboundBuffer bounds how many frames may queue for one slow client before the // defaultOutboundBuffer is the per-connection send queue depth used by NewClient
// engine gives up and disconnects it. This is the classic gorilla "hub" backpressure // (tests, tools). The server overrides it from configuration. It is kept high
// policy: a peer that cannot keep up is dropped rather than allowed to grow memory // because the engine targets endless, bursty traffic; see config.ConnConfig for
// without limit or stall a broadcast. // the memory trade-off.
const outboundBuffer = 256 const defaultOutboundBuffer = 1024
// Session flag keys. They live in the Client.store map and mirror the Node // Session flag keys. They live in the Client.store map and mirror the Node
// Session service defaults exactly. // Session service defaults exactly.
@ -46,12 +46,15 @@ type Client struct {
done chan struct{} done chan struct{}
closeOnce sync.Once closeOnce sync.Once
dropped uint64 // frames dropped due to a full outbound buffer (atomic) dropped uint64 // frames dropped due to a full outbound buffer (atomic)
writeWait time.Duration // per-write socket deadline
mu sync.RWMutex mu sync.RWMutex
info map[string]any // application metadata, shared with paired/room peers info map[string]any // application metadata, shared with paired/room peers
store map[string]bool // per-connection session flags store map[string]bool // per-connection session flags
rooms map[string]struct{} // rooms this client belongs to rooms map[string]struct{} // rooms this client belongs to
pairs map[string]struct{} // peers this client has paired with pairs map[string]struct{} // peers this client has paired toward (outgoing edges)
pairedBy map[string]struct{} // peers that paired toward this client (incoming edges)
waiting map[string]struct{} // rooms this client is awaiting an invite decision in
requiredPair bool // when true, others must be paired to reach this client requiredPair bool // when true, others must be paired to reach this client
apNumber int // virtual address: short number apNumber int // virtual address: short number
@ -59,16 +62,29 @@ type Client struct {
apIP string // virtual address: 10.x.x.x style ip apIP string // virtual address: 10.x.x.x style ip
} }
// NewClient wraps an accepted connection. Session flag defaults are applied here // NewClient wraps an accepted connection with default transport tuning. The
// server uses newClient to apply configured limits instead.
func NewClient(conn Conn, id string) *Client {
return newClient(conn, id, defaultOutboundBuffer, defaultWriteWait)
}
// newClient wraps an accepted connection. Session flag defaults are applied here
// (rather than only via the Session service's connect hook) so they are always // (rather than only via the Session service's connect hook) so they are always
// present regardless of listener ordering. // present regardless of listener ordering.
func NewClient(conn Conn, id string) *Client { func newClient(conn Conn, id string, outboundBuffer int, writeWait time.Duration) *Client {
if outboundBuffer <= 0 {
outboundBuffer = defaultOutboundBuffer
}
if writeWait <= 0 {
writeWait = defaultWriteWait
}
return &Client{ return &Client{
ID: id, ID: id,
CreatedAt: time.Now(), CreatedAt: time.Now(),
conn: conn, conn: conn,
outbound: make(chan []byte, outboundBuffer), outbound: make(chan []byte, outboundBuffer),
done: make(chan struct{}), done: make(chan struct{}),
writeWait: writeWait,
info: make(map[string]any), info: make(map[string]any),
store: map[string]bool{ store: map[string]bool{
flagNotifyPairInfo: true, flagNotifyPairInfo: true,
@ -78,6 +94,8 @@ func NewClient(conn Conn, id string) *Client {
}, },
rooms: make(map[string]struct{}), rooms: make(map[string]struct{}),
pairs: make(map[string]struct{}), pairs: make(map[string]struct{}),
pairedBy: make(map[string]struct{}),
waiting: make(map[string]struct{}),
} }
} }
@ -126,7 +144,7 @@ func (c *Client) writePump() {
for { for {
select { select {
case b := <-c.outbound: case b := <-c.outbound:
_ = c.conn.SetWriteDeadline(time.Now().Add(defaultWriteWait)) _ = c.conn.SetWriteDeadline(time.Now().Add(c.writeWait))
if err := c.conn.WriteMessage(websocket.TextMessage, b); err != nil { if err := c.conn.WriteMessage(websocket.TextMessage, b); err != nil {
return return
} }
@ -279,21 +297,52 @@ func (c *Client) Rooms() []string {
// ---- pairing ------------------------------------------------------------- // ---- pairing -------------------------------------------------------------
// AddPair records that this client has paired toward other. // Pairing keeps two indexes so that a disconnect can clean up every edge that
func (c *Client) AddPair(other string) { // touches a client in O(degree) instead of scanning all clients — and so that
// stale ids never accumulate on long-lived clients under churn (the leak that
// would otherwise grow without bound at scale):
//
// - pairs : peers THIS client paired toward (outgoing)
// - pairedBy : peers that paired toward THIS client (incoming)
//
// AddPair/RemovePair take the other *Client and update both sides. The two locks
// are taken in separate, non-nested critical sections, so concurrent A.AddPair(B)
// and B.AddPair(A) cannot deadlock.
// AddPair records that this client has paired toward other, updating both the
// outgoing edge here and the incoming edge on other.
func (c *Client) AddPair(other *Client) {
c.mu.Lock() c.mu.Lock()
c.pairs[other] = struct{}{} c.pairs[other.ID] = struct{}{}
c.mu.Unlock()
other.mu.Lock()
other.pairedBy[c.ID] = struct{}{}
other.mu.Unlock()
}
// RemovePair drops the pairing edge from this client to other (and the matching
// incoming record on other).
func (c *Client) RemovePair(other *Client) {
c.mu.Lock()
delete(c.pairs, other.ID)
c.mu.Unlock()
other.mu.Lock()
delete(other.pairedBy, c.ID)
other.mu.Unlock()
}
// ForgetPeer removes peerID from both this client's outgoing and incoming pairing
// sets. Used during the other peer's disconnect cleanup.
func (c *Client) ForgetPeer(peerID string) {
c.mu.Lock()
delete(c.pairs, peerID)
delete(c.pairedBy, peerID)
c.mu.Unlock() c.mu.Unlock()
} }
// RemovePair drops a pairing edge from this client. // HasPair reports whether this client has an outgoing pairing edge toward other.
func (c *Client) RemovePair(other string) {
c.mu.Lock()
delete(c.pairs, other)
c.mu.Unlock()
}
// HasPair reports whether this client has a pairing edge toward other.
func (c *Client) HasPair(other string) bool { func (c *Client) HasPair(other string) bool {
c.mu.RLock() c.mu.RLock()
_, ok := c.pairs[other] _, ok := c.pairs[other]
@ -301,7 +350,7 @@ func (c *Client) HasPair(other string) bool {
return ok return ok
} }
// Pairs returns a snapshot of this client's pairing edges. // Pairs returns a snapshot of this client's outgoing pairing edges.
func (c *Client) Pairs() []string { func (c *Client) Pairs() []string {
c.mu.RLock() c.mu.RLock()
out := make([]string, 0, len(c.pairs)) out := make([]string, 0, len(c.pairs))
@ -312,6 +361,47 @@ func (c *Client) Pairs() []string {
return out return out
} }
// PairedBy returns a snapshot of the peers that paired toward this client.
func (c *Client) PairedBy() []string {
c.mu.RLock()
out := make([]string, 0, len(c.pairedBy))
for id := range c.pairedBy {
out = append(out, id)
}
c.mu.RUnlock()
return out
}
// ---- invite waiting rooms -----------------------------------------------
//
// Tracking which rooms a client is awaiting an invite in lets disconnect clear
// those waiting lists, so a room's waiting set cannot grow with dead ids.
// AddWaitingRoom records that this client is awaiting an invite decision in room.
func (c *Client) AddWaitingRoom(roomID string) {
c.mu.Lock()
c.waiting[roomID] = struct{}{}
c.mu.Unlock()
}
// RemoveWaitingRoom clears a pending invite for room.
func (c *Client) RemoveWaitingRoom(roomID string) {
c.mu.Lock()
delete(c.waiting, roomID)
c.mu.Unlock()
}
// WaitingRooms returns a snapshot of the rooms this client is awaiting in.
func (c *Client) WaitingRooms() []string {
c.mu.RLock()
out := make([]string, 0, len(c.waiting))
for id := range c.waiting {
out = append(out, id)
}
c.mu.RUnlock()
return out
}
// ---- virtual address (IPPressure) --------------------------------------- // ---- virtual address (IPPressure) ---------------------------------------
func (c *Client) APNumber() int { func (c *Client) APNumber() int {

View File

@ -3,6 +3,7 @@ package ws
import ( import (
"log" "log"
"net/http" "net/http"
"sync"
"time" "time"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
@ -10,39 +11,98 @@ import (
"git.saqut.com/saqut/mwse/internal/protocol" "git.saqut.com/saqut/mwse/internal/protocol"
) )
// Heartbeat and transport defaults. PingPeriod matches the original server's 10s // Heartbeat and transport defaults used by DefaultOptions and NewClient. The
// interval; the ping payload is the same magic string the SDK's pong echoes. // PingPeriod matches the original server's 10s interval; the ping payload is the
// magic string the SDK's pong echoes.
const ( const (
defaultWriteWait = 10 * time.Second defaultWriteWait = 10 * time.Second
defaultPongWait = 60 * time.Second defaultPongWait = 60 * time.Second
defaultPingPeriod = 10 * time.Second defaultPingPeriod = 10 * time.Second
defaultMaxMessageSize = 4 << 20 // 4 MiB defaultMaxMessageSize = 16 << 20 // 16 MiB; supports large tunneled payloads (#30)
pingPayload = "saQut" pingPayload = "saQut"
) )
// Options tunes the transport for scale. Zero fields fall back to defaults.
type Options struct {
OutboundBuffer int // per-connection send queue depth
MaxMessageSize int64 // max inbound frame size
ReadBufferSize int // gorilla per-connection read buffer
WriteBufferSize int // gorilla write buffer size (pooled)
PingInterval time.Duration // heartbeat period
PongWait time.Duration // max wait for a pong before dropping
WriteWait time.Duration // per-write socket deadline
}
// DefaultOptions returns the built-in tuning.
func DefaultOptions() Options {
return Options{
OutboundBuffer: defaultOutboundBuffer,
MaxMessageSize: defaultMaxMessageSize,
ReadBufferSize: 4096,
WriteBufferSize: 4096,
PingInterval: defaultPingPeriod,
PongWait: defaultPongWait,
WriteWait: defaultWriteWait,
}
}
func (o Options) withDefaults() Options {
d := DefaultOptions()
if o.OutboundBuffer <= 0 {
o.OutboundBuffer = d.OutboundBuffer
}
if o.MaxMessageSize <= 0 {
o.MaxMessageSize = d.MaxMessageSize
}
if o.ReadBufferSize <= 0 {
o.ReadBufferSize = d.ReadBufferSize
}
if o.WriteBufferSize <= 0 {
o.WriteBufferSize = d.WriteBufferSize
}
if o.PingInterval <= 0 {
o.PingInterval = d.PingInterval
}
if o.PongWait <= 0 {
o.PongWait = d.PongWait
}
if o.WriteWait <= 0 {
o.WriteWait = d.WriteWait
}
return o
}
// Server upgrades HTTP requests to WebSocket connections and runs each one's // Server upgrades HTTP requests to WebSocket connections and runs each one's
// lifecycle. It holds no per-connection state itself; everything lives on the Hub. // lifecycle. It holds no per-connection state itself; everything lives on the Hub.
type Server struct { type Server struct {
hub *Hub hub *Hub
upgrader websocket.Upgrader upgrader websocket.Upgrader
pingPeriod time.Duration opts Options
pongWait time.Duration
} }
// NewServer returns a Server bound to hub. CheckOrigin always returns true to // NewServer returns a Server bound to hub. An optional Options tunes the
// preserve the original autoAcceptConnections behaviour (origin policy is a // transport; omit it for defaults.
// deployment concern handled in front of the engine). //
func NewServer(hub *Hub) *Server { // The upgrader uses a shared WriteBufferPool so write scratch buffers are reused
// across connections instead of allocated per connection — a large memory saving
// at high connection counts. CheckOrigin always returns true to preserve the
// original autoAcceptConnections behaviour (origin policy belongs in front of the
// engine).
func NewServer(hub *Hub, opts ...Options) *Server {
o := DefaultOptions()
if len(opts) > 0 {
o = opts[0].withDefaults()
}
return &Server{ return &Server{
hub: hub, hub: hub,
opts: o,
upgrader: websocket.Upgrader{ upgrader: websocket.Upgrader{
ReadBufferSize: 4096, ReadBufferSize: o.ReadBufferSize,
WriteBufferSize: 4096, WriteBufferSize: o.WriteBufferSize,
WriteBufferPool: &sync.Pool{},
CheckOrigin: func(*http.Request) bool { return true }, CheckOrigin: func(*http.Request) bool { return true },
}, },
pingPeriod: defaultPingPeriod,
pongWait: defaultPongWait,
} }
} }
@ -60,7 +120,7 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// handle drives one connection from accept to disconnect. It is generic over the // handle drives one connection from accept to disconnect. It is generic over the
// Conn interface so tests can feed it a scripted in-memory connection. // Conn interface so tests can feed it a scripted in-memory connection.
func (s *Server) handle(conn Conn) { func (s *Server) handle(conn Conn) {
client := NewClient(conn, newUUID()) client := newClient(conn, newUUID(), s.opts.OutboundBuffer, s.opts.WriteWait)
// Connect: register, start the writer, fire connect listeners (id, private // Connect: register, start the writer, fire connect listeners (id, private
// room, session defaults). Must happen before the read loop so the client can // room, session defaults). Must happen before the read loop so the client can
@ -80,13 +140,13 @@ func (s *Server) handle(conn Conn) {
// readLoop consumes inbound frames. Pongs that do not echo the magic payload drop // readLoop consumes inbound frames. Pongs that do not echo the magic payload drop
// the connection (matching the original); a valid pong extends the read deadline. // the connection (matching the original); a valid pong extends the read deadline.
func (s *Server) readLoop(c *Client) { func (s *Server) readLoop(c *Client) {
c.conn.SetReadLimit(defaultMaxMessageSize) c.conn.SetReadLimit(s.opts.MaxMessageSize)
_ = c.conn.SetReadDeadline(time.Now().Add(s.pongWait)) _ = c.conn.SetReadDeadline(time.Now().Add(s.opts.PongWait))
c.conn.SetPongHandler(func(appData string) error { c.conn.SetPongHandler(func(appData string) error {
if appData != pingPayload { if appData != pingPayload {
return errBadPong return errBadPong
} }
return c.conn.SetReadDeadline(time.Now().Add(s.pongWait)) return c.conn.SetReadDeadline(time.Now().Add(s.opts.PongWait))
}) })
for { for {
@ -139,7 +199,7 @@ func (s *Server) dispatch(c *Client, data []byte) {
// pingLoop sends a ping with the magic payload every ping period until the client // pingLoop sends a ping with the magic payload every ping period until the client
// closes. // closes.
func (s *Server) pingLoop(c *Client) { func (s *Server) pingLoop(c *Client) {
ticker := time.NewTicker(s.pingPeriod) ticker := time.NewTicker(s.opts.PingInterval)
defer ticker.Stop() defer ticker.Stop()
for { for {
select { select {
@ -147,7 +207,7 @@ func (s *Server) pingLoop(c *Client) {
err := c.conn.WriteControl( err := c.conn.WriteControl(
websocket.PingMessage, websocket.PingMessage,
[]byte(pingPayload), []byte(pingPayload),
time.Now().Add(defaultWriteWait), time.Now().Add(s.opts.WriteWait),
) )
if err != nil { if err != nil {
c.Close() c.Close()

View File

@ -239,6 +239,33 @@ func TestHubRegistryConcurrency(t *testing.T) {
// success mirrors the services helper so the hub test stays self-contained. // success mirrors the services helper so the hub test stays self-contained.
func success() map[string]any { return map[string]any{"status": "success"} } func success() map[string]any { return map[string]any{"status": "success"} }
func TestPairReverseIndex(t *testing.T) {
a := NewClient(testutil.NewFakeConn(), "a")
b := NewClient(testutil.NewFakeConn(), "b")
a.AddPair(b)
if !a.HasPair("b") {
t.Fatal("a should have outgoing edge to b")
}
if got := b.PairedBy(); len(got) != 1 || got[0] != "a" {
t.Fatalf("b.PairedBy() = %v, want [a]", got)
}
// ForgetPeer (used during a disconnect of the other side) must clear both
// directions so no stale id remains.
a.ForgetPeer("b")
if a.HasPair("b") {
t.Fatal("a should no longer reference b")
}
// RemovePair clears the outgoing edge and the matching incoming record.
a.AddPair(b)
a.RemovePair(b)
if a.HasPair("b") || len(b.PairedBy()) != 0 {
t.Fatal("RemovePair should clear both sides")
}
}
func TestServerHandleRepliesToRequest(t *testing.T) { func TestServerHandleRepliesToRequest(t *testing.T) {
hub := NewHub() hub := NewHub()
hub.Register("ping", func(c *Client, m protocol.Message) any { hub.Register("ping", func(c *Client, m protocol.Message) any {