From 6a4d71a1227cb3d791a71a5b8f6e69e92776b4e4 Mon Sep 17 00:00:00 2001 From: sawka Date: Sun, 21 Dec 2025 12:13:06 -0800 Subject: [PATCH 1/6] fix multi-level routing arch --- pkg/wshutil/wshrouter.go | 35 +++++++++++++++-------------------- 1 file changed, 15 insertions(+), 20 deletions(-) diff --git a/pkg/wshutil/wshrouter.go b/pkg/wshutil/wshrouter.go index acf5632c3b..ad4d989dae 100644 --- a/pkg/wshutil/wshrouter.go +++ b/pkg/wshutil/wshrouter.go @@ -169,20 +169,15 @@ func (router *WshRouter) getRouteInfo(rpcId string) *routeInfo { } func (router *WshRouter) handleAnnounceMessage(msg RpcMessage, input msgAndRoute) { - // if we have an upstream, send it there - // if we don't (we are the terminal router), then add it to our announced route map + if msg.Source != input.fromRouteId { + router.Lock.Lock() + router.AnnouncedRoutes[msg.Source] = input.fromRouteId + router.Lock.Unlock() + } upstream := router.GetUpstreamClient() if upstream != nil { upstream.SendRpcMessage(input.msgBytes, "announce-upstream") - return - } - if msg.Source == input.fromRouteId { - // not necessary to save the id mapping - return } - router.Lock.Lock() - defer router.Lock.Unlock() - router.AnnouncedRoutes[msg.Source] = input.fromRouteId } func (router *WshRouter) handleUnannounceMessage(msg RpcMessage) { @@ -204,21 +199,21 @@ func (router *WshRouter) sendRoutedMessage(msgBytes []byte, routeId string) bool rpc.SendRpcMessage(msgBytes, "route") return true } + localRouteId := router.getAnnouncedRoute(routeId) + if localRouteId != "" { + rpc := router.GetRpc(localRouteId) + if rpc != nil { + rpc.SendRpcMessage(msgBytes, "route-local") + return true + } + } upstream := router.GetUpstreamClient() if upstream != nil { upstream.SendRpcMessage(msgBytes, "route-upstream") return true - } else { - // we are the upstream, so consult our announced routes map - localRouteId := router.getAnnouncedRoute(routeId) - rpc := router.GetRpc(localRouteId) - if rpc == nil { - log.Printf("[router] no rpc for route id %q\n", routeId) - return false - } - rpc.SendRpcMessage(msgBytes, "route-local") - return true } + log.Printf("[router] no rpc for route id %q\n", routeId) + return false } func (router *WshRouter) runServer() { From c57392bd99fa75d616f2467226446aadc69888e0 Mon Sep 17 00:00:00 2001 From: sawka Date: Mon, 22 Dec 2025 13:00:19 -0800 Subject: [PATCH 2/6] respect timeout for sends --- pkg/wshutil/wshrpc.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/pkg/wshutil/wshrpc.go b/pkg/wshutil/wshrpc.go index 8a90a0790e..59ea49fab4 100644 --- a/pkg/wshutil/wshrpc.go +++ b/pkg/wshutil/wshrpc.go @@ -726,8 +726,13 @@ func (w *WshRpc) SendComplexRequest(command string, data any, opts *wshrpc.RpcOp return nil, err } handler.respCh = w.registerRpc(handler, command, opts.Route, handler.reqId) - w.OutputCh <- barr - return handler, nil + select { + case w.OutputCh <- barr: + return handler, nil + case <-handler.ctx.Done(): + handler.finalize() + return nil, fmt.Errorf("timeout sending request") + } } func (w *WshRpc) IsServerDone() bool { From 87084897b4a901fb50425d6d4245e4b1c5785154 Mon Sep 17 00:00:00 2001 From: sawka Date: Mon, 22 Dec 2025 13:00:55 -0800 Subject: [PATCH 3/6] unannounce should also be sent to the upstream --- pkg/wshutil/wshrouter.go | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/pkg/wshutil/wshrouter.go b/pkg/wshutil/wshrouter.go index ad4d989dae..7fc2e96875 100644 --- a/pkg/wshutil/wshrouter.go +++ b/pkg/wshutil/wshrouter.go @@ -180,10 +180,15 @@ func (router *WshRouter) handleAnnounceMessage(msg RpcMessage, input msgAndRoute } } -func (router *WshRouter) handleUnannounceMessage(msg RpcMessage) { +func (router *WshRouter) handleUnannounceMessage(msg RpcMessage, input msgAndRoute) { router.Lock.Lock() - defer router.Lock.Unlock() delete(router.AnnouncedRoutes, msg.Source) + router.Lock.Unlock() + + upstream := router.GetUpstreamClient() + if upstream != nil { + upstream.SendRpcMessage(input.msgBytes, "unannounce-upstream") + } } func (router *WshRouter) getAnnouncedRoute(routeId string) string { @@ -231,7 +236,7 @@ func (router *WshRouter) runServer() { continue } if msg.Command == wshrpc.Command_RouteUnannounce { - router.handleUnannounceMessage(msg) + router.handleUnannounceMessage(msg, input) continue } if msg.Command != "" { @@ -348,7 +353,6 @@ func (router *WshRouter) RegisterRoute(routeId string, rpc AbstractRpcClient, sh func (router *WshRouter) UnregisterRoute(routeId string) { log.Printf("[router] unregistering wsh route %q\n", routeId) router.Lock.Lock() - defer router.Lock.Unlock() delete(router.RouteMap, routeId) // clear out announced routes for routeId, localRouteId := range router.AnnouncedRoutes { @@ -356,6 +360,15 @@ func (router *WshRouter) UnregisterRoute(routeId string) { delete(router.AnnouncedRoutes, routeId) } } + upstream := router.UpstreamClient + router.Lock.Unlock() + + if upstream != nil { + unannounceMsg := RpcMessage{Command: wshrpc.Command_RouteUnannounce, Source: routeId} + unannounceBytes, _ := json.Marshal(unannounceMsg) + upstream.SendRpcMessage(unannounceBytes, "route-unannounce") + } + go func() { defer func() { panichandler.PanicHandler("WshRouter:unregisterRoute:routegone", recover()) From 01c3c4c4b4e2b94b06e0ae3e1011f741238a19fd Mon Sep 17 00:00:00 2001 From: sawka Date: Mon, 22 Dec 2025 13:15:57 -0800 Subject: [PATCH 4/6] responses now obey timeout context, and sendcancel takes a context... --- pkg/web/web.go | 7 +++++-- pkg/wshrpc/wshclient/wshclientutil.go | 6 +++--- pkg/wshutil/wshrpc.go | 30 ++++++++++++++++++++------- 3 files changed, 31 insertions(+), 12 deletions(-) diff --git a/pkg/web/web.go b/pkg/web/web.go index c5571c694b..28bcccd5a3 100644 --- a/pkg/web/web.go +++ b/pkg/web/web.go @@ -5,6 +5,7 @@ package web import ( "bytes" + "context" "encoding/base64" "encoding/json" "fmt" @@ -254,7 +255,7 @@ func handleRemoteStreamFile(w http.ResponseWriter, req *http.Request, conn strin return handleRemoteStreamFileFromCh(w, req, path, rtnCh, rpcOpts.StreamCancelFn, no404) } -func handleRemoteStreamFileFromCh(w http.ResponseWriter, req *http.Request, path string, rtnCh <-chan wshrpc.RespOrErrorUnion[wshrpc.FileData], streamCancelFn func(), no404 bool) error { +func handleRemoteStreamFileFromCh(w http.ResponseWriter, req *http.Request, path string, rtnCh <-chan wshrpc.RespOrErrorUnion[wshrpc.FileData], streamCancelFn func(context.Context) error, no404 bool) error { firstPk := true var fileInfo *wshrpc.FileInfo loopDone := false @@ -270,7 +271,9 @@ func handleRemoteStreamFileFromCh(w http.ResponseWriter, req *http.Request, path select { case <-ctx.Done(): if streamCancelFn != nil { - streamCancelFn() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + streamCancelFn(ctx) } return ctx.Err() case respUnion, ok := <-rtnCh: diff --git a/pkg/wshrpc/wshclient/wshclientutil.go b/pkg/wshrpc/wshclient/wshclientutil.go index 327466b9ff..52d311c0a9 100644 --- a/pkg/wshrpc/wshclient/wshclientutil.go +++ b/pkg/wshrpc/wshclient/wshclientutil.go @@ -4,6 +4,7 @@ package wshclient import ( + "context" "errors" "github.com/wavetermdev/waveterm/pkg/panichandler" @@ -62,9 +63,8 @@ func sendRpcRequestResponseStreamHelper[T any](w *wshutil.WshRpc, command string rtnErr(respChan, err) return respChan } - opts.StreamCancelFn = func() { - // TODO coordinate the cancel with the for loop below - reqHandler.SendCancel() + opts.StreamCancelFn = func(ctx context.Context) error { + return reqHandler.SendCancel(ctx) } go func() { defer func() { diff --git a/pkg/wshutil/wshrpc.go b/pkg/wshutil/wshrpc.go index 59ea49fab4..4a71f020b3 100644 --- a/pkg/wshutil/wshrpc.go +++ b/pkg/wshutil/wshrpc.go @@ -502,7 +502,7 @@ func (handler *RpcRequestHandler) Context() context.Context { return handler.ctx } -func (handler *RpcRequestHandler) SendCancel() { +func (handler *RpcRequestHandler) SendCancel(ctx context.Context) error { defer func() { panichandler.PanicHandler("SendCancel", recover()) }() @@ -512,8 +512,14 @@ func (handler *RpcRequestHandler) SendCancel() { AuthToken: handler.w.GetAuthToken(), } barr, _ := json.Marshal(msg) // will never fail - handler.w.OutputCh <- barr - handler.finalize() + select { + case handler.w.OutputCh <- barr: + handler.finalize() + return nil + case <-ctx.Done(): + handler.finalize() + return fmt.Errorf("timeout sending cancel") + } } func (handler *RpcRequestHandler) ResponseDone() bool { @@ -609,7 +615,10 @@ func (handler *RpcResponseHandler) SendMessage(msg string) { AuthToken: handler.w.GetAuthToken(), } msgBytes, _ := json.Marshal(rpcMsg) // will never fail - handler.w.OutputCh <- msgBytes + select { + case handler.w.OutputCh <- msgBytes: + case <-handler.ctx.Done(): + } } func (handler *RpcResponseHandler) SendResponse(data any, done bool) error { @@ -635,8 +644,12 @@ func (handler *RpcResponseHandler) SendResponse(data any, done bool) error { if err != nil { return err } - handler.w.OutputCh <- barr - return nil + select { + case handler.w.OutputCh <- barr: + return nil + case <-handler.ctx.Done(): + return fmt.Errorf("timeout sending response") + } } func (handler *RpcResponseHandler) SendResponseError(err error) { @@ -653,7 +666,10 @@ func (handler *RpcResponseHandler) SendResponseError(err error) { AuthToken: handler.w.GetAuthToken(), } barr, _ := json.Marshal(msg) // will never fail - handler.w.OutputCh <- barr + select { + case handler.w.OutputCh <- barr: + case <-handler.ctx.Done(): + } } func (handler *RpcResponseHandler) IsCanceled() bool { From 27fbdd6f9d84e1646aab497bcc59f995d78951dc Mon Sep 17 00:00:00 2001 From: sawka Date: Mon, 22 Dec 2025 14:42:56 -0800 Subject: [PATCH 5/6] fix small bug with cleanup/close when reqid is empty --- pkg/wshutil/wshadapter.go | 2 +- pkg/wshutil/wshrpc.go | 20 +++++++++++++------- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/pkg/wshutil/wshadapter.go b/pkg/wshutil/wshadapter.go index f1760381fc..22667dbfe2 100644 --- a/pkg/wshutil/wshadapter.go +++ b/pkg/wshutil/wshadapter.go @@ -96,7 +96,7 @@ func serverImplAdapter(impl any) func(*RpcResponseHandler) bool { } rmethod := findCmdMethod(impl, cmd) if rmethod == nil { - if !handler.NeedsResponse() { + if !handler.NeedsResponse() && cmd != wshrpc.Command_Message { // we also send an out of band message here since this is likely unexpected and will require debugging handler.SendMessage(fmt.Sprintf("command %q method %q not found", handler.GetCommand(), methodDecl.MethodName)) } diff --git a/pkg/wshutil/wshrpc.go b/pkg/wshutil/wshrpc.go index 4a71f020b3..ebfca5cc9f 100644 --- a/pkg/wshutil/wshrpc.go +++ b/pkg/wshutil/wshrpc.go @@ -313,7 +313,9 @@ func (w *WshRpc) handleRequestInternal(req *RpcMessage, pprofCtx context.Context } respHandler.contextCancelFn.Store(&cancelFn) respHandler.ctx = withRespHandler(ctx, respHandler) - w.registerResponseHandler(req.ReqId, respHandler) + if req.ReqId != "" { + w.registerResponseHandler(req.ReqId, respHandler) + } isAsync := false defer func() { panicErr := panichandler.PanicHandler("handleRequest", recover()) @@ -613,6 +615,7 @@ func (handler *RpcResponseHandler) SendMessage(msg string) { Message: msg, }, AuthToken: handler.w.GetAuthToken(), + Route: handler.source, // send back to source } msgBytes, _ := json.Marshal(rpcMsg) // will never fail select { @@ -625,15 +628,15 @@ func (handler *RpcResponseHandler) SendResponse(data any, done bool) error { defer func() { panichandler.PanicHandler("SendResponse", recover()) }() - if handler.reqId == "" { - return nil // no response expected - } if handler.done.Load() { return fmt.Errorf("request already done, cannot send additional response") } if done { defer handler.close() } + if handler.reqId == "" { + return nil + } msg := &RpcMessage{ ResId: handler.reqId, Data: data, @@ -656,10 +659,13 @@ func (handler *RpcResponseHandler) SendResponseError(err error) { defer func() { panichandler.PanicHandler("SendResponseError", recover()) }() - if handler.reqId == "" || handler.done.Load() { + if handler.done.Load() { return } defer handler.close() + if handler.reqId == "" { + return + } msg := &RpcMessage{ ResId: handler.reqId, Error: err.Error(), @@ -691,11 +697,11 @@ func (handler *RpcResponseHandler) Finalize() { if handler.reqId != "" { handler.w.unregisterResponseHandler(handler.reqId) } - if handler.reqId == "" || handler.done.Load() { + if handler.done.Load() { return } + // SendResponse with done=true will call close() via defer, even when reqId is empty handler.SendResponse(nil, true) - handler.close() } func (handler *RpcResponseHandler) IsDone() bool { From c5aa204c1cd4ce7993b967488ea22dcc1b26ebfe Mon Sep 17 00:00:00 2001 From: sawka Date: Mon, 22 Dec 2025 15:00:18 -0800 Subject: [PATCH 6/6] fix variable aliasing bug --- pkg/wshutil/wshrouter.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/wshutil/wshrouter.go b/pkg/wshutil/wshrouter.go index 7fc2e96875..80c6a038a6 100644 --- a/pkg/wshutil/wshrouter.go +++ b/pkg/wshutil/wshrouter.go @@ -355,9 +355,9 @@ func (router *WshRouter) UnregisterRoute(routeId string) { router.Lock.Lock() delete(router.RouteMap, routeId) // clear out announced routes - for routeId, localRouteId := range router.AnnouncedRoutes { + for announcedRouteId, localRouteId := range router.AnnouncedRoutes { if localRouteId == routeId { - delete(router.AnnouncedRoutes, routeId) + delete(router.AnnouncedRoutes, announcedRouteId) } } upstream := router.UpstreamClient