package ws import ( "encoding/json" "sync" "sync/atomic" "time" "github.com/gorilla/websocket" "git.saqut.com/saqut/mwse/internal/protocol" ) // defaultOutboundBuffer is the per-connection send queue depth used by NewClient // (tests, tools). The server overrides it from configuration. It is kept high // because the engine targets endless, bursty traffic; see config.ConnConfig for // the memory trade-off. const defaultOutboundBuffer = 1024 // Session flag keys. They live in the Client.store map and mirror the Node // Session service defaults exactly. const ( flagNotifyPairInfo = "notifyPairInfo" flagPackReceive = "packrecaive" // (sic) original spelling kept for parity flagPackSending = "packsending" flagNotifyRoomInfo = "notifyRoomInfo" ) // Client is one connected peer. // // Concurrency model (the whole point of the Go rewrite, see #22): // // - All socket WRITES go through a single writer goroutine draining `outbound`. // Producers never touch the socket, so concurrent writes are impossible. // - `Send` enqueues onto `outbound` but always also selects on `done`, so a send // racing a disconnect is harmlessly dropped instead of writing to a dead peer. // - All mutable peer STATE (info, store, rooms, pairs, ...) is guarded by `mu`. // A peer leaving (clearing its state) and another goroutine reading/sending to // it are therefore serialized; the "leave-while-send" race cannot occur. type Client struct { ID string CreatedAt time.Time conn Conn outbound chan []byte done chan struct{} closeOnce sync.Once dropped uint64 // frames dropped due to a full outbound buffer (atomic) writeWait time.Duration // per-write socket deadline mu sync.RWMutex info map[string]any // application metadata, shared with paired/room peers store map[string]bool // per-connection session flags rooms map[string]struct{} // rooms this client belongs to pairs map[string]struct{} // peers this client has paired toward (outgoing edges) pairedBy map[string]struct{} // peers that paired toward this client (incoming edges) waiting map[string]struct{} // rooms this client is awaiting an invite decision in requiredPair bool // when true, others must be paired to reach this client apNumber int // virtual address: short number apShortCode string // virtual address: 3-letter code apIP string // virtual address: 10.x.x.x style ip } // NewClient wraps an accepted connection with default transport tuning. The // server uses newClient to apply configured limits instead. func NewClient(conn Conn, id string) *Client { return newClient(conn, id, defaultOutboundBuffer, defaultWriteWait) } // newClient wraps an accepted connection. Session flag defaults are applied here // (rather than only via the Session service's connect hook) so they are always // present regardless of listener ordering. func newClient(conn Conn, id string, outboundBuffer int, writeWait time.Duration) *Client { if outboundBuffer <= 0 { outboundBuffer = defaultOutboundBuffer } if writeWait <= 0 { writeWait = defaultWriteWait } return &Client{ ID: id, CreatedAt: time.Now(), conn: conn, outbound: make(chan []byte, outboundBuffer), done: make(chan struct{}), writeWait: writeWait, info: make(map[string]any), store: map[string]bool{ flagNotifyPairInfo: true, flagPackReceive: true, flagPackSending: true, flagNotifyRoomInfo: true, }, rooms: make(map[string]struct{}), pairs: make(map[string]struct{}), pairedBy: make(map[string]struct{}), waiting: make(map[string]struct{}), } } // ---- sending ------------------------------------------------------------- // Send marshals v and enqueues it for the writer goroutine. It is safe to call // from any goroutine and at any time, and it never blocks: // // - if the client is closing, the frame is dropped (this is the branch that // makes "send to a peer that is leaving" safe instead of a race/panic); // - if the outbound buffer is full, this frame is dropped but the connection is // kept. MWSE is a best-effort relay, so a momentarily slow consumer should // lose a frame, not be disconnected (which would cascade under load). A // genuinely dead consumer is still reaped: its writer eventually trips the // write deadline, which ends the read loop and disconnects it. func (c *Client) Send(v any) { b, err := json.Marshal(v) if err != nil { return } select { case c.outbound <- b: case <-c.done: // Client is gone; drop silently. default: // Buffer full; drop this frame, keep the connection. atomic.AddUint64(&c.dropped, 1) } } // Dropped returns the number of frames dropped because the outbound buffer was // full. Useful for load tests and operational metrics. func (c *Client) Dropped() uint64 { return atomic.LoadUint64(&c.dropped) } // Signal sends a server-initiated message [payload, name] to this client. func (c *Client) Signal(name string, payload any) { c.Send(protocol.Signal(name, payload)) } // ---- pumps --------------------------------------------------------------- // writePump is the ONLY goroutine that calls conn.WriteMessage. It exits when the // client is closed, closing the socket on the way out. func (c *Client) writePump() { defer c.conn.Close() for { select { case b := <-c.outbound: _ = c.conn.SetWriteDeadline(time.Now().Add(c.writeWait)) if err := c.conn.WriteMessage(websocket.TextMessage, b); err != nil { return } case <-c.done: return } } } // Close tears the client's transport down exactly once. It does NOT run the // logical disconnect (room/pair cleanup) — that is driven by the read loop's exit // in server.go, so it always happens precisely once per connection. func (c *Client) Close() { c.closeOnce.Do(func() { // Best-effort polite close frame; ignore errors (peer may already be gone). _ = c.conn.WriteControl( websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), time.Now().Add(time.Second), ) close(c.done) }) } // Done exposes the close signal for select-based waits (used by the ping loop). func (c *Client) Done() <-chan struct{} { return c.done } // ---- guarded state accessors -------------------------------------------- // SetInfo stores an application metadata value. func (c *Client) SetInfo(name string, value any) { c.mu.Lock() c.info[name] = value c.mu.Unlock() } // InfoValue returns a single metadata value. func (c *Client) InfoValue(name string) (any, bool) { c.mu.RLock() v, ok := c.info[name] c.mu.RUnlock() return v, ok } // Info returns a copy of all metadata. A copy is returned so callers can range // over it without holding the lock. func (c *Client) Info() map[string]any { c.mu.RLock() out := make(map[string]any, len(c.info)) for k, v := range c.info { out[k] = v } c.mu.RUnlock() return out } // Match reports whether every key/value in filter is present and equal in this // client's info (the room peer filter from Node's Client.match). func (c *Client) Match(filter map[string]any) bool { c.mu.RLock() defer c.mu.RUnlock() if len(filter) > len(c.info) { return false } for k, want := range filter { got, ok := c.info[k] if !ok || got != want { return false } } return true } // SetStore sets a session flag. func (c *Client) SetStore(name string, v bool) { c.mu.Lock() c.store[name] = v c.mu.Unlock() } // store flag readers, named to match the original Client helpers. func (c *Client) PackWriteable() bool { return c.storeFlag(flagPackReceive) } func (c *Client) PackReadable() bool { return c.storeFlag(flagPackSending) } func (c *Client) PeerInfoNotifiable() bool { return c.storeFlag(flagNotifyPairInfo) } func (c *Client) RoomInfoNotifiable() bool { return c.storeFlag(flagNotifyRoomInfo) } func (c *Client) storeFlag(name string) bool { c.mu.RLock() v := c.store[name] c.mu.RUnlock() return v } // ResetStore restores the default session flags. func (c *Client) ResetStore() { c.mu.Lock() c.store[flagNotifyPairInfo] = true c.store[flagPackReceive] = true c.store[flagPackSending] = true c.store[flagNotifyRoomInfo] = true c.mu.Unlock() } // RequiredPair / SetRequiredPair toggle the "private" reachability mode. func (c *Client) RequiredPair() bool { c.mu.RLock() v := c.requiredPair c.mu.RUnlock() return v } func (c *Client) SetRequiredPair(v bool) { c.mu.Lock() c.requiredPair = v c.mu.Unlock() } // ---- room membership ----------------------------------------------------- func (c *Client) addRoom(id string) { c.mu.Lock() c.rooms[id] = struct{}{} c.mu.Unlock() } func (c *Client) removeRoom(id string) { c.mu.Lock() delete(c.rooms, id) c.mu.Unlock() } // InRoom reports membership. func (c *Client) InRoom(id string) bool { c.mu.RLock() _, ok := c.rooms[id] c.mu.RUnlock() return ok } // Rooms returns a snapshot of room ids this client belongs to. func (c *Client) Rooms() []string { c.mu.RLock() out := make([]string, 0, len(c.rooms)) for id := range c.rooms { out = append(out, id) } c.mu.RUnlock() return out } // ---- pairing ------------------------------------------------------------- // Pairing keeps two indexes so that a disconnect can clean up every edge that // touches a client in O(degree) instead of scanning all clients — and so that // stale ids never accumulate on long-lived clients under churn (the leak that // would otherwise grow without bound at scale): // // - pairs : peers THIS client paired toward (outgoing) // - pairedBy : peers that paired toward THIS client (incoming) // // AddPair/RemovePair take the other *Client and update both sides. The two locks // are taken in separate, non-nested critical sections, so concurrent A.AddPair(B) // and B.AddPair(A) cannot deadlock. // AddPair records that this client has paired toward other, updating both the // outgoing edge here and the incoming edge on other. func (c *Client) AddPair(other *Client) { c.mu.Lock() c.pairs[other.ID] = struct{}{} c.mu.Unlock() other.mu.Lock() other.pairedBy[c.ID] = struct{}{} other.mu.Unlock() } // RemovePair drops the pairing edge from this client to other (and the matching // incoming record on other). func (c *Client) RemovePair(other *Client) { c.mu.Lock() delete(c.pairs, other.ID) c.mu.Unlock() other.mu.Lock() delete(other.pairedBy, c.ID) other.mu.Unlock() } // ForgetPeer removes peerID from both this client's outgoing and incoming pairing // sets. Used during the other peer's disconnect cleanup. func (c *Client) ForgetPeer(peerID string) { c.mu.Lock() delete(c.pairs, peerID) delete(c.pairedBy, peerID) c.mu.Unlock() } // HasPair reports whether this client has an outgoing pairing edge toward other. func (c *Client) HasPair(other string) bool { c.mu.RLock() _, ok := c.pairs[other] c.mu.RUnlock() return ok } // Pairs returns a snapshot of this client's outgoing pairing edges. func (c *Client) Pairs() []string { c.mu.RLock() out := make([]string, 0, len(c.pairs)) for id := range c.pairs { out = append(out, id) } c.mu.RUnlock() return out } // PairedBy returns a snapshot of the peers that paired toward this client. func (c *Client) PairedBy() []string { c.mu.RLock() out := make([]string, 0, len(c.pairedBy)) for id := range c.pairedBy { out = append(out, id) } c.mu.RUnlock() return out } // ---- invite waiting rooms ----------------------------------------------- // // Tracking which rooms a client is awaiting an invite in lets disconnect clear // those waiting lists, so a room's waiting set cannot grow with dead ids. // AddWaitingRoom records that this client is awaiting an invite decision in room. func (c *Client) AddWaitingRoom(roomID string) { c.mu.Lock() c.waiting[roomID] = struct{}{} c.mu.Unlock() } // RemoveWaitingRoom clears a pending invite for room. func (c *Client) RemoveWaitingRoom(roomID string) { c.mu.Lock() delete(c.waiting, roomID) c.mu.Unlock() } // WaitingRooms returns a snapshot of the rooms this client is awaiting in. func (c *Client) WaitingRooms() []string { c.mu.RLock() out := make([]string, 0, len(c.waiting)) for id := range c.waiting { out = append(out, id) } c.mu.RUnlock() return out } // ---- virtual address (IPPressure) --------------------------------------- func (c *Client) APNumber() int { c.mu.RLock() defer c.mu.RUnlock() return c.apNumber } func (c *Client) SetAPNumber(v int) { c.mu.Lock() c.apNumber = v c.mu.Unlock() } func (c *Client) APShortCode() string { c.mu.RLock() defer c.mu.RUnlock() return c.apShortCode } func (c *Client) SetAPShortCode(v string) { c.mu.Lock() c.apShortCode = v c.mu.Unlock() } func (c *Client) APIP() string { c.mu.RLock() defer c.mu.RUnlock() return c.apIP } func (c *Client) SetAPIP(v string) { c.mu.Lock() c.apIP = v c.mu.Unlock() }