#43/#44/#45: Notify (store-and-forward + suit) ve data-sync alt sistemleri

- internal/notify: store-and-forward (#43) + suit/yanıtlı (#44). Offline hedefe
  bırakılan mesaj bağlanınca teslim; trace id ile durum sorgulanır; suit cevabı
  3. taraf trigger'a manuel iletilir. TTL + hedef-başı sınır + janitor → leak yok.
- internal/datastore: active sync/collection (CRUD broadcast, arrival-time seq ile
  çakışma çözümü) + passive sync (dedupe merge pool) + temp/permanent datastore.
  Saf paket (ws bağımsız), servis katmanı I/O yapar.
- services.Register artık *Registry döndürür + ...Option alır (WithNotifyTrigger).
  main.go janitor'ları başlatır/durdurur. Eski Register(hub) çağrıları çalışır.

Testler: internal/notify, internal/datastore birim testleri + services
notify_test/datasync_test (offline teslim, suit reply+trigger, CRUD broadcast,
concurrent arrival-order, passive convergence, disconnect unsubscribe).
go test -race ./... yeşil.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
abdussamedulutas 2026-06-17 08:30:20 +03:00
parent 945b7621a4
commit 441093bad6
12 changed files with 1525 additions and 6 deletions

View File

@ -58,6 +58,18 @@ Hedef: çok yüksek bağlantı sayısı + sürekli mesaj trafiği; limitleri/poo
**Regression:** `internal/ws` `TestServerNoReplyOnNilResult`; `internal/services` `TestRequestResponseRoundTrip` (out-of-band cevap aynı id ile geri geliyor, erken yanıt yok).
## Data-sync alt sistemleri (#43 / #44 / #45)
16. **`services.Register` artık `*Registry` döndürür ve `...Option` alır.** Eski çağrılar (`Register(hub)`) dönüşü yok sayarak çalışmaya devam eder. Registry, ömür yönetimi gereken store'ları (notify, datastore) main'e açar; main bunların janitor'larını başlatır ve shutdown'da durdurur (goroutine sızıntısı yok). 3. taraf entegrasyonları (#44 trigger) `WithNotifyTrigger` ile enjekte edilir; varsayılan no-op (IPPressure'daki `Announcer` deseniyle aynı).
17. **#43 Notify (store-and-forward) + #44 Suit:** `internal/notify` saf, transport-bağımsız store (enjekte edilebilir clock). Hedef offline ise mesaj kuyruğa girer, bağlanınca `notify` sinyali ile teslim edilir; her bildirimde `trace` id → `notify/status` ile sorgulanır. **Leak yok:** her bildirimde TTL (vars. 24s) + hedef başına sınır (vars. 1024, en eski düşer) + periyodik janitor. **Suit (#44):** `suit:true` bildirime client `notify/reply` ile cevap verir; cevap 3. taraf sunucuya `NotifyTrigger` ile **manuel tetiklenir** (poll yok) ve origin client online ise `notify/reply` sinyali ile bildirilir.
18. **#45 Datastore + Active/Passive Sync:** `internal/datastore` saf paket (ws bağımlılığı yok — data kurallarını hub/socket olmadan test edilebilir kılar; servis katmanı abone id'lerini çözüp sinyal atar).
- **Active sync / collection (CRUD broadcast):** `data/open`/`data/set`/`data/delete`/`data/get`. Sunucu otoriter kopyayı tutar; her mutasyon datastore kilidi altında **monoton seq** ile damgalanır → **çakışma çözümü arrival-time** (kilidi en son alan kazanır), seq her broadcast'te taşınır, client'lar yakınsar. Set/delete diğer abonelere `data/op` sinyali ile yayılır (origin hariç; o seq'i kendi yanıtından öğrenir).
- **Passive sync (merge pool):** `sync/open`/`sync/push`/`sync/pull`. Öğeler kanonik JSON hash'iyle **dedupe** edilir; push yalnızca yeni delta'yı `sync/add` ile yayar → ortak havuzda toplanır, hepsi eşit olana kadar push/pull sürer.
- **Datastore tipi:** `kind:"temp"|"permanent"` (alan adı bilinçli olarak `kind`; `type` WSTS handler seçici olduğu için kullanılamaz). temp TTL ile expire, permanent kalıcı. id boşsa public id üretilir.
- **Leak yok:** temp store/pool TTL + janitor; disconnect'te `UnsubscribeAll` ile abonelik kalıntısı bırakılmaz.
## Session varsayılanları
`packrecaive`/`packsending`/`notifyPairInfo`/`notifyRoomInfo` varsayılanları **`NewClient` constructor'ında** set ediliyor (listener sırasından bağımsız her zaman mevcut). Session servisinin connect hook'u parite için yine de bunları yeniden uyguluyor.

View File

@ -0,0 +1,279 @@
// Package datastore implements the shared data layer of #45: server-authoritative
// collections that clients mutate with CRUD operations (active sync) and a fast
// merge pool that clients converge through (passive sync), plus temp/permanent
// datastores addressable by a public id.
//
// The package is deliberately free of any transport dependency: it owns the data
// and the conflict rules, and returns the subscriber ids that a mutation must be
// broadcast to. The service layer (internal/services) resolves those ids to
// connections and emits the signals. That split keeps the data rules unit-testable
// without a hub or a socket.
//
// # Conflict resolution
//
// Every mutation is serialized through the datastore's lock and stamped with a
// monotonically increasing sequence number. "Arrival-time priority" (the rule the
// issue asks for) therefore falls out naturally: concurrent writes are ordered by
// the moment they acquire the lock, and the last arrival wins. The sequence number
// rides along on every broadcast so clients converge to the same state.
package datastore
import (
"crypto/rand"
"encoding/hex"
"sort"
"sync"
"time"
)
// Kind selects the lifetime of a datastore.
type Kind string
const (
Temp Kind = "temp" // expires after a TTL (default)
Permanent Kind = "permanent" // never expires
)
// Record is one row of a collection, keyed by the value of the datastore's
// primary field.
type Record struct {
Key string `json:"key"`
Data map[string]any `json:"data"`
Seq uint64 `json:"seq"`
Updated time.Time `json:"-"`
}
// Datastore is a server-authoritative shared table.
type Datastore struct {
ID string
Kind Kind
Primary string // primary key field name (default "id")
ExpiresAt time.Time
mu sync.RWMutex
records map[string]*Record
subscribers map[string]struct{}
seq uint64
}
func (d *Datastore) expired(now time.Time) bool {
return !d.ExpiresAt.IsZero() && now.After(d.ExpiresAt)
}
// Subscribe registers a client id as interested in this datastore's broadcasts.
func (d *Datastore) Subscribe(clientID string) {
d.mu.Lock()
d.subscribers[clientID] = struct{}{}
d.mu.Unlock()
}
// Unsubscribe removes a client id.
func (d *Datastore) Unsubscribe(clientID string) {
d.mu.Lock()
delete(d.subscribers, clientID)
d.mu.Unlock()
}
// Subscribers returns the current subscriber ids.
func (d *Datastore) Subscribers() []string {
d.mu.RLock()
defer d.mu.RUnlock()
out := make([]string, 0, len(d.subscribers))
for id := range d.subscribers {
out = append(out, id)
}
return out
}
// Set upserts a record and returns it stamped with a fresh sequence number. The
// primary key is read from data[Primary]; if absent or not a string, ok is false.
func (d *Datastore) Set(data map[string]any, now time.Time) (Record, bool) {
key, ok := data[d.Primary].(string)
if !ok || key == "" {
return Record{}, false
}
d.mu.Lock()
defer d.mu.Unlock()
d.seq++
rec := &Record{Key: key, Data: data, Seq: d.seq, Updated: now}
d.records[key] = rec
return *rec, true
}
// Delete removes a record, returning the sequence number of the delete op (so the
// broadcast can be ordered against sets) and whether a record existed.
func (d *Datastore) Delete(key string) (uint64, bool) {
d.mu.Lock()
defer d.mu.Unlock()
if _, ok := d.records[key]; !ok {
return 0, false
}
d.seq++
delete(d.records, key)
return d.seq, true
}
// Get returns a copy of one record.
func (d *Datastore) Get(key string) (Record, bool) {
d.mu.RLock()
defer d.mu.RUnlock()
rec, ok := d.records[key]
if !ok {
return Record{}, false
}
return *rec, true
}
// Snapshot returns all records ordered by sequence number (insertion/update
// order), which is the order a freshly-opening client should apply them in.
func (d *Datastore) Snapshot() []Record {
d.mu.RLock()
defer d.mu.RUnlock()
out := make([]Record, 0, len(d.records))
for _, rec := range d.records {
out = append(out, *rec)
}
sort.Slice(out, func(i, j int) bool { return out[i].Seq < out[j].Seq })
return out
}
// Store owns all named datastores (active sync) and merge pools (passive sync).
type Store struct {
mu sync.Mutex
stores map[string]*Datastore
pools map[string]*Pool
now func() time.Time
ttl time.Duration
}
// NewStore returns an empty store. Temp datastores and pools default to a 1h TTL.
func NewStore() *Store {
return &Store{
stores: make(map[string]*Datastore),
pools: make(map[string]*Pool),
now: time.Now,
ttl: time.Hour,
}
}
// SetClock replaces the time source; intended for deterministic tests.
func (s *Store) SetClock(now func() time.Time) { s.now = now }
// Open returns the datastore with the given id, creating it if absent. An empty
// id is replaced with a fresh public id. An empty primary defaults to "id". A
// zero ttl on a temp datastore uses the store default; permanent ignores ttl.
func (s *Store) Open(id string, kind Kind, primary string, ttl time.Duration) *Datastore {
s.mu.Lock()
defer s.mu.Unlock()
if id == "" {
id = newID()
}
if existing, ok := s.stores[id]; ok {
return existing
}
if primary == "" {
primary = "id"
}
if kind != Permanent {
kind = Temp
}
d := &Datastore{
ID: id,
Kind: kind,
Primary: primary,
records: make(map[string]*Record),
subscribers: make(map[string]struct{}),
}
if kind == Temp {
if ttl <= 0 {
ttl = s.ttl
}
d.ExpiresAt = s.now().Add(ttl)
}
s.stores[id] = d
return d
}
// Get returns an existing, non-expired datastore.
func (s *Store) Get(id string) (*Datastore, bool) {
now := s.now()
s.mu.Lock()
defer s.mu.Unlock()
d, ok := s.stores[id]
if !ok || d.expired(now) {
return nil, false
}
return d, true
}
// UnsubscribeAll drops a client id from every datastore and pool. Called on
// disconnect so a departed client leaves no residual subscription.
func (s *Store) UnsubscribeAll(clientID string) {
s.mu.Lock()
stores := make([]*Datastore, 0, len(s.stores))
for _, d := range s.stores {
stores = append(stores, d)
}
pools := make([]*Pool, 0, len(s.pools))
for _, p := range s.pools {
pools = append(pools, p)
}
s.mu.Unlock()
for _, d := range stores {
d.Unsubscribe(clientID)
}
for _, p := range pools {
p.Unsubscribe(clientID)
}
}
// PurgeExpired removes expired temp datastores and pools, returning the total
// number removed.
func (s *Store) PurgeExpired() int {
now := s.now()
s.mu.Lock()
defer s.mu.Unlock()
removed := 0
for id, d := range s.stores {
if d.expired(now) {
delete(s.stores, id)
removed++
}
}
for id, p := range s.pools {
if p.expired(now) {
delete(s.pools, id)
removed++
}
}
return removed
}
// StartJanitor purges expired temp datastores every interval until stop is called.
func (s *Store) StartJanitor(interval time.Duration) (stop func()) {
done := make(chan struct{})
go func() {
t := time.NewTicker(interval)
defer t.Stop()
for {
select {
case <-t.C:
s.PurgeExpired()
case <-done:
return
}
}
}()
var once sync.Once
return func() { once.Do(func() { close(done) }) }
}
// Now exposes the store clock to the service layer so record timestamps share it.
func (s *Store) Now() time.Time { return s.now() }
func newID() string {
var b [12]byte
_, _ = rand.Read(b[:])
return hex.EncodeToString(b[:])
}

View File

@ -0,0 +1,184 @@
package datastore
import (
"sync"
"testing"
"time"
)
func TestDatastoreSetGetSnapshot(t *testing.T) {
s := NewStore()
d := s.Open("notes", Temp, "id", 0)
r1, ok := d.Set(map[string]any{"id": "a", "v": 1}, s.Now())
if !ok || r1.Seq != 1 {
t.Fatalf("first set = %+v ok=%v", r1, ok)
}
r2, _ := d.Set(map[string]any{"id": "b", "v": 2}, s.Now())
if r2.Seq != 2 {
t.Fatalf("second set seq = %d, want 2", r2.Seq)
}
// Update of a existing key gets a fresh (higher) seq — last write wins.
r1b, _ := d.Set(map[string]any{"id": "a", "v": 99}, s.Now())
if r1b.Seq <= r2.Seq {
t.Fatalf("update seq = %d, want > %d", r1b.Seq, r2.Seq)
}
got, ok := d.Get("a")
if !ok || got.Data["v"] != 99 {
t.Fatalf("get a = %+v", got)
}
snap := d.Snapshot()
if len(snap) != 2 {
t.Fatalf("snapshot len = %d, want 2", len(snap))
}
// Snapshot is ordered by seq; the updated "a" now sorts last.
if snap[len(snap)-1].Key != "a" {
t.Fatalf("snapshot order wrong: %+v", snap)
}
}
func TestDatastoreSetRequiresPrimary(t *testing.T) {
s := NewStore()
d := s.Open("x", Temp, "id", 0)
if _, ok := d.Set(map[string]any{"noid": true}, s.Now()); ok {
t.Fatal("set without the primary key should fail")
}
}
func TestDatastoreDelete(t *testing.T) {
s := NewStore()
d := s.Open("x", Temp, "id", 0)
d.Set(map[string]any{"id": "a"}, s.Now())
seq, ok := d.Delete("a")
if !ok || seq == 0 {
t.Fatalf("delete = seq %d ok %v", seq, ok)
}
if _, ok := d.Get("a"); ok {
t.Fatal("record should be gone after delete")
}
if _, ok := d.Delete("a"); ok {
t.Fatal("deleting a missing key should fail")
}
}
// TestConcurrentSetsResolveByArrival is the #45 conflict-resolution guarantee:
// concurrent writes are serialized by the datastore lock, every write gets a
// unique monotonic seq, and the final value is whichever write arrived last.
func TestConcurrentSetsResolveByArrival(t *testing.T) {
s := NewStore()
d := s.Open("race", Temp, "id", 0)
const n = 200
var wg sync.WaitGroup
seqs := make([]uint64, n)
for i := 0; i < n; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
rec, _ := d.Set(map[string]any{"id": "k", "writer": i}, s.Now())
seqs[i] = rec.Seq
}(i)
}
wg.Wait()
// All seqs unique and within [1,n].
seen := make(map[uint64]bool, n)
var max uint64
for _, sq := range seqs {
if sq == 0 || sq > n || seen[sq] {
t.Fatalf("bad/duplicate seq %d", sq)
}
seen[sq] = true
if sq > max {
max = sq
}
}
// The surviving record must be the one with the highest seq (last arrival).
final, _ := d.Get("k")
if final.Seq != max {
t.Fatalf("final record seq = %d, want max %d", final.Seq, max)
}
}
func TestTempExpiryAndPurge(t *testing.T) {
s := NewStore()
now := time.Unix(0, 0)
s.SetClock(func() time.Time { return now })
s.Open("temp1", Temp, "id", time.Second)
s.Open("perm1", Permanent, "id", 0)
now = now.Add(2 * time.Second)
if _, ok := s.Get("temp1"); ok {
t.Fatal("temp store should have expired")
}
if _, ok := s.Get("perm1"); !ok {
t.Fatal("permanent store must not expire")
}
if removed := s.PurgeExpired(); removed != 1 {
t.Fatalf("purge removed %d, want 1", removed)
}
}
func TestOpenIsIdempotentAndGeneratesID(t *testing.T) {
s := NewStore()
a := s.Open("", Temp, "", 0)
if a.ID == "" {
t.Fatal("empty id should be replaced with a generated one")
}
if a.Primary != "id" {
t.Fatalf("default primary = %q, want id", a.Primary)
}
b := s.Open(a.ID, Temp, "", 0)
if a != b {
t.Fatal("opening an existing id should return the same datastore")
}
}
func TestPoolMergeDedupesAndTracksDelta(t *testing.T) {
s := NewStore()
p := s.OpenPool("p1", 0)
_ = p
added, _, ok := s.PushPool("p1", []any{"x", "y", "x"})
if !ok || len(added) != 2 {
t.Fatalf("first push added %v (ok=%v), want 2 unique", added, ok)
}
// Re-pushing existing items adds nothing.
added, _, _ = s.PushPool("p1", []any{"x", "y"})
if len(added) != 0 {
t.Fatalf("re-push added %v, want 0", added)
}
// A new item is the only delta.
added, _, _ = s.PushPool("p1", []any{"x", "z"})
if len(added) != 1 || added[0] != "z" {
t.Fatalf("push delta = %v, want [z]", added)
}
pool, _ := s.GetPool("p1")
if len(pool.Items()) != 3 {
t.Fatalf("pool size = %d, want 3", len(pool.Items()))
}
}
func TestUnsubscribeAllClearsEverywhere(t *testing.T) {
s := NewStore()
d := s.Open("d", Temp, "id", 0)
p := s.OpenPool("p", 0)
d.Subscribe("alice")
p.Subscribe("alice")
d.Subscribe("bob")
s.UnsubscribeAll("alice")
if len(d.Subscribers()) != 1 || d.Subscribers()[0] != "bob" {
t.Fatalf("datastore subscribers = %v, want [bob]", d.Subscribers())
}
if len(p.Subscribers()) != 0 {
t.Fatalf("pool subscribers = %v, want empty", p.Subscribers())
}
}

125
internal/datastore/pool.go Normal file
View File

@ -0,0 +1,125 @@
package datastore
import (
"crypto/sha256"
"encoding/hex"
"encoding/json"
"time"
)
// Pool is the passive-sync primitive (#45 part 1): a shared set of unique items
// that clients converge by pushing their local items and pulling the merged
// result. Items are deduplicated by the hash of their canonical JSON, so pushing
// the same item from many clients adds it exactly once. Insertion order is kept so
// every client observes the pool in the same order.
type Pool struct {
ID string
ExpiresAt time.Time
items map[string]any // contentHash -> item
order []string // contentHashes in insertion order
subscribers map[string]struct{}
}
func (p *Pool) expired(now time.Time) bool {
return !p.ExpiresAt.IsZero() && now.After(p.ExpiresAt)
}
// Subscribe / Unsubscribe / Subscribers mirror Datastore's, guarded by the
// owning Store's lock (pool mutation always happens under that lock).
func (p *Pool) Subscribe(clientID string) { p.subscribers[clientID] = struct{}{} }
func (p *Pool) Unsubscribe(clientID string) { delete(p.subscribers, clientID) }
func (p *Pool) Subscribers() []string {
out := make([]string, 0, len(p.subscribers))
for id := range p.subscribers {
out = append(out, id)
}
return out
}
// Items returns the merged pool in insertion order.
func (p *Pool) Items() []any {
out := make([]any, 0, len(p.order))
for _, h := range p.order {
out = append(out, p.items[h])
}
return out
}
// merge adds items not already present, returning only the newly added ones (so
// the caller broadcasts the minimal delta). Convergence is reached when a client
// has pushed and pulled until no side has new items.
func (p *Pool) merge(items []any) []any {
added := make([]any, 0, len(items))
for _, it := range items {
h := contentHash(it)
if _, ok := p.items[h]; ok {
continue
}
p.items[h] = it
p.order = append(p.order, h)
added = append(added, it)
}
return added
}
// OpenPool returns the pool with the given id, creating it if absent. An empty id
// is replaced with a fresh public id; a zero ttl uses the store default. Pools are
// always temporary (passive sync is for fast, ephemeral convergence).
func (s *Store) OpenPool(id string, ttl time.Duration) *Pool {
s.mu.Lock()
defer s.mu.Unlock()
if id == "" {
id = newID()
}
if p, ok := s.pools[id]; ok {
return p
}
if ttl <= 0 {
ttl = s.ttl
}
p := &Pool{
ID: id,
ExpiresAt: s.now().Add(ttl),
items: make(map[string]any),
subscribers: make(map[string]struct{}),
}
s.pools[id] = p
return p
}
// GetPool returns an existing, non-expired pool.
func (s *Store) GetPool(id string) (*Pool, bool) {
now := s.now()
s.mu.Lock()
defer s.mu.Unlock()
p, ok := s.pools[id]
if !ok || p.expired(now) {
return nil, false
}
return p, true
}
// PushPool merges items into the pool under the store lock and returns the newly
// added items plus the current subscriber list for broadcasting.
func (s *Store) PushPool(id string, items []any) (added []any, subscribers []string, ok bool) {
now := s.now()
s.mu.Lock()
defer s.mu.Unlock()
p, exists := s.pools[id]
if !exists || p.expired(now) {
return nil, nil, false
}
return p.merge(items), p.Subscribers(), true
}
// contentHash is the dedup key: the SHA-256 of the item's canonical JSON.
func contentHash(item any) string {
b, err := json.Marshal(item)
if err != nil {
// Non-serialisable items fall back to a per-call unique key (never deduped).
return newID()
}
sum := sha256.Sum256(b)
return hex.EncodeToString(sum[:])
}

218
internal/notify/store.go Normal file
View File

@ -0,0 +1,218 @@
// Package notify implements store-and-forward notifications (#43) and their
// reply-bearing "suit" variant (#44). A notification addressed to a client that
// is offline is held until the client connects; one addressed to an online client
// is delivered immediately. Every notification carries a trace id so its delivery
// (and, for suit notifications, its reply) can be queried later.
//
// The store is bounded in two independent ways so it can never grow without
// limit, even if a target never reconnects: an expiry per notification and a hard
// per-target cap (oldest dropped first). A janitor periodically purges expired
// entries; tests drive expiry deterministically through an injectable clock.
package notify
import (
"crypto/rand"
"encoding/hex"
"sync"
"time"
)
// Notification is one store-and-forward message addressed to a client id.
type Notification struct {
Trace string // unique id used to query delivery/reply status
From string // sender: a client id, or "server" for server-originated
To string // target client id
Pack any // opaque payload delivered to the target
Suit bool // true => a reply is expected (#44)
CreatedAt time.Time
ExpiresAt time.Time // zero == never expires
Delivered bool
DeliveredAt time.Time
Replied bool
Reply any
RepliedAt time.Time
}
func (n *Notification) expired(now time.Time) bool {
return !n.ExpiresAt.IsZero() && now.After(n.ExpiresAt)
}
// Store is the in-memory notification registry. All methods are safe for
// concurrent use.
type Store struct {
mu sync.Mutex
pending map[string][]*Notification // target id -> queued, undelivered
byTrace map[string]*Notification // trace -> notification (status + reply)
now func() time.Time
defaultTTL time.Duration
maxPerTarget int
}
// NewStore returns an empty store with sensible defaults (24h TTL, 1024 queued
// notifications per offline target).
func NewStore() *Store {
return &Store{
pending: make(map[string][]*Notification),
byTrace: make(map[string]*Notification),
now: time.Now,
defaultTTL: 24 * time.Hour,
maxPerTarget: 1024,
}
}
// SetClock replaces the time source; intended for deterministic tests.
func (s *Store) SetClock(now func() time.Time) { s.now = now }
// Put records a notification for `to`. A zero ttl uses the store default. The
// returned Notification is a snapshot; the trace id is filled in.
func (s *Store) Put(from, to string, pack any, suit bool, ttl time.Duration) Notification {
if ttl <= 0 {
ttl = s.defaultTTL
}
now := s.now()
n := &Notification{
Trace: newTrace(),
From: from,
To: to,
Pack: pack,
Suit: suit,
CreatedAt: now,
ExpiresAt: now.Add(ttl),
}
s.mu.Lock()
defer s.mu.Unlock()
q := s.pending[to]
// Enforce the per-target cap by evicting the oldest queued notifications.
for len(q) >= s.maxPerTarget && len(q) > 0 {
evicted := q[0]
q = q[1:]
delete(s.byTrace, evicted.Trace)
}
s.pending[to] = append(q, n)
s.byTrace[n.Trace] = n
return *n
}
// Drain returns every non-expired pending notification for `to`, marking each
// delivered and removing it from the pending queue. Delivered notifications stay
// in the trace index (until they expire) so their status can still be queried.
func (s *Store) Drain(to string) []Notification {
now := s.now()
s.mu.Lock()
defer s.mu.Unlock()
q := s.pending[to]
if len(q) == 0 {
return nil
}
delete(s.pending, to)
out := make([]Notification, 0, len(q))
for _, n := range q {
if n.expired(now) {
delete(s.byTrace, n.Trace)
continue
}
n.Delivered = true
n.DeliveredAt = now
out = append(out, *n)
}
return out
}
// Status returns a snapshot of the notification with the given trace.
func (s *Store) Status(trace string) (Notification, bool) {
now := s.now()
s.mu.Lock()
defer s.mu.Unlock()
n, ok := s.byTrace[trace]
if !ok || n.expired(now) {
return Notification{}, false
}
return *n, true
}
// Reply records a client's reply to a suit notification and returns the updated
// snapshot. It fails if the trace is unknown, expired, or not a suit.
func (s *Store) Reply(trace string, reply any) (Notification, bool) {
now := s.now()
s.mu.Lock()
defer s.mu.Unlock()
n, ok := s.byTrace[trace]
if !ok || n.expired(now) || !n.Suit {
return Notification{}, false
}
n.Replied = true
n.Reply = reply
n.RepliedAt = now
return *n, true
}
// PurgeExpired removes expired notifications from both indexes and returns how
// many were removed. A janitor calls this periodically; Drain/Status/Reply also
// drop expired entries lazily as they are touched.
func (s *Store) PurgeExpired() int {
now := s.now()
s.mu.Lock()
defer s.mu.Unlock()
removed := 0
for trace, n := range s.byTrace {
if n.expired(now) {
delete(s.byTrace, trace)
removed++
}
}
for to, q := range s.pending {
kept := q[:0]
for _, n := range q {
if !n.expired(now) {
kept = append(kept, n)
}
}
if len(kept) == 0 {
delete(s.pending, to)
} else {
s.pending[to] = kept
}
}
return removed
}
// PendingCount reports how many notifications are queued for a target (test aid).
func (s *Store) PendingCount(to string) int {
s.mu.Lock()
defer s.mu.Unlock()
return len(s.pending[to])
}
// StartJanitor runs PurgeExpired every interval until the returned stop function
// is called. main() owns the lifecycle; the stop function makes it leak-free.
func (s *Store) StartJanitor(interval time.Duration) (stop func()) {
done := make(chan struct{})
go func() {
t := time.NewTicker(interval)
defer t.Stop()
for {
select {
case <-t.C:
s.PurgeExpired()
case <-done:
return
}
}
}()
var once sync.Once
return func() { once.Do(func() { close(done) }) }
}
func newTrace() string {
var b [16]byte
_, _ = rand.Read(b[:])
return hex.EncodeToString(b[:])
}

View File

@ -0,0 +1,121 @@
package notify
import (
"testing"
"time"
)
func TestPutAndDrainDelivers(t *testing.T) {
s := NewStore()
n := s.Put("alice", "bob", map[string]any{"text": "hi"}, false, 0)
if n.Trace == "" {
t.Fatal("Put should assign a trace id")
}
if s.PendingCount("bob") != 1 {
t.Fatalf("pending for bob = %d, want 1", s.PendingCount("bob"))
}
got := s.Drain("bob")
if len(got) != 1 || got[0].Trace != n.Trace {
t.Fatalf("Drain = %+v, want the queued notification", got)
}
if !got[0].Delivered {
t.Fatal("drained notification should be marked delivered")
}
if s.PendingCount("bob") != 0 {
t.Fatal("pending should be empty after drain")
}
// Status still works after delivery.
st, ok := s.Status(n.Trace)
if !ok || !st.Delivered {
t.Fatalf("Status after drain = %+v, ok=%v", st, ok)
}
}
func TestExpiryDropsNotification(t *testing.T) {
s := NewStore()
now := time.Unix(1000, 0)
s.SetClock(func() time.Time { return now })
s.Put("srv", "bob", "payload", false, 5*time.Second)
// Advance past expiry; the queued message must not be delivered.
now = now.Add(10 * time.Second)
if got := s.Drain("bob"); len(got) != 0 {
t.Fatalf("expired notification was delivered: %+v", got)
}
if removed := s.PurgeExpired(); removed != 0 {
// Drain already dropped it from byTrace; nothing left to purge.
t.Fatalf("PurgeExpired removed %d, want 0 (already dropped on drain)", removed)
}
}
func TestPurgeExpiredReclaimsUndeliveredTraces(t *testing.T) {
s := NewStore()
now := time.Unix(0, 0)
s.SetClock(func() time.Time { return now })
// A target that never connects: its messages must still be reclaimed.
s.Put("srv", "ghost", "a", false, time.Second)
s.Put("srv", "ghost", "b", false, time.Second)
now = now.Add(2 * time.Second)
removed := s.PurgeExpired()
if removed != 2 {
t.Fatalf("PurgeExpired removed %d, want 2", removed)
}
if s.PendingCount("ghost") != 0 {
t.Fatal("expired pending queue should be gone")
}
}
func TestPerTargetCapEvictsOldest(t *testing.T) {
s := NewStore()
s.maxPerTarget = 3
var traces []string
for i := 0; i < 5; i++ {
traces = append(traces, s.Put("srv", "bob", i, false, 0).Trace)
}
if s.PendingCount("bob") != 3 {
t.Fatalf("pending = %d, want capped at 3", s.PendingCount("bob"))
}
// The two oldest must have been evicted from the trace index too.
if _, ok := s.Status(traces[0]); ok {
t.Fatal("oldest notification should have been evicted")
}
if _, ok := s.Status(traces[4]); !ok {
t.Fatal("newest notification should be retained")
}
}
func TestSuitReply(t *testing.T) {
s := NewStore()
n := s.Put("srv", "bob", "question", true, 0)
// A non-suit reply target must fail.
plain := s.Put("srv", "bob", "fyi", false, 0)
if _, ok := s.Reply(plain.Trace, "nope"); ok {
t.Fatal("replying to a non-suit notification should fail")
}
updated, ok := s.Reply(n.Trace, map[string]any{"answer": 42})
if !ok || !updated.Replied {
t.Fatalf("Reply = %+v ok=%v", updated, ok)
}
st, _ := s.Status(n.Trace)
if !st.Replied {
t.Fatal("status should reflect the reply")
}
if m, _ := st.Reply.(map[string]any); m["answer"] != 42 {
t.Fatalf("stored reply = %v", st.Reply)
}
}
func TestUnknownTraceStatus(t *testing.T) {
s := NewStore()
if _, ok := s.Status("does-not-exist"); ok {
t.Fatal("unknown trace should not resolve")
}
}

View File

@ -0,0 +1,164 @@
package services
import (
"time"
"git.saqut.com/saqut/mwse/internal/datastore"
"git.saqut.com/saqut/mwse/internal/protocol"
"git.saqut.com/saqut/mwse/internal/ws"
)
// registerDatastore wires the #45 shared data layer: active sync (collections with
// CRUD broadcast), passive sync (a fast merge pool), and temp/permanent
// datastores. The returned store is handed back so a janitor can reclaim expired
// temp stores/pools.
//
// Wire surface (additive):
//
// Active sync / collection / datastore:
// data/open {id?, kind?, primary?, expires?} -> {status, id, primary, kind, records}
// (kind = "temp" | "permanent". The payload field is "kind", NOT "type":
// "type" is reserved by WSTS to select the handler.)
// data/set {id, record} -> {status, key, seq} + broadcast data/op(set)
// data/delete {id, key} -> {status, seq} + broadcast data/op(delete)
// data/get {id, key?} -> {status, record|records}
//
// Passive sync (merge pool):
// sync/open {id?, expires?} -> {status, id, items}
// sync/push {id, items} -> {status, added} + broadcast sync/add
// sync/pull {id} -> {status, items}
//
// Broadcasts are server signals delivered to every *other* subscriber:
//
// [ {id, op, record|key, seq}, "data/op" ]
// [ {id, items}, "sync/add" ]
func registerDatastore(hub *ws.Hub) *datastore.Store {
store := datastore.NewStore()
// Signal a set of subscriber ids, skipping the originator (it already holds the
// change and learns the authoritative seq from its own reply).
broadcast := func(subs []string, except, name string, payload map[string]any) {
for _, id := range subs {
if id == except {
continue
}
if cl, ok := hub.Client(id); ok {
cl.Signal(name, payload)
}
}
}
// A departed client must leave no residual subscription anywhere.
hub.OnDisconnect(func(c *ws.Client) { store.UnsubscribeAll(c.ID) })
// ---- active sync / collection / datastore -------------------------------
hub.Register("data/open", func(c *ws.Client, m protocol.Message) any {
kind := datastore.Temp
if m.Str("kind") == string(datastore.Permanent) {
kind = datastore.Permanent
}
ttl := time.Duration(m.Int("expires")) * time.Second
ds := store.Open(m.Str("id"), kind, m.Str("primary"), ttl)
ds.Subscribe(c.ID)
return map[string]any{
"status": "success",
"id": ds.ID,
"primary": ds.Primary,
"kind": string(ds.Kind),
"records": ds.Snapshot(),
}
})
hub.Register("data/set", func(c *ws.Client, m protocol.Message) any {
ds, ok := store.Get(m.Str("id"))
if !ok {
return fail("NOT_FOUND")
}
rec, ok := ds.Set(toMap(m.Get("record")), store.Now())
if !ok {
return fail("PRIMARY_KEY_REQUIRED")
}
broadcast(ds.Subscribers(), c.ID, "data/op", map[string]any{
"id": ds.ID,
"op": "set",
"record": rec,
"seq": rec.Seq,
})
return map[string]any{"status": "success", "key": rec.Key, "seq": rec.Seq}
})
hub.Register("data/delete", func(c *ws.Client, m protocol.Message) any {
ds, ok := store.Get(m.Str("id"))
if !ok {
return fail("NOT_FOUND")
}
key := m.Str("key")
seq, ok := ds.Delete(key)
if !ok {
return fail("NO_SUCH_KEY")
}
broadcast(ds.Subscribers(), c.ID, "data/op", map[string]any{
"id": ds.ID,
"op": "delete",
"key": key,
"seq": seq,
})
return map[string]any{"status": "success", "seq": seq}
})
hub.Register("data/get", func(c *ws.Client, m protocol.Message) any {
ds, ok := store.Get(m.Str("id"))
if !ok {
return fail("NOT_FOUND")
}
if key := m.Str("key"); key != "" {
rec, ok := ds.Get(key)
if !ok {
return fail("NO_SUCH_KEY")
}
return map[string]any{"status": "success", "record": rec}
}
return map[string]any{"status": "success", "records": ds.Snapshot()}
})
// ---- passive sync (merge pool) ------------------------------------------
hub.Register("sync/open", func(c *ws.Client, m protocol.Message) any {
ttl := time.Duration(m.Int("expires")) * time.Second
p := store.OpenPool(m.Str("id"), ttl)
p.Subscribe(c.ID)
return map[string]any{"status": "success", "id": p.ID, "items": p.Items()}
})
hub.Register("sync/push", func(c *ws.Client, m protocol.Message) any {
items := toSlice(m.Get("items"))
added, subs, ok := store.PushPool(m.Str("id"), items)
if !ok {
return fail("NOT_FOUND")
}
if len(added) > 0 {
broadcast(subs, c.ID, "sync/add", map[string]any{"id": m.Str("id"), "items": added})
}
return map[string]any{"status": "success", "added": len(added)}
})
hub.Register("sync/pull", func(c *ws.Client, m protocol.Message) any {
p, ok := store.GetPool(m.Str("id"))
if !ok {
return fail("NOT_FOUND")
}
return map[string]any{"status": "success", "items": p.Items()}
})
return store
}
// toSlice coerces a decoded JSON value to a slice, returning nil when it is not
// an array.
func toSlice(v any) []any {
if s, ok := v.([]any); ok {
return s
}
return nil
}

View File

@ -0,0 +1,152 @@
package services
import (
"encoding/json"
"testing"
"git.saqut.com/saqut/mwse/internal/datastore"
"git.saqut.com/saqut/mwse/internal/testutil"
"git.saqut.com/saqut/mwse/internal/ws"
)
// waitDataOp waits for a data/op broadcast whose "op" field matches (the stream
// may contain several — e.g. a set followed by a delete).
func waitDataOp(t *testing.T, fc *testutil.FakeConn, op string) map[string]any {
t.Helper()
find := func() (map[string]any, bool) {
for _, raw := range fc.Writes() {
var arr []any
if json.Unmarshal(raw, &arr) != nil || len(arr) != 2 || arr[1] != "data/op" {
continue
}
if p, ok := arr[0].(map[string]any); ok && p["op"] == op {
return p, true
}
}
return nil, false
}
waitFor(t, func() bool { _, ok := find(); return ok })
p, _ := find()
return p
}
// newHubReg builds a hub with services registered and returns the Registry so a
// test can inspect the long-lived stores.
func newHubReg() (*ws.Hub, *Registry) {
hub := ws.NewHub()
reg := Register(hub)
return hub, reg
}
// TestActiveSyncBroadcast is the #45 collection/CRUD-broadcast core: a set by one
// subscriber is broadcast to every other subscriber with the authoritative seq.
func TestActiveSyncBroadcast(t *testing.T) {
hub, _ := newHubReg()
a, _ := connect(hub, "alice")
b, fb := connect(hub, "bob")
// Both open the same datastore by id.
opened := asMap(t, hub.Handle(a, msg("data/open", "id", "shared", "kind", "temp", "primary", "id")))
id := opened["id"].(string)
hub.Handle(b, msg("data/open", "id", id))
// alice sets a record.
set := asMap(t, hub.Handle(a, msg("data/set", "id", id, "record", map[string]any{"id": "k1", "v": float64(7)})))
if set["status"] != "success" {
t.Fatalf("data/set = %v", set)
}
// bob receives the data/op broadcast (set).
op := waitSignal(t, fb, "data/op")
if op["op"] != "set" {
t.Fatalf("op = %v, want set", op["op"])
}
rec := asMap(t, op["record"])
if rec["key"] != "k1" {
t.Fatalf("broadcast record key = %v, want k1", rec["key"])
}
// bob can read the record back from the authoritative copy. (An in-process
// reply carries the Go value; over a real socket it is the same shape as JSON.)
got := asMap(t, hub.Handle(b, msg("data/get", "id", id, "key", "k1")))
gotRec := got["record"].(datastore.Record)
if gotRec.Key != "k1" {
t.Fatalf("data/get = %v", got)
}
}
func TestActiveSyncDeleteBroadcast(t *testing.T) {
hub, _ := newHubReg()
a, _ := connect(hub, "alice")
b, fb := connect(hub, "bob")
hub.Handle(a, msg("data/open", "id", "d", "kind", "temp"))
hub.Handle(b, msg("data/open", "id", "d"))
hub.Handle(a, msg("data/set", "id", "d", "record", map[string]any{"id": "x"}))
del := asMap(t, hub.Handle(a, msg("data/delete", "id", "d", "key", "x")))
if del["status"] != "success" {
t.Fatalf("data/delete = %v", del)
}
op := waitDataOp(t, fb, "delete")
if op["key"] != "x" {
t.Fatalf("delete broadcast = %v", op)
}
}
// TestPassiveSyncConvergence is the #45 merge-pool core: pushes are deduped and
// only the delta is broadcast, so subscribers converge.
func TestPassiveSyncConvergence(t *testing.T) {
hub, _ := newHubReg()
a, _ := connect(hub, "alice")
b, fb := connect(hub, "bob")
opened := asMap(t, hub.Handle(a, msg("sync/open", "id", "pool")))
id := opened["id"].(string)
hub.Handle(b, msg("sync/open", "id", id))
// alice pushes two items; bob gets them as a delta.
res := asMap(t, hub.Handle(a, msg("sync/push", "id", id, "items", []any{"x", "y"})))
if res["added"] != 2 {
t.Fatalf("push added = %v, want 2", res["added"])
}
add := waitSignal(t, fb, "sync/add")
if items, _ := add["items"].([]any); len(items) != 2 {
t.Fatalf("sync/add items = %v, want 2", add["items"])
}
// bob pulls and sees the full converged pool.
pull := asMap(t, hub.Handle(b, msg("sync/pull", "id", id)))
if items, _ := pull["items"].([]any); len(items) != 2 {
t.Fatalf("sync/pull items = %v, want 2", pull["items"])
}
// A push with one new + one existing item broadcasts only the new one.
res = asMap(t, hub.Handle(b, msg("sync/push", "id", id, "items", []any{"y", "z"})))
if res["added"] != 1 {
t.Fatalf("second push added = %v, want 1", res["added"])
}
}
// TestDataSubscriptionClearedOnDisconnect proves no residual subscription is left
// behind (no leak) when a subscriber disconnects.
func TestDataSubscriptionClearedOnDisconnect(t *testing.T) {
hub, reg := newHubReg()
a, _ := connect(hub, "alice")
connect(hub, "bob")
hub.Handle(a, msg("data/open", "id", "d", "kind", "temp"))
ds, ok := reg.Data.Get("d")
if !ok {
t.Fatal("datastore should exist")
}
if len(ds.Subscribers()) != 1 {
t.Fatalf("subscribers before disconnect = %v, want [alice]", ds.Subscribers())
}
hub.Disconnect(a)
if len(ds.Subscribers()) != 0 {
t.Fatalf("subscribers after disconnect = %v, want empty", ds.Subscribers())
}
}

View File

@ -0,0 +1,95 @@
package services
import (
"time"
"git.saqut.com/saqut/mwse/internal/notify"
"git.saqut.com/saqut/mwse/internal/protocol"
"git.saqut.com/saqut/mwse/internal/ws"
)
// registerNotify wires the store-and-forward notification service (#43) and its
// reply-bearing suit variant (#44). The returned store is handed to the caller so
// a janitor can reclaim expired entries.
//
// Wire surface (all additive — the frozen relay contract is untouched):
//
// - notify/send {to, pack, expires?, suit?} -> {status, trace}
// - notify/status {trace} -> {status, delivered, replied, reply}
// - notify/reply {trace, pack} -> {status} (client answering a suit)
//
// Delivery to the target is the server signal:
//
// [ {from, pack, trace, suit}, "notify" ]
func registerNotify(hub *ws.Hub, trigger NotifyTrigger) *notify.Store {
store := notify.NewStore()
deliver := func(c *ws.Client, n notify.Notification) {
c.Signal("notify", map[string]any{
"from": n.From,
"pack": n.Pack,
"trace": n.Trace,
"suit": n.Suit,
})
}
// flush drains and delivers everything queued for a connected client.
flush := func(c *ws.Client) {
for _, n := range store.Drain(c.ID) {
deliver(c, n)
}
}
// On connect, deliver anything that arrived while the client was offline.
hub.OnConnect(flush)
hub.Register("notify/send", func(c *ws.Client, m protocol.Message) any {
to := m.Str("to")
if to == "" {
return fail("TO_REQUIRED")
}
ttl := time.Duration(m.Int("expires")) * time.Second // 0 => store default
n := store.Put(c.ID, to, m.Get("pack"), m.Truthy("suit"), ttl)
// If the target is connected right now, deliver immediately; otherwise it
// stays queued until the target's next connect.
if target, ok := hub.Client(to); ok {
flush(target)
}
return map[string]any{"status": "success", "trace": n.Trace}
})
hub.Register("notify/status", func(c *ws.Client, m protocol.Message) any {
n, ok := store.Status(m.Str("trace"))
if !ok {
return fail("NOT_FOUND")
}
return map[string]any{
"status": "success",
"delivered": n.Delivered,
"replied": n.Replied,
"reply": n.Reply,
}
})
hub.Register("notify/reply", func(c *ws.Client, m protocol.Message) any {
n, ok := store.Reply(m.Str("trace"), m.Get("pack"))
if !ok {
return fail("NOT_FOUND_OR_NOT_SUIT")
}
// Push the reply outward to the 3rd-party application server (#44): MWSE
// triggers it manually rather than having the server poll notify/status.
trigger.NotifyReplied(n)
// If the original sender is an online client, signal it the reply too.
if origin, ok := hub.Client(n.From); ok {
origin.Signal("notify/reply", map[string]any{
"trace": n.Trace,
"from": c.ID,
"pack": n.Reply,
})
}
return success()
})
return store
}

View File

@ -0,0 +1,110 @@
package services
import (
"sync"
"testing"
"git.saqut.com/saqut/mwse/internal/notify"
"git.saqut.com/saqut/mwse/internal/ws"
)
// recTrigger records suit replies pushed outward (the #44 3rd-party trigger).
type recTrigger struct {
mu sync.Mutex
got []notify.Notification
}
func (r *recTrigger) NotifyReplied(n notify.Notification) {
r.mu.Lock()
r.got = append(r.got, n)
r.mu.Unlock()
}
func (r *recTrigger) count() int {
r.mu.Lock()
defer r.mu.Unlock()
return len(r.got)
}
// TestNotifyOfflineThenDeliverOnConnect is the #43 store-and-forward core: a
// message left for an offline client is delivered when that client connects.
func TestNotifyOfflineThenDeliverOnConnect(t *testing.T) {
hub := newHub()
a, _ := connect(hub, "alice")
res := asMap(t, hub.Handle(a, msg("notify/send", "to", "bob", "pack", map[string]any{"text": "hi"})))
if res["status"] != "success" {
t.Fatalf("notify/send = %v", res)
}
trace, _ := res["trace"].(string)
if trace == "" {
t.Fatal("notify/send should return a trace id")
}
// bob was offline; now connects and must receive the queued notification.
_, fb := connect(hub, "bob")
sig := waitSignal(t, fb, "notify")
if sig["trace"] != trace {
t.Fatalf("delivered trace = %v, want %s", sig["trace"], trace)
}
if p, _ := sig["pack"].(map[string]any); p["text"] != "hi" {
t.Fatalf("delivered pack = %v", sig["pack"])
}
// Status reports delivered.
st := asMap(t, hub.Handle(a, msg("notify/status", "trace", trace)))
if st["delivered"] != true {
t.Fatalf("status = %v, want delivered", st)
}
}
// TestNotifyImmediateWhenOnline delivers without waiting when the target is up.
func TestNotifyImmediateWhenOnline(t *testing.T) {
hub := newHub()
a, _ := connect(hub, "alice")
_, fb := connect(hub, "bob")
hub.Handle(a, msg("notify/send", "to", "bob", "pack", map[string]any{"text": "now"}))
sig := waitSignal(t, fb, "notify")
if p, _ := sig["pack"].(map[string]any); p["text"] != "now" {
t.Fatalf("immediate delivery pack = %v", sig["pack"])
}
}
// TestNotifySuitReply is the #44 reply path: a suit notification's reply reaches
// the 3rd-party trigger and is signalled back to the origin client.
func TestNotifySuitReply(t *testing.T) {
trig := &recTrigger{}
hub := ws.NewHub()
Register(hub, WithNotifyTrigger(trig))
a, fa := connect(hub, "alice")
b, fb := connect(hub, "bob")
res := asMap(t, hub.Handle(a, msg("notify/send", "to", "bob", "suit", true, "pack", map[string]any{"q": "ok?"})))
trace, _ := res["trace"].(string)
sig := waitSignal(t, fb, "notify")
if sig["suit"] != true {
t.Fatalf("notify suit flag = %v, want true", sig["suit"])
}
// bob replies to the suit.
rep := asMap(t, hub.Handle(b, msg("notify/reply", "trace", trace, "pack", map[string]any{"a": "yes"})))
if rep["status"] != "success" {
t.Fatalf("notify/reply = %v", rep)
}
// The 3rd-party trigger fired, and the origin (alice) got the reply signal.
waitFor(t, func() bool { return trig.count() == 1 })
got := waitSignal(t, fa, "notify/reply")
if p, _ := got["pack"].(map[string]any); p["a"] != "yes" {
t.Fatalf("origin reply signal pack = %v", got["pack"])
}
// A non-suit reply must be rejected.
plain := asMap(t, hub.Handle(a, msg("notify/send", "to", "bob", "pack", map[string]any{})))
if r := asMap(t, hub.Handle(b, msg("notify/reply", "trace", plain["trace"], "pack", map[string]any{}))); r["status"] != "fail" {
t.Fatalf("reply to non-suit = %v, want fail", r)
}
}

View File

@ -14,21 +14,71 @@ import (
"crypto/sha256"
"encoding/hex"
"git.saqut.com/saqut/mwse/internal/datastore"
"git.saqut.com/saqut/mwse/internal/notify"
"git.saqut.com/saqut/mwse/internal/protocol"
"git.saqut.com/saqut/mwse/internal/ws"
)
// Register wires every service onto the hub. Call once during startup, before the
// server begins accepting connections. The order mirrors the Node require() order
// so connect-time side effects (id message, private room, session defaults)
// happen in the same sequence.
func Register(hub *ws.Hub) {
// NotifyTrigger is invoked when a suit notification receives a reply, so MWSE can
// push the result to a 3rd-party application server (#44) instead of that server
// polling for it. The default is a no-op; a real HTTP-backed implementation lives
// in the bridge wiring (#46).
type NotifyTrigger interface {
NotifyReplied(n notify.Notification)
}
type noopTrigger struct{}
func (noopTrigger) NotifyReplied(notify.Notification) {}
// options collects the externally-wired integrations a deployment may supply.
type options struct {
notifyTrigger NotifyTrigger
}
// Option configures Register.
type Option func(*options)
// WithNotifyTrigger sets the 3rd-party trigger fired on suit replies (#44).
func WithNotifyTrigger(t NotifyTrigger) Option {
return func(o *options) {
if t != nil {
o.notifyTrigger = t
}
}
}
// Registry exposes the long-lived stores the services own so the caller (main)
// can manage their lifecycle — for example start the notify/datastore janitors
// that reclaim expired entries.
type Registry struct {
Notify *notify.Store
Data *datastore.Store
}
// Register wires every service onto the hub and returns a Registry of the stores
// that need lifecycle management. Call once during startup, before the server
// begins accepting connections. The order mirrors the Node require() order so
// connect-time side effects (id message, private room, session defaults) happen
// in the same sequence; the data-sync services (#43#45) are appended after.
func Register(hub *ws.Hub, opts ...Option) *Registry {
o := options{notifyTrigger: noopTrigger{}}
for _, apply := range opts {
apply(&o)
}
registerYourID(hub)
registerAuth(hub)
registerRoom(hub)
registerDataTransfer(hub)
registerIPPressure(hub, nil)
registerSession(hub)
return &Registry{
Notify: registerNotify(hub, o.notifyTrigger),
Data: registerDatastore(hub),
}
}
// ---- small response helpers ---------------------------------------------

11
main.go
View File

@ -12,6 +12,7 @@ import (
"os"
"os/signal"
"syscall"
"time"
"git.saqut.com/saqut/mwse/internal/config"
"git.saqut.com/saqut/mwse/internal/httpserver"
@ -23,7 +24,15 @@ func main() {
cfg := config.Load()
hub := ws.NewHub()
services.Register(hub)
reg := services.Register(hub)
// The notify and datastore stores hold entries with a TTL; their janitors
// reclaim expired ones so memory cannot grow without bound. They are stopped
// during shutdown so no goroutine is leaked.
stopNotify := reg.Notify.StartJanitor(time.Minute)
stopData := reg.Data.StartJanitor(time.Minute)
defer stopNotify()
defer stopData()
srv := httpserver.New(hub, cfg)