MWSE/internal/services/datastore.go

165 lines
5.0 KiB
Go

package services
import (
"time"
"git.saqut.com/saqut/mwse/internal/datastore"
"git.saqut.com/saqut/mwse/internal/protocol"
"git.saqut.com/saqut/mwse/internal/ws"
)
// registerDatastore wires the #45 shared data layer: active sync (collections with
// CRUD broadcast), passive sync (a fast merge pool), and temp/permanent
// datastores. The returned store is handed back so a janitor can reclaim expired
// temp stores/pools.
//
// Wire surface (additive):
//
// Active sync / collection / datastore:
// data/open {id?, kind?, primary?, expires?} -> {status, id, primary, kind, records}
// (kind = "temp" | "permanent". The payload field is "kind", NOT "type":
// "type" is reserved by WSTS to select the handler.)
// data/set {id, record} -> {status, key, seq} + broadcast data/op(set)
// data/delete {id, key} -> {status, seq} + broadcast data/op(delete)
// data/get {id, key?} -> {status, record|records}
//
// Passive sync (merge pool):
// sync/open {id?, expires?} -> {status, id, items}
// sync/push {id, items} -> {status, added} + broadcast sync/add
// sync/pull {id} -> {status, items}
//
// Broadcasts are server signals delivered to every *other* subscriber:
//
// [ {id, op, record|key, seq}, "data/op" ]
// [ {id, items}, "sync/add" ]
func registerDatastore(hub *ws.Hub) *datastore.Store {
store := datastore.NewStore()
// Signal a set of subscriber ids, skipping the originator (it already holds the
// change and learns the authoritative seq from its own reply).
broadcast := func(subs []string, except, name string, payload map[string]any) {
for _, id := range subs {
if id == except {
continue
}
if cl, ok := hub.Client(id); ok {
cl.Signal(name, payload)
}
}
}
// A departed client must leave no residual subscription anywhere.
hub.OnDisconnect(func(c *ws.Client) { store.UnsubscribeAll(c.ID) })
// ---- active sync / collection / datastore -------------------------------
hub.Register("data/open", func(c *ws.Client, m protocol.Message) any {
kind := datastore.Temp
if m.Str("kind") == string(datastore.Permanent) {
kind = datastore.Permanent
}
ttl := time.Duration(m.Int("expires")) * time.Second
ds := store.Open(m.Str("id"), kind, m.Str("primary"), ttl)
ds.Subscribe(c.ID)
return map[string]any{
"status": "success",
"id": ds.ID,
"primary": ds.Primary,
"kind": string(ds.Kind),
"records": ds.Snapshot(),
}
})
hub.Register("data/set", func(c *ws.Client, m protocol.Message) any {
ds, ok := store.Get(m.Str("id"))
if !ok {
return fail("NOT_FOUND")
}
rec, ok := ds.Set(toMap(m.Get("record")), store.Now())
if !ok {
return fail("PRIMARY_KEY_REQUIRED")
}
broadcast(ds.Subscribers(), c.ID, "data/op", map[string]any{
"id": ds.ID,
"op": "set",
"record": rec,
"seq": rec.Seq,
})
return map[string]any{"status": "success", "key": rec.Key, "seq": rec.Seq}
})
hub.Register("data/delete", func(c *ws.Client, m protocol.Message) any {
ds, ok := store.Get(m.Str("id"))
if !ok {
return fail("NOT_FOUND")
}
key := m.Str("key")
seq, ok := ds.Delete(key)
if !ok {
return fail("NO_SUCH_KEY")
}
broadcast(ds.Subscribers(), c.ID, "data/op", map[string]any{
"id": ds.ID,
"op": "delete",
"key": key,
"seq": seq,
})
return map[string]any{"status": "success", "seq": seq}
})
hub.Register("data/get", func(c *ws.Client, m protocol.Message) any {
ds, ok := store.Get(m.Str("id"))
if !ok {
return fail("NOT_FOUND")
}
if key := m.Str("key"); key != "" {
rec, ok := ds.Get(key)
if !ok {
return fail("NO_SUCH_KEY")
}
return map[string]any{"status": "success", "record": rec}
}
return map[string]any{"status": "success", "records": ds.Snapshot()}
})
// ---- passive sync (merge pool) ------------------------------------------
hub.Register("sync/open", func(c *ws.Client, m protocol.Message) any {
ttl := time.Duration(m.Int("expires")) * time.Second
p := store.OpenPool(m.Str("id"), ttl)
p.Subscribe(c.ID)
return map[string]any{"status": "success", "id": p.ID, "items": p.Items()}
})
hub.Register("sync/push", func(c *ws.Client, m protocol.Message) any {
items := toSlice(m.Get("items"))
added, subs, ok := store.PushPool(m.Str("id"), items)
if !ok {
return fail("NOT_FOUND")
}
if len(added) > 0 {
broadcast(subs, c.ID, "sync/add", map[string]any{"id": m.Str("id"), "items": added})
}
return map[string]any{"status": "success", "added": len(added)}
})
hub.Register("sync/pull", func(c *ws.Client, m protocol.Message) any {
p, ok := store.GetPool(m.Str("id"))
if !ok {
return fail("NOT_FOUND")
}
return map[string]any{"status": "success", "items": p.Items()}
})
return store
}
// toSlice coerces a decoded JSON value to a slice, returning nil when it is not
// an array.
func toSlice(v any) []any {
if s, ok := v.([]any); ok {
return s
}
return nil
}