MWSE/internal/datastore/datastore.go

280 lines
7.2 KiB
Go

// Package datastore implements the shared data layer of #45: server-authoritative
// collections that clients mutate with CRUD operations (active sync) and a fast
// merge pool that clients converge through (passive sync), plus temp/permanent
// datastores addressable by a public id.
//
// The package is deliberately free of any transport dependency: it owns the data
// and the conflict rules, and returns the subscriber ids that a mutation must be
// broadcast to. The service layer (internal/services) resolves those ids to
// connections and emits the signals. That split keeps the data rules unit-testable
// without a hub or a socket.
//
// # Conflict resolution
//
// Every mutation is serialized through the datastore's lock and stamped with a
// monotonically increasing sequence number. "Arrival-time priority" (the rule the
// issue asks for) therefore falls out naturally: concurrent writes are ordered by
// the moment they acquire the lock, and the last arrival wins. The sequence number
// rides along on every broadcast so clients converge to the same state.
package datastore
import (
"crypto/rand"
"encoding/hex"
"sort"
"sync"
"time"
)
// Kind selects the lifetime of a datastore.
type Kind string
const (
Temp Kind = "temp" // expires after a TTL (default)
Permanent Kind = "permanent" // never expires
)
// Record is one row of a collection, keyed by the value of the datastore's
// primary field.
type Record struct {
Key string `json:"key"`
Data map[string]any `json:"data"`
Seq uint64 `json:"seq"`
Updated time.Time `json:"-"`
}
// Datastore is a server-authoritative shared table.
type Datastore struct {
ID string
Kind Kind
Primary string // primary key field name (default "id")
ExpiresAt time.Time
mu sync.RWMutex
records map[string]*Record
subscribers map[string]struct{}
seq uint64
}
func (d *Datastore) expired(now time.Time) bool {
return !d.ExpiresAt.IsZero() && now.After(d.ExpiresAt)
}
// Subscribe registers a client id as interested in this datastore's broadcasts.
func (d *Datastore) Subscribe(clientID string) {
d.mu.Lock()
d.subscribers[clientID] = struct{}{}
d.mu.Unlock()
}
// Unsubscribe removes a client id.
func (d *Datastore) Unsubscribe(clientID string) {
d.mu.Lock()
delete(d.subscribers, clientID)
d.mu.Unlock()
}
// Subscribers returns the current subscriber ids.
func (d *Datastore) Subscribers() []string {
d.mu.RLock()
defer d.mu.RUnlock()
out := make([]string, 0, len(d.subscribers))
for id := range d.subscribers {
out = append(out, id)
}
return out
}
// Set upserts a record and returns it stamped with a fresh sequence number. The
// primary key is read from data[Primary]; if absent or not a string, ok is false.
func (d *Datastore) Set(data map[string]any, now time.Time) (Record, bool) {
key, ok := data[d.Primary].(string)
if !ok || key == "" {
return Record{}, false
}
d.mu.Lock()
defer d.mu.Unlock()
d.seq++
rec := &Record{Key: key, Data: data, Seq: d.seq, Updated: now}
d.records[key] = rec
return *rec, true
}
// Delete removes a record, returning the sequence number of the delete op (so the
// broadcast can be ordered against sets) and whether a record existed.
func (d *Datastore) Delete(key string) (uint64, bool) {
d.mu.Lock()
defer d.mu.Unlock()
if _, ok := d.records[key]; !ok {
return 0, false
}
d.seq++
delete(d.records, key)
return d.seq, true
}
// Get returns a copy of one record.
func (d *Datastore) Get(key string) (Record, bool) {
d.mu.RLock()
defer d.mu.RUnlock()
rec, ok := d.records[key]
if !ok {
return Record{}, false
}
return *rec, true
}
// Snapshot returns all records ordered by sequence number (insertion/update
// order), which is the order a freshly-opening client should apply them in.
func (d *Datastore) Snapshot() []Record {
d.mu.RLock()
defer d.mu.RUnlock()
out := make([]Record, 0, len(d.records))
for _, rec := range d.records {
out = append(out, *rec)
}
sort.Slice(out, func(i, j int) bool { return out[i].Seq < out[j].Seq })
return out
}
// Store owns all named datastores (active sync) and merge pools (passive sync).
type Store struct {
mu sync.Mutex
stores map[string]*Datastore
pools map[string]*Pool
now func() time.Time
ttl time.Duration
}
// NewStore returns an empty store. Temp datastores and pools default to a 1h TTL.
func NewStore() *Store {
return &Store{
stores: make(map[string]*Datastore),
pools: make(map[string]*Pool),
now: time.Now,
ttl: time.Hour,
}
}
// SetClock replaces the time source; intended for deterministic tests.
func (s *Store) SetClock(now func() time.Time) { s.now = now }
// Open returns the datastore with the given id, creating it if absent. An empty
// id is replaced with a fresh public id. An empty primary defaults to "id". A
// zero ttl on a temp datastore uses the store default; permanent ignores ttl.
func (s *Store) Open(id string, kind Kind, primary string, ttl time.Duration) *Datastore {
s.mu.Lock()
defer s.mu.Unlock()
if id == "" {
id = newID()
}
if existing, ok := s.stores[id]; ok {
return existing
}
if primary == "" {
primary = "id"
}
if kind != Permanent {
kind = Temp
}
d := &Datastore{
ID: id,
Kind: kind,
Primary: primary,
records: make(map[string]*Record),
subscribers: make(map[string]struct{}),
}
if kind == Temp {
if ttl <= 0 {
ttl = s.ttl
}
d.ExpiresAt = s.now().Add(ttl)
}
s.stores[id] = d
return d
}
// Get returns an existing, non-expired datastore.
func (s *Store) Get(id string) (*Datastore, bool) {
now := s.now()
s.mu.Lock()
defer s.mu.Unlock()
d, ok := s.stores[id]
if !ok || d.expired(now) {
return nil, false
}
return d, true
}
// UnsubscribeAll drops a client id from every datastore and pool. Called on
// disconnect so a departed client leaves no residual subscription.
func (s *Store) UnsubscribeAll(clientID string) {
s.mu.Lock()
stores := make([]*Datastore, 0, len(s.stores))
for _, d := range s.stores {
stores = append(stores, d)
}
pools := make([]*Pool, 0, len(s.pools))
for _, p := range s.pools {
pools = append(pools, p)
}
s.mu.Unlock()
for _, d := range stores {
d.Unsubscribe(clientID)
}
for _, p := range pools {
p.Unsubscribe(clientID)
}
}
// PurgeExpired removes expired temp datastores and pools, returning the total
// number removed.
func (s *Store) PurgeExpired() int {
now := s.now()
s.mu.Lock()
defer s.mu.Unlock()
removed := 0
for id, d := range s.stores {
if d.expired(now) {
delete(s.stores, id)
removed++
}
}
for id, p := range s.pools {
if p.expired(now) {
delete(s.pools, id)
removed++
}
}
return removed
}
// StartJanitor purges expired temp datastores every interval until stop is called.
func (s *Store) StartJanitor(interval time.Duration) (stop func()) {
done := make(chan struct{})
go func() {
t := time.NewTicker(interval)
defer t.Stop()
for {
select {
case <-t.C:
s.PurgeExpired()
case <-done:
return
}
}
}()
var once sync.Once
return func() { once.Do(func() { close(done) }) }
}
// Now exposes the store clock to the service layer so record timestamps share it.
func (s *Store) Now() time.Time { return s.now() }
func newID() string {
var b [12]byte
_, _ = rand.Read(b[:])
return hex.EncodeToString(b[:])
}