#33: EventPool WOM — askıda kalan promise düzeltmesi
Engine: dispatcher handler nil dönerse yanıt göndermez (nil = yanıt yok / cevap out-of-band gelecek). Bu, request/to'nun erken [null,id,'E'] ile ezilmesini önler; gerçek cevap response/to ile aynı id üzerinden gelir. SDK: EventPool.only() WOM yolu eklendi (waiter bırakmaz); Peer.send ve Room.send handshake'siz dalı request() yerine only() kullanır. Public API değişmedi. Test: TestServerNoReplyOnNilResult, TestRequestResponseRoundTrip, TestTunnelDoesNotLeakSourceAddress, TestTunnelLargePayloadIntact. go test -race yeşil. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
parent
f079ef5325
commit
91ebbeffb2
|
|
@ -10,11 +10,11 @@
|
||||||
|
|
||||||
**Karar gereken:** Actor modeli mi istiyorsun, yoksa bu yeterli mi? (Önerim: bu yeterli ve daha sade. Aktör'e geçmek büyük yeniden yazım; bu tasarım 1.0.0 yük profillerinde de ölçeklenir.)
|
**Karar gereken:** Actor modeli mi istiyorsun, yoksa bu yeterli mi? (Önerim: bu yeterli ve daha sade. Aktör'e geçmek büyük yeniden yazım; bu tasarım 1.0.0 yük profillerinde de ölçeklenir.)
|
||||||
|
|
||||||
## 2. P2P request/response zinciri (#23/#24)
|
## 2. P2P request/response zinciri (#23/#24) — ÇÖZÜLDÜ (#33)
|
||||||
|
|
||||||
**Durum:** `request/to` + `response/to` Node'a **sadık** portlandı. SDK request'i action 'R' ile gönderdiğinden sunucu anında `[null, id, 'E']` yanıtlıyor; eşin asıl cevabı `response/to` ile sonra geliyor ama o id artık SDK tarafında "tamamlanmış" sayılabilir.
|
**Önceki sorun:** SDK request'i action 'R' ile gönderdiğinden sunucu anında `[null, id, 'E']` yanıtlıyordu; eşin asıl cevabı `response/to` ile sonra geliyordu ama o id SDK tarafında zaten silinmiş oluyordu (cevap kayboluyordu / `mwse.request` patlıyordu).
|
||||||
|
|
||||||
**İncelenecek:** Bu, Node'da da var olan bir uyumsuzluk. Düzeltmek **ya SDK** (`request/to`'yu fire-and-forget gönder, cevabı ayrı bir id ile beklesin) **ya da** dispatcher'a relay-farkında bir istisna gerektirir — ikisi de "donmuş sözleşme"yi etkiler, o yüzden senin kararın. Feature-parity (1.0.0) işi.
|
**Çözüm (#33):** Dispatcher artık handler `nil` döndürdüğünde **yanıt göndermiyor** (`nil` = "ben yanıt vermeyeceğim / cevap out-of-band gelecek"). `request/to` nil döndürür → erken `[null,id,'E']` yok → asıl cevap `response/to` ile aynı id üzerinden gelir. SDK tarafında WOM yolu `EventPool.only()` ile `request()`'ten ayrıldı. Bu, donmuş sözleşmenin **şekillerini** değiştirmez; yalnızca SDK'nın kullanamadığı sahte bir `null`-`E` frame'ini kaldırır. Regression: `TestServerNoReplyOnNilResult`, `TestRequestResponseRoundTrip`.
|
||||||
|
|
||||||
## 3. `/api` kontrol düzlemi — ertelenen uçlar
|
## 3. `/api` kontrol düzlemi — ertelenen uçlar
|
||||||
|
|
||||||
|
|
|
||||||
12
decisions.md
12
decisions.md
|
|
@ -24,7 +24,7 @@ Kullanıcı talimatı: "tüm fonksiyonların aynı olması önemli değil; amaç
|
||||||
|
|
||||||
## Bilinçli sadakat (Node davranışı korundu)
|
## Bilinçli sadakat (Node davranışı korundu)
|
||||||
|
|
||||||
- **`request/to` → 'E' yanıtı:** SDK request'i action 'R' ile gönderdiğinden generic dispatcher hemen `[null, id, 'E']` yanıtlar; eş cevabı `response/to` ile sonra gelir. Bu Node ile bire bir aynı (ve aynı uyumsuzluğu taşır). Dispatcher'ı özel-durumlamak tel sözleşmesini değiştirebileceğinden DOKUNULMADI. → `REVIEW.md`.
|
- ~~**`request/to` → 'E' yanıtı:**~~ **DÜZELTİLDİ (#33)** — aşağıdaki §"#33" bölümüne bakın. Eskiden generic dispatcher action 'R' için hemen `[null,id,'E']` yanıtlıyor, eşin asıl cevabını (response/to) eziyordu. Artık handler `nil` dönerse dispatcher yanıt göndermez.
|
||||||
- **`auth/info` çift gönderim:** Hem pair hem roompair olan bir peer iki `pair/info` alır (Node ile aynı).
|
- **`auth/info` çift gönderim:** Hem pair hem roompair olan bir peer iki `pair/info` alır (Node ile aynı).
|
||||||
- **Heartbeat:** 10sn `saQut` ping; pong "saQut" değilse bağlantı kapanır (Node ile aynı).
|
- **Heartbeat:** 10sn `saQut` ping; pong "saQut" değilse bağlantı kapanır (Node ile aynı).
|
||||||
|
|
||||||
|
|
@ -48,6 +48,16 @@ Hedef: çok yüksek bağlantı sayısı + sürekli mesaj trafiği; limitleri/poo
|
||||||
### #27 paritesi — eklenen
|
### #27 paritesi — eklenen
|
||||||
13. **`ifexistsJoin`:** create-room'da isim çakışırsa hata yerine mevcut odaya katıl (tek tur "create or join").
|
13. **`ifexistsJoin`:** create-room'da isim çakışırsa hata yerine mevcut odaya katıl (tek tur "create or join").
|
||||||
|
|
||||||
|
## #33 — EventPool WOM (askıda kalan promise) düzeltmesi
|
||||||
|
|
||||||
|
**Kök neden:** İki yönlü hata. (a) Engine dispatcher, numeric-id + action 'R' olan HER frame'e anında `[result, id, 'E']` yanıtlıyordu — handler `nil` dönse bile `[null,id,'E']`. (b) SDK `EventPool.request()` WOM (fire-and-forget) paketler için bile bir waiter (promise) kaydediyordu. Sonuç: `request/to` gibi cevabı **out-of-band** (`response/to`) gelen paketlerde, sahte `[null,id,'E']` waiter'ı erken çözüp **siliyor**, gerçek cevap kayboluyordu; saf WOM paketlerde ise (ör. `pack/to`) gereksiz waiter kalıyordu.
|
||||||
|
|
||||||
|
**Çözüm:**
|
||||||
|
14. **Engine:** dispatcher artık handler `nil` döndürürse yanıt göndermez (`server.go`). `nil` = "yanıtım yok / cevap out-of-band gelecek". Tüm gerçek request handler'ları non-nil döner (reply alır); yalnızca relay/WOM handler'ları (`pack/to` handshake'siz, `request/to`, `response/to`, `pack/room` handshake'siz) nil döner ve sessiz kalır. Bu, tel **şekillerini** değiştirmez; sadece SDK'nın kullanamadığı sahte frame'i kaldırır.
|
||||||
|
15. **SDK:** `EventPool.only(msg)` eklendi — WOM yolu, waiter bırakmaz. `Peer.send` (pack/to) ve `Room.send` handshake'siz dalı artık `only()` kullanır. `request()` yalnızca cevap bekleyen paketlere ayrıldı. Public API (peer.send/room.send imzaları) değişmedi.
|
||||||
|
|
||||||
|
**Regression:** `internal/ws` `TestServerNoReplyOnNilResult`; `internal/services` `TestRequestResponseRoundTrip` (out-of-band cevap aynı id ile geri geliyor, erken yanıt yok).
|
||||||
|
|
||||||
## Session varsayılanları
|
## Session varsayılanları
|
||||||
|
|
||||||
`packrecaive`/`packsending`/`notifyPairInfo`/`notifyRoomInfo` varsayılanları **`NewClient` constructor'ında** set ediliyor (listener sırasından bağımsız her zaman mevcut). Session servisinin connect hook'u parite için yine de bunları yeniden uyguluyor.
|
`packrecaive`/`packsending`/`notifyPairInfo`/`notifyRoomInfo` varsayılanları **`NewClient` constructor'ında** set ediliyor (listener sırasından bağımsız her zaman mevcut). Session servisinin connect hook'u parite için yine de bunları yeniden uyguluyor.
|
||||||
|
|
|
||||||
|
|
@ -13,6 +13,12 @@ export default class EventPool
|
||||||
constructor(wsts:MWSE){
|
constructor(wsts:MWSE){
|
||||||
this.wsts = wsts;
|
this.wsts = wsts;
|
||||||
}
|
}
|
||||||
|
/**
|
||||||
|
* request sends a packet that expects a correlated reply and resolves with it.
|
||||||
|
* Use it ONLY for response-bearing packets. For fire-and-forget (WOM) packets
|
||||||
|
* use only(): registering a waiter for a packet the server never answers leaves
|
||||||
|
* a promise pending forever (issue #33).
|
||||||
|
*/
|
||||||
public request(msg: Message) : Promise<any>
|
public request(msg: Message) : Promise<any>
|
||||||
{
|
{
|
||||||
return new Promise((ok,rej) => {
|
return new Promise((ok,rej) => {
|
||||||
|
|
@ -28,6 +34,16 @@ export default class EventPool
|
||||||
this.wsts.WSTSProtocol.SendRequest(msg, id);
|
this.wsts.WSTSProtocol.SendRequest(msg, id);
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
/**
|
||||||
|
* only is the WOM (without-me / fire-and-forget) path: it sends the packet and
|
||||||
|
* leaves NO pending waiter. The engine deliberately does not reply to these
|
||||||
|
* relays (it returns nil), so there is nothing to await. This is the separation
|
||||||
|
* issue #33 requires: request() = response-bearing, only() = WOM.
|
||||||
|
*/
|
||||||
|
public only(msg: Message)
|
||||||
|
{
|
||||||
|
this.wsts.WSTSProtocol.SendOnly(msg);
|
||||||
|
}
|
||||||
public stream(msg: Message, callback: Function)
|
public stream(msg: Message, callback: Function)
|
||||||
{
|
{
|
||||||
let id = ++this.count;
|
let id = ++this.count;
|
||||||
|
|
|
||||||
|
|
@ -213,7 +213,10 @@ export default class Peer extends EventTarget
|
||||||
if(!this.mwse.writable){
|
if(!this.mwse.writable){
|
||||||
return console.warn("Socket is not writable");
|
return console.warn("Socket is not writable");
|
||||||
}
|
}
|
||||||
await this.mwse.EventPooling.request({
|
// WOM (fire-and-forget): a plain peer.send expects no reply, so use the
|
||||||
|
// only() path and leave no pending waiter. Using request() here would
|
||||||
|
// register a promise the engine never resolves (issue #33).
|
||||||
|
this.mwse.EventPooling.only({
|
||||||
type:'pack/to',
|
type:'pack/to',
|
||||||
pack,
|
pack,
|
||||||
to: this.socketId
|
to: this.socketId
|
||||||
|
|
|
||||||
|
|
@ -135,7 +135,10 @@ export default class Room extends EventTarget
|
||||||
throw new Error("Cant send message to room")
|
throw new Error("Cant send message to room")
|
||||||
}
|
}
|
||||||
}else{
|
}else{
|
||||||
await this.mwse.EventPooling.request({
|
// WOM broadcast: fire-and-forget, no waiter (issue #33). When the caller
|
||||||
|
// wants delivery confirmation it passes handshake=true (branch above),
|
||||||
|
// which the engine answers with a {type} ack.
|
||||||
|
this.mwse.EventPooling.only({
|
||||||
type:'pack/room',
|
type:'pack/room',
|
||||||
pack,
|
pack,
|
||||||
to: this.roomId,
|
to: this.roomId,
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,112 @@
|
||||||
|
package services
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"git.saqut.com/saqut/mwse/internal/testutil"
|
||||||
|
)
|
||||||
|
|
||||||
|
// findReply scans the captured frames for a [payload, id] frame whose id slot is
|
||||||
|
// the given number. This is how a peer's response/to answer reaches the original
|
||||||
|
// requester: the numeric request id sits in the signal slot so the SDK's event
|
||||||
|
// pool resolves the matching pending promise.
|
||||||
|
func findReply(fc *testutil.FakeConn, id float64) (map[string]any, bool) {
|
||||||
|
for _, raw := range fc.Writes() {
|
||||||
|
var arr []any
|
||||||
|
if json.Unmarshal(raw, &arr) != nil || len(arr) < 2 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if n, ok := arr[1].(float64); ok && n == id {
|
||||||
|
payload, _ := arr[0].(map[string]any)
|
||||||
|
return payload, true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestRequestResponseRoundTrip is the #30 (data tunneling) + #33 (WOM) core: a
|
||||||
|
// request/to is answered out-of-band by the peer's response/to carrying the same
|
||||||
|
// id. The request/to handler itself must return nil so the engine sends no
|
||||||
|
// premature reply that would clobber the pending request.
|
||||||
|
func TestRequestResponseRoundTrip(t *testing.T) {
|
||||||
|
hub := newHub()
|
||||||
|
a, fa := connect(hub, "a")
|
||||||
|
b, fb := connect(hub, "b")
|
||||||
|
|
||||||
|
// a sends a request to b with request id 7.
|
||||||
|
if r := hub.Handle(a, msg("request/to", "to", "b", "pack", map[string]any{"q": "ping"})); r != nil {
|
||||||
|
t.Fatalf("request/to must return nil (answered out-of-band), got %v", r)
|
||||||
|
}
|
||||||
|
|
||||||
|
// b receives the request signal, with a's id and the payload, but NO source IP.
|
||||||
|
req := waitSignal(t, fb, "request")
|
||||||
|
if req["from"] != "a" {
|
||||||
|
t.Fatalf("request from = %v, want a", req["from"])
|
||||||
|
}
|
||||||
|
assertNoAddressLeak(t, req)
|
||||||
|
|
||||||
|
// b answers with response/to using the original request id.
|
||||||
|
if r := hub.Handle(b, msg("response/to", "to", "a", "id", float64(7), "pack", map[string]any{"a": "pong"})); r != nil {
|
||||||
|
t.Fatalf("response/to must return nil, got %v", r)
|
||||||
|
}
|
||||||
|
|
||||||
|
// a receives [ {from:"b", pack:{a:"pong"}}, 7 ] — resolving request id 7.
|
||||||
|
waitFor(t, func() bool { _, ok := findReply(fa, 7); return ok })
|
||||||
|
ans, _ := findReply(fa, 7)
|
||||||
|
if ans["from"] != "b" {
|
||||||
|
t.Fatalf("answer from = %v, want b", ans["from"])
|
||||||
|
}
|
||||||
|
pack := asMap(t, ans["pack"])
|
||||||
|
if pack["a"] != "pong" {
|
||||||
|
t.Fatalf("answer pack = %v, want {a:pong}", pack)
|
||||||
|
}
|
||||||
|
_ = b
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestTunnelDoesNotLeakSourceAddress verifies the virtualization requirement of
|
||||||
|
// #30: a relayed pack carries only the logical sender id and the payload, never
|
||||||
|
// the sender's real IP or device type.
|
||||||
|
func TestTunnelDoesNotLeakSourceAddress(t *testing.T) {
|
||||||
|
hub := newHub()
|
||||||
|
a, _ := connect(hub, "a")
|
||||||
|
_, fb := connect(hub, "b")
|
||||||
|
|
||||||
|
hub.Handle(a, msg("pack/to", "to", "b", "pack", map[string]any{"hi": true}))
|
||||||
|
got := waitSignal(t, fb, "pack")
|
||||||
|
assertNoAddressLeak(t, got)
|
||||||
|
if got["from"] != "a" {
|
||||||
|
t.Fatalf("pack from = %v, want a", got["from"])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestTunnelLargePayloadIntact verifies #30's large-payload requirement: a big
|
||||||
|
// pack is relayed byte-for-byte (the engine never truncates or mangles it). The
|
||||||
|
// transport's MaxMessageSize default (16 MiB) comfortably covers chunked file
|
||||||
|
// transfer frames.
|
||||||
|
func TestTunnelLargePayloadIntact(t *testing.T) {
|
||||||
|
hub := newHub()
|
||||||
|
a, _ := connect(hub, "a")
|
||||||
|
_, fb := connect(hub, "b")
|
||||||
|
|
||||||
|
big := strings.Repeat("x", 1<<20) // 1 MiB chunk
|
||||||
|
hub.Handle(a, msg("pack/to", "to", "b", "pack", map[string]any{"chunk": big}))
|
||||||
|
|
||||||
|
got := waitSignal(t, fb, "pack")
|
||||||
|
pack := asMap(t, got["pack"])
|
||||||
|
if s, _ := pack["chunk"].(string); len(s) != len(big) {
|
||||||
|
t.Fatalf("relayed chunk length = %d, want %d", len(s), len(big))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// assertNoAddressLeak fails if a relayed payload exposes anything resembling the
|
||||||
|
// sender's real network identity.
|
||||||
|
func assertNoAddressLeak(t *testing.T, payload map[string]any) {
|
||||||
|
t.Helper()
|
||||||
|
for _, k := range []string{"ip", "address", "remoteAddr", "host", "device", "deviceType"} {
|
||||||
|
if _, ok := payload[k]; ok {
|
||||||
|
t.Fatalf("relayed payload leaked %q: %v", k, payload)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -180,7 +180,16 @@ func (s *Server) dispatch(c *Client, data []byte) {
|
||||||
|
|
||||||
result := s.hub.Handle(c, env.Message)
|
result := s.hub.Handle(c, env.Message)
|
||||||
|
|
||||||
if flag, ok := env.WantsReply(); ok {
|
// Reply only when the handler actually produced a result. A nil result marks a
|
||||||
|
// handler that is either fire-and-forget (a WOM relay such as pack/to without a
|
||||||
|
// handshake) or one that will be answered out-of-band later: request/to is
|
||||||
|
// answered by the peer's response/to frame carrying the *same* request id.
|
||||||
|
//
|
||||||
|
// Sending a premature [null, id, "E"] in those cases would resolve and then
|
||||||
|
// delete the SDK's pending request before the real answer arrived — exactly the
|
||||||
|
// #33 "promise hangs / gets clobbered" bug. Returning nil from a handler is the
|
||||||
|
// engine's explicit "no reply from me" signal. See decisions.md.
|
||||||
|
if flag, ok := env.WantsReply(); ok && result != nil {
|
||||||
c.Send(protocol.Reply(result, env.ID, flag))
|
c.Send(protocol.Reply(result, env.ID, flag))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -266,6 +266,27 @@ func TestPairReverseIndex(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestServerNoReplyOnNilResult is the #33 regression at the engine level: a
|
||||||
|
// handler that returns nil (a fire-and-forget / WOM relay, or one that answers
|
||||||
|
// out-of-band) must NOT cause a premature [null, id, "E"] reply. Such a reply
|
||||||
|
// would resolve and discard the SDK's pending request before the real answer.
|
||||||
|
func TestServerNoReplyOnNilResult(t *testing.T) {
|
||||||
|
hub := NewHub()
|
||||||
|
hub.Register("wom", func(c *Client, m protocol.Message) any { return nil })
|
||||||
|
srv := NewServer(hub)
|
||||||
|
|
||||||
|
fc := testutil.NewFakeConn()
|
||||||
|
go srv.handle(fc)
|
||||||
|
|
||||||
|
fc.Push([]byte(`[{"type":"wom"}, 9, "R"]`))
|
||||||
|
// Give a wrong reply a chance to appear.
|
||||||
|
time.Sleep(30 * time.Millisecond)
|
||||||
|
if fc.WriteCount() != 0 {
|
||||||
|
t.Fatalf("nil-returning handler produced %d reply frames, want 0", fc.WriteCount())
|
||||||
|
}
|
||||||
|
fc.Close()
|
||||||
|
}
|
||||||
|
|
||||||
func TestServerHandleRepliesToRequest(t *testing.T) {
|
func TestServerHandleRepliesToRequest(t *testing.T) {
|
||||||
hub := NewHub()
|
hub := NewHub()
|
||||||
hub.Register("ping", func(c *Client, m protocol.Message) any {
|
hub.Register("ping", func(c *Client, m protocol.Message) any {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue