#33: EventPool WOM — askıda kalan promise düzeltmesi

Engine: dispatcher handler nil dönerse yanıt göndermez (nil = yanıt yok /
cevap out-of-band gelecek). Bu, request/to'nun erken [null,id,'E'] ile
ezilmesini önler; gerçek cevap response/to ile aynı id üzerinden gelir.

SDK: EventPool.only() WOM yolu eklendi (waiter bırakmaz); Peer.send ve
Room.send handshake'siz dalı request() yerine only() kullanır. Public API
değişmedi.

Test: TestServerNoReplyOnNilResult, TestRequestResponseRoundTrip,
TestTunnelDoesNotLeakSourceAddress, TestTunnelLargePayloadIntact. go test -race yeşil.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
abdussamedulutas 2026-06-17 08:11:29 +03:00
parent f079ef5325
commit 91ebbeffb2
8 changed files with 181 additions and 7 deletions

View File

@ -10,11 +10,11 @@
**Karar gereken:** Actor modeli mi istiyorsun, yoksa bu yeterli mi? (Önerim: bu yeterli ve daha sade. Aktör'e geçmek büyük yeniden yazım; bu tasarım 1.0.0 yük profillerinde de ölçeklenir.) **Karar gereken:** Actor modeli mi istiyorsun, yoksa bu yeterli mi? (Önerim: bu yeterli ve daha sade. Aktör'e geçmek büyük yeniden yazım; bu tasarım 1.0.0 yük profillerinde de ölçeklenir.)
## 2. P2P request/response zinciri (#23/#24) ## 2. P2P request/response zinciri (#23/#24) — ÇÖZÜLDÜ (#33)
**Durum:** `request/to` + `response/to` Node'a **sadık** portlandı. SDK request'i action 'R' ile gönderdiğinden sunucu anında `[null, id, 'E']` yanıtlıyor; eşin asıl cevabı `response/to` ile sonra geliyor ama o id artık SDK tarafında "tamamlanmış" sayılabilir. **Önceki sorun:** SDK request'i action 'R' ile gönderdiğinden sunucu anında `[null, id, 'E']` yanıtlıyordu; eşin asıl cevabı `response/to` ile sonra geliyordu ama o id SDK tarafında zaten silinmiş oluyordu (cevap kayboluyordu / `mwse.request` patlıyordu).
**İncelenecek:** Bu, Node'da da var olan bir uyumsuzluk. Düzeltmek **ya SDK** (`request/to`'yu fire-and-forget gönder, cevabı ayrı bir id ile beklesin) **ya da** dispatcher'a relay-farkında bir istisna gerektirir — ikisi de "donmuş sözleşme"yi etkiler, o yüzden senin kararın. Feature-parity (1.0.0) işi. **Çözüm (#33):** Dispatcher artık handler `nil` döndürdüğünde **yanıt göndermiyor** (`nil` = "ben yanıt vermeyeceğim / cevap out-of-band gelecek"). `request/to` nil döndürür → erken `[null,id,'E']` yok → asıl cevap `response/to` ile aynı id üzerinden gelir. SDK tarafında WOM yolu `EventPool.only()` ile `request()`'ten ayrıldı. Bu, donmuş sözleşmenin **şekillerini** değiştirmez; yalnızca SDK'nın kullanamadığı sahte bir `null`-`E` frame'ini kaldırır. Regression: `TestServerNoReplyOnNilResult`, `TestRequestResponseRoundTrip`.
## 3. `/api` kontrol düzlemi — ertelenen uçlar ## 3. `/api` kontrol düzlemi — ertelenen uçlar

View File

@ -24,7 +24,7 @@ Kullanıcı talimatı: "tüm fonksiyonların aynı olması önemli değil; amaç
## Bilinçli sadakat (Node davranışı korundu) ## Bilinçli sadakat (Node davranışı korundu)
- **`request/to` → 'E' yanıtı:** SDK request'i action 'R' ile gönderdiğinden generic dispatcher hemen `[null, id, 'E']` yanıtlar; eş cevabı `response/to` ile sonra gelir. Bu Node ile bire bir aynı (ve aynı uyumsuzluğu taşır). Dispatcher'ı özel-durumlamak tel sözleşmesini değiştirebileceğinden DOKUNULMADI. → `REVIEW.md`. - ~~**`request/to` → 'E' yanıtı:**~~ **DÜZELTİLDİ (#33)** — aşağıdaki §"#33" bölümüne bakın. Eskiden generic dispatcher action 'R' için hemen `[null,id,'E']` yanıtlıyor, eşin asıl cevabını (response/to) eziyordu. Artık handler `nil` dönerse dispatcher yanıt göndermez.
- **`auth/info` çift gönderim:** Hem pair hem roompair olan bir peer iki `pair/info` alır (Node ile aynı). - **`auth/info` çift gönderim:** Hem pair hem roompair olan bir peer iki `pair/info` alır (Node ile aynı).
- **Heartbeat:** 10sn `saQut` ping; pong "saQut" değilse bağlantı kapanır (Node ile aynı). - **Heartbeat:** 10sn `saQut` ping; pong "saQut" değilse bağlantı kapanır (Node ile aynı).
@ -48,6 +48,16 @@ Hedef: çok yüksek bağlantı sayısı + sürekli mesaj trafiği; limitleri/poo
### #27 paritesi — eklenen ### #27 paritesi — eklenen
13. **`ifexistsJoin`:** create-room'da isim çakışırsa hata yerine mevcut odaya katıl (tek tur "create or join"). 13. **`ifexistsJoin`:** create-room'da isim çakışırsa hata yerine mevcut odaya katıl (tek tur "create or join").
## #33 — EventPool WOM (askıda kalan promise) düzeltmesi
**Kök neden:** İki yönlü hata. (a) Engine dispatcher, numeric-id + action 'R' olan HER frame'e anında `[result, id, 'E']` yanıtlıyordu — handler `nil` dönse bile `[null,id,'E']`. (b) SDK `EventPool.request()` WOM (fire-and-forget) paketler için bile bir waiter (promise) kaydediyordu. Sonuç: `request/to` gibi cevabı **out-of-band** (`response/to`) gelen paketlerde, sahte `[null,id,'E']` waiter'ı erken çözüp **siliyor**, gerçek cevap kayboluyordu; saf WOM paketlerde ise (ör. `pack/to`) gereksiz waiter kalıyordu.
**Çözüm:**
14. **Engine:** dispatcher artık handler `nil` döndürürse yanıt göndermez (`server.go`). `nil` = "yanıtım yok / cevap out-of-band gelecek". Tüm gerçek request handler'ları non-nil döner (reply alır); yalnızca relay/WOM handler'ları (`pack/to` handshake'siz, `request/to`, `response/to`, `pack/room` handshake'siz) nil döner ve sessiz kalır. Bu, tel **şekillerini** değiştirmez; sadece SDK'nın kullanamadığı sahte frame'i kaldırır.
15. **SDK:** `EventPool.only(msg)` eklendi — WOM yolu, waiter bırakmaz. `Peer.send` (pack/to) ve `Room.send` handshake'siz dalı artık `only()` kullanır. `request()` yalnızca cevap bekleyen paketlere ayrıldı. Public API (peer.send/room.send imzaları) değişmedi.
**Regression:** `internal/ws` `TestServerNoReplyOnNilResult`; `internal/services` `TestRequestResponseRoundTrip` (out-of-band cevap aynı id ile geri geliyor, erken yanıt yok).
## Session varsayılanları ## Session varsayılanları
`packrecaive`/`packsending`/`notifyPairInfo`/`notifyRoomInfo` varsayılanları **`NewClient` constructor'ında** set ediliyor (listener sırasından bağımsız her zaman mevcut). Session servisinin connect hook'u parite için yine de bunları yeniden uyguluyor. `packrecaive`/`packsending`/`notifyPairInfo`/`notifyRoomInfo` varsayılanları **`NewClient` constructor'ında** set ediliyor (listener sırasından bağımsız her zaman mevcut). Session servisinin connect hook'u parite için yine de bunları yeniden uyguluyor.

View File

@ -13,6 +13,12 @@ export default class EventPool
constructor(wsts:MWSE){ constructor(wsts:MWSE){
this.wsts = wsts; this.wsts = wsts;
} }
/**
* request sends a packet that expects a correlated reply and resolves with it.
* Use it ONLY for response-bearing packets. For fire-and-forget (WOM) packets
* use only(): registering a waiter for a packet the server never answers leaves
* a promise pending forever (issue #33).
*/
public request(msg: Message) : Promise<any> public request(msg: Message) : Promise<any>
{ {
return new Promise((ok,rej) => { return new Promise((ok,rej) => {
@ -28,6 +34,16 @@ export default class EventPool
this.wsts.WSTSProtocol.SendRequest(msg, id); this.wsts.WSTSProtocol.SendRequest(msg, id);
}) })
} }
/**
* only is the WOM (without-me / fire-and-forget) path: it sends the packet and
* leaves NO pending waiter. The engine deliberately does not reply to these
* relays (it returns nil), so there is nothing to await. This is the separation
* issue #33 requires: request() = response-bearing, only() = WOM.
*/
public only(msg: Message)
{
this.wsts.WSTSProtocol.SendOnly(msg);
}
public stream(msg: Message, callback: Function) public stream(msg: Message, callback: Function)
{ {
let id = ++this.count; let id = ++this.count;

View File

@ -213,7 +213,10 @@ export default class Peer extends EventTarget
if(!this.mwse.writable){ if(!this.mwse.writable){
return console.warn("Socket is not writable"); return console.warn("Socket is not writable");
} }
await this.mwse.EventPooling.request({ // WOM (fire-and-forget): a plain peer.send expects no reply, so use the
// only() path and leave no pending waiter. Using request() here would
// register a promise the engine never resolves (issue #33).
this.mwse.EventPooling.only({
type:'pack/to', type:'pack/to',
pack, pack,
to: this.socketId to: this.socketId

View File

@ -135,7 +135,10 @@ export default class Room extends EventTarget
throw new Error("Cant send message to room") throw new Error("Cant send message to room")
} }
}else{ }else{
await this.mwse.EventPooling.request({ // WOM broadcast: fire-and-forget, no waiter (issue #33). When the caller
// wants delivery confirmation it passes handshake=true (branch above),
// which the engine answers with a {type} ack.
this.mwse.EventPooling.only({
type:'pack/room', type:'pack/room',
pack, pack,
to: this.roomId, to: this.roomId,

View File

@ -0,0 +1,112 @@
package services
import (
"encoding/json"
"strings"
"testing"
"git.saqut.com/saqut/mwse/internal/testutil"
)
// findReply scans the captured frames for a [payload, id] frame whose id slot is
// the given number. This is how a peer's response/to answer reaches the original
// requester: the numeric request id sits in the signal slot so the SDK's event
// pool resolves the matching pending promise.
func findReply(fc *testutil.FakeConn, id float64) (map[string]any, bool) {
for _, raw := range fc.Writes() {
var arr []any
if json.Unmarshal(raw, &arr) != nil || len(arr) < 2 {
continue
}
if n, ok := arr[1].(float64); ok && n == id {
payload, _ := arr[0].(map[string]any)
return payload, true
}
}
return nil, false
}
// TestRequestResponseRoundTrip is the #30 (data tunneling) + #33 (WOM) core: a
// request/to is answered out-of-band by the peer's response/to carrying the same
// id. The request/to handler itself must return nil so the engine sends no
// premature reply that would clobber the pending request.
func TestRequestResponseRoundTrip(t *testing.T) {
hub := newHub()
a, fa := connect(hub, "a")
b, fb := connect(hub, "b")
// a sends a request to b with request id 7.
if r := hub.Handle(a, msg("request/to", "to", "b", "pack", map[string]any{"q": "ping"})); r != nil {
t.Fatalf("request/to must return nil (answered out-of-band), got %v", r)
}
// b receives the request signal, with a's id and the payload, but NO source IP.
req := waitSignal(t, fb, "request")
if req["from"] != "a" {
t.Fatalf("request from = %v, want a", req["from"])
}
assertNoAddressLeak(t, req)
// b answers with response/to using the original request id.
if r := hub.Handle(b, msg("response/to", "to", "a", "id", float64(7), "pack", map[string]any{"a": "pong"})); r != nil {
t.Fatalf("response/to must return nil, got %v", r)
}
// a receives [ {from:"b", pack:{a:"pong"}}, 7 ] — resolving request id 7.
waitFor(t, func() bool { _, ok := findReply(fa, 7); return ok })
ans, _ := findReply(fa, 7)
if ans["from"] != "b" {
t.Fatalf("answer from = %v, want b", ans["from"])
}
pack := asMap(t, ans["pack"])
if pack["a"] != "pong" {
t.Fatalf("answer pack = %v, want {a:pong}", pack)
}
_ = b
}
// TestTunnelDoesNotLeakSourceAddress verifies the virtualization requirement of
// #30: a relayed pack carries only the logical sender id and the payload, never
// the sender's real IP or device type.
func TestTunnelDoesNotLeakSourceAddress(t *testing.T) {
hub := newHub()
a, _ := connect(hub, "a")
_, fb := connect(hub, "b")
hub.Handle(a, msg("pack/to", "to", "b", "pack", map[string]any{"hi": true}))
got := waitSignal(t, fb, "pack")
assertNoAddressLeak(t, got)
if got["from"] != "a" {
t.Fatalf("pack from = %v, want a", got["from"])
}
}
// TestTunnelLargePayloadIntact verifies #30's large-payload requirement: a big
// pack is relayed byte-for-byte (the engine never truncates or mangles it). The
// transport's MaxMessageSize default (16 MiB) comfortably covers chunked file
// transfer frames.
func TestTunnelLargePayloadIntact(t *testing.T) {
hub := newHub()
a, _ := connect(hub, "a")
_, fb := connect(hub, "b")
big := strings.Repeat("x", 1<<20) // 1 MiB chunk
hub.Handle(a, msg("pack/to", "to", "b", "pack", map[string]any{"chunk": big}))
got := waitSignal(t, fb, "pack")
pack := asMap(t, got["pack"])
if s, _ := pack["chunk"].(string); len(s) != len(big) {
t.Fatalf("relayed chunk length = %d, want %d", len(s), len(big))
}
}
// assertNoAddressLeak fails if a relayed payload exposes anything resembling the
// sender's real network identity.
func assertNoAddressLeak(t *testing.T, payload map[string]any) {
t.Helper()
for _, k := range []string{"ip", "address", "remoteAddr", "host", "device", "deviceType"} {
if _, ok := payload[k]; ok {
t.Fatalf("relayed payload leaked %q: %v", k, payload)
}
}
}

View File

@ -180,7 +180,16 @@ func (s *Server) dispatch(c *Client, data []byte) {
result := s.hub.Handle(c, env.Message) result := s.hub.Handle(c, env.Message)
if flag, ok := env.WantsReply(); ok { // Reply only when the handler actually produced a result. A nil result marks a
// handler that is either fire-and-forget (a WOM relay such as pack/to without a
// handshake) or one that will be answered out-of-band later: request/to is
// answered by the peer's response/to frame carrying the *same* request id.
//
// Sending a premature [null, id, "E"] in those cases would resolve and then
// delete the SDK's pending request before the real answer arrived — exactly the
// #33 "promise hangs / gets clobbered" bug. Returning nil from a handler is the
// engine's explicit "no reply from me" signal. See decisions.md.
if flag, ok := env.WantsReply(); ok && result != nil {
c.Send(protocol.Reply(result, env.ID, flag)) c.Send(protocol.Reply(result, env.ID, flag))
return return
} }

View File

@ -266,6 +266,27 @@ func TestPairReverseIndex(t *testing.T) {
} }
} }
// TestServerNoReplyOnNilResult is the #33 regression at the engine level: a
// handler that returns nil (a fire-and-forget / WOM relay, or one that answers
// out-of-band) must NOT cause a premature [null, id, "E"] reply. Such a reply
// would resolve and discard the SDK's pending request before the real answer.
func TestServerNoReplyOnNilResult(t *testing.T) {
hub := NewHub()
hub.Register("wom", func(c *Client, m protocol.Message) any { return nil })
srv := NewServer(hub)
fc := testutil.NewFakeConn()
go srv.handle(fc)
fc.Push([]byte(`[{"type":"wom"}, 9, "R"]`))
// Give a wrong reply a chance to appear.
time.Sleep(30 * time.Millisecond)
if fc.WriteCount() != 0 {
t.Fatalf("nil-returning handler produced %d reply frames, want 0", fc.WriteCount())
}
fc.Close()
}
func TestServerHandleRepliesToRequest(t *testing.T) { func TestServerHandleRepliesToRequest(t *testing.T) {
hub := NewHub() hub := NewHub()
hub.Register("ping", func(c *Client, m protocol.Message) any { hub.Register("ping", func(c *Client, m protocol.Message) any {