MWSE/internal/ws/ws_test.go

299 lines
7.1 KiB
Go

package ws
import (
"encoding/json"
"fmt"
"sync"
"testing"
"time"
"git.saqut.com/saqut/mwse/internal/protocol"
"git.saqut.com/saqut/mwse/internal/testutil"
)
// newTestClient builds a client backed by an in-memory connection and starts its
// writer pump, the way the real server does.
func newTestClient(id string) (*Client, *testutil.FakeConn) {
fc := testutil.NewFakeConn()
c := NewClient(fc, id)
go c.writePump()
return c, fc
}
// waitFor polls cond until it holds or the deadline passes.
func waitFor(t *testing.T, cond func() bool) {
t.Helper()
for i := 0; i < 500; i++ {
if cond() {
return
}
time.Sleep(time.Millisecond)
}
t.Fatal("condition not met within timeout")
}
// lastSignal decodes the most recent [payload, name] frame from fc.
func lastSignal(t *testing.T, fc *testutil.FakeConn) (string, map[string]any) {
t.Helper()
writes := fc.Writes()
if len(writes) == 0 {
t.Fatal("no frames written")
}
var arr []any
if err := json.Unmarshal(writes[len(writes)-1], &arr); err != nil {
t.Fatalf("decode frame: %v", err)
}
name, _ := arr[1].(string)
payload, _ := arr[0].(map[string]any)
return name, payload
}
func TestSendConcurrentWithClose(t *testing.T) {
// The core safety property of the rewrite: sending to a client that is closing
// must never panic or race, only drop. Run under -race.
for trial := 0; trial < 30; trial++ {
c, _ := newTestClient("x")
var wg sync.WaitGroup
for i := 0; i < 8; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 100; j++ {
c.Send(map[string]any{"j": j})
}
}()
}
go c.Close()
wg.Wait()
}
}
func TestSendAfterCloseDoesNotPanic(t *testing.T) {
c, _ := newTestClient("x")
c.Close()
waitFor(t, func() bool {
select {
case <-c.Done():
return true
default:
return false
}
})
// Should return promptly and not panic.
c.Send(map[string]any{"ignored": true})
}
func TestRoomBroadcastDelivers(t *testing.T) {
hub := NewHub()
room := NewRoom(hub)
room.OwnerID = "o"
room.NotifyActionJoined = false // keep frame counts clean
room.Publish()
a, fa := newTestClient("a")
b, fb := newTestClient("b")
hub.addClient(a)
hub.addClient(b)
room.Join(a)
room.Join(b)
room.Broadcast("pack/room", map[string]any{"hello": float64(1)}, "", nil)
waitFor(t, func() bool { return fa.WriteCount() >= 1 && fb.WriteCount() >= 1 })
name, payload := lastSignal(t, fa)
if name != "pack/room" {
t.Fatalf("signal name = %q, want pack/room", name)
}
if payload["hello"] != float64(1) {
t.Fatalf("payload = %v", payload)
}
}
func TestRoomBroadcastRespectsExceptAndFilter(t *testing.T) {
hub := NewHub()
room := NewRoom(hub)
room.OwnerID = "o"
room.NotifyActionJoined = false
room.Publish()
a, fa := newTestClient("a")
b, fb := newTestClient("b")
hub.addClient(a)
hub.addClient(b)
room.Join(a)
room.Join(b)
// b opts out of peer-info notifications; the filter must skip it.
b.SetStore(flagNotifyPairInfo, false)
room.Broadcast("x", map[string]any{"n": float64(1)}, "", (*Client).PeerInfoNotifiable)
waitFor(t, func() bool { return fa.WriteCount() >= 1 })
time.Sleep(20 * time.Millisecond) // give a wrong delivery a chance to show up
if fb.WriteCount() != 0 {
t.Fatalf("filtered-out client received %d frames", fb.WriteCount())
}
// exceptID must also be honoured.
room.Broadcast("y", map[string]any{}, a.ID, nil)
waitFor(t, func() bool { return fb.WriteCount() >= 1 })
if name, _ := lastSignal(t, fa); name == "y" {
t.Fatal("exceptID client should not have received the second broadcast")
}
}
func TestRoomEmptyTriggersDown(t *testing.T) {
hub := NewHub()
room := NewRoom(hub)
room.OwnerID = "a"
room.Publish()
a, _ := newTestClient("a")
hub.addClient(a)
room.Join(a)
room.Eject(a)
if _, ok := hub.Room(room.ID); ok {
t.Fatal("room should be removed from hub when it empties")
}
if a.InRoom(room.ID) {
t.Fatal("client should no longer reference a downed room")
}
}
// TestLeaveWhileSendRace is the #22 regression: broadcasting to a room while
// members concurrently leave and rejoin. Under -race this catches any unguarded
// access to shared room/peer state — the exact failure the Node engine had.
func TestLeaveWhileSendRace(t *testing.T) {
hub := NewHub()
room := NewRoom(hub)
room.OwnerID = "owner"
room.Publish()
const n = 30
clients := make([]*Client, n)
for i := 0; i < n; i++ {
c, _ := newTestClient(fmt.Sprintf("c%d", i))
clients[i] = c
hub.addClient(c)
room.Join(c)
}
var wg sync.WaitGroup
// Broadcasters hammer the room.
for g := 0; g < 4; g++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 300; j++ {
room.Broadcast("pack/room", map[string]any{"j": j}, "", nil)
}
}()
}
// Churn: members leave and rejoin repeatedly while broadcasts are in flight.
for i := 0; i < n; i++ {
wg.Add(1)
go func(c *Client) {
defer wg.Done()
for k := 0; k < 100; k++ {
room.Eject(c)
room.Join(c)
}
}(clients[i])
}
wg.Wait()
for _, c := range clients {
c.Close()
}
}
func TestHubRegistryConcurrency(t *testing.T) {
hub := NewHub()
hub.Register("noop", func(c *Client, m protocol.Message) any { return success() })
var wg sync.WaitGroup
for g := 0; g < 8; g++ {
wg.Add(1)
go func(g int) {
defer wg.Done()
for i := 0; i < 200; i++ {
id := fmt.Sprintf("g%d-%d", g, i)
c := NewClient(testutil.NewFakeConn(), id)
hub.addClient(c)
_, _ = hub.Client(id)
_ = hub.Clients()
_ = hub.ClientCount()
hub.removeClient(id)
}
}(g)
}
wg.Wait()
}
// success mirrors the services helper so the hub test stays self-contained.
func success() map[string]any { return map[string]any{"status": "success"} }
func TestPairReverseIndex(t *testing.T) {
a := NewClient(testutil.NewFakeConn(), "a")
b := NewClient(testutil.NewFakeConn(), "b")
a.AddPair(b)
if !a.HasPair("b") {
t.Fatal("a should have outgoing edge to b")
}
if got := b.PairedBy(); len(got) != 1 || got[0] != "a" {
t.Fatalf("b.PairedBy() = %v, want [a]", got)
}
// ForgetPeer (used during a disconnect of the other side) must clear both
// directions so no stale id remains.
a.ForgetPeer("b")
if a.HasPair("b") {
t.Fatal("a should no longer reference b")
}
// RemovePair clears the outgoing edge and the matching incoming record.
a.AddPair(b)
a.RemovePair(b)
if a.HasPair("b") || len(b.PairedBy()) != 0 {
t.Fatal("RemovePair should clear both sides")
}
}
func TestServerHandleRepliesToRequest(t *testing.T) {
hub := NewHub()
hub.Register("ping", func(c *Client, m protocol.Message) any {
return map[string]any{"pong": true}
})
srv := NewServer(hub)
fc := testutil.NewFakeConn()
go srv.handle(fc)
fc.Push([]byte(`[{"type":"ping"}, 5, "R"]`))
waitFor(t, func() bool { return fc.WriteCount() >= 1 })
writes := fc.Writes()
var arr []any
if err := json.Unmarshal(writes[len(writes)-1], &arr); err != nil {
t.Fatalf("decode reply: %v", err)
}
// Expect [ {"pong":true}, 5, "E" ].
if len(arr) != 3 {
t.Fatalf("reply arity = %d, want 3", len(arr))
}
if arr[1] != float64(5) {
t.Fatalf("reply id = %v, want 5", arr[1])
}
if arr[2] != protocol.FlagEnd {
t.Fatalf("reply flag = %v, want E", arr[2])
}
fc.Close()
}