MWSE/internal/ws/client.go

347 lines
9.7 KiB
Go

package ws
import (
"encoding/json"
"sync"
"sync/atomic"
"time"
"github.com/gorilla/websocket"
"git.saqut.com/saqut/mwse/internal/protocol"
)
// outboundBuffer bounds how many frames may queue for one slow client before the
// engine gives up and disconnects it. This is the classic gorilla "hub" backpressure
// policy: a peer that cannot keep up is dropped rather than allowed to grow memory
// without limit or stall a broadcast.
const outboundBuffer = 256
// Session flag keys. They live in the Client.store map and mirror the Node
// Session service defaults exactly.
const (
flagNotifyPairInfo = "notifyPairInfo"
flagPackReceive = "packrecaive" // (sic) original spelling kept for parity
flagPackSending = "packsending"
flagNotifyRoomInfo = "notifyRoomInfo"
)
// Client is one connected peer.
//
// Concurrency model (the whole point of the Go rewrite, see #22):
//
// - All socket WRITES go through a single writer goroutine draining `outbound`.
// Producers never touch the socket, so concurrent writes are impossible.
// - `Send` enqueues onto `outbound` but always also selects on `done`, so a send
// racing a disconnect is harmlessly dropped instead of writing to a dead peer.
// - All mutable peer STATE (info, store, rooms, pairs, ...) is guarded by `mu`.
// A peer leaving (clearing its state) and another goroutine reading/sending to
// it are therefore serialized; the "leave-while-send" race cannot occur.
type Client struct {
ID string
CreatedAt time.Time
conn Conn
outbound chan []byte
done chan struct{}
closeOnce sync.Once
dropped uint64 // frames dropped due to a full outbound buffer (atomic)
mu sync.RWMutex
info map[string]any // application metadata, shared with paired/room peers
store map[string]bool // per-connection session flags
rooms map[string]struct{} // rooms this client belongs to
pairs map[string]struct{} // peers this client has paired with
requiredPair bool // when true, others must be paired to reach this client
apNumber int // virtual address: short number
apShortCode string // virtual address: 3-letter code
apIP string // virtual address: 10.x.x.x style ip
}
// NewClient wraps an accepted connection. Session flag defaults are applied here
// (rather than only via the Session service's connect hook) so they are always
// present regardless of listener ordering.
func NewClient(conn Conn, id string) *Client {
return &Client{
ID: id,
CreatedAt: time.Now(),
conn: conn,
outbound: make(chan []byte, outboundBuffer),
done: make(chan struct{}),
info: make(map[string]any),
store: map[string]bool{
flagNotifyPairInfo: true,
flagPackReceive: true,
flagPackSending: true,
flagNotifyRoomInfo: true,
},
rooms: make(map[string]struct{}),
pairs: make(map[string]struct{}),
}
}
// ---- sending -------------------------------------------------------------
// Send marshals v and enqueues it for the writer goroutine. It is safe to call
// from any goroutine and at any time, and it never blocks:
//
// - if the client is closing, the frame is dropped (this is the branch that
// makes "send to a peer that is leaving" safe instead of a race/panic);
// - if the outbound buffer is full, this frame is dropped but the connection is
// kept. MWSE is a best-effort relay, so a momentarily slow consumer should
// lose a frame, not be disconnected (which would cascade under load). A
// genuinely dead consumer is still reaped: its writer eventually trips the
// write deadline, which ends the read loop and disconnects it.
func (c *Client) Send(v any) {
b, err := json.Marshal(v)
if err != nil {
return
}
select {
case c.outbound <- b:
case <-c.done:
// Client is gone; drop silently.
default:
// Buffer full; drop this frame, keep the connection.
atomic.AddUint64(&c.dropped, 1)
}
}
// Dropped returns the number of frames dropped because the outbound buffer was
// full. Useful for load tests and operational metrics.
func (c *Client) Dropped() uint64 { return atomic.LoadUint64(&c.dropped) }
// Signal sends a server-initiated message [payload, name] to this client.
func (c *Client) Signal(name string, payload any) {
c.Send(protocol.Signal(name, payload))
}
// ---- pumps ---------------------------------------------------------------
// writePump is the ONLY goroutine that calls conn.WriteMessage. It exits when the
// client is closed, closing the socket on the way out.
func (c *Client) writePump() {
defer c.conn.Close()
for {
select {
case b := <-c.outbound:
_ = c.conn.SetWriteDeadline(time.Now().Add(defaultWriteWait))
if err := c.conn.WriteMessage(websocket.TextMessage, b); err != nil {
return
}
case <-c.done:
return
}
}
}
// Close tears the client's transport down exactly once. It does NOT run the
// logical disconnect (room/pair cleanup) — that is driven by the read loop's exit
// in server.go, so it always happens precisely once per connection.
func (c *Client) Close() {
c.closeOnce.Do(func() {
// Best-effort polite close frame; ignore errors (peer may already be gone).
_ = c.conn.WriteControl(
websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""),
time.Now().Add(time.Second),
)
close(c.done)
})
}
// Done exposes the close signal for select-based waits (used by the ping loop).
func (c *Client) Done() <-chan struct{} { return c.done }
// ---- guarded state accessors --------------------------------------------
// SetInfo stores an application metadata value.
func (c *Client) SetInfo(name string, value any) {
c.mu.Lock()
c.info[name] = value
c.mu.Unlock()
}
// InfoValue returns a single metadata value.
func (c *Client) InfoValue(name string) (any, bool) {
c.mu.RLock()
v, ok := c.info[name]
c.mu.RUnlock()
return v, ok
}
// Info returns a copy of all metadata. A copy is returned so callers can range
// over it without holding the lock.
func (c *Client) Info() map[string]any {
c.mu.RLock()
out := make(map[string]any, len(c.info))
for k, v := range c.info {
out[k] = v
}
c.mu.RUnlock()
return out
}
// Match reports whether every key/value in filter is present and equal in this
// client's info (the room peer filter from Node's Client.match).
func (c *Client) Match(filter map[string]any) bool {
c.mu.RLock()
defer c.mu.RUnlock()
if len(filter) > len(c.info) {
return false
}
for k, want := range filter {
got, ok := c.info[k]
if !ok || got != want {
return false
}
}
return true
}
// SetStore sets a session flag.
func (c *Client) SetStore(name string, v bool) {
c.mu.Lock()
c.store[name] = v
c.mu.Unlock()
}
// store flag readers, named to match the original Client helpers.
func (c *Client) PackWriteable() bool { return c.storeFlag(flagPackReceive) }
func (c *Client) PackReadable() bool { return c.storeFlag(flagPackSending) }
func (c *Client) PeerInfoNotifiable() bool { return c.storeFlag(flagNotifyPairInfo) }
func (c *Client) RoomInfoNotifiable() bool { return c.storeFlag(flagNotifyRoomInfo) }
func (c *Client) storeFlag(name string) bool {
c.mu.RLock()
v := c.store[name]
c.mu.RUnlock()
return v
}
// ResetStore restores the default session flags.
func (c *Client) ResetStore() {
c.mu.Lock()
c.store[flagNotifyPairInfo] = true
c.store[flagPackReceive] = true
c.store[flagPackSending] = true
c.store[flagNotifyRoomInfo] = true
c.mu.Unlock()
}
// RequiredPair / SetRequiredPair toggle the "private" reachability mode.
func (c *Client) RequiredPair() bool {
c.mu.RLock()
v := c.requiredPair
c.mu.RUnlock()
return v
}
func (c *Client) SetRequiredPair(v bool) {
c.mu.Lock()
c.requiredPair = v
c.mu.Unlock()
}
// ---- room membership -----------------------------------------------------
func (c *Client) addRoom(id string) {
c.mu.Lock()
c.rooms[id] = struct{}{}
c.mu.Unlock()
}
func (c *Client) removeRoom(id string) {
c.mu.Lock()
delete(c.rooms, id)
c.mu.Unlock()
}
// InRoom reports membership.
func (c *Client) InRoom(id string) bool {
c.mu.RLock()
_, ok := c.rooms[id]
c.mu.RUnlock()
return ok
}
// Rooms returns a snapshot of room ids this client belongs to.
func (c *Client) Rooms() []string {
c.mu.RLock()
out := make([]string, 0, len(c.rooms))
for id := range c.rooms {
out = append(out, id)
}
c.mu.RUnlock()
return out
}
// ---- pairing -------------------------------------------------------------
// AddPair records that this client has paired toward other.
func (c *Client) AddPair(other string) {
c.mu.Lock()
c.pairs[other] = struct{}{}
c.mu.Unlock()
}
// RemovePair drops a pairing edge from this client.
func (c *Client) RemovePair(other string) {
c.mu.Lock()
delete(c.pairs, other)
c.mu.Unlock()
}
// HasPair reports whether this client has a pairing edge toward other.
func (c *Client) HasPair(other string) bool {
c.mu.RLock()
_, ok := c.pairs[other]
c.mu.RUnlock()
return ok
}
// Pairs returns a snapshot of this client's pairing edges.
func (c *Client) Pairs() []string {
c.mu.RLock()
out := make([]string, 0, len(c.pairs))
for id := range c.pairs {
out = append(out, id)
}
c.mu.RUnlock()
return out
}
// ---- virtual address (IPPressure) ---------------------------------------
func (c *Client) APNumber() int {
c.mu.RLock()
defer c.mu.RUnlock()
return c.apNumber
}
func (c *Client) SetAPNumber(v int) {
c.mu.Lock()
c.apNumber = v
c.mu.Unlock()
}
func (c *Client) APShortCode() string {
c.mu.RLock()
defer c.mu.RUnlock()
return c.apShortCode
}
func (c *Client) SetAPShortCode(v string) {
c.mu.Lock()
c.apShortCode = v
c.mu.Unlock()
}
func (c *Client) APIP() string {
c.mu.RLock()
defer c.mu.RUnlock()
return c.apIP
}
func (c *Client) SetAPIP(v string) {
c.mu.Lock()
c.apIP = v
c.mu.Unlock()
}