diff --git a/REVIEW.md b/REVIEW.md index 1d244a7..4663022 100644 --- a/REVIEW.md +++ b/REVIEW.md @@ -10,11 +10,11 @@ **Karar gereken:** Actor modeli mi istiyorsun, yoksa bu yeterli mi? (Önerim: bu yeterli ve daha sade. Aktör'e geçmek büyük yeniden yazım; bu tasarım 1.0.0 yük profillerinde de ölçeklenir.) -## 2. P2P request/response zinciri (#23/#24) +## 2. P2P request/response zinciri (#23/#24) — ÇÖZÜLDÜ (#33) -**Durum:** `request/to` + `response/to` Node'a **sadık** portlandı. SDK request'i action 'R' ile gönderdiğinden sunucu anında `[null, id, 'E']` yanıtlıyor; eşin asıl cevabı `response/to` ile sonra geliyor ama o id artık SDK tarafında "tamamlanmış" sayılabilir. +**Önceki sorun:** SDK request'i action 'R' ile gönderdiğinden sunucu anında `[null, id, 'E']` yanıtlıyordu; eşin asıl cevabı `response/to` ile sonra geliyordu ama o id SDK tarafında zaten silinmiş oluyordu (cevap kayboluyordu / `mwse.request` patlıyordu). -**İncelenecek:** Bu, Node'da da var olan bir uyumsuzluk. Düzeltmek **ya SDK** (`request/to`'yu fire-and-forget gönder, cevabı ayrı bir id ile beklesin) **ya da** dispatcher'a relay-farkında bir istisna gerektirir — ikisi de "donmuş sözleşme"yi etkiler, o yüzden senin kararın. Feature-parity (1.0.0) işi. +**Çözüm (#33):** Dispatcher artık handler `nil` döndürdüğünde **yanıt göndermiyor** (`nil` = "ben yanıt vermeyeceğim / cevap out-of-band gelecek"). `request/to` nil döndürür → erken `[null,id,'E']` yok → asıl cevap `response/to` ile aynı id üzerinden gelir. SDK tarafında WOM yolu `EventPool.only()` ile `request()`'ten ayrıldı. Bu, donmuş sözleşmenin **şekillerini** değiştirmez; yalnızca SDK'nın kullanamadığı sahte bir `null`-`E` frame'ini kaldırır. Regression: `TestServerNoReplyOnNilResult`, `TestRequestResponseRoundTrip`. ## 3. `/api` kontrol düzlemi — ertelenen uçlar diff --git a/decisions.md b/decisions.md index eb219ca..b187d70 100644 --- a/decisions.md +++ b/decisions.md @@ -24,7 +24,7 @@ Kullanıcı talimatı: "tüm fonksiyonların aynı olması önemli değil; amaç ## Bilinçli sadakat (Node davranışı korundu) -- **`request/to` → 'E' yanıtı:** SDK request'i action 'R' ile gönderdiğinden generic dispatcher hemen `[null, id, 'E']` yanıtlar; eş cevabı `response/to` ile sonra gelir. Bu Node ile bire bir aynı (ve aynı uyumsuzluğu taşır). Dispatcher'ı özel-durumlamak tel sözleşmesini değiştirebileceğinden DOKUNULMADI. → `REVIEW.md`. +- ~~**`request/to` → 'E' yanıtı:**~~ **DÜZELTİLDİ (#33)** — aşağıdaki §"#33" bölümüne bakın. Eskiden generic dispatcher action 'R' için hemen `[null,id,'E']` yanıtlıyor, eşin asıl cevabını (response/to) eziyordu. Artık handler `nil` dönerse dispatcher yanıt göndermez. - **`auth/info` çift gönderim:** Hem pair hem roompair olan bir peer iki `pair/info` alır (Node ile aynı). - **Heartbeat:** 10sn `saQut` ping; pong "saQut" değilse bağlantı kapanır (Node ile aynı). @@ -48,6 +48,16 @@ Hedef: çok yüksek bağlantı sayısı + sürekli mesaj trafiği; limitleri/poo ### #27 paritesi — eklenen 13. **`ifexistsJoin`:** create-room'da isim çakışırsa hata yerine mevcut odaya katıl (tek tur "create or join"). +## #33 — EventPool WOM (askıda kalan promise) düzeltmesi + +**Kök neden:** İki yönlü hata. (a) Engine dispatcher, numeric-id + action 'R' olan HER frame'e anında `[result, id, 'E']` yanıtlıyordu — handler `nil` dönse bile `[null,id,'E']`. (b) SDK `EventPool.request()` WOM (fire-and-forget) paketler için bile bir waiter (promise) kaydediyordu. Sonuç: `request/to` gibi cevabı **out-of-band** (`response/to`) gelen paketlerde, sahte `[null,id,'E']` waiter'ı erken çözüp **siliyor**, gerçek cevap kayboluyordu; saf WOM paketlerde ise (ör. `pack/to`) gereksiz waiter kalıyordu. + +**Çözüm:** +14. **Engine:** dispatcher artık handler `nil` döndürürse yanıt göndermez (`server.go`). `nil` = "yanıtım yok / cevap out-of-band gelecek". Tüm gerçek request handler'ları non-nil döner (reply alır); yalnızca relay/WOM handler'ları (`pack/to` handshake'siz, `request/to`, `response/to`, `pack/room` handshake'siz) nil döner ve sessiz kalır. Bu, tel **şekillerini** değiştirmez; sadece SDK'nın kullanamadığı sahte frame'i kaldırır. +15. **SDK:** `EventPool.only(msg)` eklendi — WOM yolu, waiter bırakmaz. `Peer.send` (pack/to) ve `Room.send` handshake'siz dalı artık `only()` kullanır. `request()` yalnızca cevap bekleyen paketlere ayrıldı. Public API (peer.send/room.send imzaları) değişmedi. + +**Regression:** `internal/ws` `TestServerNoReplyOnNilResult`; `internal/services` `TestRequestResponseRoundTrip` (out-of-band cevap aynı id ile geri geliyor, erken yanıt yok). + ## Session varsayılanları `packrecaive`/`packsending`/`notifyPairInfo`/`notifyRoomInfo` varsayılanları **`NewClient` constructor'ında** set ediliyor (listener sırasından bağımsız her zaman mevcut). Session servisinin connect hook'u parite için yine de bunları yeniden uyguluyor. diff --git a/frontend/EventPool.ts b/frontend/EventPool.ts index 6c970e7..89799d2 100644 --- a/frontend/EventPool.ts +++ b/frontend/EventPool.ts @@ -13,6 +13,12 @@ export default class EventPool constructor(wsts:MWSE){ this.wsts = wsts; } + /** + * request sends a packet that expects a correlated reply and resolves with it. + * Use it ONLY for response-bearing packets. For fire-and-forget (WOM) packets + * use only(): registering a waiter for a packet the server never answers leaves + * a promise pending forever (issue #33). + */ public request(msg: Message) : Promise { return new Promise((ok,rej) => { @@ -28,6 +34,16 @@ export default class EventPool this.wsts.WSTSProtocol.SendRequest(msg, id); }) } + /** + * only is the WOM (without-me / fire-and-forget) path: it sends the packet and + * leaves NO pending waiter. The engine deliberately does not reply to these + * relays (it returns nil), so there is nothing to await. This is the separation + * issue #33 requires: request() = response-bearing, only() = WOM. + */ + public only(msg: Message) + { + this.wsts.WSTSProtocol.SendOnly(msg); + } public stream(msg: Message, callback: Function) { let id = ++this.count; diff --git a/frontend/Peer.ts b/frontend/Peer.ts index 87367bd..5a19c36 100644 --- a/frontend/Peer.ts +++ b/frontend/Peer.ts @@ -213,7 +213,10 @@ export default class Peer extends EventTarget if(!this.mwse.writable){ return console.warn("Socket is not writable"); } - await this.mwse.EventPooling.request({ + // WOM (fire-and-forget): a plain peer.send expects no reply, so use the + // only() path and leave no pending waiter. Using request() here would + // register a promise the engine never resolves (issue #33). + this.mwse.EventPooling.only({ type:'pack/to', pack, to: this.socketId diff --git a/frontend/Room.ts b/frontend/Room.ts index f0dcf22..20d2f24 100644 --- a/frontend/Room.ts +++ b/frontend/Room.ts @@ -135,7 +135,10 @@ export default class Room extends EventTarget throw new Error("Cant send message to room") } }else{ - await this.mwse.EventPooling.request({ + // WOM broadcast: fire-and-forget, no waiter (issue #33). When the caller + // wants delivery confirmation it passes handshake=true (branch above), + // which the engine answers with a {type} ack. + this.mwse.EventPooling.only({ type:'pack/room', pack, to: this.roomId, diff --git a/internal/services/datatransfer_test.go b/internal/services/datatransfer_test.go new file mode 100644 index 0000000..eacd87b --- /dev/null +++ b/internal/services/datatransfer_test.go @@ -0,0 +1,112 @@ +package services + +import ( + "encoding/json" + "strings" + "testing" + + "git.saqut.com/saqut/mwse/internal/testutil" +) + +// findReply scans the captured frames for a [payload, id] frame whose id slot is +// the given number. This is how a peer's response/to answer reaches the original +// requester: the numeric request id sits in the signal slot so the SDK's event +// pool resolves the matching pending promise. +func findReply(fc *testutil.FakeConn, id float64) (map[string]any, bool) { + for _, raw := range fc.Writes() { + var arr []any + if json.Unmarshal(raw, &arr) != nil || len(arr) < 2 { + continue + } + if n, ok := arr[1].(float64); ok && n == id { + payload, _ := arr[0].(map[string]any) + return payload, true + } + } + return nil, false +} + +// TestRequestResponseRoundTrip is the #30 (data tunneling) + #33 (WOM) core: a +// request/to is answered out-of-band by the peer's response/to carrying the same +// id. The request/to handler itself must return nil so the engine sends no +// premature reply that would clobber the pending request. +func TestRequestResponseRoundTrip(t *testing.T) { + hub := newHub() + a, fa := connect(hub, "a") + b, fb := connect(hub, "b") + + // a sends a request to b with request id 7. + if r := hub.Handle(a, msg("request/to", "to", "b", "pack", map[string]any{"q": "ping"})); r != nil { + t.Fatalf("request/to must return nil (answered out-of-band), got %v", r) + } + + // b receives the request signal, with a's id and the payload, but NO source IP. + req := waitSignal(t, fb, "request") + if req["from"] != "a" { + t.Fatalf("request from = %v, want a", req["from"]) + } + assertNoAddressLeak(t, req) + + // b answers with response/to using the original request id. + if r := hub.Handle(b, msg("response/to", "to", "a", "id", float64(7), "pack", map[string]any{"a": "pong"})); r != nil { + t.Fatalf("response/to must return nil, got %v", r) + } + + // a receives [ {from:"b", pack:{a:"pong"}}, 7 ] — resolving request id 7. + waitFor(t, func() bool { _, ok := findReply(fa, 7); return ok }) + ans, _ := findReply(fa, 7) + if ans["from"] != "b" { + t.Fatalf("answer from = %v, want b", ans["from"]) + } + pack := asMap(t, ans["pack"]) + if pack["a"] != "pong" { + t.Fatalf("answer pack = %v, want {a:pong}", pack) + } + _ = b +} + +// TestTunnelDoesNotLeakSourceAddress verifies the virtualization requirement of +// #30: a relayed pack carries only the logical sender id and the payload, never +// the sender's real IP or device type. +func TestTunnelDoesNotLeakSourceAddress(t *testing.T) { + hub := newHub() + a, _ := connect(hub, "a") + _, fb := connect(hub, "b") + + hub.Handle(a, msg("pack/to", "to", "b", "pack", map[string]any{"hi": true})) + got := waitSignal(t, fb, "pack") + assertNoAddressLeak(t, got) + if got["from"] != "a" { + t.Fatalf("pack from = %v, want a", got["from"]) + } +} + +// TestTunnelLargePayloadIntact verifies #30's large-payload requirement: a big +// pack is relayed byte-for-byte (the engine never truncates or mangles it). The +// transport's MaxMessageSize default (16 MiB) comfortably covers chunked file +// transfer frames. +func TestTunnelLargePayloadIntact(t *testing.T) { + hub := newHub() + a, _ := connect(hub, "a") + _, fb := connect(hub, "b") + + big := strings.Repeat("x", 1<<20) // 1 MiB chunk + hub.Handle(a, msg("pack/to", "to", "b", "pack", map[string]any{"chunk": big})) + + got := waitSignal(t, fb, "pack") + pack := asMap(t, got["pack"]) + if s, _ := pack["chunk"].(string); len(s) != len(big) { + t.Fatalf("relayed chunk length = %d, want %d", len(s), len(big)) + } +} + +// assertNoAddressLeak fails if a relayed payload exposes anything resembling the +// sender's real network identity. +func assertNoAddressLeak(t *testing.T, payload map[string]any) { + t.Helper() + for _, k := range []string{"ip", "address", "remoteAddr", "host", "device", "deviceType"} { + if _, ok := payload[k]; ok { + t.Fatalf("relayed payload leaked %q: %v", k, payload) + } + } +} diff --git a/internal/ws/server.go b/internal/ws/server.go index c66e609..0aff603 100644 --- a/internal/ws/server.go +++ b/internal/ws/server.go @@ -180,7 +180,16 @@ func (s *Server) dispatch(c *Client, data []byte) { result := s.hub.Handle(c, env.Message) - if flag, ok := env.WantsReply(); ok { + // Reply only when the handler actually produced a result. A nil result marks a + // handler that is either fire-and-forget (a WOM relay such as pack/to without a + // handshake) or one that will be answered out-of-band later: request/to is + // answered by the peer's response/to frame carrying the *same* request id. + // + // Sending a premature [null, id, "E"] in those cases would resolve and then + // delete the SDK's pending request before the real answer arrived — exactly the + // #33 "promise hangs / gets clobbered" bug. Returning nil from a handler is the + // engine's explicit "no reply from me" signal. See decisions.md. + if flag, ok := env.WantsReply(); ok && result != nil { c.Send(protocol.Reply(result, env.ID, flag)) return } diff --git a/internal/ws/ws_test.go b/internal/ws/ws_test.go index 53c956f..2fafba3 100644 --- a/internal/ws/ws_test.go +++ b/internal/ws/ws_test.go @@ -266,6 +266,27 @@ func TestPairReverseIndex(t *testing.T) { } } +// TestServerNoReplyOnNilResult is the #33 regression at the engine level: a +// handler that returns nil (a fire-and-forget / WOM relay, or one that answers +// out-of-band) must NOT cause a premature [null, id, "E"] reply. Such a reply +// would resolve and discard the SDK's pending request before the real answer. +func TestServerNoReplyOnNilResult(t *testing.T) { + hub := NewHub() + hub.Register("wom", func(c *Client, m protocol.Message) any { return nil }) + srv := NewServer(hub) + + fc := testutil.NewFakeConn() + go srv.handle(fc) + + fc.Push([]byte(`[{"type":"wom"}, 9, "R"]`)) + // Give a wrong reply a chance to appear. + time.Sleep(30 * time.Millisecond) + if fc.WriteCount() != 0 { + t.Fatalf("nil-returning handler produced %d reply frames, want 0", fc.WriteCount()) + } + fc.Close() +} + func TestServerHandleRepliesToRequest(t *testing.T) { hub := NewHub() hub.Register("ping", func(c *Client, m protocol.Message) any {