diff --git a/decisions.md b/decisions.md index b187d70..d72d17a 100644 --- a/decisions.md +++ b/decisions.md @@ -58,6 +58,18 @@ Hedef: çok yüksek bağlantı sayısı + sürekli mesaj trafiği; limitleri/poo **Regression:** `internal/ws` `TestServerNoReplyOnNilResult`; `internal/services` `TestRequestResponseRoundTrip` (out-of-band cevap aynı id ile geri geliyor, erken yanıt yok). +## Data-sync alt sistemleri (#43 / #44 / #45) + +16. **`services.Register` artık `*Registry` döndürür ve `...Option` alır.** Eski çağrılar (`Register(hub)`) dönüşü yok sayarak çalışmaya devam eder. Registry, ömür yönetimi gereken store'ları (notify, datastore) main'e açar; main bunların janitor'larını başlatır ve shutdown'da durdurur (goroutine sızıntısı yok). 3. taraf entegrasyonları (#44 trigger) `WithNotifyTrigger` ile enjekte edilir; varsayılan no-op (IPPressure'daki `Announcer` deseniyle aynı). + +17. **#43 Notify (store-and-forward) + #44 Suit:** `internal/notify` saf, transport-bağımsız store (enjekte edilebilir clock). Hedef offline ise mesaj kuyruğa girer, bağlanınca `notify` sinyali ile teslim edilir; her bildirimde `trace` id → `notify/status` ile sorgulanır. **Leak yok:** her bildirimde TTL (vars. 24s) + hedef başına sınır (vars. 1024, en eski düşer) + periyodik janitor. **Suit (#44):** `suit:true` bildirime client `notify/reply` ile cevap verir; cevap 3. taraf sunucuya `NotifyTrigger` ile **manuel tetiklenir** (poll yok) ve origin client online ise `notify/reply` sinyali ile bildirilir. + +18. **#45 Datastore + Active/Passive Sync:** `internal/datastore` saf paket (ws bağımlılığı yok — data kurallarını hub/socket olmadan test edilebilir kılar; servis katmanı abone id'lerini çözüp sinyal atar). + - **Active sync / collection (CRUD broadcast):** `data/open`/`data/set`/`data/delete`/`data/get`. Sunucu otoriter kopyayı tutar; her mutasyon datastore kilidi altında **monoton seq** ile damgalanır → **çakışma çözümü arrival-time** (kilidi en son alan kazanır), seq her broadcast'te taşınır, client'lar yakınsar. Set/delete diğer abonelere `data/op` sinyali ile yayılır (origin hariç; o seq'i kendi yanıtından öğrenir). + - **Passive sync (merge pool):** `sync/open`/`sync/push`/`sync/pull`. Öğeler kanonik JSON hash'iyle **dedupe** edilir; push yalnızca yeni delta'yı `sync/add` ile yayar → ortak havuzda toplanır, hepsi eşit olana kadar push/pull sürer. + - **Datastore tipi:** `kind:"temp"|"permanent"` (alan adı bilinçli olarak `kind`; `type` WSTS handler seçici olduğu için kullanılamaz). temp TTL ile expire, permanent kalıcı. id boşsa public id üretilir. + - **Leak yok:** temp store/pool TTL + janitor; disconnect'te `UnsubscribeAll` ile abonelik kalıntısı bırakılmaz. + ## Session varsayılanları `packrecaive`/`packsending`/`notifyPairInfo`/`notifyRoomInfo` varsayılanları **`NewClient` constructor'ında** set ediliyor (listener sırasından bağımsız her zaman mevcut). Session servisinin connect hook'u parite için yine de bunları yeniden uyguluyor. diff --git a/internal/datastore/datastore.go b/internal/datastore/datastore.go new file mode 100644 index 0000000..f61a098 --- /dev/null +++ b/internal/datastore/datastore.go @@ -0,0 +1,279 @@ +// Package datastore implements the shared data layer of #45: server-authoritative +// collections that clients mutate with CRUD operations (active sync) and a fast +// merge pool that clients converge through (passive sync), plus temp/permanent +// datastores addressable by a public id. +// +// The package is deliberately free of any transport dependency: it owns the data +// and the conflict rules, and returns the subscriber ids that a mutation must be +// broadcast to. The service layer (internal/services) resolves those ids to +// connections and emits the signals. That split keeps the data rules unit-testable +// without a hub or a socket. +// +// # Conflict resolution +// +// Every mutation is serialized through the datastore's lock and stamped with a +// monotonically increasing sequence number. "Arrival-time priority" (the rule the +// issue asks for) therefore falls out naturally: concurrent writes are ordered by +// the moment they acquire the lock, and the last arrival wins. The sequence number +// rides along on every broadcast so clients converge to the same state. +package datastore + +import ( + "crypto/rand" + "encoding/hex" + "sort" + "sync" + "time" +) + +// Kind selects the lifetime of a datastore. +type Kind string + +const ( + Temp Kind = "temp" // expires after a TTL (default) + Permanent Kind = "permanent" // never expires +) + +// Record is one row of a collection, keyed by the value of the datastore's +// primary field. +type Record struct { + Key string `json:"key"` + Data map[string]any `json:"data"` + Seq uint64 `json:"seq"` + Updated time.Time `json:"-"` +} + +// Datastore is a server-authoritative shared table. +type Datastore struct { + ID string + Kind Kind + Primary string // primary key field name (default "id") + ExpiresAt time.Time + + mu sync.RWMutex + records map[string]*Record + subscribers map[string]struct{} + seq uint64 +} + +func (d *Datastore) expired(now time.Time) bool { + return !d.ExpiresAt.IsZero() && now.After(d.ExpiresAt) +} + +// Subscribe registers a client id as interested in this datastore's broadcasts. +func (d *Datastore) Subscribe(clientID string) { + d.mu.Lock() + d.subscribers[clientID] = struct{}{} + d.mu.Unlock() +} + +// Unsubscribe removes a client id. +func (d *Datastore) Unsubscribe(clientID string) { + d.mu.Lock() + delete(d.subscribers, clientID) + d.mu.Unlock() +} + +// Subscribers returns the current subscriber ids. +func (d *Datastore) Subscribers() []string { + d.mu.RLock() + defer d.mu.RUnlock() + out := make([]string, 0, len(d.subscribers)) + for id := range d.subscribers { + out = append(out, id) + } + return out +} + +// Set upserts a record and returns it stamped with a fresh sequence number. The +// primary key is read from data[Primary]; if absent or not a string, ok is false. +func (d *Datastore) Set(data map[string]any, now time.Time) (Record, bool) { + key, ok := data[d.Primary].(string) + if !ok || key == "" { + return Record{}, false + } + d.mu.Lock() + defer d.mu.Unlock() + d.seq++ + rec := &Record{Key: key, Data: data, Seq: d.seq, Updated: now} + d.records[key] = rec + return *rec, true +} + +// Delete removes a record, returning the sequence number of the delete op (so the +// broadcast can be ordered against sets) and whether a record existed. +func (d *Datastore) Delete(key string) (uint64, bool) { + d.mu.Lock() + defer d.mu.Unlock() + if _, ok := d.records[key]; !ok { + return 0, false + } + d.seq++ + delete(d.records, key) + return d.seq, true +} + +// Get returns a copy of one record. +func (d *Datastore) Get(key string) (Record, bool) { + d.mu.RLock() + defer d.mu.RUnlock() + rec, ok := d.records[key] + if !ok { + return Record{}, false + } + return *rec, true +} + +// Snapshot returns all records ordered by sequence number (insertion/update +// order), which is the order a freshly-opening client should apply them in. +func (d *Datastore) Snapshot() []Record { + d.mu.RLock() + defer d.mu.RUnlock() + out := make([]Record, 0, len(d.records)) + for _, rec := range d.records { + out = append(out, *rec) + } + sort.Slice(out, func(i, j int) bool { return out[i].Seq < out[j].Seq }) + return out +} + +// Store owns all named datastores (active sync) and merge pools (passive sync). +type Store struct { + mu sync.Mutex + stores map[string]*Datastore + pools map[string]*Pool + now func() time.Time + ttl time.Duration +} + +// NewStore returns an empty store. Temp datastores and pools default to a 1h TTL. +func NewStore() *Store { + return &Store{ + stores: make(map[string]*Datastore), + pools: make(map[string]*Pool), + now: time.Now, + ttl: time.Hour, + } +} + +// SetClock replaces the time source; intended for deterministic tests. +func (s *Store) SetClock(now func() time.Time) { s.now = now } + +// Open returns the datastore with the given id, creating it if absent. An empty +// id is replaced with a fresh public id. An empty primary defaults to "id". A +// zero ttl on a temp datastore uses the store default; permanent ignores ttl. +func (s *Store) Open(id string, kind Kind, primary string, ttl time.Duration) *Datastore { + s.mu.Lock() + defer s.mu.Unlock() + + if id == "" { + id = newID() + } + if existing, ok := s.stores[id]; ok { + return existing + } + if primary == "" { + primary = "id" + } + if kind != Permanent { + kind = Temp + } + d := &Datastore{ + ID: id, + Kind: kind, + Primary: primary, + records: make(map[string]*Record), + subscribers: make(map[string]struct{}), + } + if kind == Temp { + if ttl <= 0 { + ttl = s.ttl + } + d.ExpiresAt = s.now().Add(ttl) + } + s.stores[id] = d + return d +} + +// Get returns an existing, non-expired datastore. +func (s *Store) Get(id string) (*Datastore, bool) { + now := s.now() + s.mu.Lock() + defer s.mu.Unlock() + d, ok := s.stores[id] + if !ok || d.expired(now) { + return nil, false + } + return d, true +} + +// UnsubscribeAll drops a client id from every datastore and pool. Called on +// disconnect so a departed client leaves no residual subscription. +func (s *Store) UnsubscribeAll(clientID string) { + s.mu.Lock() + stores := make([]*Datastore, 0, len(s.stores)) + for _, d := range s.stores { + stores = append(stores, d) + } + pools := make([]*Pool, 0, len(s.pools)) + for _, p := range s.pools { + pools = append(pools, p) + } + s.mu.Unlock() + for _, d := range stores { + d.Unsubscribe(clientID) + } + for _, p := range pools { + p.Unsubscribe(clientID) + } +} + +// PurgeExpired removes expired temp datastores and pools, returning the total +// number removed. +func (s *Store) PurgeExpired() int { + now := s.now() + s.mu.Lock() + defer s.mu.Unlock() + removed := 0 + for id, d := range s.stores { + if d.expired(now) { + delete(s.stores, id) + removed++ + } + } + for id, p := range s.pools { + if p.expired(now) { + delete(s.pools, id) + removed++ + } + } + return removed +} + +// StartJanitor purges expired temp datastores every interval until stop is called. +func (s *Store) StartJanitor(interval time.Duration) (stop func()) { + done := make(chan struct{}) + go func() { + t := time.NewTicker(interval) + defer t.Stop() + for { + select { + case <-t.C: + s.PurgeExpired() + case <-done: + return + } + } + }() + var once sync.Once + return func() { once.Do(func() { close(done) }) } +} + +// Now exposes the store clock to the service layer so record timestamps share it. +func (s *Store) Now() time.Time { return s.now() } + +func newID() string { + var b [12]byte + _, _ = rand.Read(b[:]) + return hex.EncodeToString(b[:]) +} diff --git a/internal/datastore/datastore_test.go b/internal/datastore/datastore_test.go new file mode 100644 index 0000000..08556cd --- /dev/null +++ b/internal/datastore/datastore_test.go @@ -0,0 +1,184 @@ +package datastore + +import ( + "sync" + "testing" + "time" +) + +func TestDatastoreSetGetSnapshot(t *testing.T) { + s := NewStore() + d := s.Open("notes", Temp, "id", 0) + + r1, ok := d.Set(map[string]any{"id": "a", "v": 1}, s.Now()) + if !ok || r1.Seq != 1 { + t.Fatalf("first set = %+v ok=%v", r1, ok) + } + r2, _ := d.Set(map[string]any{"id": "b", "v": 2}, s.Now()) + if r2.Seq != 2 { + t.Fatalf("second set seq = %d, want 2", r2.Seq) + } + + // Update of a existing key gets a fresh (higher) seq — last write wins. + r1b, _ := d.Set(map[string]any{"id": "a", "v": 99}, s.Now()) + if r1b.Seq <= r2.Seq { + t.Fatalf("update seq = %d, want > %d", r1b.Seq, r2.Seq) + } + + got, ok := d.Get("a") + if !ok || got.Data["v"] != 99 { + t.Fatalf("get a = %+v", got) + } + + snap := d.Snapshot() + if len(snap) != 2 { + t.Fatalf("snapshot len = %d, want 2", len(snap)) + } + // Snapshot is ordered by seq; the updated "a" now sorts last. + if snap[len(snap)-1].Key != "a" { + t.Fatalf("snapshot order wrong: %+v", snap) + } +} + +func TestDatastoreSetRequiresPrimary(t *testing.T) { + s := NewStore() + d := s.Open("x", Temp, "id", 0) + if _, ok := d.Set(map[string]any{"noid": true}, s.Now()); ok { + t.Fatal("set without the primary key should fail") + } +} + +func TestDatastoreDelete(t *testing.T) { + s := NewStore() + d := s.Open("x", Temp, "id", 0) + d.Set(map[string]any{"id": "a"}, s.Now()) + + seq, ok := d.Delete("a") + if !ok || seq == 0 { + t.Fatalf("delete = seq %d ok %v", seq, ok) + } + if _, ok := d.Get("a"); ok { + t.Fatal("record should be gone after delete") + } + if _, ok := d.Delete("a"); ok { + t.Fatal("deleting a missing key should fail") + } +} + +// TestConcurrentSetsResolveByArrival is the #45 conflict-resolution guarantee: +// concurrent writes are serialized by the datastore lock, every write gets a +// unique monotonic seq, and the final value is whichever write arrived last. +func TestConcurrentSetsResolveByArrival(t *testing.T) { + s := NewStore() + d := s.Open("race", Temp, "id", 0) + + const n = 200 + var wg sync.WaitGroup + seqs := make([]uint64, n) + for i := 0; i < n; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + rec, _ := d.Set(map[string]any{"id": "k", "writer": i}, s.Now()) + seqs[i] = rec.Seq + }(i) + } + wg.Wait() + + // All seqs unique and within [1,n]. + seen := make(map[uint64]bool, n) + var max uint64 + for _, sq := range seqs { + if sq == 0 || sq > n || seen[sq] { + t.Fatalf("bad/duplicate seq %d", sq) + } + seen[sq] = true + if sq > max { + max = sq + } + } + // The surviving record must be the one with the highest seq (last arrival). + final, _ := d.Get("k") + if final.Seq != max { + t.Fatalf("final record seq = %d, want max %d", final.Seq, max) + } +} + +func TestTempExpiryAndPurge(t *testing.T) { + s := NewStore() + now := time.Unix(0, 0) + s.SetClock(func() time.Time { return now }) + + s.Open("temp1", Temp, "id", time.Second) + s.Open("perm1", Permanent, "id", 0) + + now = now.Add(2 * time.Second) + if _, ok := s.Get("temp1"); ok { + t.Fatal("temp store should have expired") + } + if _, ok := s.Get("perm1"); !ok { + t.Fatal("permanent store must not expire") + } + if removed := s.PurgeExpired(); removed != 1 { + t.Fatalf("purge removed %d, want 1", removed) + } +} + +func TestOpenIsIdempotentAndGeneratesID(t *testing.T) { + s := NewStore() + a := s.Open("", Temp, "", 0) + if a.ID == "" { + t.Fatal("empty id should be replaced with a generated one") + } + if a.Primary != "id" { + t.Fatalf("default primary = %q, want id", a.Primary) + } + b := s.Open(a.ID, Temp, "", 0) + if a != b { + t.Fatal("opening an existing id should return the same datastore") + } +} + +func TestPoolMergeDedupesAndTracksDelta(t *testing.T) { + s := NewStore() + p := s.OpenPool("p1", 0) + _ = p + + added, _, ok := s.PushPool("p1", []any{"x", "y", "x"}) + if !ok || len(added) != 2 { + t.Fatalf("first push added %v (ok=%v), want 2 unique", added, ok) + } + // Re-pushing existing items adds nothing. + added, _, _ = s.PushPool("p1", []any{"x", "y"}) + if len(added) != 0 { + t.Fatalf("re-push added %v, want 0", added) + } + // A new item is the only delta. + added, _, _ = s.PushPool("p1", []any{"x", "z"}) + if len(added) != 1 || added[0] != "z" { + t.Fatalf("push delta = %v, want [z]", added) + } + + pool, _ := s.GetPool("p1") + if len(pool.Items()) != 3 { + t.Fatalf("pool size = %d, want 3", len(pool.Items())) + } +} + +func TestUnsubscribeAllClearsEverywhere(t *testing.T) { + s := NewStore() + d := s.Open("d", Temp, "id", 0) + p := s.OpenPool("p", 0) + d.Subscribe("alice") + p.Subscribe("alice") + d.Subscribe("bob") + + s.UnsubscribeAll("alice") + + if len(d.Subscribers()) != 1 || d.Subscribers()[0] != "bob" { + t.Fatalf("datastore subscribers = %v, want [bob]", d.Subscribers()) + } + if len(p.Subscribers()) != 0 { + t.Fatalf("pool subscribers = %v, want empty", p.Subscribers()) + } +} diff --git a/internal/datastore/pool.go b/internal/datastore/pool.go new file mode 100644 index 0000000..b1a4536 --- /dev/null +++ b/internal/datastore/pool.go @@ -0,0 +1,125 @@ +package datastore + +import ( + "crypto/sha256" + "encoding/hex" + "encoding/json" + "time" +) + +// Pool is the passive-sync primitive (#45 part 1): a shared set of unique items +// that clients converge by pushing their local items and pulling the merged +// result. Items are deduplicated by the hash of their canonical JSON, so pushing +// the same item from many clients adds it exactly once. Insertion order is kept so +// every client observes the pool in the same order. +type Pool struct { + ID string + ExpiresAt time.Time + + items map[string]any // contentHash -> item + order []string // contentHashes in insertion order + subscribers map[string]struct{} +} + +func (p *Pool) expired(now time.Time) bool { + return !p.ExpiresAt.IsZero() && now.After(p.ExpiresAt) +} + +// Subscribe / Unsubscribe / Subscribers mirror Datastore's, guarded by the +// owning Store's lock (pool mutation always happens under that lock). +func (p *Pool) Subscribe(clientID string) { p.subscribers[clientID] = struct{}{} } +func (p *Pool) Unsubscribe(clientID string) { delete(p.subscribers, clientID) } +func (p *Pool) Subscribers() []string { + out := make([]string, 0, len(p.subscribers)) + for id := range p.subscribers { + out = append(out, id) + } + return out +} + +// Items returns the merged pool in insertion order. +func (p *Pool) Items() []any { + out := make([]any, 0, len(p.order)) + for _, h := range p.order { + out = append(out, p.items[h]) + } + return out +} + +// merge adds items not already present, returning only the newly added ones (so +// the caller broadcasts the minimal delta). Convergence is reached when a client +// has pushed and pulled until no side has new items. +func (p *Pool) merge(items []any) []any { + added := make([]any, 0, len(items)) + for _, it := range items { + h := contentHash(it) + if _, ok := p.items[h]; ok { + continue + } + p.items[h] = it + p.order = append(p.order, h) + added = append(added, it) + } + return added +} + +// OpenPool returns the pool with the given id, creating it if absent. An empty id +// is replaced with a fresh public id; a zero ttl uses the store default. Pools are +// always temporary (passive sync is for fast, ephemeral convergence). +func (s *Store) OpenPool(id string, ttl time.Duration) *Pool { + s.mu.Lock() + defer s.mu.Unlock() + if id == "" { + id = newID() + } + if p, ok := s.pools[id]; ok { + return p + } + if ttl <= 0 { + ttl = s.ttl + } + p := &Pool{ + ID: id, + ExpiresAt: s.now().Add(ttl), + items: make(map[string]any), + subscribers: make(map[string]struct{}), + } + s.pools[id] = p + return p +} + +// GetPool returns an existing, non-expired pool. +func (s *Store) GetPool(id string) (*Pool, bool) { + now := s.now() + s.mu.Lock() + defer s.mu.Unlock() + p, ok := s.pools[id] + if !ok || p.expired(now) { + return nil, false + } + return p, true +} + +// PushPool merges items into the pool under the store lock and returns the newly +// added items plus the current subscriber list for broadcasting. +func (s *Store) PushPool(id string, items []any) (added []any, subscribers []string, ok bool) { + now := s.now() + s.mu.Lock() + defer s.mu.Unlock() + p, exists := s.pools[id] + if !exists || p.expired(now) { + return nil, nil, false + } + return p.merge(items), p.Subscribers(), true +} + +// contentHash is the dedup key: the SHA-256 of the item's canonical JSON. +func contentHash(item any) string { + b, err := json.Marshal(item) + if err != nil { + // Non-serialisable items fall back to a per-call unique key (never deduped). + return newID() + } + sum := sha256.Sum256(b) + return hex.EncodeToString(sum[:]) +} diff --git a/internal/notify/store.go b/internal/notify/store.go new file mode 100644 index 0000000..1563a6b --- /dev/null +++ b/internal/notify/store.go @@ -0,0 +1,218 @@ +// Package notify implements store-and-forward notifications (#43) and their +// reply-bearing "suit" variant (#44). A notification addressed to a client that +// is offline is held until the client connects; one addressed to an online client +// is delivered immediately. Every notification carries a trace id so its delivery +// (and, for suit notifications, its reply) can be queried later. +// +// The store is bounded in two independent ways so it can never grow without +// limit, even if a target never reconnects: an expiry per notification and a hard +// per-target cap (oldest dropped first). A janitor periodically purges expired +// entries; tests drive expiry deterministically through an injectable clock. +package notify + +import ( + "crypto/rand" + "encoding/hex" + "sync" + "time" +) + +// Notification is one store-and-forward message addressed to a client id. +type Notification struct { + Trace string // unique id used to query delivery/reply status + From string // sender: a client id, or "server" for server-originated + To string // target client id + Pack any // opaque payload delivered to the target + Suit bool // true => a reply is expected (#44) + + CreatedAt time.Time + ExpiresAt time.Time // zero == never expires + Delivered bool + DeliveredAt time.Time + + Replied bool + Reply any + RepliedAt time.Time +} + +func (n *Notification) expired(now time.Time) bool { + return !n.ExpiresAt.IsZero() && now.After(n.ExpiresAt) +} + +// Store is the in-memory notification registry. All methods are safe for +// concurrent use. +type Store struct { + mu sync.Mutex + pending map[string][]*Notification // target id -> queued, undelivered + byTrace map[string]*Notification // trace -> notification (status + reply) + + now func() time.Time + defaultTTL time.Duration + maxPerTarget int +} + +// NewStore returns an empty store with sensible defaults (24h TTL, 1024 queued +// notifications per offline target). +func NewStore() *Store { + return &Store{ + pending: make(map[string][]*Notification), + byTrace: make(map[string]*Notification), + now: time.Now, + defaultTTL: 24 * time.Hour, + maxPerTarget: 1024, + } +} + +// SetClock replaces the time source; intended for deterministic tests. +func (s *Store) SetClock(now func() time.Time) { s.now = now } + +// Put records a notification for `to`. A zero ttl uses the store default. The +// returned Notification is a snapshot; the trace id is filled in. +func (s *Store) Put(from, to string, pack any, suit bool, ttl time.Duration) Notification { + if ttl <= 0 { + ttl = s.defaultTTL + } + now := s.now() + n := &Notification{ + Trace: newTrace(), + From: from, + To: to, + Pack: pack, + Suit: suit, + CreatedAt: now, + ExpiresAt: now.Add(ttl), + } + + s.mu.Lock() + defer s.mu.Unlock() + + q := s.pending[to] + // Enforce the per-target cap by evicting the oldest queued notifications. + for len(q) >= s.maxPerTarget && len(q) > 0 { + evicted := q[0] + q = q[1:] + delete(s.byTrace, evicted.Trace) + } + s.pending[to] = append(q, n) + s.byTrace[n.Trace] = n + return *n +} + +// Drain returns every non-expired pending notification for `to`, marking each +// delivered and removing it from the pending queue. Delivered notifications stay +// in the trace index (until they expire) so their status can still be queried. +func (s *Store) Drain(to string) []Notification { + now := s.now() + s.mu.Lock() + defer s.mu.Unlock() + + q := s.pending[to] + if len(q) == 0 { + return nil + } + delete(s.pending, to) + + out := make([]Notification, 0, len(q)) + for _, n := range q { + if n.expired(now) { + delete(s.byTrace, n.Trace) + continue + } + n.Delivered = true + n.DeliveredAt = now + out = append(out, *n) + } + return out +} + +// Status returns a snapshot of the notification with the given trace. +func (s *Store) Status(trace string) (Notification, bool) { + now := s.now() + s.mu.Lock() + defer s.mu.Unlock() + n, ok := s.byTrace[trace] + if !ok || n.expired(now) { + return Notification{}, false + } + return *n, true +} + +// Reply records a client's reply to a suit notification and returns the updated +// snapshot. It fails if the trace is unknown, expired, or not a suit. +func (s *Store) Reply(trace string, reply any) (Notification, bool) { + now := s.now() + s.mu.Lock() + defer s.mu.Unlock() + n, ok := s.byTrace[trace] + if !ok || n.expired(now) || !n.Suit { + return Notification{}, false + } + n.Replied = true + n.Reply = reply + n.RepliedAt = now + return *n, true +} + +// PurgeExpired removes expired notifications from both indexes and returns how +// many were removed. A janitor calls this periodically; Drain/Status/Reply also +// drop expired entries lazily as they are touched. +func (s *Store) PurgeExpired() int { + now := s.now() + s.mu.Lock() + defer s.mu.Unlock() + + removed := 0 + for trace, n := range s.byTrace { + if n.expired(now) { + delete(s.byTrace, trace) + removed++ + } + } + for to, q := range s.pending { + kept := q[:0] + for _, n := range q { + if !n.expired(now) { + kept = append(kept, n) + } + } + if len(kept) == 0 { + delete(s.pending, to) + } else { + s.pending[to] = kept + } + } + return removed +} + +// PendingCount reports how many notifications are queued for a target (test aid). +func (s *Store) PendingCount(to string) int { + s.mu.Lock() + defer s.mu.Unlock() + return len(s.pending[to]) +} + +// StartJanitor runs PurgeExpired every interval until the returned stop function +// is called. main() owns the lifecycle; the stop function makes it leak-free. +func (s *Store) StartJanitor(interval time.Duration) (stop func()) { + done := make(chan struct{}) + go func() { + t := time.NewTicker(interval) + defer t.Stop() + for { + select { + case <-t.C: + s.PurgeExpired() + case <-done: + return + } + } + }() + var once sync.Once + return func() { once.Do(func() { close(done) }) } +} + +func newTrace() string { + var b [16]byte + _, _ = rand.Read(b[:]) + return hex.EncodeToString(b[:]) +} diff --git a/internal/notify/store_test.go b/internal/notify/store_test.go new file mode 100644 index 0000000..a76ff71 --- /dev/null +++ b/internal/notify/store_test.go @@ -0,0 +1,121 @@ +package notify + +import ( + "testing" + "time" +) + +func TestPutAndDrainDelivers(t *testing.T) { + s := NewStore() + n := s.Put("alice", "bob", map[string]any{"text": "hi"}, false, 0) + if n.Trace == "" { + t.Fatal("Put should assign a trace id") + } + if s.PendingCount("bob") != 1 { + t.Fatalf("pending for bob = %d, want 1", s.PendingCount("bob")) + } + + got := s.Drain("bob") + if len(got) != 1 || got[0].Trace != n.Trace { + t.Fatalf("Drain = %+v, want the queued notification", got) + } + if !got[0].Delivered { + t.Fatal("drained notification should be marked delivered") + } + if s.PendingCount("bob") != 0 { + t.Fatal("pending should be empty after drain") + } + + // Status still works after delivery. + st, ok := s.Status(n.Trace) + if !ok || !st.Delivered { + t.Fatalf("Status after drain = %+v, ok=%v", st, ok) + } +} + +func TestExpiryDropsNotification(t *testing.T) { + s := NewStore() + now := time.Unix(1000, 0) + s.SetClock(func() time.Time { return now }) + + s.Put("srv", "bob", "payload", false, 5*time.Second) + + // Advance past expiry; the queued message must not be delivered. + now = now.Add(10 * time.Second) + if got := s.Drain("bob"); len(got) != 0 { + t.Fatalf("expired notification was delivered: %+v", got) + } + if removed := s.PurgeExpired(); removed != 0 { + // Drain already dropped it from byTrace; nothing left to purge. + t.Fatalf("PurgeExpired removed %d, want 0 (already dropped on drain)", removed) + } +} + +func TestPurgeExpiredReclaimsUndeliveredTraces(t *testing.T) { + s := NewStore() + now := time.Unix(0, 0) + s.SetClock(func() time.Time { return now }) + + // A target that never connects: its messages must still be reclaimed. + s.Put("srv", "ghost", "a", false, time.Second) + s.Put("srv", "ghost", "b", false, time.Second) + + now = now.Add(2 * time.Second) + removed := s.PurgeExpired() + if removed != 2 { + t.Fatalf("PurgeExpired removed %d, want 2", removed) + } + if s.PendingCount("ghost") != 0 { + t.Fatal("expired pending queue should be gone") + } +} + +func TestPerTargetCapEvictsOldest(t *testing.T) { + s := NewStore() + s.maxPerTarget = 3 + + var traces []string + for i := 0; i < 5; i++ { + traces = append(traces, s.Put("srv", "bob", i, false, 0).Trace) + } + if s.PendingCount("bob") != 3 { + t.Fatalf("pending = %d, want capped at 3", s.PendingCount("bob")) + } + // The two oldest must have been evicted from the trace index too. + if _, ok := s.Status(traces[0]); ok { + t.Fatal("oldest notification should have been evicted") + } + if _, ok := s.Status(traces[4]); !ok { + t.Fatal("newest notification should be retained") + } +} + +func TestSuitReply(t *testing.T) { + s := NewStore() + n := s.Put("srv", "bob", "question", true, 0) + + // A non-suit reply target must fail. + plain := s.Put("srv", "bob", "fyi", false, 0) + if _, ok := s.Reply(plain.Trace, "nope"); ok { + t.Fatal("replying to a non-suit notification should fail") + } + + updated, ok := s.Reply(n.Trace, map[string]any{"answer": 42}) + if !ok || !updated.Replied { + t.Fatalf("Reply = %+v ok=%v", updated, ok) + } + st, _ := s.Status(n.Trace) + if !st.Replied { + t.Fatal("status should reflect the reply") + } + if m, _ := st.Reply.(map[string]any); m["answer"] != 42 { + t.Fatalf("stored reply = %v", st.Reply) + } +} + +func TestUnknownTraceStatus(t *testing.T) { + s := NewStore() + if _, ok := s.Status("does-not-exist"); ok { + t.Fatal("unknown trace should not resolve") + } +} diff --git a/internal/services/datastore.go b/internal/services/datastore.go new file mode 100644 index 0000000..b24cd15 --- /dev/null +++ b/internal/services/datastore.go @@ -0,0 +1,164 @@ +package services + +import ( + "time" + + "git.saqut.com/saqut/mwse/internal/datastore" + "git.saqut.com/saqut/mwse/internal/protocol" + "git.saqut.com/saqut/mwse/internal/ws" +) + +// registerDatastore wires the #45 shared data layer: active sync (collections with +// CRUD broadcast), passive sync (a fast merge pool), and temp/permanent +// datastores. The returned store is handed back so a janitor can reclaim expired +// temp stores/pools. +// +// Wire surface (additive): +// +// Active sync / collection / datastore: +// data/open {id?, kind?, primary?, expires?} -> {status, id, primary, kind, records} +// (kind = "temp" | "permanent". The payload field is "kind", NOT "type": +// "type" is reserved by WSTS to select the handler.) +// data/set {id, record} -> {status, key, seq} + broadcast data/op(set) +// data/delete {id, key} -> {status, seq} + broadcast data/op(delete) +// data/get {id, key?} -> {status, record|records} +// +// Passive sync (merge pool): +// sync/open {id?, expires?} -> {status, id, items} +// sync/push {id, items} -> {status, added} + broadcast sync/add +// sync/pull {id} -> {status, items} +// +// Broadcasts are server signals delivered to every *other* subscriber: +// +// [ {id, op, record|key, seq}, "data/op" ] +// [ {id, items}, "sync/add" ] +func registerDatastore(hub *ws.Hub) *datastore.Store { + store := datastore.NewStore() + + // Signal a set of subscriber ids, skipping the originator (it already holds the + // change and learns the authoritative seq from its own reply). + broadcast := func(subs []string, except, name string, payload map[string]any) { + for _, id := range subs { + if id == except { + continue + } + if cl, ok := hub.Client(id); ok { + cl.Signal(name, payload) + } + } + } + + // A departed client must leave no residual subscription anywhere. + hub.OnDisconnect(func(c *ws.Client) { store.UnsubscribeAll(c.ID) }) + + // ---- active sync / collection / datastore ------------------------------- + + hub.Register("data/open", func(c *ws.Client, m protocol.Message) any { + kind := datastore.Temp + if m.Str("kind") == string(datastore.Permanent) { + kind = datastore.Permanent + } + ttl := time.Duration(m.Int("expires")) * time.Second + ds := store.Open(m.Str("id"), kind, m.Str("primary"), ttl) + ds.Subscribe(c.ID) + return map[string]any{ + "status": "success", + "id": ds.ID, + "primary": ds.Primary, + "kind": string(ds.Kind), + "records": ds.Snapshot(), + } + }) + + hub.Register("data/set", func(c *ws.Client, m protocol.Message) any { + ds, ok := store.Get(m.Str("id")) + if !ok { + return fail("NOT_FOUND") + } + rec, ok := ds.Set(toMap(m.Get("record")), store.Now()) + if !ok { + return fail("PRIMARY_KEY_REQUIRED") + } + broadcast(ds.Subscribers(), c.ID, "data/op", map[string]any{ + "id": ds.ID, + "op": "set", + "record": rec, + "seq": rec.Seq, + }) + return map[string]any{"status": "success", "key": rec.Key, "seq": rec.Seq} + }) + + hub.Register("data/delete", func(c *ws.Client, m protocol.Message) any { + ds, ok := store.Get(m.Str("id")) + if !ok { + return fail("NOT_FOUND") + } + key := m.Str("key") + seq, ok := ds.Delete(key) + if !ok { + return fail("NO_SUCH_KEY") + } + broadcast(ds.Subscribers(), c.ID, "data/op", map[string]any{ + "id": ds.ID, + "op": "delete", + "key": key, + "seq": seq, + }) + return map[string]any{"status": "success", "seq": seq} + }) + + hub.Register("data/get", func(c *ws.Client, m protocol.Message) any { + ds, ok := store.Get(m.Str("id")) + if !ok { + return fail("NOT_FOUND") + } + if key := m.Str("key"); key != "" { + rec, ok := ds.Get(key) + if !ok { + return fail("NO_SUCH_KEY") + } + return map[string]any{"status": "success", "record": rec} + } + return map[string]any{"status": "success", "records": ds.Snapshot()} + }) + + // ---- passive sync (merge pool) ------------------------------------------ + + hub.Register("sync/open", func(c *ws.Client, m protocol.Message) any { + ttl := time.Duration(m.Int("expires")) * time.Second + p := store.OpenPool(m.Str("id"), ttl) + p.Subscribe(c.ID) + return map[string]any{"status": "success", "id": p.ID, "items": p.Items()} + }) + + hub.Register("sync/push", func(c *ws.Client, m protocol.Message) any { + items := toSlice(m.Get("items")) + added, subs, ok := store.PushPool(m.Str("id"), items) + if !ok { + return fail("NOT_FOUND") + } + if len(added) > 0 { + broadcast(subs, c.ID, "sync/add", map[string]any{"id": m.Str("id"), "items": added}) + } + return map[string]any{"status": "success", "added": len(added)} + }) + + hub.Register("sync/pull", func(c *ws.Client, m protocol.Message) any { + p, ok := store.GetPool(m.Str("id")) + if !ok { + return fail("NOT_FOUND") + } + return map[string]any{"status": "success", "items": p.Items()} + }) + + return store +} + +// toSlice coerces a decoded JSON value to a slice, returning nil when it is not +// an array. +func toSlice(v any) []any { + if s, ok := v.([]any); ok { + return s + } + return nil +} diff --git a/internal/services/datasync_test.go b/internal/services/datasync_test.go new file mode 100644 index 0000000..b43b77c --- /dev/null +++ b/internal/services/datasync_test.go @@ -0,0 +1,152 @@ +package services + +import ( + "encoding/json" + "testing" + + "git.saqut.com/saqut/mwse/internal/datastore" + "git.saqut.com/saqut/mwse/internal/testutil" + "git.saqut.com/saqut/mwse/internal/ws" +) + +// waitDataOp waits for a data/op broadcast whose "op" field matches (the stream +// may contain several — e.g. a set followed by a delete). +func waitDataOp(t *testing.T, fc *testutil.FakeConn, op string) map[string]any { + t.Helper() + find := func() (map[string]any, bool) { + for _, raw := range fc.Writes() { + var arr []any + if json.Unmarshal(raw, &arr) != nil || len(arr) != 2 || arr[1] != "data/op" { + continue + } + if p, ok := arr[0].(map[string]any); ok && p["op"] == op { + return p, true + } + } + return nil, false + } + waitFor(t, func() bool { _, ok := find(); return ok }) + p, _ := find() + return p +} + +// newHubReg builds a hub with services registered and returns the Registry so a +// test can inspect the long-lived stores. +func newHubReg() (*ws.Hub, *Registry) { + hub := ws.NewHub() + reg := Register(hub) + return hub, reg +} + +// TestActiveSyncBroadcast is the #45 collection/CRUD-broadcast core: a set by one +// subscriber is broadcast to every other subscriber with the authoritative seq. +func TestActiveSyncBroadcast(t *testing.T) { + hub, _ := newHubReg() + a, _ := connect(hub, "alice") + b, fb := connect(hub, "bob") + + // Both open the same datastore by id. + opened := asMap(t, hub.Handle(a, msg("data/open", "id", "shared", "kind", "temp", "primary", "id"))) + id := opened["id"].(string) + hub.Handle(b, msg("data/open", "id", id)) + + // alice sets a record. + set := asMap(t, hub.Handle(a, msg("data/set", "id", id, "record", map[string]any{"id": "k1", "v": float64(7)}))) + if set["status"] != "success" { + t.Fatalf("data/set = %v", set) + } + + // bob receives the data/op broadcast (set). + op := waitSignal(t, fb, "data/op") + if op["op"] != "set" { + t.Fatalf("op = %v, want set", op["op"]) + } + rec := asMap(t, op["record"]) + if rec["key"] != "k1" { + t.Fatalf("broadcast record key = %v, want k1", rec["key"]) + } + + // bob can read the record back from the authoritative copy. (An in-process + // reply carries the Go value; over a real socket it is the same shape as JSON.) + got := asMap(t, hub.Handle(b, msg("data/get", "id", id, "key", "k1"))) + gotRec := got["record"].(datastore.Record) + if gotRec.Key != "k1" { + t.Fatalf("data/get = %v", got) + } +} + +func TestActiveSyncDeleteBroadcast(t *testing.T) { + hub, _ := newHubReg() + a, _ := connect(hub, "alice") + b, fb := connect(hub, "bob") + + hub.Handle(a, msg("data/open", "id", "d", "kind", "temp")) + hub.Handle(b, msg("data/open", "id", "d")) + hub.Handle(a, msg("data/set", "id", "d", "record", map[string]any{"id": "x"})) + + del := asMap(t, hub.Handle(a, msg("data/delete", "id", "d", "key", "x"))) + if del["status"] != "success" { + t.Fatalf("data/delete = %v", del) + } + op := waitDataOp(t, fb, "delete") + if op["key"] != "x" { + t.Fatalf("delete broadcast = %v", op) + } +} + +// TestPassiveSyncConvergence is the #45 merge-pool core: pushes are deduped and +// only the delta is broadcast, so subscribers converge. +func TestPassiveSyncConvergence(t *testing.T) { + hub, _ := newHubReg() + a, _ := connect(hub, "alice") + b, fb := connect(hub, "bob") + + opened := asMap(t, hub.Handle(a, msg("sync/open", "id", "pool"))) + id := opened["id"].(string) + hub.Handle(b, msg("sync/open", "id", id)) + + // alice pushes two items; bob gets them as a delta. + res := asMap(t, hub.Handle(a, msg("sync/push", "id", id, "items", []any{"x", "y"}))) + if res["added"] != 2 { + t.Fatalf("push added = %v, want 2", res["added"]) + } + add := waitSignal(t, fb, "sync/add") + if items, _ := add["items"].([]any); len(items) != 2 { + t.Fatalf("sync/add items = %v, want 2", add["items"]) + } + + // bob pulls and sees the full converged pool. + pull := asMap(t, hub.Handle(b, msg("sync/pull", "id", id))) + if items, _ := pull["items"].([]any); len(items) != 2 { + t.Fatalf("sync/pull items = %v, want 2", pull["items"]) + } + + // A push with one new + one existing item broadcasts only the new one. + res = asMap(t, hub.Handle(b, msg("sync/push", "id", id, "items", []any{"y", "z"}))) + if res["added"] != 1 { + t.Fatalf("second push added = %v, want 1", res["added"]) + } +} + +// TestDataSubscriptionClearedOnDisconnect proves no residual subscription is left +// behind (no leak) when a subscriber disconnects. +func TestDataSubscriptionClearedOnDisconnect(t *testing.T) { + hub, reg := newHubReg() + a, _ := connect(hub, "alice") + connect(hub, "bob") + + hub.Handle(a, msg("data/open", "id", "d", "kind", "temp")) + ds, ok := reg.Data.Get("d") + if !ok { + t.Fatal("datastore should exist") + } + if len(ds.Subscribers()) != 1 { + t.Fatalf("subscribers before disconnect = %v, want [alice]", ds.Subscribers()) + } + + hub.Disconnect(a) + + if len(ds.Subscribers()) != 0 { + t.Fatalf("subscribers after disconnect = %v, want empty", ds.Subscribers()) + } +} diff --git a/internal/services/notify.go b/internal/services/notify.go new file mode 100644 index 0000000..a395e7d --- /dev/null +++ b/internal/services/notify.go @@ -0,0 +1,95 @@ +package services + +import ( + "time" + + "git.saqut.com/saqut/mwse/internal/notify" + "git.saqut.com/saqut/mwse/internal/protocol" + "git.saqut.com/saqut/mwse/internal/ws" +) + +// registerNotify wires the store-and-forward notification service (#43) and its +// reply-bearing suit variant (#44). The returned store is handed to the caller so +// a janitor can reclaim expired entries. +// +// Wire surface (all additive — the frozen relay contract is untouched): +// +// - notify/send {to, pack, expires?, suit?} -> {status, trace} +// - notify/status {trace} -> {status, delivered, replied, reply} +// - notify/reply {trace, pack} -> {status} (client answering a suit) +// +// Delivery to the target is the server signal: +// +// [ {from, pack, trace, suit}, "notify" ] +func registerNotify(hub *ws.Hub, trigger NotifyTrigger) *notify.Store { + store := notify.NewStore() + + deliver := func(c *ws.Client, n notify.Notification) { + c.Signal("notify", map[string]any{ + "from": n.From, + "pack": n.Pack, + "trace": n.Trace, + "suit": n.Suit, + }) + } + + // flush drains and delivers everything queued for a connected client. + flush := func(c *ws.Client) { + for _, n := range store.Drain(c.ID) { + deliver(c, n) + } + } + + // On connect, deliver anything that arrived while the client was offline. + hub.OnConnect(flush) + + hub.Register("notify/send", func(c *ws.Client, m protocol.Message) any { + to := m.Str("to") + if to == "" { + return fail("TO_REQUIRED") + } + ttl := time.Duration(m.Int("expires")) * time.Second // 0 => store default + n := store.Put(c.ID, to, m.Get("pack"), m.Truthy("suit"), ttl) + + // If the target is connected right now, deliver immediately; otherwise it + // stays queued until the target's next connect. + if target, ok := hub.Client(to); ok { + flush(target) + } + return map[string]any{"status": "success", "trace": n.Trace} + }) + + hub.Register("notify/status", func(c *ws.Client, m protocol.Message) any { + n, ok := store.Status(m.Str("trace")) + if !ok { + return fail("NOT_FOUND") + } + return map[string]any{ + "status": "success", + "delivered": n.Delivered, + "replied": n.Replied, + "reply": n.Reply, + } + }) + + hub.Register("notify/reply", func(c *ws.Client, m protocol.Message) any { + n, ok := store.Reply(m.Str("trace"), m.Get("pack")) + if !ok { + return fail("NOT_FOUND_OR_NOT_SUIT") + } + // Push the reply outward to the 3rd-party application server (#44): MWSE + // triggers it manually rather than having the server poll notify/status. + trigger.NotifyReplied(n) + // If the original sender is an online client, signal it the reply too. + if origin, ok := hub.Client(n.From); ok { + origin.Signal("notify/reply", map[string]any{ + "trace": n.Trace, + "from": c.ID, + "pack": n.Reply, + }) + } + return success() + }) + + return store +} diff --git a/internal/services/notify_test.go b/internal/services/notify_test.go new file mode 100644 index 0000000..11e4ba7 --- /dev/null +++ b/internal/services/notify_test.go @@ -0,0 +1,110 @@ +package services + +import ( + "sync" + "testing" + + "git.saqut.com/saqut/mwse/internal/notify" + "git.saqut.com/saqut/mwse/internal/ws" +) + +// recTrigger records suit replies pushed outward (the #44 3rd-party trigger). +type recTrigger struct { + mu sync.Mutex + got []notify.Notification +} + +func (r *recTrigger) NotifyReplied(n notify.Notification) { + r.mu.Lock() + r.got = append(r.got, n) + r.mu.Unlock() +} + +func (r *recTrigger) count() int { + r.mu.Lock() + defer r.mu.Unlock() + return len(r.got) +} + +// TestNotifyOfflineThenDeliverOnConnect is the #43 store-and-forward core: a +// message left for an offline client is delivered when that client connects. +func TestNotifyOfflineThenDeliverOnConnect(t *testing.T) { + hub := newHub() + a, _ := connect(hub, "alice") + + res := asMap(t, hub.Handle(a, msg("notify/send", "to", "bob", "pack", map[string]any{"text": "hi"}))) + if res["status"] != "success" { + t.Fatalf("notify/send = %v", res) + } + trace, _ := res["trace"].(string) + if trace == "" { + t.Fatal("notify/send should return a trace id") + } + + // bob was offline; now connects and must receive the queued notification. + _, fb := connect(hub, "bob") + sig := waitSignal(t, fb, "notify") + if sig["trace"] != trace { + t.Fatalf("delivered trace = %v, want %s", sig["trace"], trace) + } + if p, _ := sig["pack"].(map[string]any); p["text"] != "hi" { + t.Fatalf("delivered pack = %v", sig["pack"]) + } + + // Status reports delivered. + st := asMap(t, hub.Handle(a, msg("notify/status", "trace", trace))) + if st["delivered"] != true { + t.Fatalf("status = %v, want delivered", st) + } +} + +// TestNotifyImmediateWhenOnline delivers without waiting when the target is up. +func TestNotifyImmediateWhenOnline(t *testing.T) { + hub := newHub() + a, _ := connect(hub, "alice") + _, fb := connect(hub, "bob") + + hub.Handle(a, msg("notify/send", "to", "bob", "pack", map[string]any{"text": "now"})) + sig := waitSignal(t, fb, "notify") + if p, _ := sig["pack"].(map[string]any); p["text"] != "now" { + t.Fatalf("immediate delivery pack = %v", sig["pack"]) + } +} + +// TestNotifySuitReply is the #44 reply path: a suit notification's reply reaches +// the 3rd-party trigger and is signalled back to the origin client. +func TestNotifySuitReply(t *testing.T) { + trig := &recTrigger{} + hub := ws.NewHub() + Register(hub, WithNotifyTrigger(trig)) + + a, fa := connect(hub, "alice") + b, fb := connect(hub, "bob") + + res := asMap(t, hub.Handle(a, msg("notify/send", "to", "bob", "suit", true, "pack", map[string]any{"q": "ok?"}))) + trace, _ := res["trace"].(string) + + sig := waitSignal(t, fb, "notify") + if sig["suit"] != true { + t.Fatalf("notify suit flag = %v, want true", sig["suit"]) + } + + // bob replies to the suit. + rep := asMap(t, hub.Handle(b, msg("notify/reply", "trace", trace, "pack", map[string]any{"a": "yes"}))) + if rep["status"] != "success" { + t.Fatalf("notify/reply = %v", rep) + } + + // The 3rd-party trigger fired, and the origin (alice) got the reply signal. + waitFor(t, func() bool { return trig.count() == 1 }) + got := waitSignal(t, fa, "notify/reply") + if p, _ := got["pack"].(map[string]any); p["a"] != "yes" { + t.Fatalf("origin reply signal pack = %v", got["pack"]) + } + + // A non-suit reply must be rejected. + plain := asMap(t, hub.Handle(a, msg("notify/send", "to", "bob", "pack", map[string]any{}))) + if r := asMap(t, hub.Handle(b, msg("notify/reply", "trace", plain["trace"], "pack", map[string]any{}))); r["status"] != "fail" { + t.Fatalf("reply to non-suit = %v, want fail", r) + } +} diff --git a/internal/services/services.go b/internal/services/services.go index 0a91579..c3c2dd4 100644 --- a/internal/services/services.go +++ b/internal/services/services.go @@ -14,21 +14,71 @@ import ( "crypto/sha256" "encoding/hex" + "git.saqut.com/saqut/mwse/internal/datastore" + "git.saqut.com/saqut/mwse/internal/notify" "git.saqut.com/saqut/mwse/internal/protocol" "git.saqut.com/saqut/mwse/internal/ws" ) -// Register wires every service onto the hub. Call once during startup, before the -// server begins accepting connections. The order mirrors the Node require() order -// so connect-time side effects (id message, private room, session defaults) -// happen in the same sequence. -func Register(hub *ws.Hub) { +// NotifyTrigger is invoked when a suit notification receives a reply, so MWSE can +// push the result to a 3rd-party application server (#44) instead of that server +// polling for it. The default is a no-op; a real HTTP-backed implementation lives +// in the bridge wiring (#46). +type NotifyTrigger interface { + NotifyReplied(n notify.Notification) +} + +type noopTrigger struct{} + +func (noopTrigger) NotifyReplied(notify.Notification) {} + +// options collects the externally-wired integrations a deployment may supply. +type options struct { + notifyTrigger NotifyTrigger +} + +// Option configures Register. +type Option func(*options) + +// WithNotifyTrigger sets the 3rd-party trigger fired on suit replies (#44). +func WithNotifyTrigger(t NotifyTrigger) Option { + return func(o *options) { + if t != nil { + o.notifyTrigger = t + } + } +} + +// Registry exposes the long-lived stores the services own so the caller (main) +// can manage their lifecycle — for example start the notify/datastore janitors +// that reclaim expired entries. +type Registry struct { + Notify *notify.Store + Data *datastore.Store +} + +// Register wires every service onto the hub and returns a Registry of the stores +// that need lifecycle management. Call once during startup, before the server +// begins accepting connections. The order mirrors the Node require() order so +// connect-time side effects (id message, private room, session defaults) happen +// in the same sequence; the data-sync services (#43–#45) are appended after. +func Register(hub *ws.Hub, opts ...Option) *Registry { + o := options{notifyTrigger: noopTrigger{}} + for _, apply := range opts { + apply(&o) + } + registerYourID(hub) registerAuth(hub) registerRoom(hub) registerDataTransfer(hub) registerIPPressure(hub, nil) registerSession(hub) + + return &Registry{ + Notify: registerNotify(hub, o.notifyTrigger), + Data: registerDatastore(hub), + } } // ---- small response helpers --------------------------------------------- diff --git a/main.go b/main.go index c641468..059f923 100644 --- a/main.go +++ b/main.go @@ -12,6 +12,7 @@ import ( "os" "os/signal" "syscall" + "time" "git.saqut.com/saqut/mwse/internal/config" "git.saqut.com/saqut/mwse/internal/httpserver" @@ -23,7 +24,15 @@ func main() { cfg := config.Load() hub := ws.NewHub() - services.Register(hub) + reg := services.Register(hub) + + // The notify and datastore stores hold entries with a TTL; their janitors + // reclaim expired ones so memory cannot grow without bound. They are stopped + // during shutdown so no goroutine is leaked. + stopNotify := reg.Notify.StartJanitor(time.Minute) + stopData := reg.Data.StartJanitor(time.Minute) + defer stopNotify() + defer stopData() srv := httpserver.New(hub, cfg)