MWSE/internal/datastore/pool.go

126 lines
3.5 KiB
Go

package datastore
import (
"crypto/sha256"
"encoding/hex"
"encoding/json"
"time"
)
// Pool is the passive-sync primitive (#45 part 1): a shared set of unique items
// that clients converge by pushing their local items and pulling the merged
// result. Items are deduplicated by the hash of their canonical JSON, so pushing
// the same item from many clients adds it exactly once. Insertion order is kept so
// every client observes the pool in the same order.
type Pool struct {
ID string
ExpiresAt time.Time
items map[string]any // contentHash -> item
order []string // contentHashes in insertion order
subscribers map[string]struct{}
}
func (p *Pool) expired(now time.Time) bool {
return !p.ExpiresAt.IsZero() && now.After(p.ExpiresAt)
}
// Subscribe / Unsubscribe / Subscribers mirror Datastore's, guarded by the
// owning Store's lock (pool mutation always happens under that lock).
func (p *Pool) Subscribe(clientID string) { p.subscribers[clientID] = struct{}{} }
func (p *Pool) Unsubscribe(clientID string) { delete(p.subscribers, clientID) }
func (p *Pool) Subscribers() []string {
out := make([]string, 0, len(p.subscribers))
for id := range p.subscribers {
out = append(out, id)
}
return out
}
// Items returns the merged pool in insertion order.
func (p *Pool) Items() []any {
out := make([]any, 0, len(p.order))
for _, h := range p.order {
out = append(out, p.items[h])
}
return out
}
// merge adds items not already present, returning only the newly added ones (so
// the caller broadcasts the minimal delta). Convergence is reached when a client
// has pushed and pulled until no side has new items.
func (p *Pool) merge(items []any) []any {
added := make([]any, 0, len(items))
for _, it := range items {
h := contentHash(it)
if _, ok := p.items[h]; ok {
continue
}
p.items[h] = it
p.order = append(p.order, h)
added = append(added, it)
}
return added
}
// OpenPool returns the pool with the given id, creating it if absent. An empty id
// is replaced with a fresh public id; a zero ttl uses the store default. Pools are
// always temporary (passive sync is for fast, ephemeral convergence).
func (s *Store) OpenPool(id string, ttl time.Duration) *Pool {
s.mu.Lock()
defer s.mu.Unlock()
if id == "" {
id = newID()
}
if p, ok := s.pools[id]; ok {
return p
}
if ttl <= 0 {
ttl = s.ttl
}
p := &Pool{
ID: id,
ExpiresAt: s.now().Add(ttl),
items: make(map[string]any),
subscribers: make(map[string]struct{}),
}
s.pools[id] = p
return p
}
// GetPool returns an existing, non-expired pool.
func (s *Store) GetPool(id string) (*Pool, bool) {
now := s.now()
s.mu.Lock()
defer s.mu.Unlock()
p, ok := s.pools[id]
if !ok || p.expired(now) {
return nil, false
}
return p, true
}
// PushPool merges items into the pool under the store lock and returns the newly
// added items plus the current subscriber list for broadcasting.
func (s *Store) PushPool(id string, items []any) (added []any, subscribers []string, ok bool) {
now := s.now()
s.mu.Lock()
defer s.mu.Unlock()
p, exists := s.pools[id]
if !exists || p.expired(now) {
return nil, nil, false
}
return p.merge(items), p.Subscribers(), true
}
// contentHash is the dedup key: the SHA-256 of the item's canonical JSON.
func contentHash(item any) string {
b, err := json.Marshal(item)
if err != nil {
// Non-serialisable items fall back to a per-call unique key (never deduped).
return newID()
}
sum := sha256.Sum256(b)
return hex.EncodeToString(sum[:])
}