#43/#44/#45: Notify (store-and-forward + suit) ve data-sync alt sistemleri
- internal/notify: store-and-forward (#43) + suit/yanıtlı (#44). Offline hedefe bırakılan mesaj bağlanınca teslim; trace id ile durum sorgulanır; suit cevabı 3. taraf trigger'a manuel iletilir. TTL + hedef-başı sınır + janitor → leak yok. - internal/datastore: active sync/collection (CRUD broadcast, arrival-time seq ile çakışma çözümü) + passive sync (dedupe merge pool) + temp/permanent datastore. Saf paket (ws bağımsız), servis katmanı I/O yapar. - services.Register artık *Registry döndürür + ...Option alır (WithNotifyTrigger). main.go janitor'ları başlatır/durdurur. Eski Register(hub) çağrıları çalışır. Testler: internal/notify, internal/datastore birim testleri + services notify_test/datasync_test (offline teslim, suit reply+trigger, CRUD broadcast, concurrent arrival-order, passive convergence, disconnect unsubscribe). go test -race ./... yeşil. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
parent
945b7621a4
commit
441093bad6
12
decisions.md
12
decisions.md
|
|
@ -58,6 +58,18 @@ Hedef: çok yüksek bağlantı sayısı + sürekli mesaj trafiği; limitleri/poo
|
|||
|
||||
**Regression:** `internal/ws` `TestServerNoReplyOnNilResult`; `internal/services` `TestRequestResponseRoundTrip` (out-of-band cevap aynı id ile geri geliyor, erken yanıt yok).
|
||||
|
||||
## Data-sync alt sistemleri (#43 / #44 / #45)
|
||||
|
||||
16. **`services.Register` artık `*Registry` döndürür ve `...Option` alır.** Eski çağrılar (`Register(hub)`) dönüşü yok sayarak çalışmaya devam eder. Registry, ömür yönetimi gereken store'ları (notify, datastore) main'e açar; main bunların janitor'larını başlatır ve shutdown'da durdurur (goroutine sızıntısı yok). 3. taraf entegrasyonları (#44 trigger) `WithNotifyTrigger` ile enjekte edilir; varsayılan no-op (IPPressure'daki `Announcer` deseniyle aynı).
|
||||
|
||||
17. **#43 Notify (store-and-forward) + #44 Suit:** `internal/notify` saf, transport-bağımsız store (enjekte edilebilir clock). Hedef offline ise mesaj kuyruğa girer, bağlanınca `notify` sinyali ile teslim edilir; her bildirimde `trace` id → `notify/status` ile sorgulanır. **Leak yok:** her bildirimde TTL (vars. 24s) + hedef başına sınır (vars. 1024, en eski düşer) + periyodik janitor. **Suit (#44):** `suit:true` bildirime client `notify/reply` ile cevap verir; cevap 3. taraf sunucuya `NotifyTrigger` ile **manuel tetiklenir** (poll yok) ve origin client online ise `notify/reply` sinyali ile bildirilir.
|
||||
|
||||
18. **#45 Datastore + Active/Passive Sync:** `internal/datastore` saf paket (ws bağımlılığı yok — data kurallarını hub/socket olmadan test edilebilir kılar; servis katmanı abone id'lerini çözüp sinyal atar).
|
||||
- **Active sync / collection (CRUD broadcast):** `data/open`/`data/set`/`data/delete`/`data/get`. Sunucu otoriter kopyayı tutar; her mutasyon datastore kilidi altında **monoton seq** ile damgalanır → **çakışma çözümü arrival-time** (kilidi en son alan kazanır), seq her broadcast'te taşınır, client'lar yakınsar. Set/delete diğer abonelere `data/op` sinyali ile yayılır (origin hariç; o seq'i kendi yanıtından öğrenir).
|
||||
- **Passive sync (merge pool):** `sync/open`/`sync/push`/`sync/pull`. Öğeler kanonik JSON hash'iyle **dedupe** edilir; push yalnızca yeni delta'yı `sync/add` ile yayar → ortak havuzda toplanır, hepsi eşit olana kadar push/pull sürer.
|
||||
- **Datastore tipi:** `kind:"temp"|"permanent"` (alan adı bilinçli olarak `kind`; `type` WSTS handler seçici olduğu için kullanılamaz). temp TTL ile expire, permanent kalıcı. id boşsa public id üretilir.
|
||||
- **Leak yok:** temp store/pool TTL + janitor; disconnect'te `UnsubscribeAll` ile abonelik kalıntısı bırakılmaz.
|
||||
|
||||
## Session varsayılanları
|
||||
|
||||
`packrecaive`/`packsending`/`notifyPairInfo`/`notifyRoomInfo` varsayılanları **`NewClient` constructor'ında** set ediliyor (listener sırasından bağımsız her zaman mevcut). Session servisinin connect hook'u parite için yine de bunları yeniden uyguluyor.
|
||||
|
|
|
|||
|
|
@ -0,0 +1,279 @@
|
|||
// Package datastore implements the shared data layer of #45: server-authoritative
|
||||
// collections that clients mutate with CRUD operations (active sync) and a fast
|
||||
// merge pool that clients converge through (passive sync), plus temp/permanent
|
||||
// datastores addressable by a public id.
|
||||
//
|
||||
// The package is deliberately free of any transport dependency: it owns the data
|
||||
// and the conflict rules, and returns the subscriber ids that a mutation must be
|
||||
// broadcast to. The service layer (internal/services) resolves those ids to
|
||||
// connections and emits the signals. That split keeps the data rules unit-testable
|
||||
// without a hub or a socket.
|
||||
//
|
||||
// # Conflict resolution
|
||||
//
|
||||
// Every mutation is serialized through the datastore's lock and stamped with a
|
||||
// monotonically increasing sequence number. "Arrival-time priority" (the rule the
|
||||
// issue asks for) therefore falls out naturally: concurrent writes are ordered by
|
||||
// the moment they acquire the lock, and the last arrival wins. The sequence number
|
||||
// rides along on every broadcast so clients converge to the same state.
|
||||
package datastore
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"encoding/hex"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Kind selects the lifetime of a datastore.
|
||||
type Kind string
|
||||
|
||||
const (
|
||||
Temp Kind = "temp" // expires after a TTL (default)
|
||||
Permanent Kind = "permanent" // never expires
|
||||
)
|
||||
|
||||
// Record is one row of a collection, keyed by the value of the datastore's
|
||||
// primary field.
|
||||
type Record struct {
|
||||
Key string `json:"key"`
|
||||
Data map[string]any `json:"data"`
|
||||
Seq uint64 `json:"seq"`
|
||||
Updated time.Time `json:"-"`
|
||||
}
|
||||
|
||||
// Datastore is a server-authoritative shared table.
|
||||
type Datastore struct {
|
||||
ID string
|
||||
Kind Kind
|
||||
Primary string // primary key field name (default "id")
|
||||
ExpiresAt time.Time
|
||||
|
||||
mu sync.RWMutex
|
||||
records map[string]*Record
|
||||
subscribers map[string]struct{}
|
||||
seq uint64
|
||||
}
|
||||
|
||||
func (d *Datastore) expired(now time.Time) bool {
|
||||
return !d.ExpiresAt.IsZero() && now.After(d.ExpiresAt)
|
||||
}
|
||||
|
||||
// Subscribe registers a client id as interested in this datastore's broadcasts.
|
||||
func (d *Datastore) Subscribe(clientID string) {
|
||||
d.mu.Lock()
|
||||
d.subscribers[clientID] = struct{}{}
|
||||
d.mu.Unlock()
|
||||
}
|
||||
|
||||
// Unsubscribe removes a client id.
|
||||
func (d *Datastore) Unsubscribe(clientID string) {
|
||||
d.mu.Lock()
|
||||
delete(d.subscribers, clientID)
|
||||
d.mu.Unlock()
|
||||
}
|
||||
|
||||
// Subscribers returns the current subscriber ids.
|
||||
func (d *Datastore) Subscribers() []string {
|
||||
d.mu.RLock()
|
||||
defer d.mu.RUnlock()
|
||||
out := make([]string, 0, len(d.subscribers))
|
||||
for id := range d.subscribers {
|
||||
out = append(out, id)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// Set upserts a record and returns it stamped with a fresh sequence number. The
|
||||
// primary key is read from data[Primary]; if absent or not a string, ok is false.
|
||||
func (d *Datastore) Set(data map[string]any, now time.Time) (Record, bool) {
|
||||
key, ok := data[d.Primary].(string)
|
||||
if !ok || key == "" {
|
||||
return Record{}, false
|
||||
}
|
||||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
d.seq++
|
||||
rec := &Record{Key: key, Data: data, Seq: d.seq, Updated: now}
|
||||
d.records[key] = rec
|
||||
return *rec, true
|
||||
}
|
||||
|
||||
// Delete removes a record, returning the sequence number of the delete op (so the
|
||||
// broadcast can be ordered against sets) and whether a record existed.
|
||||
func (d *Datastore) Delete(key string) (uint64, bool) {
|
||||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
if _, ok := d.records[key]; !ok {
|
||||
return 0, false
|
||||
}
|
||||
d.seq++
|
||||
delete(d.records, key)
|
||||
return d.seq, true
|
||||
}
|
||||
|
||||
// Get returns a copy of one record.
|
||||
func (d *Datastore) Get(key string) (Record, bool) {
|
||||
d.mu.RLock()
|
||||
defer d.mu.RUnlock()
|
||||
rec, ok := d.records[key]
|
||||
if !ok {
|
||||
return Record{}, false
|
||||
}
|
||||
return *rec, true
|
||||
}
|
||||
|
||||
// Snapshot returns all records ordered by sequence number (insertion/update
|
||||
// order), which is the order a freshly-opening client should apply them in.
|
||||
func (d *Datastore) Snapshot() []Record {
|
||||
d.mu.RLock()
|
||||
defer d.mu.RUnlock()
|
||||
out := make([]Record, 0, len(d.records))
|
||||
for _, rec := range d.records {
|
||||
out = append(out, *rec)
|
||||
}
|
||||
sort.Slice(out, func(i, j int) bool { return out[i].Seq < out[j].Seq })
|
||||
return out
|
||||
}
|
||||
|
||||
// Store owns all named datastores (active sync) and merge pools (passive sync).
|
||||
type Store struct {
|
||||
mu sync.Mutex
|
||||
stores map[string]*Datastore
|
||||
pools map[string]*Pool
|
||||
now func() time.Time
|
||||
ttl time.Duration
|
||||
}
|
||||
|
||||
// NewStore returns an empty store. Temp datastores and pools default to a 1h TTL.
|
||||
func NewStore() *Store {
|
||||
return &Store{
|
||||
stores: make(map[string]*Datastore),
|
||||
pools: make(map[string]*Pool),
|
||||
now: time.Now,
|
||||
ttl: time.Hour,
|
||||
}
|
||||
}
|
||||
|
||||
// SetClock replaces the time source; intended for deterministic tests.
|
||||
func (s *Store) SetClock(now func() time.Time) { s.now = now }
|
||||
|
||||
// Open returns the datastore with the given id, creating it if absent. An empty
|
||||
// id is replaced with a fresh public id. An empty primary defaults to "id". A
|
||||
// zero ttl on a temp datastore uses the store default; permanent ignores ttl.
|
||||
func (s *Store) Open(id string, kind Kind, primary string, ttl time.Duration) *Datastore {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if id == "" {
|
||||
id = newID()
|
||||
}
|
||||
if existing, ok := s.stores[id]; ok {
|
||||
return existing
|
||||
}
|
||||
if primary == "" {
|
||||
primary = "id"
|
||||
}
|
||||
if kind != Permanent {
|
||||
kind = Temp
|
||||
}
|
||||
d := &Datastore{
|
||||
ID: id,
|
||||
Kind: kind,
|
||||
Primary: primary,
|
||||
records: make(map[string]*Record),
|
||||
subscribers: make(map[string]struct{}),
|
||||
}
|
||||
if kind == Temp {
|
||||
if ttl <= 0 {
|
||||
ttl = s.ttl
|
||||
}
|
||||
d.ExpiresAt = s.now().Add(ttl)
|
||||
}
|
||||
s.stores[id] = d
|
||||
return d
|
||||
}
|
||||
|
||||
// Get returns an existing, non-expired datastore.
|
||||
func (s *Store) Get(id string) (*Datastore, bool) {
|
||||
now := s.now()
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
d, ok := s.stores[id]
|
||||
if !ok || d.expired(now) {
|
||||
return nil, false
|
||||
}
|
||||
return d, true
|
||||
}
|
||||
|
||||
// UnsubscribeAll drops a client id from every datastore and pool. Called on
|
||||
// disconnect so a departed client leaves no residual subscription.
|
||||
func (s *Store) UnsubscribeAll(clientID string) {
|
||||
s.mu.Lock()
|
||||
stores := make([]*Datastore, 0, len(s.stores))
|
||||
for _, d := range s.stores {
|
||||
stores = append(stores, d)
|
||||
}
|
||||
pools := make([]*Pool, 0, len(s.pools))
|
||||
for _, p := range s.pools {
|
||||
pools = append(pools, p)
|
||||
}
|
||||
s.mu.Unlock()
|
||||
for _, d := range stores {
|
||||
d.Unsubscribe(clientID)
|
||||
}
|
||||
for _, p := range pools {
|
||||
p.Unsubscribe(clientID)
|
||||
}
|
||||
}
|
||||
|
||||
// PurgeExpired removes expired temp datastores and pools, returning the total
|
||||
// number removed.
|
||||
func (s *Store) PurgeExpired() int {
|
||||
now := s.now()
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
removed := 0
|
||||
for id, d := range s.stores {
|
||||
if d.expired(now) {
|
||||
delete(s.stores, id)
|
||||
removed++
|
||||
}
|
||||
}
|
||||
for id, p := range s.pools {
|
||||
if p.expired(now) {
|
||||
delete(s.pools, id)
|
||||
removed++
|
||||
}
|
||||
}
|
||||
return removed
|
||||
}
|
||||
|
||||
// StartJanitor purges expired temp datastores every interval until stop is called.
|
||||
func (s *Store) StartJanitor(interval time.Duration) (stop func()) {
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
t := time.NewTicker(interval)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-t.C:
|
||||
s.PurgeExpired()
|
||||
case <-done:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
var once sync.Once
|
||||
return func() { once.Do(func() { close(done) }) }
|
||||
}
|
||||
|
||||
// Now exposes the store clock to the service layer so record timestamps share it.
|
||||
func (s *Store) Now() time.Time { return s.now() }
|
||||
|
||||
func newID() string {
|
||||
var b [12]byte
|
||||
_, _ = rand.Read(b[:])
|
||||
return hex.EncodeToString(b[:])
|
||||
}
|
||||
|
|
@ -0,0 +1,184 @@
|
|||
package datastore
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestDatastoreSetGetSnapshot(t *testing.T) {
|
||||
s := NewStore()
|
||||
d := s.Open("notes", Temp, "id", 0)
|
||||
|
||||
r1, ok := d.Set(map[string]any{"id": "a", "v": 1}, s.Now())
|
||||
if !ok || r1.Seq != 1 {
|
||||
t.Fatalf("first set = %+v ok=%v", r1, ok)
|
||||
}
|
||||
r2, _ := d.Set(map[string]any{"id": "b", "v": 2}, s.Now())
|
||||
if r2.Seq != 2 {
|
||||
t.Fatalf("second set seq = %d, want 2", r2.Seq)
|
||||
}
|
||||
|
||||
// Update of a existing key gets a fresh (higher) seq — last write wins.
|
||||
r1b, _ := d.Set(map[string]any{"id": "a", "v": 99}, s.Now())
|
||||
if r1b.Seq <= r2.Seq {
|
||||
t.Fatalf("update seq = %d, want > %d", r1b.Seq, r2.Seq)
|
||||
}
|
||||
|
||||
got, ok := d.Get("a")
|
||||
if !ok || got.Data["v"] != 99 {
|
||||
t.Fatalf("get a = %+v", got)
|
||||
}
|
||||
|
||||
snap := d.Snapshot()
|
||||
if len(snap) != 2 {
|
||||
t.Fatalf("snapshot len = %d, want 2", len(snap))
|
||||
}
|
||||
// Snapshot is ordered by seq; the updated "a" now sorts last.
|
||||
if snap[len(snap)-1].Key != "a" {
|
||||
t.Fatalf("snapshot order wrong: %+v", snap)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDatastoreSetRequiresPrimary(t *testing.T) {
|
||||
s := NewStore()
|
||||
d := s.Open("x", Temp, "id", 0)
|
||||
if _, ok := d.Set(map[string]any{"noid": true}, s.Now()); ok {
|
||||
t.Fatal("set without the primary key should fail")
|
||||
}
|
||||
}
|
||||
|
||||
func TestDatastoreDelete(t *testing.T) {
|
||||
s := NewStore()
|
||||
d := s.Open("x", Temp, "id", 0)
|
||||
d.Set(map[string]any{"id": "a"}, s.Now())
|
||||
|
||||
seq, ok := d.Delete("a")
|
||||
if !ok || seq == 0 {
|
||||
t.Fatalf("delete = seq %d ok %v", seq, ok)
|
||||
}
|
||||
if _, ok := d.Get("a"); ok {
|
||||
t.Fatal("record should be gone after delete")
|
||||
}
|
||||
if _, ok := d.Delete("a"); ok {
|
||||
t.Fatal("deleting a missing key should fail")
|
||||
}
|
||||
}
|
||||
|
||||
// TestConcurrentSetsResolveByArrival is the #45 conflict-resolution guarantee:
|
||||
// concurrent writes are serialized by the datastore lock, every write gets a
|
||||
// unique monotonic seq, and the final value is whichever write arrived last.
|
||||
func TestConcurrentSetsResolveByArrival(t *testing.T) {
|
||||
s := NewStore()
|
||||
d := s.Open("race", Temp, "id", 0)
|
||||
|
||||
const n = 200
|
||||
var wg sync.WaitGroup
|
||||
seqs := make([]uint64, n)
|
||||
for i := 0; i < n; i++ {
|
||||
wg.Add(1)
|
||||
go func(i int) {
|
||||
defer wg.Done()
|
||||
rec, _ := d.Set(map[string]any{"id": "k", "writer": i}, s.Now())
|
||||
seqs[i] = rec.Seq
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// All seqs unique and within [1,n].
|
||||
seen := make(map[uint64]bool, n)
|
||||
var max uint64
|
||||
for _, sq := range seqs {
|
||||
if sq == 0 || sq > n || seen[sq] {
|
||||
t.Fatalf("bad/duplicate seq %d", sq)
|
||||
}
|
||||
seen[sq] = true
|
||||
if sq > max {
|
||||
max = sq
|
||||
}
|
||||
}
|
||||
// The surviving record must be the one with the highest seq (last arrival).
|
||||
final, _ := d.Get("k")
|
||||
if final.Seq != max {
|
||||
t.Fatalf("final record seq = %d, want max %d", final.Seq, max)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTempExpiryAndPurge(t *testing.T) {
|
||||
s := NewStore()
|
||||
now := time.Unix(0, 0)
|
||||
s.SetClock(func() time.Time { return now })
|
||||
|
||||
s.Open("temp1", Temp, "id", time.Second)
|
||||
s.Open("perm1", Permanent, "id", 0)
|
||||
|
||||
now = now.Add(2 * time.Second)
|
||||
if _, ok := s.Get("temp1"); ok {
|
||||
t.Fatal("temp store should have expired")
|
||||
}
|
||||
if _, ok := s.Get("perm1"); !ok {
|
||||
t.Fatal("permanent store must not expire")
|
||||
}
|
||||
if removed := s.PurgeExpired(); removed != 1 {
|
||||
t.Fatalf("purge removed %d, want 1", removed)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOpenIsIdempotentAndGeneratesID(t *testing.T) {
|
||||
s := NewStore()
|
||||
a := s.Open("", Temp, "", 0)
|
||||
if a.ID == "" {
|
||||
t.Fatal("empty id should be replaced with a generated one")
|
||||
}
|
||||
if a.Primary != "id" {
|
||||
t.Fatalf("default primary = %q, want id", a.Primary)
|
||||
}
|
||||
b := s.Open(a.ID, Temp, "", 0)
|
||||
if a != b {
|
||||
t.Fatal("opening an existing id should return the same datastore")
|
||||
}
|
||||
}
|
||||
|
||||
func TestPoolMergeDedupesAndTracksDelta(t *testing.T) {
|
||||
s := NewStore()
|
||||
p := s.OpenPool("p1", 0)
|
||||
_ = p
|
||||
|
||||
added, _, ok := s.PushPool("p1", []any{"x", "y", "x"})
|
||||
if !ok || len(added) != 2 {
|
||||
t.Fatalf("first push added %v (ok=%v), want 2 unique", added, ok)
|
||||
}
|
||||
// Re-pushing existing items adds nothing.
|
||||
added, _, _ = s.PushPool("p1", []any{"x", "y"})
|
||||
if len(added) != 0 {
|
||||
t.Fatalf("re-push added %v, want 0", added)
|
||||
}
|
||||
// A new item is the only delta.
|
||||
added, _, _ = s.PushPool("p1", []any{"x", "z"})
|
||||
if len(added) != 1 || added[0] != "z" {
|
||||
t.Fatalf("push delta = %v, want [z]", added)
|
||||
}
|
||||
|
||||
pool, _ := s.GetPool("p1")
|
||||
if len(pool.Items()) != 3 {
|
||||
t.Fatalf("pool size = %d, want 3", len(pool.Items()))
|
||||
}
|
||||
}
|
||||
|
||||
func TestUnsubscribeAllClearsEverywhere(t *testing.T) {
|
||||
s := NewStore()
|
||||
d := s.Open("d", Temp, "id", 0)
|
||||
p := s.OpenPool("p", 0)
|
||||
d.Subscribe("alice")
|
||||
p.Subscribe("alice")
|
||||
d.Subscribe("bob")
|
||||
|
||||
s.UnsubscribeAll("alice")
|
||||
|
||||
if len(d.Subscribers()) != 1 || d.Subscribers()[0] != "bob" {
|
||||
t.Fatalf("datastore subscribers = %v, want [bob]", d.Subscribers())
|
||||
}
|
||||
if len(p.Subscribers()) != 0 {
|
||||
t.Fatalf("pool subscribers = %v, want empty", p.Subscribers())
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,125 @@
|
|||
package datastore
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Pool is the passive-sync primitive (#45 part 1): a shared set of unique items
|
||||
// that clients converge by pushing their local items and pulling the merged
|
||||
// result. Items are deduplicated by the hash of their canonical JSON, so pushing
|
||||
// the same item from many clients adds it exactly once. Insertion order is kept so
|
||||
// every client observes the pool in the same order.
|
||||
type Pool struct {
|
||||
ID string
|
||||
ExpiresAt time.Time
|
||||
|
||||
items map[string]any // contentHash -> item
|
||||
order []string // contentHashes in insertion order
|
||||
subscribers map[string]struct{}
|
||||
}
|
||||
|
||||
func (p *Pool) expired(now time.Time) bool {
|
||||
return !p.ExpiresAt.IsZero() && now.After(p.ExpiresAt)
|
||||
}
|
||||
|
||||
// Subscribe / Unsubscribe / Subscribers mirror Datastore's, guarded by the
|
||||
// owning Store's lock (pool mutation always happens under that lock).
|
||||
func (p *Pool) Subscribe(clientID string) { p.subscribers[clientID] = struct{}{} }
|
||||
func (p *Pool) Unsubscribe(clientID string) { delete(p.subscribers, clientID) }
|
||||
func (p *Pool) Subscribers() []string {
|
||||
out := make([]string, 0, len(p.subscribers))
|
||||
for id := range p.subscribers {
|
||||
out = append(out, id)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// Items returns the merged pool in insertion order.
|
||||
func (p *Pool) Items() []any {
|
||||
out := make([]any, 0, len(p.order))
|
||||
for _, h := range p.order {
|
||||
out = append(out, p.items[h])
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// merge adds items not already present, returning only the newly added ones (so
|
||||
// the caller broadcasts the minimal delta). Convergence is reached when a client
|
||||
// has pushed and pulled until no side has new items.
|
||||
func (p *Pool) merge(items []any) []any {
|
||||
added := make([]any, 0, len(items))
|
||||
for _, it := range items {
|
||||
h := contentHash(it)
|
||||
if _, ok := p.items[h]; ok {
|
||||
continue
|
||||
}
|
||||
p.items[h] = it
|
||||
p.order = append(p.order, h)
|
||||
added = append(added, it)
|
||||
}
|
||||
return added
|
||||
}
|
||||
|
||||
// OpenPool returns the pool with the given id, creating it if absent. An empty id
|
||||
// is replaced with a fresh public id; a zero ttl uses the store default. Pools are
|
||||
// always temporary (passive sync is for fast, ephemeral convergence).
|
||||
func (s *Store) OpenPool(id string, ttl time.Duration) *Pool {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if id == "" {
|
||||
id = newID()
|
||||
}
|
||||
if p, ok := s.pools[id]; ok {
|
||||
return p
|
||||
}
|
||||
if ttl <= 0 {
|
||||
ttl = s.ttl
|
||||
}
|
||||
p := &Pool{
|
||||
ID: id,
|
||||
ExpiresAt: s.now().Add(ttl),
|
||||
items: make(map[string]any),
|
||||
subscribers: make(map[string]struct{}),
|
||||
}
|
||||
s.pools[id] = p
|
||||
return p
|
||||
}
|
||||
|
||||
// GetPool returns an existing, non-expired pool.
|
||||
func (s *Store) GetPool(id string) (*Pool, bool) {
|
||||
now := s.now()
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
p, ok := s.pools[id]
|
||||
if !ok || p.expired(now) {
|
||||
return nil, false
|
||||
}
|
||||
return p, true
|
||||
}
|
||||
|
||||
// PushPool merges items into the pool under the store lock and returns the newly
|
||||
// added items plus the current subscriber list for broadcasting.
|
||||
func (s *Store) PushPool(id string, items []any) (added []any, subscribers []string, ok bool) {
|
||||
now := s.now()
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
p, exists := s.pools[id]
|
||||
if !exists || p.expired(now) {
|
||||
return nil, nil, false
|
||||
}
|
||||
return p.merge(items), p.Subscribers(), true
|
||||
}
|
||||
|
||||
// contentHash is the dedup key: the SHA-256 of the item's canonical JSON.
|
||||
func contentHash(item any) string {
|
||||
b, err := json.Marshal(item)
|
||||
if err != nil {
|
||||
// Non-serialisable items fall back to a per-call unique key (never deduped).
|
||||
return newID()
|
||||
}
|
||||
sum := sha256.Sum256(b)
|
||||
return hex.EncodeToString(sum[:])
|
||||
}
|
||||
|
|
@ -0,0 +1,218 @@
|
|||
// Package notify implements store-and-forward notifications (#43) and their
|
||||
// reply-bearing "suit" variant (#44). A notification addressed to a client that
|
||||
// is offline is held until the client connects; one addressed to an online client
|
||||
// is delivered immediately. Every notification carries a trace id so its delivery
|
||||
// (and, for suit notifications, its reply) can be queried later.
|
||||
//
|
||||
// The store is bounded in two independent ways so it can never grow without
|
||||
// limit, even if a target never reconnects: an expiry per notification and a hard
|
||||
// per-target cap (oldest dropped first). A janitor periodically purges expired
|
||||
// entries; tests drive expiry deterministically through an injectable clock.
|
||||
package notify
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"encoding/hex"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Notification is one store-and-forward message addressed to a client id.
|
||||
type Notification struct {
|
||||
Trace string // unique id used to query delivery/reply status
|
||||
From string // sender: a client id, or "server" for server-originated
|
||||
To string // target client id
|
||||
Pack any // opaque payload delivered to the target
|
||||
Suit bool // true => a reply is expected (#44)
|
||||
|
||||
CreatedAt time.Time
|
||||
ExpiresAt time.Time // zero == never expires
|
||||
Delivered bool
|
||||
DeliveredAt time.Time
|
||||
|
||||
Replied bool
|
||||
Reply any
|
||||
RepliedAt time.Time
|
||||
}
|
||||
|
||||
func (n *Notification) expired(now time.Time) bool {
|
||||
return !n.ExpiresAt.IsZero() && now.After(n.ExpiresAt)
|
||||
}
|
||||
|
||||
// Store is the in-memory notification registry. All methods are safe for
|
||||
// concurrent use.
|
||||
type Store struct {
|
||||
mu sync.Mutex
|
||||
pending map[string][]*Notification // target id -> queued, undelivered
|
||||
byTrace map[string]*Notification // trace -> notification (status + reply)
|
||||
|
||||
now func() time.Time
|
||||
defaultTTL time.Duration
|
||||
maxPerTarget int
|
||||
}
|
||||
|
||||
// NewStore returns an empty store with sensible defaults (24h TTL, 1024 queued
|
||||
// notifications per offline target).
|
||||
func NewStore() *Store {
|
||||
return &Store{
|
||||
pending: make(map[string][]*Notification),
|
||||
byTrace: make(map[string]*Notification),
|
||||
now: time.Now,
|
||||
defaultTTL: 24 * time.Hour,
|
||||
maxPerTarget: 1024,
|
||||
}
|
||||
}
|
||||
|
||||
// SetClock replaces the time source; intended for deterministic tests.
|
||||
func (s *Store) SetClock(now func() time.Time) { s.now = now }
|
||||
|
||||
// Put records a notification for `to`. A zero ttl uses the store default. The
|
||||
// returned Notification is a snapshot; the trace id is filled in.
|
||||
func (s *Store) Put(from, to string, pack any, suit bool, ttl time.Duration) Notification {
|
||||
if ttl <= 0 {
|
||||
ttl = s.defaultTTL
|
||||
}
|
||||
now := s.now()
|
||||
n := &Notification{
|
||||
Trace: newTrace(),
|
||||
From: from,
|
||||
To: to,
|
||||
Pack: pack,
|
||||
Suit: suit,
|
||||
CreatedAt: now,
|
||||
ExpiresAt: now.Add(ttl),
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
q := s.pending[to]
|
||||
// Enforce the per-target cap by evicting the oldest queued notifications.
|
||||
for len(q) >= s.maxPerTarget && len(q) > 0 {
|
||||
evicted := q[0]
|
||||
q = q[1:]
|
||||
delete(s.byTrace, evicted.Trace)
|
||||
}
|
||||
s.pending[to] = append(q, n)
|
||||
s.byTrace[n.Trace] = n
|
||||
return *n
|
||||
}
|
||||
|
||||
// Drain returns every non-expired pending notification for `to`, marking each
|
||||
// delivered and removing it from the pending queue. Delivered notifications stay
|
||||
// in the trace index (until they expire) so their status can still be queried.
|
||||
func (s *Store) Drain(to string) []Notification {
|
||||
now := s.now()
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
q := s.pending[to]
|
||||
if len(q) == 0 {
|
||||
return nil
|
||||
}
|
||||
delete(s.pending, to)
|
||||
|
||||
out := make([]Notification, 0, len(q))
|
||||
for _, n := range q {
|
||||
if n.expired(now) {
|
||||
delete(s.byTrace, n.Trace)
|
||||
continue
|
||||
}
|
||||
n.Delivered = true
|
||||
n.DeliveredAt = now
|
||||
out = append(out, *n)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// Status returns a snapshot of the notification with the given trace.
|
||||
func (s *Store) Status(trace string) (Notification, bool) {
|
||||
now := s.now()
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
n, ok := s.byTrace[trace]
|
||||
if !ok || n.expired(now) {
|
||||
return Notification{}, false
|
||||
}
|
||||
return *n, true
|
||||
}
|
||||
|
||||
// Reply records a client's reply to a suit notification and returns the updated
|
||||
// snapshot. It fails if the trace is unknown, expired, or not a suit.
|
||||
func (s *Store) Reply(trace string, reply any) (Notification, bool) {
|
||||
now := s.now()
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
n, ok := s.byTrace[trace]
|
||||
if !ok || n.expired(now) || !n.Suit {
|
||||
return Notification{}, false
|
||||
}
|
||||
n.Replied = true
|
||||
n.Reply = reply
|
||||
n.RepliedAt = now
|
||||
return *n, true
|
||||
}
|
||||
|
||||
// PurgeExpired removes expired notifications from both indexes and returns how
|
||||
// many were removed. A janitor calls this periodically; Drain/Status/Reply also
|
||||
// drop expired entries lazily as they are touched.
|
||||
func (s *Store) PurgeExpired() int {
|
||||
now := s.now()
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
removed := 0
|
||||
for trace, n := range s.byTrace {
|
||||
if n.expired(now) {
|
||||
delete(s.byTrace, trace)
|
||||
removed++
|
||||
}
|
||||
}
|
||||
for to, q := range s.pending {
|
||||
kept := q[:0]
|
||||
for _, n := range q {
|
||||
if !n.expired(now) {
|
||||
kept = append(kept, n)
|
||||
}
|
||||
}
|
||||
if len(kept) == 0 {
|
||||
delete(s.pending, to)
|
||||
} else {
|
||||
s.pending[to] = kept
|
||||
}
|
||||
}
|
||||
return removed
|
||||
}
|
||||
|
||||
// PendingCount reports how many notifications are queued for a target (test aid).
|
||||
func (s *Store) PendingCount(to string) int {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return len(s.pending[to])
|
||||
}
|
||||
|
||||
// StartJanitor runs PurgeExpired every interval until the returned stop function
|
||||
// is called. main() owns the lifecycle; the stop function makes it leak-free.
|
||||
func (s *Store) StartJanitor(interval time.Duration) (stop func()) {
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
t := time.NewTicker(interval)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-t.C:
|
||||
s.PurgeExpired()
|
||||
case <-done:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
var once sync.Once
|
||||
return func() { once.Do(func() { close(done) }) }
|
||||
}
|
||||
|
||||
func newTrace() string {
|
||||
var b [16]byte
|
||||
_, _ = rand.Read(b[:])
|
||||
return hex.EncodeToString(b[:])
|
||||
}
|
||||
|
|
@ -0,0 +1,121 @@
|
|||
package notify
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestPutAndDrainDelivers(t *testing.T) {
|
||||
s := NewStore()
|
||||
n := s.Put("alice", "bob", map[string]any{"text": "hi"}, false, 0)
|
||||
if n.Trace == "" {
|
||||
t.Fatal("Put should assign a trace id")
|
||||
}
|
||||
if s.PendingCount("bob") != 1 {
|
||||
t.Fatalf("pending for bob = %d, want 1", s.PendingCount("bob"))
|
||||
}
|
||||
|
||||
got := s.Drain("bob")
|
||||
if len(got) != 1 || got[0].Trace != n.Trace {
|
||||
t.Fatalf("Drain = %+v, want the queued notification", got)
|
||||
}
|
||||
if !got[0].Delivered {
|
||||
t.Fatal("drained notification should be marked delivered")
|
||||
}
|
||||
if s.PendingCount("bob") != 0 {
|
||||
t.Fatal("pending should be empty after drain")
|
||||
}
|
||||
|
||||
// Status still works after delivery.
|
||||
st, ok := s.Status(n.Trace)
|
||||
if !ok || !st.Delivered {
|
||||
t.Fatalf("Status after drain = %+v, ok=%v", st, ok)
|
||||
}
|
||||
}
|
||||
|
||||
func TestExpiryDropsNotification(t *testing.T) {
|
||||
s := NewStore()
|
||||
now := time.Unix(1000, 0)
|
||||
s.SetClock(func() time.Time { return now })
|
||||
|
||||
s.Put("srv", "bob", "payload", false, 5*time.Second)
|
||||
|
||||
// Advance past expiry; the queued message must not be delivered.
|
||||
now = now.Add(10 * time.Second)
|
||||
if got := s.Drain("bob"); len(got) != 0 {
|
||||
t.Fatalf("expired notification was delivered: %+v", got)
|
||||
}
|
||||
if removed := s.PurgeExpired(); removed != 0 {
|
||||
// Drain already dropped it from byTrace; nothing left to purge.
|
||||
t.Fatalf("PurgeExpired removed %d, want 0 (already dropped on drain)", removed)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPurgeExpiredReclaimsUndeliveredTraces(t *testing.T) {
|
||||
s := NewStore()
|
||||
now := time.Unix(0, 0)
|
||||
s.SetClock(func() time.Time { return now })
|
||||
|
||||
// A target that never connects: its messages must still be reclaimed.
|
||||
s.Put("srv", "ghost", "a", false, time.Second)
|
||||
s.Put("srv", "ghost", "b", false, time.Second)
|
||||
|
||||
now = now.Add(2 * time.Second)
|
||||
removed := s.PurgeExpired()
|
||||
if removed != 2 {
|
||||
t.Fatalf("PurgeExpired removed %d, want 2", removed)
|
||||
}
|
||||
if s.PendingCount("ghost") != 0 {
|
||||
t.Fatal("expired pending queue should be gone")
|
||||
}
|
||||
}
|
||||
|
||||
func TestPerTargetCapEvictsOldest(t *testing.T) {
|
||||
s := NewStore()
|
||||
s.maxPerTarget = 3
|
||||
|
||||
var traces []string
|
||||
for i := 0; i < 5; i++ {
|
||||
traces = append(traces, s.Put("srv", "bob", i, false, 0).Trace)
|
||||
}
|
||||
if s.PendingCount("bob") != 3 {
|
||||
t.Fatalf("pending = %d, want capped at 3", s.PendingCount("bob"))
|
||||
}
|
||||
// The two oldest must have been evicted from the trace index too.
|
||||
if _, ok := s.Status(traces[0]); ok {
|
||||
t.Fatal("oldest notification should have been evicted")
|
||||
}
|
||||
if _, ok := s.Status(traces[4]); !ok {
|
||||
t.Fatal("newest notification should be retained")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSuitReply(t *testing.T) {
|
||||
s := NewStore()
|
||||
n := s.Put("srv", "bob", "question", true, 0)
|
||||
|
||||
// A non-suit reply target must fail.
|
||||
plain := s.Put("srv", "bob", "fyi", false, 0)
|
||||
if _, ok := s.Reply(plain.Trace, "nope"); ok {
|
||||
t.Fatal("replying to a non-suit notification should fail")
|
||||
}
|
||||
|
||||
updated, ok := s.Reply(n.Trace, map[string]any{"answer": 42})
|
||||
if !ok || !updated.Replied {
|
||||
t.Fatalf("Reply = %+v ok=%v", updated, ok)
|
||||
}
|
||||
st, _ := s.Status(n.Trace)
|
||||
if !st.Replied {
|
||||
t.Fatal("status should reflect the reply")
|
||||
}
|
||||
if m, _ := st.Reply.(map[string]any); m["answer"] != 42 {
|
||||
t.Fatalf("stored reply = %v", st.Reply)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUnknownTraceStatus(t *testing.T) {
|
||||
s := NewStore()
|
||||
if _, ok := s.Status("does-not-exist"); ok {
|
||||
t.Fatal("unknown trace should not resolve")
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,164 @@
|
|||
package services
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"git.saqut.com/saqut/mwse/internal/datastore"
|
||||
"git.saqut.com/saqut/mwse/internal/protocol"
|
||||
"git.saqut.com/saqut/mwse/internal/ws"
|
||||
)
|
||||
|
||||
// registerDatastore wires the #45 shared data layer: active sync (collections with
|
||||
// CRUD broadcast), passive sync (a fast merge pool), and temp/permanent
|
||||
// datastores. The returned store is handed back so a janitor can reclaim expired
|
||||
// temp stores/pools.
|
||||
//
|
||||
// Wire surface (additive):
|
||||
//
|
||||
// Active sync / collection / datastore:
|
||||
// data/open {id?, kind?, primary?, expires?} -> {status, id, primary, kind, records}
|
||||
// (kind = "temp" | "permanent". The payload field is "kind", NOT "type":
|
||||
// "type" is reserved by WSTS to select the handler.)
|
||||
// data/set {id, record} -> {status, key, seq} + broadcast data/op(set)
|
||||
// data/delete {id, key} -> {status, seq} + broadcast data/op(delete)
|
||||
// data/get {id, key?} -> {status, record|records}
|
||||
//
|
||||
// Passive sync (merge pool):
|
||||
// sync/open {id?, expires?} -> {status, id, items}
|
||||
// sync/push {id, items} -> {status, added} + broadcast sync/add
|
||||
// sync/pull {id} -> {status, items}
|
||||
//
|
||||
// Broadcasts are server signals delivered to every *other* subscriber:
|
||||
//
|
||||
// [ {id, op, record|key, seq}, "data/op" ]
|
||||
// [ {id, items}, "sync/add" ]
|
||||
func registerDatastore(hub *ws.Hub) *datastore.Store {
|
||||
store := datastore.NewStore()
|
||||
|
||||
// Signal a set of subscriber ids, skipping the originator (it already holds the
|
||||
// change and learns the authoritative seq from its own reply).
|
||||
broadcast := func(subs []string, except, name string, payload map[string]any) {
|
||||
for _, id := range subs {
|
||||
if id == except {
|
||||
continue
|
||||
}
|
||||
if cl, ok := hub.Client(id); ok {
|
||||
cl.Signal(name, payload)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// A departed client must leave no residual subscription anywhere.
|
||||
hub.OnDisconnect(func(c *ws.Client) { store.UnsubscribeAll(c.ID) })
|
||||
|
||||
// ---- active sync / collection / datastore -------------------------------
|
||||
|
||||
hub.Register("data/open", func(c *ws.Client, m protocol.Message) any {
|
||||
kind := datastore.Temp
|
||||
if m.Str("kind") == string(datastore.Permanent) {
|
||||
kind = datastore.Permanent
|
||||
}
|
||||
ttl := time.Duration(m.Int("expires")) * time.Second
|
||||
ds := store.Open(m.Str("id"), kind, m.Str("primary"), ttl)
|
||||
ds.Subscribe(c.ID)
|
||||
return map[string]any{
|
||||
"status": "success",
|
||||
"id": ds.ID,
|
||||
"primary": ds.Primary,
|
||||
"kind": string(ds.Kind),
|
||||
"records": ds.Snapshot(),
|
||||
}
|
||||
})
|
||||
|
||||
hub.Register("data/set", func(c *ws.Client, m protocol.Message) any {
|
||||
ds, ok := store.Get(m.Str("id"))
|
||||
if !ok {
|
||||
return fail("NOT_FOUND")
|
||||
}
|
||||
rec, ok := ds.Set(toMap(m.Get("record")), store.Now())
|
||||
if !ok {
|
||||
return fail("PRIMARY_KEY_REQUIRED")
|
||||
}
|
||||
broadcast(ds.Subscribers(), c.ID, "data/op", map[string]any{
|
||||
"id": ds.ID,
|
||||
"op": "set",
|
||||
"record": rec,
|
||||
"seq": rec.Seq,
|
||||
})
|
||||
return map[string]any{"status": "success", "key": rec.Key, "seq": rec.Seq}
|
||||
})
|
||||
|
||||
hub.Register("data/delete", func(c *ws.Client, m protocol.Message) any {
|
||||
ds, ok := store.Get(m.Str("id"))
|
||||
if !ok {
|
||||
return fail("NOT_FOUND")
|
||||
}
|
||||
key := m.Str("key")
|
||||
seq, ok := ds.Delete(key)
|
||||
if !ok {
|
||||
return fail("NO_SUCH_KEY")
|
||||
}
|
||||
broadcast(ds.Subscribers(), c.ID, "data/op", map[string]any{
|
||||
"id": ds.ID,
|
||||
"op": "delete",
|
||||
"key": key,
|
||||
"seq": seq,
|
||||
})
|
||||
return map[string]any{"status": "success", "seq": seq}
|
||||
})
|
||||
|
||||
hub.Register("data/get", func(c *ws.Client, m protocol.Message) any {
|
||||
ds, ok := store.Get(m.Str("id"))
|
||||
if !ok {
|
||||
return fail("NOT_FOUND")
|
||||
}
|
||||
if key := m.Str("key"); key != "" {
|
||||
rec, ok := ds.Get(key)
|
||||
if !ok {
|
||||
return fail("NO_SUCH_KEY")
|
||||
}
|
||||
return map[string]any{"status": "success", "record": rec}
|
||||
}
|
||||
return map[string]any{"status": "success", "records": ds.Snapshot()}
|
||||
})
|
||||
|
||||
// ---- passive sync (merge pool) ------------------------------------------
|
||||
|
||||
hub.Register("sync/open", func(c *ws.Client, m protocol.Message) any {
|
||||
ttl := time.Duration(m.Int("expires")) * time.Second
|
||||
p := store.OpenPool(m.Str("id"), ttl)
|
||||
p.Subscribe(c.ID)
|
||||
return map[string]any{"status": "success", "id": p.ID, "items": p.Items()}
|
||||
})
|
||||
|
||||
hub.Register("sync/push", func(c *ws.Client, m protocol.Message) any {
|
||||
items := toSlice(m.Get("items"))
|
||||
added, subs, ok := store.PushPool(m.Str("id"), items)
|
||||
if !ok {
|
||||
return fail("NOT_FOUND")
|
||||
}
|
||||
if len(added) > 0 {
|
||||
broadcast(subs, c.ID, "sync/add", map[string]any{"id": m.Str("id"), "items": added})
|
||||
}
|
||||
return map[string]any{"status": "success", "added": len(added)}
|
||||
})
|
||||
|
||||
hub.Register("sync/pull", func(c *ws.Client, m protocol.Message) any {
|
||||
p, ok := store.GetPool(m.Str("id"))
|
||||
if !ok {
|
||||
return fail("NOT_FOUND")
|
||||
}
|
||||
return map[string]any{"status": "success", "items": p.Items()}
|
||||
})
|
||||
|
||||
return store
|
||||
}
|
||||
|
||||
// toSlice coerces a decoded JSON value to a slice, returning nil when it is not
|
||||
// an array.
|
||||
func toSlice(v any) []any {
|
||||
if s, ok := v.([]any); ok {
|
||||
return s
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
@ -0,0 +1,152 @@
|
|||
package services
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"testing"
|
||||
|
||||
"git.saqut.com/saqut/mwse/internal/datastore"
|
||||
"git.saqut.com/saqut/mwse/internal/testutil"
|
||||
"git.saqut.com/saqut/mwse/internal/ws"
|
||||
)
|
||||
|
||||
// waitDataOp waits for a data/op broadcast whose "op" field matches (the stream
|
||||
// may contain several — e.g. a set followed by a delete).
|
||||
func waitDataOp(t *testing.T, fc *testutil.FakeConn, op string) map[string]any {
|
||||
t.Helper()
|
||||
find := func() (map[string]any, bool) {
|
||||
for _, raw := range fc.Writes() {
|
||||
var arr []any
|
||||
if json.Unmarshal(raw, &arr) != nil || len(arr) != 2 || arr[1] != "data/op" {
|
||||
continue
|
||||
}
|
||||
if p, ok := arr[0].(map[string]any); ok && p["op"] == op {
|
||||
return p, true
|
||||
}
|
||||
}
|
||||
return nil, false
|
||||
}
|
||||
waitFor(t, func() bool { _, ok := find(); return ok })
|
||||
p, _ := find()
|
||||
return p
|
||||
}
|
||||
|
||||
// newHubReg builds a hub with services registered and returns the Registry so a
|
||||
// test can inspect the long-lived stores.
|
||||
func newHubReg() (*ws.Hub, *Registry) {
|
||||
hub := ws.NewHub()
|
||||
reg := Register(hub)
|
||||
return hub, reg
|
||||
}
|
||||
|
||||
// TestActiveSyncBroadcast is the #45 collection/CRUD-broadcast core: a set by one
|
||||
// subscriber is broadcast to every other subscriber with the authoritative seq.
|
||||
func TestActiveSyncBroadcast(t *testing.T) {
|
||||
hub, _ := newHubReg()
|
||||
a, _ := connect(hub, "alice")
|
||||
b, fb := connect(hub, "bob")
|
||||
|
||||
// Both open the same datastore by id.
|
||||
opened := asMap(t, hub.Handle(a, msg("data/open", "id", "shared", "kind", "temp", "primary", "id")))
|
||||
id := opened["id"].(string)
|
||||
hub.Handle(b, msg("data/open", "id", id))
|
||||
|
||||
// alice sets a record.
|
||||
set := asMap(t, hub.Handle(a, msg("data/set", "id", id, "record", map[string]any{"id": "k1", "v": float64(7)})))
|
||||
if set["status"] != "success" {
|
||||
t.Fatalf("data/set = %v", set)
|
||||
}
|
||||
|
||||
// bob receives the data/op broadcast (set).
|
||||
op := waitSignal(t, fb, "data/op")
|
||||
if op["op"] != "set" {
|
||||
t.Fatalf("op = %v, want set", op["op"])
|
||||
}
|
||||
rec := asMap(t, op["record"])
|
||||
if rec["key"] != "k1" {
|
||||
t.Fatalf("broadcast record key = %v, want k1", rec["key"])
|
||||
}
|
||||
|
||||
// bob can read the record back from the authoritative copy. (An in-process
|
||||
// reply carries the Go value; over a real socket it is the same shape as JSON.)
|
||||
got := asMap(t, hub.Handle(b, msg("data/get", "id", id, "key", "k1")))
|
||||
gotRec := got["record"].(datastore.Record)
|
||||
if gotRec.Key != "k1" {
|
||||
t.Fatalf("data/get = %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestActiveSyncDeleteBroadcast(t *testing.T) {
|
||||
hub, _ := newHubReg()
|
||||
a, _ := connect(hub, "alice")
|
||||
b, fb := connect(hub, "bob")
|
||||
|
||||
hub.Handle(a, msg("data/open", "id", "d", "kind", "temp"))
|
||||
hub.Handle(b, msg("data/open", "id", "d"))
|
||||
hub.Handle(a, msg("data/set", "id", "d", "record", map[string]any{"id": "x"}))
|
||||
|
||||
del := asMap(t, hub.Handle(a, msg("data/delete", "id", "d", "key", "x")))
|
||||
if del["status"] != "success" {
|
||||
t.Fatalf("data/delete = %v", del)
|
||||
}
|
||||
op := waitDataOp(t, fb, "delete")
|
||||
if op["key"] != "x" {
|
||||
t.Fatalf("delete broadcast = %v", op)
|
||||
}
|
||||
}
|
||||
|
||||
// TestPassiveSyncConvergence is the #45 merge-pool core: pushes are deduped and
|
||||
// only the delta is broadcast, so subscribers converge.
|
||||
func TestPassiveSyncConvergence(t *testing.T) {
|
||||
hub, _ := newHubReg()
|
||||
a, _ := connect(hub, "alice")
|
||||
b, fb := connect(hub, "bob")
|
||||
|
||||
opened := asMap(t, hub.Handle(a, msg("sync/open", "id", "pool")))
|
||||
id := opened["id"].(string)
|
||||
hub.Handle(b, msg("sync/open", "id", id))
|
||||
|
||||
// alice pushes two items; bob gets them as a delta.
|
||||
res := asMap(t, hub.Handle(a, msg("sync/push", "id", id, "items", []any{"x", "y"})))
|
||||
if res["added"] != 2 {
|
||||
t.Fatalf("push added = %v, want 2", res["added"])
|
||||
}
|
||||
add := waitSignal(t, fb, "sync/add")
|
||||
if items, _ := add["items"].([]any); len(items) != 2 {
|
||||
t.Fatalf("sync/add items = %v, want 2", add["items"])
|
||||
}
|
||||
|
||||
// bob pulls and sees the full converged pool.
|
||||
pull := asMap(t, hub.Handle(b, msg("sync/pull", "id", id)))
|
||||
if items, _ := pull["items"].([]any); len(items) != 2 {
|
||||
t.Fatalf("sync/pull items = %v, want 2", pull["items"])
|
||||
}
|
||||
|
||||
// A push with one new + one existing item broadcasts only the new one.
|
||||
res = asMap(t, hub.Handle(b, msg("sync/push", "id", id, "items", []any{"y", "z"})))
|
||||
if res["added"] != 1 {
|
||||
t.Fatalf("second push added = %v, want 1", res["added"])
|
||||
}
|
||||
}
|
||||
|
||||
// TestDataSubscriptionClearedOnDisconnect proves no residual subscription is left
|
||||
// behind (no leak) when a subscriber disconnects.
|
||||
func TestDataSubscriptionClearedOnDisconnect(t *testing.T) {
|
||||
hub, reg := newHubReg()
|
||||
a, _ := connect(hub, "alice")
|
||||
connect(hub, "bob")
|
||||
|
||||
hub.Handle(a, msg("data/open", "id", "d", "kind", "temp"))
|
||||
ds, ok := reg.Data.Get("d")
|
||||
if !ok {
|
||||
t.Fatal("datastore should exist")
|
||||
}
|
||||
if len(ds.Subscribers()) != 1 {
|
||||
t.Fatalf("subscribers before disconnect = %v, want [alice]", ds.Subscribers())
|
||||
}
|
||||
|
||||
hub.Disconnect(a)
|
||||
|
||||
if len(ds.Subscribers()) != 0 {
|
||||
t.Fatalf("subscribers after disconnect = %v, want empty", ds.Subscribers())
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,95 @@
|
|||
package services
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"git.saqut.com/saqut/mwse/internal/notify"
|
||||
"git.saqut.com/saqut/mwse/internal/protocol"
|
||||
"git.saqut.com/saqut/mwse/internal/ws"
|
||||
)
|
||||
|
||||
// registerNotify wires the store-and-forward notification service (#43) and its
|
||||
// reply-bearing suit variant (#44). The returned store is handed to the caller so
|
||||
// a janitor can reclaim expired entries.
|
||||
//
|
||||
// Wire surface (all additive — the frozen relay contract is untouched):
|
||||
//
|
||||
// - notify/send {to, pack, expires?, suit?} -> {status, trace}
|
||||
// - notify/status {trace} -> {status, delivered, replied, reply}
|
||||
// - notify/reply {trace, pack} -> {status} (client answering a suit)
|
||||
//
|
||||
// Delivery to the target is the server signal:
|
||||
//
|
||||
// [ {from, pack, trace, suit}, "notify" ]
|
||||
func registerNotify(hub *ws.Hub, trigger NotifyTrigger) *notify.Store {
|
||||
store := notify.NewStore()
|
||||
|
||||
deliver := func(c *ws.Client, n notify.Notification) {
|
||||
c.Signal("notify", map[string]any{
|
||||
"from": n.From,
|
||||
"pack": n.Pack,
|
||||
"trace": n.Trace,
|
||||
"suit": n.Suit,
|
||||
})
|
||||
}
|
||||
|
||||
// flush drains and delivers everything queued for a connected client.
|
||||
flush := func(c *ws.Client) {
|
||||
for _, n := range store.Drain(c.ID) {
|
||||
deliver(c, n)
|
||||
}
|
||||
}
|
||||
|
||||
// On connect, deliver anything that arrived while the client was offline.
|
||||
hub.OnConnect(flush)
|
||||
|
||||
hub.Register("notify/send", func(c *ws.Client, m protocol.Message) any {
|
||||
to := m.Str("to")
|
||||
if to == "" {
|
||||
return fail("TO_REQUIRED")
|
||||
}
|
||||
ttl := time.Duration(m.Int("expires")) * time.Second // 0 => store default
|
||||
n := store.Put(c.ID, to, m.Get("pack"), m.Truthy("suit"), ttl)
|
||||
|
||||
// If the target is connected right now, deliver immediately; otherwise it
|
||||
// stays queued until the target's next connect.
|
||||
if target, ok := hub.Client(to); ok {
|
||||
flush(target)
|
||||
}
|
||||
return map[string]any{"status": "success", "trace": n.Trace}
|
||||
})
|
||||
|
||||
hub.Register("notify/status", func(c *ws.Client, m protocol.Message) any {
|
||||
n, ok := store.Status(m.Str("trace"))
|
||||
if !ok {
|
||||
return fail("NOT_FOUND")
|
||||
}
|
||||
return map[string]any{
|
||||
"status": "success",
|
||||
"delivered": n.Delivered,
|
||||
"replied": n.Replied,
|
||||
"reply": n.Reply,
|
||||
}
|
||||
})
|
||||
|
||||
hub.Register("notify/reply", func(c *ws.Client, m protocol.Message) any {
|
||||
n, ok := store.Reply(m.Str("trace"), m.Get("pack"))
|
||||
if !ok {
|
||||
return fail("NOT_FOUND_OR_NOT_SUIT")
|
||||
}
|
||||
// Push the reply outward to the 3rd-party application server (#44): MWSE
|
||||
// triggers it manually rather than having the server poll notify/status.
|
||||
trigger.NotifyReplied(n)
|
||||
// If the original sender is an online client, signal it the reply too.
|
||||
if origin, ok := hub.Client(n.From); ok {
|
||||
origin.Signal("notify/reply", map[string]any{
|
||||
"trace": n.Trace,
|
||||
"from": c.ID,
|
||||
"pack": n.Reply,
|
||||
})
|
||||
}
|
||||
return success()
|
||||
})
|
||||
|
||||
return store
|
||||
}
|
||||
|
|
@ -0,0 +1,110 @@
|
|||
package services
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"git.saqut.com/saqut/mwse/internal/notify"
|
||||
"git.saqut.com/saqut/mwse/internal/ws"
|
||||
)
|
||||
|
||||
// recTrigger records suit replies pushed outward (the #44 3rd-party trigger).
|
||||
type recTrigger struct {
|
||||
mu sync.Mutex
|
||||
got []notify.Notification
|
||||
}
|
||||
|
||||
func (r *recTrigger) NotifyReplied(n notify.Notification) {
|
||||
r.mu.Lock()
|
||||
r.got = append(r.got, n)
|
||||
r.mu.Unlock()
|
||||
}
|
||||
|
||||
func (r *recTrigger) count() int {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
return len(r.got)
|
||||
}
|
||||
|
||||
// TestNotifyOfflineThenDeliverOnConnect is the #43 store-and-forward core: a
|
||||
// message left for an offline client is delivered when that client connects.
|
||||
func TestNotifyOfflineThenDeliverOnConnect(t *testing.T) {
|
||||
hub := newHub()
|
||||
a, _ := connect(hub, "alice")
|
||||
|
||||
res := asMap(t, hub.Handle(a, msg("notify/send", "to", "bob", "pack", map[string]any{"text": "hi"})))
|
||||
if res["status"] != "success" {
|
||||
t.Fatalf("notify/send = %v", res)
|
||||
}
|
||||
trace, _ := res["trace"].(string)
|
||||
if trace == "" {
|
||||
t.Fatal("notify/send should return a trace id")
|
||||
}
|
||||
|
||||
// bob was offline; now connects and must receive the queued notification.
|
||||
_, fb := connect(hub, "bob")
|
||||
sig := waitSignal(t, fb, "notify")
|
||||
if sig["trace"] != trace {
|
||||
t.Fatalf("delivered trace = %v, want %s", sig["trace"], trace)
|
||||
}
|
||||
if p, _ := sig["pack"].(map[string]any); p["text"] != "hi" {
|
||||
t.Fatalf("delivered pack = %v", sig["pack"])
|
||||
}
|
||||
|
||||
// Status reports delivered.
|
||||
st := asMap(t, hub.Handle(a, msg("notify/status", "trace", trace)))
|
||||
if st["delivered"] != true {
|
||||
t.Fatalf("status = %v, want delivered", st)
|
||||
}
|
||||
}
|
||||
|
||||
// TestNotifyImmediateWhenOnline delivers without waiting when the target is up.
|
||||
func TestNotifyImmediateWhenOnline(t *testing.T) {
|
||||
hub := newHub()
|
||||
a, _ := connect(hub, "alice")
|
||||
_, fb := connect(hub, "bob")
|
||||
|
||||
hub.Handle(a, msg("notify/send", "to", "bob", "pack", map[string]any{"text": "now"}))
|
||||
sig := waitSignal(t, fb, "notify")
|
||||
if p, _ := sig["pack"].(map[string]any); p["text"] != "now" {
|
||||
t.Fatalf("immediate delivery pack = %v", sig["pack"])
|
||||
}
|
||||
}
|
||||
|
||||
// TestNotifySuitReply is the #44 reply path: a suit notification's reply reaches
|
||||
// the 3rd-party trigger and is signalled back to the origin client.
|
||||
func TestNotifySuitReply(t *testing.T) {
|
||||
trig := &recTrigger{}
|
||||
hub := ws.NewHub()
|
||||
Register(hub, WithNotifyTrigger(trig))
|
||||
|
||||
a, fa := connect(hub, "alice")
|
||||
b, fb := connect(hub, "bob")
|
||||
|
||||
res := asMap(t, hub.Handle(a, msg("notify/send", "to", "bob", "suit", true, "pack", map[string]any{"q": "ok?"})))
|
||||
trace, _ := res["trace"].(string)
|
||||
|
||||
sig := waitSignal(t, fb, "notify")
|
||||
if sig["suit"] != true {
|
||||
t.Fatalf("notify suit flag = %v, want true", sig["suit"])
|
||||
}
|
||||
|
||||
// bob replies to the suit.
|
||||
rep := asMap(t, hub.Handle(b, msg("notify/reply", "trace", trace, "pack", map[string]any{"a": "yes"})))
|
||||
if rep["status"] != "success" {
|
||||
t.Fatalf("notify/reply = %v", rep)
|
||||
}
|
||||
|
||||
// The 3rd-party trigger fired, and the origin (alice) got the reply signal.
|
||||
waitFor(t, func() bool { return trig.count() == 1 })
|
||||
got := waitSignal(t, fa, "notify/reply")
|
||||
if p, _ := got["pack"].(map[string]any); p["a"] != "yes" {
|
||||
t.Fatalf("origin reply signal pack = %v", got["pack"])
|
||||
}
|
||||
|
||||
// A non-suit reply must be rejected.
|
||||
plain := asMap(t, hub.Handle(a, msg("notify/send", "to", "bob", "pack", map[string]any{})))
|
||||
if r := asMap(t, hub.Handle(b, msg("notify/reply", "trace", plain["trace"], "pack", map[string]any{}))); r["status"] != "fail" {
|
||||
t.Fatalf("reply to non-suit = %v, want fail", r)
|
||||
}
|
||||
}
|
||||
|
|
@ -14,21 +14,71 @@ import (
|
|||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
|
||||
"git.saqut.com/saqut/mwse/internal/datastore"
|
||||
"git.saqut.com/saqut/mwse/internal/notify"
|
||||
"git.saqut.com/saqut/mwse/internal/protocol"
|
||||
"git.saqut.com/saqut/mwse/internal/ws"
|
||||
)
|
||||
|
||||
// Register wires every service onto the hub. Call once during startup, before the
|
||||
// server begins accepting connections. The order mirrors the Node require() order
|
||||
// so connect-time side effects (id message, private room, session defaults)
|
||||
// happen in the same sequence.
|
||||
func Register(hub *ws.Hub) {
|
||||
// NotifyTrigger is invoked when a suit notification receives a reply, so MWSE can
|
||||
// push the result to a 3rd-party application server (#44) instead of that server
|
||||
// polling for it. The default is a no-op; a real HTTP-backed implementation lives
|
||||
// in the bridge wiring (#46).
|
||||
type NotifyTrigger interface {
|
||||
NotifyReplied(n notify.Notification)
|
||||
}
|
||||
|
||||
type noopTrigger struct{}
|
||||
|
||||
func (noopTrigger) NotifyReplied(notify.Notification) {}
|
||||
|
||||
// options collects the externally-wired integrations a deployment may supply.
|
||||
type options struct {
|
||||
notifyTrigger NotifyTrigger
|
||||
}
|
||||
|
||||
// Option configures Register.
|
||||
type Option func(*options)
|
||||
|
||||
// WithNotifyTrigger sets the 3rd-party trigger fired on suit replies (#44).
|
||||
func WithNotifyTrigger(t NotifyTrigger) Option {
|
||||
return func(o *options) {
|
||||
if t != nil {
|
||||
o.notifyTrigger = t
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Registry exposes the long-lived stores the services own so the caller (main)
|
||||
// can manage their lifecycle — for example start the notify/datastore janitors
|
||||
// that reclaim expired entries.
|
||||
type Registry struct {
|
||||
Notify *notify.Store
|
||||
Data *datastore.Store
|
||||
}
|
||||
|
||||
// Register wires every service onto the hub and returns a Registry of the stores
|
||||
// that need lifecycle management. Call once during startup, before the server
|
||||
// begins accepting connections. The order mirrors the Node require() order so
|
||||
// connect-time side effects (id message, private room, session defaults) happen
|
||||
// in the same sequence; the data-sync services (#43–#45) are appended after.
|
||||
func Register(hub *ws.Hub, opts ...Option) *Registry {
|
||||
o := options{notifyTrigger: noopTrigger{}}
|
||||
for _, apply := range opts {
|
||||
apply(&o)
|
||||
}
|
||||
|
||||
registerYourID(hub)
|
||||
registerAuth(hub)
|
||||
registerRoom(hub)
|
||||
registerDataTransfer(hub)
|
||||
registerIPPressure(hub, nil)
|
||||
registerSession(hub)
|
||||
|
||||
return &Registry{
|
||||
Notify: registerNotify(hub, o.notifyTrigger),
|
||||
Data: registerDatastore(hub),
|
||||
}
|
||||
}
|
||||
|
||||
// ---- small response helpers ---------------------------------------------
|
||||
|
|
|
|||
11
main.go
11
main.go
|
|
@ -12,6 +12,7 @@ import (
|
|||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"git.saqut.com/saqut/mwse/internal/config"
|
||||
"git.saqut.com/saqut/mwse/internal/httpserver"
|
||||
|
|
@ -23,7 +24,15 @@ func main() {
|
|||
cfg := config.Load()
|
||||
|
||||
hub := ws.NewHub()
|
||||
services.Register(hub)
|
||||
reg := services.Register(hub)
|
||||
|
||||
// The notify and datastore stores hold entries with a TTL; their janitors
|
||||
// reclaim expired ones so memory cannot grow without bound. They are stopped
|
||||
// during shutdown so no goroutine is leaked.
|
||||
stopNotify := reg.Notify.StartJanitor(time.Minute)
|
||||
stopData := reg.Data.StartJanitor(time.Minute)
|
||||
defer stopNotify()
|
||||
defer stopData()
|
||||
|
||||
srv := httpserver.New(hub, cfg)
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue