package datastore import ( "crypto/sha256" "encoding/hex" "encoding/json" "time" ) // Pool is the passive-sync primitive (#45 part 1): a shared set of unique items // that clients converge by pushing their local items and pulling the merged // result. Items are deduplicated by the hash of their canonical JSON, so pushing // the same item from many clients adds it exactly once. Insertion order is kept so // every client observes the pool in the same order. type Pool struct { ID string ExpiresAt time.Time items map[string]any // contentHash -> item order []string // contentHashes in insertion order subscribers map[string]struct{} } func (p *Pool) expired(now time.Time) bool { return !p.ExpiresAt.IsZero() && now.After(p.ExpiresAt) } // Subscribe / Unsubscribe / Subscribers mirror Datastore's, guarded by the // owning Store's lock (pool mutation always happens under that lock). func (p *Pool) Subscribe(clientID string) { p.subscribers[clientID] = struct{}{} } func (p *Pool) Unsubscribe(clientID string) { delete(p.subscribers, clientID) } func (p *Pool) Subscribers() []string { out := make([]string, 0, len(p.subscribers)) for id := range p.subscribers { out = append(out, id) } return out } // Items returns the merged pool in insertion order. func (p *Pool) Items() []any { out := make([]any, 0, len(p.order)) for _, h := range p.order { out = append(out, p.items[h]) } return out } // merge adds items not already present, returning only the newly added ones (so // the caller broadcasts the minimal delta). Convergence is reached when a client // has pushed and pulled until no side has new items. func (p *Pool) merge(items []any) []any { added := make([]any, 0, len(items)) for _, it := range items { h := contentHash(it) if _, ok := p.items[h]; ok { continue } p.items[h] = it p.order = append(p.order, h) added = append(added, it) } return added } // OpenPool returns the pool with the given id, creating it if absent. An empty id // is replaced with a fresh public id; a zero ttl uses the store default. Pools are // always temporary (passive sync is for fast, ephemeral convergence). func (s *Store) OpenPool(id string, ttl time.Duration) *Pool { s.mu.Lock() defer s.mu.Unlock() if id == "" { id = newID() } if p, ok := s.pools[id]; ok { return p } if ttl <= 0 { ttl = s.ttl } p := &Pool{ ID: id, ExpiresAt: s.now().Add(ttl), items: make(map[string]any), subscribers: make(map[string]struct{}), } s.pools[id] = p return p } // GetPool returns an existing, non-expired pool. func (s *Store) GetPool(id string) (*Pool, bool) { now := s.now() s.mu.Lock() defer s.mu.Unlock() p, ok := s.pools[id] if !ok || p.expired(now) { return nil, false } return p, true } // PushPool merges items into the pool under the store lock and returns the newly // added items plus the current subscriber list for broadcasting. func (s *Store) PushPool(id string, items []any) (added []any, subscribers []string, ok bool) { now := s.now() s.mu.Lock() defer s.mu.Unlock() p, exists := s.pools[id] if !exists || p.expired(now) { return nil, nil, false } return p.merge(items), p.Subscribers(), true } // contentHash is the dedup key: the SHA-256 of the item's canonical JSON. func contentHash(item any) string { b, err := json.Marshal(item) if err != nil { // Non-serialisable items fall back to a per-call unique key (never deduped). return newID() } sum := sha256.Sum256(b) return hex.EncodeToString(sum[:]) }