package services import ( "encoding/json" "testing" "git.saqut.com/saqut/mwse/internal/datastore" "git.saqut.com/saqut/mwse/internal/testutil" "git.saqut.com/saqut/mwse/internal/ws" ) // waitDataOp waits for a data/op broadcast whose "op" field matches (the stream // may contain several — e.g. a set followed by a delete). func waitDataOp(t *testing.T, fc *testutil.FakeConn, op string) map[string]any { t.Helper() find := func() (map[string]any, bool) { for _, raw := range fc.Writes() { var arr []any if json.Unmarshal(raw, &arr) != nil || len(arr) != 2 || arr[1] != "data/op" { continue } if p, ok := arr[0].(map[string]any); ok && p["op"] == op { return p, true } } return nil, false } waitFor(t, func() bool { _, ok := find(); return ok }) p, _ := find() return p } // newHubReg builds a hub with services registered and returns the Registry so a // test can inspect the long-lived stores. func newHubReg() (*ws.Hub, *Registry) { hub := ws.NewHub() reg := Register(hub) return hub, reg } // TestActiveSyncBroadcast is the #45 collection/CRUD-broadcast core: a set by one // subscriber is broadcast to every other subscriber with the authoritative seq. func TestActiveSyncBroadcast(t *testing.T) { hub, _ := newHubReg() a, _ := connect(hub, "alice") b, fb := connect(hub, "bob") // Both open the same datastore by id. opened := asMap(t, hub.Handle(a, msg("data/open", "id", "shared", "kind", "temp", "primary", "id"))) id := opened["id"].(string) hub.Handle(b, msg("data/open", "id", id)) // alice sets a record. set := asMap(t, hub.Handle(a, msg("data/set", "id", id, "record", map[string]any{"id": "k1", "v": float64(7)}))) if set["status"] != "success" { t.Fatalf("data/set = %v", set) } // bob receives the data/op broadcast (set). op := waitSignal(t, fb, "data/op") if op["op"] != "set" { t.Fatalf("op = %v, want set", op["op"]) } rec := asMap(t, op["record"]) if rec["key"] != "k1" { t.Fatalf("broadcast record key = %v, want k1", rec["key"]) } // bob can read the record back from the authoritative copy. (An in-process // reply carries the Go value; over a real socket it is the same shape as JSON.) got := asMap(t, hub.Handle(b, msg("data/get", "id", id, "key", "k1"))) gotRec := got["record"].(datastore.Record) if gotRec.Key != "k1" { t.Fatalf("data/get = %v", got) } } func TestActiveSyncDeleteBroadcast(t *testing.T) { hub, _ := newHubReg() a, _ := connect(hub, "alice") b, fb := connect(hub, "bob") hub.Handle(a, msg("data/open", "id", "d", "kind", "temp")) hub.Handle(b, msg("data/open", "id", "d")) hub.Handle(a, msg("data/set", "id", "d", "record", map[string]any{"id": "x"})) del := asMap(t, hub.Handle(a, msg("data/delete", "id", "d", "key", "x"))) if del["status"] != "success" { t.Fatalf("data/delete = %v", del) } op := waitDataOp(t, fb, "delete") if op["key"] != "x" { t.Fatalf("delete broadcast = %v", op) } } // TestPassiveSyncConvergence is the #45 merge-pool core: pushes are deduped and // only the delta is broadcast, so subscribers converge. func TestPassiveSyncConvergence(t *testing.T) { hub, _ := newHubReg() a, _ := connect(hub, "alice") b, fb := connect(hub, "bob") opened := asMap(t, hub.Handle(a, msg("sync/open", "id", "pool"))) id := opened["id"].(string) hub.Handle(b, msg("sync/open", "id", id)) // alice pushes two items; bob gets them as a delta. res := asMap(t, hub.Handle(a, msg("sync/push", "id", id, "items", []any{"x", "y"}))) if res["added"] != 2 { t.Fatalf("push added = %v, want 2", res["added"]) } add := waitSignal(t, fb, "sync/add") if items, _ := add["items"].([]any); len(items) != 2 { t.Fatalf("sync/add items = %v, want 2", add["items"]) } // bob pulls and sees the full converged pool. pull := asMap(t, hub.Handle(b, msg("sync/pull", "id", id))) if items, _ := pull["items"].([]any); len(items) != 2 { t.Fatalf("sync/pull items = %v, want 2", pull["items"]) } // A push with one new + one existing item broadcasts only the new one. res = asMap(t, hub.Handle(b, msg("sync/push", "id", id, "items", []any{"y", "z"}))) if res["added"] != 1 { t.Fatalf("second push added = %v, want 1", res["added"]) } } // TestDataSubscriptionClearedOnDisconnect proves no residual subscription is left // behind (no leak) when a subscriber disconnects. func TestDataSubscriptionClearedOnDisconnect(t *testing.T) { hub, reg := newHubReg() a, _ := connect(hub, "alice") connect(hub, "bob") hub.Handle(a, msg("data/open", "id", "d", "kind", "temp")) ds, ok := reg.Data.Get("d") if !ok { t.Fatal("datastore should exist") } if len(ds.Subscribers()) != 1 { t.Fatalf("subscribers before disconnect = %v, want [alice]", ds.Subscribers()) } hub.Disconnect(a) if len(ds.Subscribers()) != 0 { t.Fatalf("subscribers after disconnect = %v, want empty", ds.Subscribers()) } }