MWSE/internal/notify/store.go

219 lines
5.7 KiB
Go

// Package notify implements store-and-forward notifications (#43) and their
// reply-bearing "suit" variant (#44). A notification addressed to a client that
// is offline is held until the client connects; one addressed to an online client
// is delivered immediately. Every notification carries a trace id so its delivery
// (and, for suit notifications, its reply) can be queried later.
//
// The store is bounded in two independent ways so it can never grow without
// limit, even if a target never reconnects: an expiry per notification and a hard
// per-target cap (oldest dropped first). A janitor periodically purges expired
// entries; tests drive expiry deterministically through an injectable clock.
package notify
import (
"crypto/rand"
"encoding/hex"
"sync"
"time"
)
// Notification is one store-and-forward message addressed to a client id.
type Notification struct {
Trace string // unique id used to query delivery/reply status
From string // sender: a client id, or "server" for server-originated
To string // target client id
Pack any // opaque payload delivered to the target
Suit bool // true => a reply is expected (#44)
CreatedAt time.Time
ExpiresAt time.Time // zero == never expires
Delivered bool
DeliveredAt time.Time
Replied bool
Reply any
RepliedAt time.Time
}
func (n *Notification) expired(now time.Time) bool {
return !n.ExpiresAt.IsZero() && now.After(n.ExpiresAt)
}
// Store is the in-memory notification registry. All methods are safe for
// concurrent use.
type Store struct {
mu sync.Mutex
pending map[string][]*Notification // target id -> queued, undelivered
byTrace map[string]*Notification // trace -> notification (status + reply)
now func() time.Time
defaultTTL time.Duration
maxPerTarget int
}
// NewStore returns an empty store with sensible defaults (24h TTL, 1024 queued
// notifications per offline target).
func NewStore() *Store {
return &Store{
pending: make(map[string][]*Notification),
byTrace: make(map[string]*Notification),
now: time.Now,
defaultTTL: 24 * time.Hour,
maxPerTarget: 1024,
}
}
// SetClock replaces the time source; intended for deterministic tests.
func (s *Store) SetClock(now func() time.Time) { s.now = now }
// Put records a notification for `to`. A zero ttl uses the store default. The
// returned Notification is a snapshot; the trace id is filled in.
func (s *Store) Put(from, to string, pack any, suit bool, ttl time.Duration) Notification {
if ttl <= 0 {
ttl = s.defaultTTL
}
now := s.now()
n := &Notification{
Trace: newTrace(),
From: from,
To: to,
Pack: pack,
Suit: suit,
CreatedAt: now,
ExpiresAt: now.Add(ttl),
}
s.mu.Lock()
defer s.mu.Unlock()
q := s.pending[to]
// Enforce the per-target cap by evicting the oldest queued notifications.
for len(q) >= s.maxPerTarget && len(q) > 0 {
evicted := q[0]
q = q[1:]
delete(s.byTrace, evicted.Trace)
}
s.pending[to] = append(q, n)
s.byTrace[n.Trace] = n
return *n
}
// Drain returns every non-expired pending notification for `to`, marking each
// delivered and removing it from the pending queue. Delivered notifications stay
// in the trace index (until they expire) so their status can still be queried.
func (s *Store) Drain(to string) []Notification {
now := s.now()
s.mu.Lock()
defer s.mu.Unlock()
q := s.pending[to]
if len(q) == 0 {
return nil
}
delete(s.pending, to)
out := make([]Notification, 0, len(q))
for _, n := range q {
if n.expired(now) {
delete(s.byTrace, n.Trace)
continue
}
n.Delivered = true
n.DeliveredAt = now
out = append(out, *n)
}
return out
}
// Status returns a snapshot of the notification with the given trace.
func (s *Store) Status(trace string) (Notification, bool) {
now := s.now()
s.mu.Lock()
defer s.mu.Unlock()
n, ok := s.byTrace[trace]
if !ok || n.expired(now) {
return Notification{}, false
}
return *n, true
}
// Reply records a client's reply to a suit notification and returns the updated
// snapshot. It fails if the trace is unknown, expired, or not a suit.
func (s *Store) Reply(trace string, reply any) (Notification, bool) {
now := s.now()
s.mu.Lock()
defer s.mu.Unlock()
n, ok := s.byTrace[trace]
if !ok || n.expired(now) || !n.Suit {
return Notification{}, false
}
n.Replied = true
n.Reply = reply
n.RepliedAt = now
return *n, true
}
// PurgeExpired removes expired notifications from both indexes and returns how
// many were removed. A janitor calls this periodically; Drain/Status/Reply also
// drop expired entries lazily as they are touched.
func (s *Store) PurgeExpired() int {
now := s.now()
s.mu.Lock()
defer s.mu.Unlock()
removed := 0
for trace, n := range s.byTrace {
if n.expired(now) {
delete(s.byTrace, trace)
removed++
}
}
for to, q := range s.pending {
kept := q[:0]
for _, n := range q {
if !n.expired(now) {
kept = append(kept, n)
}
}
if len(kept) == 0 {
delete(s.pending, to)
} else {
s.pending[to] = kept
}
}
return removed
}
// PendingCount reports how many notifications are queued for a target (test aid).
func (s *Store) PendingCount(to string) int {
s.mu.Lock()
defer s.mu.Unlock()
return len(s.pending[to])
}
// StartJanitor runs PurgeExpired every interval until the returned stop function
// is called. main() owns the lifecycle; the stop function makes it leak-free.
func (s *Store) StartJanitor(interval time.Duration) (stop func()) {
done := make(chan struct{})
go func() {
t := time.NewTicker(interval)
defer t.Stop()
for {
select {
case <-t.C:
s.PurgeExpired()
case <-done:
return
}
}
}()
var once sync.Once
return func() { once.Do(func() { close(done) }) }
}
func newTrace() string {
var b [16]byte
_, _ = rand.Read(b[:])
return hex.EncodeToString(b[:])
}