MWSE/loadtest/main.go

165 lines
4.3 KiB
Go

// Command mwse-loadtest drives the MWSE engine with many concurrent WebSocket
// clients to smoke-test correctness and measure throughput/latency. It is a
// separate Go module so it can be built and run independently of the engine, and
// reused as a benchmark harness.
//
// Run the engine, then:
//
// go run . # default: ping mode, 50 clients, 10s
// go run . -mode relay -clients 200 -dur 15s
// go run . -addr ws://localhost:7707/ -clients 500 -mode ping
package main
import (
"encoding/json"
"flag"
"fmt"
"log"
"sync"
"sync/atomic"
"time"
)
func main() {
addr := flag.String("addr", "ws://localhost:7707/", "engine WebSocket URL")
clients := flag.Int("clients", 50, "number of concurrent clients")
dur := flag.Duration("dur", 10*time.Second, "test duration")
mode := flag.String("mode", "ping", "scenario: ping | relay")
flag.Parse()
log.Printf("connecting %d clients to %s ...", *clients, *addr)
conns := dialAll(*addr, *clients)
defer func() {
for _, c := range conns {
c.Close()
}
}()
log.Printf("connected %d clients", len(conns))
switch *mode {
case "ping":
runPing(conns, *dur)
case "relay":
runRelay(conns, *dur)
default:
log.Fatalf("unknown mode %q (want ping or relay)", *mode)
}
}
// dialAll connects n clients and waits for each to receive its socket id.
func dialAll(addr string, n int) []*Client {
var (
mu sync.Mutex
conns []*Client
wg sync.WaitGroup
)
for i := 0; i < n; i++ {
wg.Add(1)
go func() {
defer wg.Done()
c, err := Dial(addr)
if err != nil {
log.Printf("dial failed: %v", err)
return
}
if err := c.WaitID(5 * time.Second); err != nil {
log.Printf("no id: %v", err)
c.Close()
return
}
mu.Lock()
conns = append(conns, c)
mu.Unlock()
}()
}
wg.Wait()
return conns
}
// runPing has every client issue back-to-back requests (my/socketid) for the
// duration, measuring round-trip latency and total throughput.
func runPing(conns []*Client, dur time.Duration) {
var lat latency
var errors int64
deadline := time.Now().Add(dur)
var wg sync.WaitGroup
for _, c := range conns {
wg.Add(1)
go func(c *Client) {
defer wg.Done()
for time.Now().Before(deadline) {
_, rtt, err := c.Request(map[string]any{"type": "my/socketid"}, 5*time.Second)
if err != nil {
atomic.AddInt64(&errors, 1)
return
}
lat.record(rtt)
}
}(c)
}
wg.Wait()
total := lat.count()
p50, p90, p99, max := lat.percentiles()
fmt.Println("\n=== ping results ===")
fmt.Printf("clients : %d\n", len(conns))
fmt.Printf("requests : %d\n", total)
fmt.Printf("errors : %d\n", errors)
fmt.Printf("throughput : %.0f req/s\n", float64(total)/dur.Seconds())
fmt.Printf("latency p50 : %s\n", p50)
fmt.Printf("latency p90 : %s\n", p90)
fmt.Printf("latency p99 : %s\n", p99)
fmt.Printf("latency max : %s\n", max)
}
// runRelay pairs clients (2i <-> 2i+1) and has each send fire-and-forget packs to
// its partner, counting deliveries to measure relay fanout throughput.
func runRelay(conns []*Client, dur time.Duration) {
if len(conns) < 2 {
log.Fatal("relay mode needs at least 2 clients")
}
var delivered int64
for _, c := range conns {
c.OnSignal("pack", func(json.RawMessage) { atomic.AddInt64(&delivered, 1) })
}
var sent int64
deadline := time.Now().Add(dur)
var wg sync.WaitGroup
for i := 0; i+1 < len(conns); i += 2 {
a, b := conns[i], conns[i+1]
for _, pair := range [][2]*Client{{a, b}, {b, a}} {
src, dst := pair[0], pair[1]
wg.Add(1)
go func(src, dst *Client) {
defer wg.Done()
for time.Now().Before(deadline) {
err := src.SendOnly(map[string]any{
"type": "pack/to",
"to": dst.ID,
"pack": map[string]any{"t": time.Now().UnixNano()},
})
if err != nil {
return
}
atomic.AddInt64(&sent, 1)
}
}(src, dst)
}
}
wg.Wait()
// Allow a moment for the last in-flight relays to arrive.
time.Sleep(250 * time.Millisecond)
fmt.Println("\n=== relay results ===")
fmt.Printf("clients : %d\n", len(conns))
fmt.Printf("packs sent : %d\n", sent)
fmt.Printf("packs recvd : %d\n", atomic.LoadInt64(&delivered))
fmt.Printf("send rate : %.0f msg/s\n", float64(sent)/dur.Seconds())
fmt.Printf("recv rate : %.0f msg/s\n", float64(delivered)/dur.Seconds())
}