185 lines
4.6 KiB
Go
185 lines
4.6 KiB
Go
package datastore
|
|
|
|
import (
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
func TestDatastoreSetGetSnapshot(t *testing.T) {
|
|
s := NewStore()
|
|
d := s.Open("notes", Temp, "id", 0)
|
|
|
|
r1, ok := d.Set(map[string]any{"id": "a", "v": 1}, s.Now())
|
|
if !ok || r1.Seq != 1 {
|
|
t.Fatalf("first set = %+v ok=%v", r1, ok)
|
|
}
|
|
r2, _ := d.Set(map[string]any{"id": "b", "v": 2}, s.Now())
|
|
if r2.Seq != 2 {
|
|
t.Fatalf("second set seq = %d, want 2", r2.Seq)
|
|
}
|
|
|
|
// Update of a existing key gets a fresh (higher) seq — last write wins.
|
|
r1b, _ := d.Set(map[string]any{"id": "a", "v": 99}, s.Now())
|
|
if r1b.Seq <= r2.Seq {
|
|
t.Fatalf("update seq = %d, want > %d", r1b.Seq, r2.Seq)
|
|
}
|
|
|
|
got, ok := d.Get("a")
|
|
if !ok || got.Data["v"] != 99 {
|
|
t.Fatalf("get a = %+v", got)
|
|
}
|
|
|
|
snap := d.Snapshot()
|
|
if len(snap) != 2 {
|
|
t.Fatalf("snapshot len = %d, want 2", len(snap))
|
|
}
|
|
// Snapshot is ordered by seq; the updated "a" now sorts last.
|
|
if snap[len(snap)-1].Key != "a" {
|
|
t.Fatalf("snapshot order wrong: %+v", snap)
|
|
}
|
|
}
|
|
|
|
func TestDatastoreSetRequiresPrimary(t *testing.T) {
|
|
s := NewStore()
|
|
d := s.Open("x", Temp, "id", 0)
|
|
if _, ok := d.Set(map[string]any{"noid": true}, s.Now()); ok {
|
|
t.Fatal("set without the primary key should fail")
|
|
}
|
|
}
|
|
|
|
func TestDatastoreDelete(t *testing.T) {
|
|
s := NewStore()
|
|
d := s.Open("x", Temp, "id", 0)
|
|
d.Set(map[string]any{"id": "a"}, s.Now())
|
|
|
|
seq, ok := d.Delete("a")
|
|
if !ok || seq == 0 {
|
|
t.Fatalf("delete = seq %d ok %v", seq, ok)
|
|
}
|
|
if _, ok := d.Get("a"); ok {
|
|
t.Fatal("record should be gone after delete")
|
|
}
|
|
if _, ok := d.Delete("a"); ok {
|
|
t.Fatal("deleting a missing key should fail")
|
|
}
|
|
}
|
|
|
|
// TestConcurrentSetsResolveByArrival is the #45 conflict-resolution guarantee:
|
|
// concurrent writes are serialized by the datastore lock, every write gets a
|
|
// unique monotonic seq, and the final value is whichever write arrived last.
|
|
func TestConcurrentSetsResolveByArrival(t *testing.T) {
|
|
s := NewStore()
|
|
d := s.Open("race", Temp, "id", 0)
|
|
|
|
const n = 200
|
|
var wg sync.WaitGroup
|
|
seqs := make([]uint64, n)
|
|
for i := 0; i < n; i++ {
|
|
wg.Add(1)
|
|
go func(i int) {
|
|
defer wg.Done()
|
|
rec, _ := d.Set(map[string]any{"id": "k", "writer": i}, s.Now())
|
|
seqs[i] = rec.Seq
|
|
}(i)
|
|
}
|
|
wg.Wait()
|
|
|
|
// All seqs unique and within [1,n].
|
|
seen := make(map[uint64]bool, n)
|
|
var max uint64
|
|
for _, sq := range seqs {
|
|
if sq == 0 || sq > n || seen[sq] {
|
|
t.Fatalf("bad/duplicate seq %d", sq)
|
|
}
|
|
seen[sq] = true
|
|
if sq > max {
|
|
max = sq
|
|
}
|
|
}
|
|
// The surviving record must be the one with the highest seq (last arrival).
|
|
final, _ := d.Get("k")
|
|
if final.Seq != max {
|
|
t.Fatalf("final record seq = %d, want max %d", final.Seq, max)
|
|
}
|
|
}
|
|
|
|
func TestTempExpiryAndPurge(t *testing.T) {
|
|
s := NewStore()
|
|
now := time.Unix(0, 0)
|
|
s.SetClock(func() time.Time { return now })
|
|
|
|
s.Open("temp1", Temp, "id", time.Second)
|
|
s.Open("perm1", Permanent, "id", 0)
|
|
|
|
now = now.Add(2 * time.Second)
|
|
if _, ok := s.Get("temp1"); ok {
|
|
t.Fatal("temp store should have expired")
|
|
}
|
|
if _, ok := s.Get("perm1"); !ok {
|
|
t.Fatal("permanent store must not expire")
|
|
}
|
|
if removed := s.PurgeExpired(); removed != 1 {
|
|
t.Fatalf("purge removed %d, want 1", removed)
|
|
}
|
|
}
|
|
|
|
func TestOpenIsIdempotentAndGeneratesID(t *testing.T) {
|
|
s := NewStore()
|
|
a := s.Open("", Temp, "", 0)
|
|
if a.ID == "" {
|
|
t.Fatal("empty id should be replaced with a generated one")
|
|
}
|
|
if a.Primary != "id" {
|
|
t.Fatalf("default primary = %q, want id", a.Primary)
|
|
}
|
|
b := s.Open(a.ID, Temp, "", 0)
|
|
if a != b {
|
|
t.Fatal("opening an existing id should return the same datastore")
|
|
}
|
|
}
|
|
|
|
func TestPoolMergeDedupesAndTracksDelta(t *testing.T) {
|
|
s := NewStore()
|
|
p := s.OpenPool("p1", 0)
|
|
_ = p
|
|
|
|
added, _, ok := s.PushPool("p1", []any{"x", "y", "x"})
|
|
if !ok || len(added) != 2 {
|
|
t.Fatalf("first push added %v (ok=%v), want 2 unique", added, ok)
|
|
}
|
|
// Re-pushing existing items adds nothing.
|
|
added, _, _ = s.PushPool("p1", []any{"x", "y"})
|
|
if len(added) != 0 {
|
|
t.Fatalf("re-push added %v, want 0", added)
|
|
}
|
|
// A new item is the only delta.
|
|
added, _, _ = s.PushPool("p1", []any{"x", "z"})
|
|
if len(added) != 1 || added[0] != "z" {
|
|
t.Fatalf("push delta = %v, want [z]", added)
|
|
}
|
|
|
|
pool, _ := s.GetPool("p1")
|
|
if len(pool.Items()) != 3 {
|
|
t.Fatalf("pool size = %d, want 3", len(pool.Items()))
|
|
}
|
|
}
|
|
|
|
func TestUnsubscribeAllClearsEverywhere(t *testing.T) {
|
|
s := NewStore()
|
|
d := s.Open("d", Temp, "id", 0)
|
|
p := s.OpenPool("p", 0)
|
|
d.Subscribe("alice")
|
|
p.Subscribe("alice")
|
|
d.Subscribe("bob")
|
|
|
|
s.UnsubscribeAll("alice")
|
|
|
|
if len(d.Subscribers()) != 1 || d.Subscribers()[0] != "bob" {
|
|
t.Fatalf("datastore subscribers = %v, want [bob]", d.Subscribers())
|
|
}
|
|
if len(p.Subscribers()) != 0 {
|
|
t.Fatalf("pool subscribers = %v, want empty", p.Subscribers())
|
|
}
|
|
}
|