165 lines
5.0 KiB
Go
165 lines
5.0 KiB
Go
package services
|
|
|
|
import (
|
|
"time"
|
|
|
|
"git.saqut.com/saqut/mwse/internal/datastore"
|
|
"git.saqut.com/saqut/mwse/internal/protocol"
|
|
"git.saqut.com/saqut/mwse/internal/ws"
|
|
)
|
|
|
|
// registerDatastore wires the #45 shared data layer: active sync (collections with
|
|
// CRUD broadcast), passive sync (a fast merge pool), and temp/permanent
|
|
// datastores. The returned store is handed back so a janitor can reclaim expired
|
|
// temp stores/pools.
|
|
//
|
|
// Wire surface (additive):
|
|
//
|
|
// Active sync / collection / datastore:
|
|
// data/open {id?, kind?, primary?, expires?} -> {status, id, primary, kind, records}
|
|
// (kind = "temp" | "permanent". The payload field is "kind", NOT "type":
|
|
// "type" is reserved by WSTS to select the handler.)
|
|
// data/set {id, record} -> {status, key, seq} + broadcast data/op(set)
|
|
// data/delete {id, key} -> {status, seq} + broadcast data/op(delete)
|
|
// data/get {id, key?} -> {status, record|records}
|
|
//
|
|
// Passive sync (merge pool):
|
|
// sync/open {id?, expires?} -> {status, id, items}
|
|
// sync/push {id, items} -> {status, added} + broadcast sync/add
|
|
// sync/pull {id} -> {status, items}
|
|
//
|
|
// Broadcasts are server signals delivered to every *other* subscriber:
|
|
//
|
|
// [ {id, op, record|key, seq}, "data/op" ]
|
|
// [ {id, items}, "sync/add" ]
|
|
func registerDatastore(hub *ws.Hub) *datastore.Store {
|
|
store := datastore.NewStore()
|
|
|
|
// Signal a set of subscriber ids, skipping the originator (it already holds the
|
|
// change and learns the authoritative seq from its own reply).
|
|
broadcast := func(subs []string, except, name string, payload map[string]any) {
|
|
for _, id := range subs {
|
|
if id == except {
|
|
continue
|
|
}
|
|
if cl, ok := hub.Client(id); ok {
|
|
cl.Signal(name, payload)
|
|
}
|
|
}
|
|
}
|
|
|
|
// A departed client must leave no residual subscription anywhere.
|
|
hub.OnDisconnect(func(c *ws.Client) { store.UnsubscribeAll(c.ID) })
|
|
|
|
// ---- active sync / collection / datastore -------------------------------
|
|
|
|
hub.Register("data/open", func(c *ws.Client, m protocol.Message) any {
|
|
kind := datastore.Temp
|
|
if m.Str("kind") == string(datastore.Permanent) {
|
|
kind = datastore.Permanent
|
|
}
|
|
ttl := time.Duration(m.Int("expires")) * time.Second
|
|
ds := store.Open(m.Str("id"), kind, m.Str("primary"), ttl)
|
|
ds.Subscribe(c.ID)
|
|
return map[string]any{
|
|
"status": "success",
|
|
"id": ds.ID,
|
|
"primary": ds.Primary,
|
|
"kind": string(ds.Kind),
|
|
"records": ds.Snapshot(),
|
|
}
|
|
})
|
|
|
|
hub.Register("data/set", func(c *ws.Client, m protocol.Message) any {
|
|
ds, ok := store.Get(m.Str("id"))
|
|
if !ok {
|
|
return fail("NOT_FOUND")
|
|
}
|
|
rec, ok := ds.Set(toMap(m.Get("record")), store.Now())
|
|
if !ok {
|
|
return fail("PRIMARY_KEY_REQUIRED")
|
|
}
|
|
broadcast(ds.Subscribers(), c.ID, "data/op", map[string]any{
|
|
"id": ds.ID,
|
|
"op": "set",
|
|
"record": rec,
|
|
"seq": rec.Seq,
|
|
})
|
|
return map[string]any{"status": "success", "key": rec.Key, "seq": rec.Seq}
|
|
})
|
|
|
|
hub.Register("data/delete", func(c *ws.Client, m protocol.Message) any {
|
|
ds, ok := store.Get(m.Str("id"))
|
|
if !ok {
|
|
return fail("NOT_FOUND")
|
|
}
|
|
key := m.Str("key")
|
|
seq, ok := ds.Delete(key)
|
|
if !ok {
|
|
return fail("NO_SUCH_KEY")
|
|
}
|
|
broadcast(ds.Subscribers(), c.ID, "data/op", map[string]any{
|
|
"id": ds.ID,
|
|
"op": "delete",
|
|
"key": key,
|
|
"seq": seq,
|
|
})
|
|
return map[string]any{"status": "success", "seq": seq}
|
|
})
|
|
|
|
hub.Register("data/get", func(c *ws.Client, m protocol.Message) any {
|
|
ds, ok := store.Get(m.Str("id"))
|
|
if !ok {
|
|
return fail("NOT_FOUND")
|
|
}
|
|
if key := m.Str("key"); key != "" {
|
|
rec, ok := ds.Get(key)
|
|
if !ok {
|
|
return fail("NO_SUCH_KEY")
|
|
}
|
|
return map[string]any{"status": "success", "record": rec}
|
|
}
|
|
return map[string]any{"status": "success", "records": ds.Snapshot()}
|
|
})
|
|
|
|
// ---- passive sync (merge pool) ------------------------------------------
|
|
|
|
hub.Register("sync/open", func(c *ws.Client, m protocol.Message) any {
|
|
ttl := time.Duration(m.Int("expires")) * time.Second
|
|
p := store.OpenPool(m.Str("id"), ttl)
|
|
p.Subscribe(c.ID)
|
|
return map[string]any{"status": "success", "id": p.ID, "items": p.Items()}
|
|
})
|
|
|
|
hub.Register("sync/push", func(c *ws.Client, m protocol.Message) any {
|
|
items := toSlice(m.Get("items"))
|
|
added, subs, ok := store.PushPool(m.Str("id"), items)
|
|
if !ok {
|
|
return fail("NOT_FOUND")
|
|
}
|
|
if len(added) > 0 {
|
|
broadcast(subs, c.ID, "sync/add", map[string]any{"id": m.Str("id"), "items": added})
|
|
}
|
|
return map[string]any{"status": "success", "added": len(added)}
|
|
})
|
|
|
|
hub.Register("sync/pull", func(c *ws.Client, m protocol.Message) any {
|
|
p, ok := store.GetPool(m.Str("id"))
|
|
if !ok {
|
|
return fail("NOT_FOUND")
|
|
}
|
|
return map[string]any{"status": "success", "items": p.Items()}
|
|
})
|
|
|
|
return store
|
|
}
|
|
|
|
// toSlice coerces a decoded JSON value to a slice, returning nil when it is not
|
|
// an array.
|
|
func toSlice(v any) []any {
|
|
if s, ok := v.([]any); ok {
|
|
return s
|
|
}
|
|
return nil
|
|
}
|