MWSE/internal/services/notify.go

96 lines
2.8 KiB
Go

package services
import (
"time"
"git.saqut.com/saqut/mwse/internal/notify"
"git.saqut.com/saqut/mwse/internal/protocol"
"git.saqut.com/saqut/mwse/internal/ws"
)
// registerNotify wires the store-and-forward notification service (#43) and its
// reply-bearing suit variant (#44). The returned store is handed to the caller so
// a janitor can reclaim expired entries.
//
// Wire surface (all additive — the frozen relay contract is untouched):
//
// - notify/send {to, pack, expires?, suit?} -> {status, trace}
// - notify/status {trace} -> {status, delivered, replied, reply}
// - notify/reply {trace, pack} -> {status} (client answering a suit)
//
// Delivery to the target is the server signal:
//
// [ {from, pack, trace, suit}, "notify" ]
func registerNotify(hub *ws.Hub, trigger NotifyTrigger) *notify.Store {
store := notify.NewStore()
deliver := func(c *ws.Client, n notify.Notification) {
c.Signal("notify", map[string]any{
"from": n.From,
"pack": n.Pack,
"trace": n.Trace,
"suit": n.Suit,
})
}
// flush drains and delivers everything queued for a connected client.
flush := func(c *ws.Client) {
for _, n := range store.Drain(c.ID) {
deliver(c, n)
}
}
// On connect, deliver anything that arrived while the client was offline.
hub.OnConnect(flush)
hub.Register("notify/send", func(c *ws.Client, m protocol.Message) any {
to := m.Str("to")
if to == "" {
return fail("TO_REQUIRED")
}
ttl := time.Duration(m.Int("expires")) * time.Second // 0 => store default
n := store.Put(c.ID, to, m.Get("pack"), m.Truthy("suit"), ttl)
// If the target is connected right now, deliver immediately; otherwise it
// stays queued until the target's next connect.
if target, ok := hub.Client(to); ok {
flush(target)
}
return map[string]any{"status": "success", "trace": n.Trace}
})
hub.Register("notify/status", func(c *ws.Client, m protocol.Message) any {
n, ok := store.Status(m.Str("trace"))
if !ok {
return fail("NOT_FOUND")
}
return map[string]any{
"status": "success",
"delivered": n.Delivered,
"replied": n.Replied,
"reply": n.Reply,
}
})
hub.Register("notify/reply", func(c *ws.Client, m protocol.Message) any {
n, ok := store.Reply(m.Str("trace"), m.Get("pack"))
if !ok {
return fail("NOT_FOUND_OR_NOT_SUIT")
}
// Push the reply outward to the 3rd-party application server (#44): MWSE
// triggers it manually rather than having the server poll notify/status.
trigger.NotifyReplied(n)
// If the original sender is an online client, signal it the reply too.
if origin, ok := hub.Client(n.From); ok {
origin.Signal("notify/reply", map[string]any{
"trace": n.Trace,
"from": c.ID,
"pack": n.Reply,
})
}
return success()
})
return store
}