package services import ( "time" "git.saqut.com/saqut/mwse/internal/datastore" "git.saqut.com/saqut/mwse/internal/protocol" "git.saqut.com/saqut/mwse/internal/ws" ) // registerDatastore wires the #45 shared data layer: active sync (collections with // CRUD broadcast), passive sync (a fast merge pool), and temp/permanent // datastores. The returned store is handed back so a janitor can reclaim expired // temp stores/pools. // // Wire surface (additive): // // Active sync / collection / datastore: // data/open {id?, kind?, primary?, expires?} -> {status, id, primary, kind, records} // (kind = "temp" | "permanent". The payload field is "kind", NOT "type": // "type" is reserved by WSTS to select the handler.) // data/set {id, record} -> {status, key, seq} + broadcast data/op(set) // data/delete {id, key} -> {status, seq} + broadcast data/op(delete) // data/get {id, key?} -> {status, record|records} // // Passive sync (merge pool): // sync/open {id?, expires?} -> {status, id, items} // sync/push {id, items} -> {status, added} + broadcast sync/add // sync/pull {id} -> {status, items} // // Broadcasts are server signals delivered to every *other* subscriber: // // [ {id, op, record|key, seq}, "data/op" ] // [ {id, items}, "sync/add" ] func registerDatastore(hub *ws.Hub) *datastore.Store { store := datastore.NewStore() // Signal a set of subscriber ids, skipping the originator (it already holds the // change and learns the authoritative seq from its own reply). broadcast := func(subs []string, except, name string, payload map[string]any) { for _, id := range subs { if id == except { continue } if cl, ok := hub.Client(id); ok { cl.Signal(name, payload) } } } // A departed client must leave no residual subscription anywhere. hub.OnDisconnect(func(c *ws.Client) { store.UnsubscribeAll(c.ID) }) // ---- active sync / collection / datastore ------------------------------- hub.Register("data/open", func(c *ws.Client, m protocol.Message) any { kind := datastore.Temp if m.Str("kind") == string(datastore.Permanent) { kind = datastore.Permanent } ttl := time.Duration(m.Int("expires")) * time.Second ds := store.Open(m.Str("id"), kind, m.Str("primary"), ttl) ds.Subscribe(c.ID) return map[string]any{ "status": "success", "id": ds.ID, "primary": ds.Primary, "kind": string(ds.Kind), "records": ds.Snapshot(), } }) hub.Register("data/set", func(c *ws.Client, m protocol.Message) any { ds, ok := store.Get(m.Str("id")) if !ok { return fail("NOT_FOUND") } rec, ok := ds.Set(toMap(m.Get("record")), store.Now()) if !ok { return fail("PRIMARY_KEY_REQUIRED") } broadcast(ds.Subscribers(), c.ID, "data/op", map[string]any{ "id": ds.ID, "op": "set", "record": rec, "seq": rec.Seq, }) return map[string]any{"status": "success", "key": rec.Key, "seq": rec.Seq} }) hub.Register("data/delete", func(c *ws.Client, m protocol.Message) any { ds, ok := store.Get(m.Str("id")) if !ok { return fail("NOT_FOUND") } key := m.Str("key") seq, ok := ds.Delete(key) if !ok { return fail("NO_SUCH_KEY") } broadcast(ds.Subscribers(), c.ID, "data/op", map[string]any{ "id": ds.ID, "op": "delete", "key": key, "seq": seq, }) return map[string]any{"status": "success", "seq": seq} }) hub.Register("data/get", func(c *ws.Client, m protocol.Message) any { ds, ok := store.Get(m.Str("id")) if !ok { return fail("NOT_FOUND") } if key := m.Str("key"); key != "" { rec, ok := ds.Get(key) if !ok { return fail("NO_SUCH_KEY") } return map[string]any{"status": "success", "record": rec} } return map[string]any{"status": "success", "records": ds.Snapshot()} }) // ---- passive sync (merge pool) ------------------------------------------ hub.Register("sync/open", func(c *ws.Client, m protocol.Message) any { ttl := time.Duration(m.Int("expires")) * time.Second p := store.OpenPool(m.Str("id"), ttl) p.Subscribe(c.ID) return map[string]any{"status": "success", "id": p.ID, "items": p.Items()} }) hub.Register("sync/push", func(c *ws.Client, m protocol.Message) any { items := toSlice(m.Get("items")) added, subs, ok := store.PushPool(m.Str("id"), items) if !ok { return fail("NOT_FOUND") } if len(added) > 0 { broadcast(subs, c.ID, "sync/add", map[string]any{"id": m.Str("id"), "items": added}) } return map[string]any{"status": "success", "added": len(added)} }) hub.Register("sync/pull", func(c *ws.Client, m protocol.Message) any { p, ok := store.GetPool(m.Str("id")) if !ok { return fail("NOT_FOUND") } return map[string]any{"status": "success", "items": p.Items()} }) return store } // toSlice coerces a decoded JSON value to a slice, returning nil when it is not // an array. func toSlice(v any) []any { if s, ok := v.([]any); ok { return s } return nil }