// Package notify implements store-and-forward notifications (#43) and their // reply-bearing "suit" variant (#44). A notification addressed to a client that // is offline is held until the client connects; one addressed to an online client // is delivered immediately. Every notification carries a trace id so its delivery // (and, for suit notifications, its reply) can be queried later. // // The store is bounded in two independent ways so it can never grow without // limit, even if a target never reconnects: an expiry per notification and a hard // per-target cap (oldest dropped first). A janitor periodically purges expired // entries; tests drive expiry deterministically through an injectable clock. package notify import ( "crypto/rand" "encoding/hex" "sync" "time" ) // Notification is one store-and-forward message addressed to a client id. type Notification struct { Trace string // unique id used to query delivery/reply status From string // sender: a client id, or "server" for server-originated To string // target client id Pack any // opaque payload delivered to the target Suit bool // true => a reply is expected (#44) CreatedAt time.Time ExpiresAt time.Time // zero == never expires Delivered bool DeliveredAt time.Time Replied bool Reply any RepliedAt time.Time } func (n *Notification) expired(now time.Time) bool { return !n.ExpiresAt.IsZero() && now.After(n.ExpiresAt) } // Store is the in-memory notification registry. All methods are safe for // concurrent use. type Store struct { mu sync.Mutex pending map[string][]*Notification // target id -> queued, undelivered byTrace map[string]*Notification // trace -> notification (status + reply) now func() time.Time defaultTTL time.Duration maxPerTarget int } // NewStore returns an empty store with sensible defaults (24h TTL, 1024 queued // notifications per offline target). func NewStore() *Store { return &Store{ pending: make(map[string][]*Notification), byTrace: make(map[string]*Notification), now: time.Now, defaultTTL: 24 * time.Hour, maxPerTarget: 1024, } } // SetClock replaces the time source; intended for deterministic tests. func (s *Store) SetClock(now func() time.Time) { s.now = now } // Put records a notification for `to`. A zero ttl uses the store default. The // returned Notification is a snapshot; the trace id is filled in. func (s *Store) Put(from, to string, pack any, suit bool, ttl time.Duration) Notification { if ttl <= 0 { ttl = s.defaultTTL } now := s.now() n := &Notification{ Trace: newTrace(), From: from, To: to, Pack: pack, Suit: suit, CreatedAt: now, ExpiresAt: now.Add(ttl), } s.mu.Lock() defer s.mu.Unlock() q := s.pending[to] // Enforce the per-target cap by evicting the oldest queued notifications. for len(q) >= s.maxPerTarget && len(q) > 0 { evicted := q[0] q = q[1:] delete(s.byTrace, evicted.Trace) } s.pending[to] = append(q, n) s.byTrace[n.Trace] = n return *n } // Drain returns every non-expired pending notification for `to`, marking each // delivered and removing it from the pending queue. Delivered notifications stay // in the trace index (until they expire) so their status can still be queried. func (s *Store) Drain(to string) []Notification { now := s.now() s.mu.Lock() defer s.mu.Unlock() q := s.pending[to] if len(q) == 0 { return nil } delete(s.pending, to) out := make([]Notification, 0, len(q)) for _, n := range q { if n.expired(now) { delete(s.byTrace, n.Trace) continue } n.Delivered = true n.DeliveredAt = now out = append(out, *n) } return out } // Status returns a snapshot of the notification with the given trace. func (s *Store) Status(trace string) (Notification, bool) { now := s.now() s.mu.Lock() defer s.mu.Unlock() n, ok := s.byTrace[trace] if !ok || n.expired(now) { return Notification{}, false } return *n, true } // Reply records a client's reply to a suit notification and returns the updated // snapshot. It fails if the trace is unknown, expired, or not a suit. func (s *Store) Reply(trace string, reply any) (Notification, bool) { now := s.now() s.mu.Lock() defer s.mu.Unlock() n, ok := s.byTrace[trace] if !ok || n.expired(now) || !n.Suit { return Notification{}, false } n.Replied = true n.Reply = reply n.RepliedAt = now return *n, true } // PurgeExpired removes expired notifications from both indexes and returns how // many were removed. A janitor calls this periodically; Drain/Status/Reply also // drop expired entries lazily as they are touched. func (s *Store) PurgeExpired() int { now := s.now() s.mu.Lock() defer s.mu.Unlock() removed := 0 for trace, n := range s.byTrace { if n.expired(now) { delete(s.byTrace, trace) removed++ } } for to, q := range s.pending { kept := q[:0] for _, n := range q { if !n.expired(now) { kept = append(kept, n) } } if len(kept) == 0 { delete(s.pending, to) } else { s.pending[to] = kept } } return removed } // PendingCount reports how many notifications are queued for a target (test aid). func (s *Store) PendingCount(to string) int { s.mu.Lock() defer s.mu.Unlock() return len(s.pending[to]) } // StartJanitor runs PurgeExpired every interval until the returned stop function // is called. main() owns the lifecycle; the stop function makes it leak-free. func (s *Store) StartJanitor(interval time.Duration) (stop func()) { done := make(chan struct{}) go func() { t := time.NewTicker(interval) defer t.Stop() for { select { case <-t.C: s.PurgeExpired() case <-done: return } } }() var once sync.Once return func() { once.Do(func() { close(done) }) } } func newTrace() string { var b [16]byte _, _ = rand.Read(b[:]) return hex.EncodeToString(b[:]) }