diff --git a/internal/bridge/bridge.go b/internal/bridge/bridge.go new file mode 100644 index 0000000..e4624ea --- /dev/null +++ b/internal/bridge/bridge.go @@ -0,0 +1,155 @@ +// Package bridge implements the 3rd-party server bridge (#46): it lets an external +// application server talk to MWSE over plain HTTPS (get/post) without speaking +// WebSocket, and lets MWSE delegate connection approval to that application +// server. +// +// Three pieces, each independently testable: +// +// - Inbox : a bounded queue of client->application messages the app drains +// by polling an HTTP endpoint. +// - HTTPApprover : asks the application "Connect?" for each new client and accepts +// only on an explicit approval (fail-closed). +// - HTTPTrigger : pushes a suit-notification reply (#44) to the application, +// so the app is told the moment a reply arrives instead of polling. +package bridge + +import ( + "bytes" + "context" + "encoding/json" + "net/http" + "sync" + "time" + + "git.saqut.com/saqut/mwse/internal/notify" +) + +// Message is one client->application message held in the inbox. +type Message struct { + From string `json:"from"` + Pack any `json:"pack"` + At time.Time `json:"at"` +} + +// Inbox is a bounded FIFO of messages awaiting collection by the application +// server. It is bounded so a never-polling application cannot make it grow without +// limit (oldest messages are dropped first). +type Inbox struct { + mu sync.Mutex + q []Message + max int +} + +// NewInbox returns an inbox holding up to max messages (<=0 uses 10000). +func NewInbox(max int) *Inbox { + if max <= 0 { + max = 10000 + } + return &Inbox{max: max} +} + +// Push appends a message from a client, dropping the oldest if the cap is hit. +func (i *Inbox) Push(from string, pack any) { + i.mu.Lock() + defer i.mu.Unlock() + if len(i.q) >= i.max { + drop := len(i.q) - i.max + 1 + i.q = i.q[drop:] + } + i.q = append(i.q, Message{From: from, Pack: pack, At: time.Now()}) +} + +// Drain returns and clears all queued messages. +func (i *Inbox) Drain() []Message { + i.mu.Lock() + defer i.mu.Unlock() + if len(i.q) == 0 { + return nil + } + out := i.q + i.q = nil + return out +} + +// Len reports how many messages are queued. +func (i *Inbox) Len() int { + i.mu.Lock() + defer i.mu.Unlock() + return len(i.q) +} + +// HTTPApprover delegates connection approval to an application server. For each +// new client it POSTs {id, meta} to URL and accepts only if the response is 200 +// with a JSON body {"approve": true}. Any error (unreachable app, non-200, +// malformed body) denies the connection — fail-closed, matching "approves or it +// is rejected". +type HTTPApprover struct { + URL string + Client *http.Client +} + +// NewHTTPApprover builds an approver with a sensible request timeout. +func NewHTTPApprover(url string, timeout time.Duration) *HTTPApprover { + if timeout <= 0 { + timeout = 3 * time.Second + } + return &HTTPApprover{URL: url, Client: &http.Client{Timeout: timeout}} +} + +// Approve implements ws.Approver. +func (a *HTTPApprover) Approve(id string, meta map[string]any) bool { + body, _ := json.Marshal(map[string]any{"id": id, "meta": meta}) + req, err := http.NewRequestWithContext(context.Background(), http.MethodPost, a.URL, bytes.NewReader(body)) + if err != nil { + return false + } + req.Header.Set("Content-Type", "application/json") + resp, err := a.Client.Do(req) + if err != nil { + return false + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return false + } + var out struct { + Approve bool `json:"approve"` + } + if json.NewDecoder(resp.Body).Decode(&out) != nil { + return false + } + return out.Approve +} + +// HTTPTrigger pushes suit-notification replies (#44) to the application server. +// It structurally satisfies services.NotifyTrigger. +type HTTPTrigger struct { + URL string + Client *http.Client +} + +// NewHTTPTrigger builds a trigger with a sensible request timeout. +func NewHTTPTrigger(url string, timeout time.Duration) *HTTPTrigger { + if timeout <= 0 { + timeout = 3 * time.Second + } + return &HTTPTrigger{URL: url, Client: &http.Client{Timeout: timeout}} +} + +// NotifyReplied posts the reply to the application server (best effort). +func (t *HTTPTrigger) NotifyReplied(n notify.Notification) { + body, _ := json.Marshal(map[string]any{ + "trace": n.Trace, + "from": n.From, + "to": n.To, + "reply": n.Reply, + }) + req, err := http.NewRequestWithContext(context.Background(), http.MethodPost, t.URL, bytes.NewReader(body)) + if err != nil { + return + } + req.Header.Set("Content-Type", "application/json") + if resp, err := t.Client.Do(req); err == nil { + resp.Body.Close() + } +} diff --git a/internal/bridge/bridge_test.go b/internal/bridge/bridge_test.go new file mode 100644 index 0000000..8fd792e --- /dev/null +++ b/internal/bridge/bridge_test.go @@ -0,0 +1,155 @@ +package bridge + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + "time" + + "git.saqut.com/saqut/mwse/internal/notify" +) + +// ---- Inbox --------------------------------------------------------------- + +func TestInboxPushDrain(t *testing.T) { + inbox := NewInbox(0) + inbox.Push("a", "hello") + inbox.Push("b", 42) + + msgs := inbox.Drain() + if len(msgs) != 2 { + t.Fatalf("drain = %d, want 2", len(msgs)) + } + if msgs[0].From != "a" || msgs[1].From != "b" { + t.Fatalf("from order wrong: %v", msgs) + } + + // Drain again should return nil (empty). + if got := inbox.Drain(); got != nil { + t.Fatalf("second drain = %v, want nil", got) + } +} + +func TestInboxCapDropsOldest(t *testing.T) { + inbox := NewInbox(3) + for i := range 5 { + inbox.Push("x", i) + } + if inbox.Len() != 3 { + t.Fatalf("len = %d, want 3 (capped)", inbox.Len()) + } + msgs := inbox.Drain() + // Should contain the last 3 items: 2, 3, 4. + if msgs[0].Pack.(int) != 2 { + t.Fatalf("oldest surviving item = %v, want 2", msgs[0].Pack) + } +} + +func TestInboxLen(t *testing.T) { + inbox := NewInbox(0) + if inbox.Len() != 0 { + t.Fatalf("initial len = %d, want 0", inbox.Len()) + } + inbox.Push("a", nil) + if inbox.Len() != 1 { + t.Fatalf("after push len = %d, want 1", inbox.Len()) + } +} + +// ---- HTTPApprover -------------------------------------------------------- + +func TestHTTPApproverApproves(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var body map[string]any + json.NewDecoder(r.Body).Decode(&body) + if body["id"] == nil { + http.Error(w, "missing id", 400) + return + } + json.NewEncoder(w).Encode(map[string]any{"approve": true}) + })) + defer srv.Close() + + a := NewHTTPApprover(srv.URL, 2*time.Second) + if !a.Approve("client-1", map[string]any{"ip": "1.2.3.4"}) { + t.Fatal("expected approve=true") + } +} + +func TestHTTPApproverRejects(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + json.NewEncoder(w).Encode(map[string]any{"approve": false}) + })) + defer srv.Close() + + a := NewHTTPApprover(srv.URL, 2*time.Second) + if a.Approve("client-2", nil) { + t.Fatal("expected approve=false") + } +} + +func TestHTTPApproverFailClosed(t *testing.T) { + // Unreachable URL must deny (fail-closed). + a := NewHTTPApprover("http://127.0.0.1:1", 100*time.Millisecond) + if a.Approve("x", nil) { + t.Fatal("expected fail-closed denial for unreachable server") + } +} + +func TestHTTPApproverNon200Denies(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + http.Error(w, "forbidden", http.StatusForbidden) + })) + defer srv.Close() + + a := NewHTTPApprover(srv.URL, 2*time.Second) + if a.Approve("c", nil) { + t.Fatal("expected denial on non-200") + } +} + +// ---- HTTPTrigger --------------------------------------------------------- + +func TestHTTPTriggerPosts(t *testing.T) { + received := make(chan map[string]any, 1) + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var body map[string]any + json.NewDecoder(r.Body).Decode(&body) + received <- body + })) + defer srv.Close() + + trigger := NewHTTPTrigger(srv.URL, 2*time.Second) + n := notify.Notification{ + Trace: "tr1", + From: "alice", + To: "bob", + Reply: map[string]any{"ok": true}, + } + trigger.NotifyReplied(n) + + select { + case body := <-received: + if body["trace"] != "tr1" || body["from"] != "alice" { + t.Fatalf("received body = %v", body) + } + case <-time.After(2 * time.Second): + t.Fatal("trigger post not received within 2s") + } +} + +func TestHTTPTriggerBestEffortOnError(t *testing.T) { + // An unreachable trigger must not panic or block. + trigger := NewHTTPTrigger("http://127.0.0.1:1", 100*time.Millisecond) + done := make(chan struct{}) + go func() { + trigger.NotifyReplied(notify.Notification{Trace: "x"}) + close(done) + }() + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("trigger blocked on error instead of returning quickly") + } +} diff --git a/internal/httpserver/api.go b/internal/httpserver/api.go index 11fc10b..1ea6c2a 100644 --- a/internal/httpserver/api.go +++ b/internal/httpserver/api.go @@ -5,6 +5,7 @@ import ( "net/http" "sync" + "git.saqut.com/saqut/mwse/internal/bridge" "git.saqut.com/saqut/mwse/internal/ws" ) @@ -56,7 +57,10 @@ func (s *apiKeyStore) auth(next func(http.ResponseWriter, *http.Request, string) // endpoints and the core server-initiated messaging endpoints from api.js. The // server-as-room-participant (join/leave) and webhook endpoints are intentionally // deferred to feature-parity work (see REVIEW.md). -func registerAPI(mux *http.ServeMux, hub *ws.Hub) { +// +// When bridgeInbox is non-nil, the bridge drain endpoint is also registered: +// - POST /api/bridge/inbox — drain all queued client→app messages (#46) +func registerAPI(mux *http.ServeMux, hub *ws.Hub, bridgeInbox *bridge.Inbox) { keys := newAPIKeyStore() mux.HandleFunc("POST /api/auth/key", func(w http.ResponseWriter, r *http.Request) { @@ -187,6 +191,19 @@ func registerAPI(mux *http.ServeMux, hub *ws.Hub) { room.Publish() writeJSON(w, http.StatusOK, map[string]any{"status": "success", "room": room.ToJSON(false)}) })) + + // Bridge endpoints (#46) — only registered when the inbox is configured. + if bridgeInbox != nil { + // POST /api/bridge/inbox drains all queued client→app messages atomically. + // The application server polls this to receive messages sent via bridge/send. + mux.HandleFunc("POST /api/bridge/inbox", keys.auth(func(w http.ResponseWriter, r *http.Request, domain string) { + msgs := bridgeInbox.Drain() + if msgs == nil { + msgs = []bridge.Message{} + } + writeJSON(w, http.StatusOK, map[string]any{"status": "success", "messages": msgs}) + })) + } } func orDefault(v, def string) string { diff --git a/internal/httpserver/httpserver.go b/internal/httpserver/httpserver.go index b3d0362..86b981d 100644 --- a/internal/httpserver/httpserver.go +++ b/internal/httpserver/httpserver.go @@ -12,15 +12,30 @@ import ( "github.com/gorilla/websocket" + "git.saqut.com/saqut/mwse/internal/bridge" "git.saqut.com/saqut/mwse/internal/config" "git.saqut.com/saqut/mwse/internal/ws" ) +// ServerOptions holds optional wiring for httpserver.New. +type ServerOptions struct { + // BridgeInbox, when non-nil, enables POST /api/bridge/inbox so an application + // server can drain client messages routed via bridge/send (#46). + BridgeInbox *bridge.Inbox + // Approver, when non-nil, gates each incoming WebSocket connection by asking + // the approver before the HTTP upgrade (#46). + Approver ws.Approver +} + // New builds the *http.Server. WebSocket upgrades are detected on ANY path and // routed to the engine (the SDK derives its endpoint from wherever the script was // served, so the upgrade may arrive at "/" or "/script/"). All other requests go // through the static/API mux. -func New(hub *ws.Hub, cfg config.Config) *http.Server { +func New(hub *ws.Hub, cfg config.Config, srvOpts ...ServerOptions) *http.Server { + var so ServerOptions + if len(srvOpts) > 0 { + so = srvOpts[0] + } wsServer := ws.NewServer(hub, ws.Options{ OutboundBuffer: cfg.Conn.OutboundBuffer, MaxMessageSize: cfg.Conn.MaxMessageSize, @@ -29,10 +44,11 @@ func New(hub *ws.Hub, cfg config.Config) *http.Server { PingInterval: cfg.Conn.PingInterval, PongWait: cfg.Conn.PongWait, WriteWait: cfg.Conn.WriteWait, + Approver: so.Approver, }) mux := http.NewServeMux() - registerAPI(mux, hub) + registerAPI(mux, hub, so.BridgeInbox) registerStatic(mux, cfg) root := func(w http.ResponseWriter, r *http.Request) { diff --git a/internal/services/bridge.go b/internal/services/bridge.go new file mode 100644 index 0000000..bdc1544 --- /dev/null +++ b/internal/services/bridge.go @@ -0,0 +1,21 @@ +package services + +import ( + "git.saqut.com/saqut/mwse/internal/bridge" + "git.saqut.com/saqut/mwse/internal/protocol" + "git.saqut.com/saqut/mwse/internal/ws" +) + +// registerBridge wires the bridge/send handler that routes client messages into +// the application server's inbox (#46). The inbox is then drained by the +// application server via POST /api/bridge/inbox. +func registerBridge(hub *ws.Hub, inbox *bridge.Inbox) { + hub.Register("bridge/send", func(c *ws.Client, m protocol.Message) any { + pack := m.Get("pack") + if pack == nil { + return fail("PACK_REQUIRED") + } + inbox.Push(c.ID, pack) + return success() + }) +} diff --git a/internal/services/bridge_test.go b/internal/services/bridge_test.go new file mode 100644 index 0000000..8abb20d --- /dev/null +++ b/internal/services/bridge_test.go @@ -0,0 +1,61 @@ +package services + +import ( + "testing" + + "git.saqut.com/saqut/mwse/internal/bridge" + "git.saqut.com/saqut/mwse/internal/ws" +) + +// TestBridgeSendQueuesInInbox verifies that a bridge/send message from a client +// is queued into the inbox and that a missing pack is rejected (#46). +func TestBridgeSendQueuesInInbox(t *testing.T) { + hub := ws.NewHub() + inbox := bridge.NewInbox(0) + Register(hub, WithBridgeInbox(inbox)) + + a, _ := connect(hub, "alice") + + // A well-formed bridge/send should be queued. + res := asMap(t, hub.Handle(a, msg("bridge/send", "pack", map[string]any{"hello": "world"}))) + if res["status"] != "success" { + t.Fatalf("bridge/send = %v", res) + } + if inbox.Len() != 1 { + t.Fatalf("inbox len = %d, want 1", inbox.Len()) + } + msgs := inbox.Drain() + if msgs[0].From != "alice" { + t.Fatalf("inbox.From = %q, want alice", msgs[0].From) + } + if pack, ok := msgs[0].Pack.(map[string]any); !ok || pack["hello"] != "world" { + t.Fatalf("inbox.Pack = %v, want hello:world", msgs[0].Pack) + } +} + +func TestBridgeSendRejectsEmptyPack(t *testing.T) { + hub := ws.NewHub() + inbox := bridge.NewInbox(0) + Register(hub, WithBridgeInbox(inbox)) + a, _ := connect(hub, "bob") + + res := asMap(t, hub.Handle(a, msg("bridge/send"))) + if res["message"] != "PACK_REQUIRED" { + t.Fatalf("expected PACK_REQUIRED, got %v", res) + } + if inbox.Len() != 0 { + t.Fatal("inbox should be empty after rejected send") + } +} + +func TestBridgeSendNotRegisteredWithoutOption(t *testing.T) { + // Without WithBridgeInbox, bridge/send should not be registered. + hub := ws.NewHub() + Register(hub) // no bridge option + a, _ := connect(hub, "charlie") + + res := asMap(t, hub.Handle(a, msg("bridge/send", "pack", "x"))) + if res["message"] != "UNKNOWN_TYPE" { + t.Fatalf("expected UNKNOWN_TYPE, got %v", res) + } +} diff --git a/internal/services/services.go b/internal/services/services.go index c3c2dd4..6f0737f 100644 --- a/internal/services/services.go +++ b/internal/services/services.go @@ -14,6 +14,7 @@ import ( "crypto/sha256" "encoding/hex" + "git.saqut.com/saqut/mwse/internal/bridge" "git.saqut.com/saqut/mwse/internal/datastore" "git.saqut.com/saqut/mwse/internal/notify" "git.saqut.com/saqut/mwse/internal/protocol" @@ -35,6 +36,7 @@ func (noopTrigger) NotifyReplied(notify.Notification) {} // options collects the externally-wired integrations a deployment may supply. type options struct { notifyTrigger NotifyTrigger + bridgeInbox *bridge.Inbox // nil = bridge/send not registered } // Option configures Register. @@ -49,12 +51,25 @@ func WithNotifyTrigger(t NotifyTrigger) Option { } } -// Registry exposes the long-lived stores the services own so the caller (main) -// can manage their lifecycle — for example start the notify/datastore janitors -// that reclaim expired entries. +// WithBridgeInbox enables the bridge/send handler (#46). Clients can then +// route messages into the inbox, which the application server drains via +// POST /api/bridge/inbox. +func WithBridgeInbox(inbox *bridge.Inbox) Option { + return func(o *options) { + if inbox != nil { + o.bridgeInbox = inbox + } + } +} + +// Registry exposes the long-lived stores and optional subsystems so the caller +// (main) can manage their lifecycle. type Registry struct { Notify *notify.Store Data *datastore.Store + // Bridge is the inbox registered via WithBridgeInbox; nil when bridge is not + // configured. The HTTP layer drains it via POST /api/bridge/inbox. + Bridge *bridge.Inbox } // Register wires every service onto the hub and returns a Registry of the stores @@ -75,10 +90,15 @@ func Register(hub *ws.Hub, opts ...Option) *Registry { registerIPPressure(hub, nil) registerSession(hub) - return &Registry{ + reg := &Registry{ Notify: registerNotify(hub, o.notifyTrigger), Data: registerDatastore(hub), } + if o.bridgeInbox != nil { + registerBridge(hub, o.bridgeInbox) + reg.Bridge = o.bridgeInbox + } + return reg } // ---- small response helpers --------------------------------------------- diff --git a/internal/ws/server.go b/internal/ws/server.go index 0aff603..690e626 100644 --- a/internal/ws/server.go +++ b/internal/ws/server.go @@ -23,6 +23,15 @@ const ( pingPayload = "saQut" ) +// Approver decides whether to accept an incoming WebSocket connection. It is +// called before the HTTP upgrade with the pre-assigned client id and a metadata +// map derived from the request (ip, userAgent). Returning false causes a 403 +// Forbidden response; the upgrade never happens. When nil, all connections are +// accepted (the original open-door behaviour). +type Approver interface { + Approve(id string, meta map[string]any) bool +} + // Options tunes the transport for scale. Zero fields fall back to defaults. type Options struct { OutboundBuffer int // per-connection send queue depth @@ -32,6 +41,8 @@ type Options struct { PingInterval time.Duration // heartbeat period PongWait time.Duration // max wait for a pong before dropping WriteWait time.Duration // per-write socket deadline + // Approver is an optional connection gate. Nil accepts all connections. + Approver Approver } // DefaultOptions returns the built-in tuning. @@ -108,19 +119,34 @@ func NewServer(hub *Hub, opts ...Options) *Server { // ServeHTTP implements http.Handler: it upgrades the request and hands the // connection to the lifecycle. It is the WebSocket endpoint. +// +// When an Approver is configured it is consulted before the upgrade so the HTTP +// response can still carry a 403; once the upgrade succeeds the protocol is +// WebSocket and HTTP-level rejection is no longer possible. func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { + id := newUUID() + if s.opts.Approver != nil { + meta := map[string]any{ + "ip": r.RemoteAddr, + "userAgent": r.UserAgent(), + } + if !s.opts.Approver.Approve(id, meta) { + http.Error(w, "Connection rejected", http.StatusForbidden) + return + } + } conn, err := s.upgrader.Upgrade(w, r, nil) if err != nil { log.Printf("ws: upgrade failed: %v", err) return } - s.handle(conn) + s.handleWithID(conn, id) } -// handle drives one connection from accept to disconnect. It is generic over the -// Conn interface so tests can feed it a scripted in-memory connection. -func (s *Server) handle(conn Conn) { - client := newClient(conn, newUUID(), s.opts.OutboundBuffer, s.opts.WriteWait) +// handleWithID drives one connection from accept to disconnect. It is generic over +// the Conn interface so tests can feed it a scripted in-memory connection. +func (s *Server) handleWithID(conn Conn, id string) { + client := newClient(conn, id, s.opts.OutboundBuffer, s.opts.WriteWait) // Connect: register, start the writer, fire connect listeners (id, private // room, session defaults). Must happen before the read loop so the client can diff --git a/internal/ws/ws_test.go b/internal/ws/ws_test.go index 2fafba3..9d56182 100644 --- a/internal/ws/ws_test.go +++ b/internal/ws/ws_test.go @@ -276,7 +276,7 @@ func TestServerNoReplyOnNilResult(t *testing.T) { srv := NewServer(hub) fc := testutil.NewFakeConn() - go srv.handle(fc) + go srv.handleWithID(fc, newUUID()) fc.Push([]byte(`[{"type":"wom"}, 9, "R"]`)) // Give a wrong reply a chance to appear. @@ -295,7 +295,7 @@ func TestServerHandleRepliesToRequest(t *testing.T) { srv := NewServer(hub) fc := testutil.NewFakeConn() - go srv.handle(fc) + go srv.handleWithID(fc, newUUID()) fc.Push([]byte(`[{"type":"ping"}, 5, "R"]`)) waitFor(t, func() bool { return fc.WriteCount() >= 1 }) diff --git a/main.go b/main.go index 059f923..a999d9a 100644 --- a/main.go +++ b/main.go @@ -14,6 +14,7 @@ import ( "syscall" "time" + "git.saqut.com/saqut/mwse/internal/bridge" "git.saqut.com/saqut/mwse/internal/config" "git.saqut.com/saqut/mwse/internal/httpserver" "git.saqut.com/saqut/mwse/internal/services" @@ -24,7 +25,34 @@ func main() { cfg := config.Load() hub := ws.NewHub() - reg := services.Register(hub) + + // Bridge (3rd-party server integration, #46) — optional; activated by env vars. + // BRIDGE_APPROVE_URL: before each WebSocket upgrade, MWSE POSTs the candidate + // client id + metadata there; a non-200 or {"approve":false} body rejects it. + // BRIDGE_TRIGGER_URL: when a suit notification is replied to, MWSE POSTs the + // reply there so the app is notified immediately instead of polling. + var srvOpts httpserver.ServerOptions + var svcOpts []services.Option + if approveURL := os.Getenv("BRIDGE_APPROVE_URL"); approveURL != "" { + srvOpts.Approver = bridge.NewHTTPApprover(approveURL, 0) + log.Printf("bridge: connection approval delegated to %s", approveURL) + } + if triggerURL := os.Getenv("BRIDGE_TRIGGER_URL"); triggerURL != "" { + svcOpts = append(svcOpts, services.WithNotifyTrigger(bridge.NewHTTPTrigger(triggerURL, 0))) + log.Printf("bridge: suit-reply trigger wired to %s", triggerURL) + } + // The inbox is always created so bridge/send and POST /api/bridge/inbox are + // available once either bridge env var is set, or if the inbox is otherwise + // useful. When neither env var is set, the services option is simply not added + // and the bridge/send handler is not registered. + if srvOpts.Approver != nil || os.Getenv("BRIDGE_INBOX") != "" { + inbox := bridge.NewInbox(0) + svcOpts = append(svcOpts, services.WithBridgeInbox(inbox)) + srvOpts.BridgeInbox = inbox + log.Printf("bridge: inbox enabled (POST /api/bridge/inbox)") + } + + reg := services.Register(hub, svcOpts...) // The notify and datastore stores hold entries with a TTL; their janitors // reclaim expired ones so memory cannot grow without bound. They are stopped @@ -34,7 +62,7 @@ func main() { defer stopNotify() defer stopData() - srv := httpserver.New(hub, cfg) + srv := httpserver.New(hub, cfg, srvOpts) // Run the listener in the background so main can wait for a shutdown signal. serverErr := make(chan error, 1)