219 lines
5.7 KiB
Go
219 lines
5.7 KiB
Go
// Package notify implements store-and-forward notifications (#43) and their
|
|
// reply-bearing "suit" variant (#44). A notification addressed to a client that
|
|
// is offline is held until the client connects; one addressed to an online client
|
|
// is delivered immediately. Every notification carries a trace id so its delivery
|
|
// (and, for suit notifications, its reply) can be queried later.
|
|
//
|
|
// The store is bounded in two independent ways so it can never grow without
|
|
// limit, even if a target never reconnects: an expiry per notification and a hard
|
|
// per-target cap (oldest dropped first). A janitor periodically purges expired
|
|
// entries; tests drive expiry deterministically through an injectable clock.
|
|
package notify
|
|
|
|
import (
|
|
"crypto/rand"
|
|
"encoding/hex"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// Notification is one store-and-forward message addressed to a client id.
|
|
type Notification struct {
|
|
Trace string // unique id used to query delivery/reply status
|
|
From string // sender: a client id, or "server" for server-originated
|
|
To string // target client id
|
|
Pack any // opaque payload delivered to the target
|
|
Suit bool // true => a reply is expected (#44)
|
|
|
|
CreatedAt time.Time
|
|
ExpiresAt time.Time // zero == never expires
|
|
Delivered bool
|
|
DeliveredAt time.Time
|
|
|
|
Replied bool
|
|
Reply any
|
|
RepliedAt time.Time
|
|
}
|
|
|
|
func (n *Notification) expired(now time.Time) bool {
|
|
return !n.ExpiresAt.IsZero() && now.After(n.ExpiresAt)
|
|
}
|
|
|
|
// Store is the in-memory notification registry. All methods are safe for
|
|
// concurrent use.
|
|
type Store struct {
|
|
mu sync.Mutex
|
|
pending map[string][]*Notification // target id -> queued, undelivered
|
|
byTrace map[string]*Notification // trace -> notification (status + reply)
|
|
|
|
now func() time.Time
|
|
defaultTTL time.Duration
|
|
maxPerTarget int
|
|
}
|
|
|
|
// NewStore returns an empty store with sensible defaults (24h TTL, 1024 queued
|
|
// notifications per offline target).
|
|
func NewStore() *Store {
|
|
return &Store{
|
|
pending: make(map[string][]*Notification),
|
|
byTrace: make(map[string]*Notification),
|
|
now: time.Now,
|
|
defaultTTL: 24 * time.Hour,
|
|
maxPerTarget: 1024,
|
|
}
|
|
}
|
|
|
|
// SetClock replaces the time source; intended for deterministic tests.
|
|
func (s *Store) SetClock(now func() time.Time) { s.now = now }
|
|
|
|
// Put records a notification for `to`. A zero ttl uses the store default. The
|
|
// returned Notification is a snapshot; the trace id is filled in.
|
|
func (s *Store) Put(from, to string, pack any, suit bool, ttl time.Duration) Notification {
|
|
if ttl <= 0 {
|
|
ttl = s.defaultTTL
|
|
}
|
|
now := s.now()
|
|
n := &Notification{
|
|
Trace: newTrace(),
|
|
From: from,
|
|
To: to,
|
|
Pack: pack,
|
|
Suit: suit,
|
|
CreatedAt: now,
|
|
ExpiresAt: now.Add(ttl),
|
|
}
|
|
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
q := s.pending[to]
|
|
// Enforce the per-target cap by evicting the oldest queued notifications.
|
|
for len(q) >= s.maxPerTarget && len(q) > 0 {
|
|
evicted := q[0]
|
|
q = q[1:]
|
|
delete(s.byTrace, evicted.Trace)
|
|
}
|
|
s.pending[to] = append(q, n)
|
|
s.byTrace[n.Trace] = n
|
|
return *n
|
|
}
|
|
|
|
// Drain returns every non-expired pending notification for `to`, marking each
|
|
// delivered and removing it from the pending queue. Delivered notifications stay
|
|
// in the trace index (until they expire) so their status can still be queried.
|
|
func (s *Store) Drain(to string) []Notification {
|
|
now := s.now()
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
q := s.pending[to]
|
|
if len(q) == 0 {
|
|
return nil
|
|
}
|
|
delete(s.pending, to)
|
|
|
|
out := make([]Notification, 0, len(q))
|
|
for _, n := range q {
|
|
if n.expired(now) {
|
|
delete(s.byTrace, n.Trace)
|
|
continue
|
|
}
|
|
n.Delivered = true
|
|
n.DeliveredAt = now
|
|
out = append(out, *n)
|
|
}
|
|
return out
|
|
}
|
|
|
|
// Status returns a snapshot of the notification with the given trace.
|
|
func (s *Store) Status(trace string) (Notification, bool) {
|
|
now := s.now()
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
n, ok := s.byTrace[trace]
|
|
if !ok || n.expired(now) {
|
|
return Notification{}, false
|
|
}
|
|
return *n, true
|
|
}
|
|
|
|
// Reply records a client's reply to a suit notification and returns the updated
|
|
// snapshot. It fails if the trace is unknown, expired, or not a suit.
|
|
func (s *Store) Reply(trace string, reply any) (Notification, bool) {
|
|
now := s.now()
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
n, ok := s.byTrace[trace]
|
|
if !ok || n.expired(now) || !n.Suit {
|
|
return Notification{}, false
|
|
}
|
|
n.Replied = true
|
|
n.Reply = reply
|
|
n.RepliedAt = now
|
|
return *n, true
|
|
}
|
|
|
|
// PurgeExpired removes expired notifications from both indexes and returns how
|
|
// many were removed. A janitor calls this periodically; Drain/Status/Reply also
|
|
// drop expired entries lazily as they are touched.
|
|
func (s *Store) PurgeExpired() int {
|
|
now := s.now()
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
removed := 0
|
|
for trace, n := range s.byTrace {
|
|
if n.expired(now) {
|
|
delete(s.byTrace, trace)
|
|
removed++
|
|
}
|
|
}
|
|
for to, q := range s.pending {
|
|
kept := q[:0]
|
|
for _, n := range q {
|
|
if !n.expired(now) {
|
|
kept = append(kept, n)
|
|
}
|
|
}
|
|
if len(kept) == 0 {
|
|
delete(s.pending, to)
|
|
} else {
|
|
s.pending[to] = kept
|
|
}
|
|
}
|
|
return removed
|
|
}
|
|
|
|
// PendingCount reports how many notifications are queued for a target (test aid).
|
|
func (s *Store) PendingCount(to string) int {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
return len(s.pending[to])
|
|
}
|
|
|
|
// StartJanitor runs PurgeExpired every interval until the returned stop function
|
|
// is called. main() owns the lifecycle; the stop function makes it leak-free.
|
|
func (s *Store) StartJanitor(interval time.Duration) (stop func()) {
|
|
done := make(chan struct{})
|
|
go func() {
|
|
t := time.NewTicker(interval)
|
|
defer t.Stop()
|
|
for {
|
|
select {
|
|
case <-t.C:
|
|
s.PurgeExpired()
|
|
case <-done:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
var once sync.Once
|
|
return func() { once.Do(func() { close(done) }) }
|
|
}
|
|
|
|
func newTrace() string {
|
|
var b [16]byte
|
|
_, _ = rand.Read(b[:])
|
|
return hex.EncodeToString(b[:])
|
|
}
|