Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions pkg/web/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package web

import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -254,7 +255,7 @@ func handleRemoteStreamFile(w http.ResponseWriter, req *http.Request, conn strin
return handleRemoteStreamFileFromCh(w, req, path, rtnCh, rpcOpts.StreamCancelFn, no404)
}

func handleRemoteStreamFileFromCh(w http.ResponseWriter, req *http.Request, path string, rtnCh <-chan wshrpc.RespOrErrorUnion[wshrpc.FileData], streamCancelFn func(), no404 bool) error {
func handleRemoteStreamFileFromCh(w http.ResponseWriter, req *http.Request, path string, rtnCh <-chan wshrpc.RespOrErrorUnion[wshrpc.FileData], streamCancelFn func(context.Context) error, no404 bool) error {
firstPk := true
var fileInfo *wshrpc.FileInfo
loopDone := false
Expand All @@ -270,7 +271,9 @@ func handleRemoteStreamFileFromCh(w http.ResponseWriter, req *http.Request, path
select {
case <-ctx.Done():
if streamCancelFn != nil {
streamCancelFn()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
streamCancelFn(ctx)
}
return ctx.Err()
case respUnion, ok := <-rtnCh:
Expand Down
6 changes: 3 additions & 3 deletions pkg/wshrpc/wshclient/wshclientutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package wshclient

import (
"context"
"errors"

"github.com/wavetermdev/waveterm/pkg/panichandler"
Expand Down Expand Up @@ -62,9 +63,8 @@
rtnErr(respChan, err)
return respChan
}
opts.StreamCancelFn = func() {
// TODO coordinate the cancel with the for loop below
reqHandler.SendCancel()
opts.StreamCancelFn = func(ctx context.Context) error {

Check failure on line 66 in pkg/wshrpc/wshclient/wshclientutil.go

View workflow job for this annotation

GitHub Actions / Analyze (javascript-typescript)

cannot use func(ctx context.Context) error {…} (value of type func(ctx context.Context) error) as func() value in assignment

Check failure on line 66 in pkg/wshrpc/wshclient/wshclientutil.go

View workflow job for this annotation

GitHub Actions / Build for TestDriver.ai

cannot use func(ctx context.Context) error {…} (value of type func(ctx context.Context) error) as func() value in assignment

Check failure on line 66 in pkg/wshrpc/wshclient/wshclientutil.go

View workflow job for this annotation

GitHub Actions / Analyze (go)

cannot use func(ctx context.Context) error {…} (value of type func(ctx context.Context) error) as func() value in assignment
return reqHandler.SendCancel(ctx)
}
Comment on lines +66 to 68
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Find RpcOpts type definition and StreamCancelFn signature
rg -n "type RpcOpts|StreamCancelFn" --type=go -A 5 -B 2

Repository: wavetermdev/waveterm

Length of output: 2005


Type mismatch: StreamCancelFn signature incompatible with RpcOpts field definition.

The code assigns func(ctx context.Context) error to opts.StreamCancelFn, but the field in RpcOpts (pkg/wshrpc/wshrpctypes.go:376) is defined as func() with no parameters and no return value. This type mismatch will prevent compilation. The RpcOpts.StreamCancelFn field definition must be updated to func(context.Context) error to match the new signature.

🧰 Tools
🪛 golangci-lint (2.5.0)

[error] 66-66: : # github.com/wavetermdev/waveterm/pkg/wshrpc/wshclient
pkg/wshrpc/wshclient/wshclientutil.go:66:24: cannot use func(ctx context.Context) error {…} (value of type func(ctx context.Context) error) as func() value in assignment

(typecheck)

🤖 Prompt for AI Agents
In pkg/wshrpc/wshclient/wshclientutil.go around lines 66-68, opts.StreamCancelFn
is assigned a func(ctx context.Context) error but RpcOpts.StreamCancelFn is
declared as func() in pkg/wshrpc/wshrpctypes.go (around line 376); update the
RpcOpts.StreamCancelFn field type to func(context.Context) error to match the
new signature, then update any other callers/assignments of StreamCancelFn to
accept and return the context/error signature (or wrap them where appropriate),
and run go build/go test to ensure all usages compile.

go func() {
defer func() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/wshutil/wshadapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func serverImplAdapter(impl any) func(*RpcResponseHandler) bool {
}
rmethod := findCmdMethod(impl, cmd)
if rmethod == nil {
if !handler.NeedsResponse() {
if !handler.NeedsResponse() && cmd != wshrpc.Command_Message {
// we also send an out of band message here since this is likely unexpected and will require debugging
handler.SendMessage(fmt.Sprintf("command %q method %q not found", handler.GetCommand(), methodDecl.MethodName))
}
Expand Down
60 changes: 34 additions & 26 deletions pkg/wshutil/wshrouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,26 +169,26 @@ func (router *WshRouter) getRouteInfo(rpcId string) *routeInfo {
}

func (router *WshRouter) handleAnnounceMessage(msg RpcMessage, input msgAndRoute) {
// if we have an upstream, send it there
// if we don't (we are the terminal router), then add it to our announced route map
if msg.Source != input.fromRouteId {
router.Lock.Lock()
router.AnnouncedRoutes[msg.Source] = input.fromRouteId
router.Lock.Unlock()
}
upstream := router.GetUpstreamClient()
if upstream != nil {
upstream.SendRpcMessage(input.msgBytes, "announce-upstream")
return
}
if msg.Source == input.fromRouteId {
// not necessary to save the id mapping
return
}
router.Lock.Lock()
defer router.Lock.Unlock()
router.AnnouncedRoutes[msg.Source] = input.fromRouteId
}

func (router *WshRouter) handleUnannounceMessage(msg RpcMessage) {
func (router *WshRouter) handleUnannounceMessage(msg RpcMessage, input msgAndRoute) {
router.Lock.Lock()
defer router.Lock.Unlock()
delete(router.AnnouncedRoutes, msg.Source)
router.Lock.Unlock()

upstream := router.GetUpstreamClient()
if upstream != nil {
upstream.SendRpcMessage(input.msgBytes, "unannounce-upstream")
}
}

func (router *WshRouter) getAnnouncedRoute(routeId string) string {
Expand All @@ -204,21 +204,21 @@ func (router *WshRouter) sendRoutedMessage(msgBytes []byte, routeId string) bool
rpc.SendRpcMessage(msgBytes, "route")
return true
}
localRouteId := router.getAnnouncedRoute(routeId)
if localRouteId != "" {
rpc := router.GetRpc(localRouteId)
if rpc != nil {
rpc.SendRpcMessage(msgBytes, "route-local")
return true
}
}
upstream := router.GetUpstreamClient()
if upstream != nil {
upstream.SendRpcMessage(msgBytes, "route-upstream")
return true
} else {
// we are the upstream, so consult our announced routes map
localRouteId := router.getAnnouncedRoute(routeId)
rpc := router.GetRpc(localRouteId)
if rpc == nil {
log.Printf("[router] no rpc for route id %q\n", routeId)
return false
}
rpc.SendRpcMessage(msgBytes, "route-local")
return true
}
log.Printf("[router] no rpc for route id %q\n", routeId)
return false
}

func (router *WshRouter) runServer() {
Expand All @@ -236,7 +236,7 @@ func (router *WshRouter) runServer() {
continue
}
if msg.Command == wshrpc.Command_RouteUnannounce {
router.handleUnannounceMessage(msg)
router.handleUnannounceMessage(msg, input)
continue
}
if msg.Command != "" {
Expand Down Expand Up @@ -353,14 +353,22 @@ func (router *WshRouter) RegisterRoute(routeId string, rpc AbstractRpcClient, sh
func (router *WshRouter) UnregisterRoute(routeId string) {
log.Printf("[router] unregistering wsh route %q\n", routeId)
router.Lock.Lock()
defer router.Lock.Unlock()
delete(router.RouteMap, routeId)
// clear out announced routes
for routeId, localRouteId := range router.AnnouncedRoutes {
for announcedRouteId, localRouteId := range router.AnnouncedRoutes {
if localRouteId == routeId {
delete(router.AnnouncedRoutes, routeId)
delete(router.AnnouncedRoutes, announcedRouteId)
}
}
upstream := router.UpstreamClient
router.Lock.Unlock()

if upstream != nil {
unannounceMsg := RpcMessage{Command: wshrpc.Command_RouteUnannounce, Source: routeId}
unannounceBytes, _ := json.Marshal(unannounceMsg)
upstream.SendRpcMessage(unannounceBytes, "route-unannounce")
}

go func() {
defer func() {
panichandler.PanicHandler("WshRouter:unregisterRoute:routegone", recover())
Expand Down
59 changes: 43 additions & 16 deletions pkg/wshutil/wshrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,9 @@ func (w *WshRpc) handleRequestInternal(req *RpcMessage, pprofCtx context.Context
}
respHandler.contextCancelFn.Store(&cancelFn)
respHandler.ctx = withRespHandler(ctx, respHandler)
w.registerResponseHandler(req.ReqId, respHandler)
if req.ReqId != "" {
w.registerResponseHandler(req.ReqId, respHandler)
}
isAsync := false
defer func() {
panicErr := panichandler.PanicHandler("handleRequest", recover())
Expand Down Expand Up @@ -502,7 +504,7 @@ func (handler *RpcRequestHandler) Context() context.Context {
return handler.ctx
}

func (handler *RpcRequestHandler) SendCancel() {
func (handler *RpcRequestHandler) SendCancel(ctx context.Context) error {
defer func() {
panichandler.PanicHandler("SendCancel", recover())
}()
Expand All @@ -512,8 +514,14 @@ func (handler *RpcRequestHandler) SendCancel() {
AuthToken: handler.w.GetAuthToken(),
}
barr, _ := json.Marshal(msg) // will never fail
handler.w.OutputCh <- barr
handler.finalize()
select {
case handler.w.OutputCh <- barr:
handler.finalize()
return nil
case <-ctx.Done():
handler.finalize()
return fmt.Errorf("timeout sending cancel")
}
}

func (handler *RpcRequestHandler) ResponseDone() bool {
Expand Down Expand Up @@ -607,24 +615,28 @@ func (handler *RpcResponseHandler) SendMessage(msg string) {
Message: msg,
},
AuthToken: handler.w.GetAuthToken(),
Route: handler.source, // send back to source
}
msgBytes, _ := json.Marshal(rpcMsg) // will never fail
handler.w.OutputCh <- msgBytes
select {
case handler.w.OutputCh <- msgBytes:
case <-handler.ctx.Done():
}
}

func (handler *RpcResponseHandler) SendResponse(data any, done bool) error {
defer func() {
panichandler.PanicHandler("SendResponse", recover())
}()
if handler.reqId == "" {
return nil // no response expected
}
if handler.done.Load() {
return fmt.Errorf("request already done, cannot send additional response")
}
if done {
defer handler.close()
}
if handler.reqId == "" {
return nil
}
msg := &RpcMessage{
ResId: handler.reqId,
Data: data,
Expand All @@ -635,25 +647,35 @@ func (handler *RpcResponseHandler) SendResponse(data any, done bool) error {
if err != nil {
return err
}
handler.w.OutputCh <- barr
return nil
select {
case handler.w.OutputCh <- barr:
return nil
case <-handler.ctx.Done():
return fmt.Errorf("timeout sending response")
}
}

func (handler *RpcResponseHandler) SendResponseError(err error) {
defer func() {
panichandler.PanicHandler("SendResponseError", recover())
}()
if handler.reqId == "" || handler.done.Load() {
if handler.done.Load() {
return
}
defer handler.close()
if handler.reqId == "" {
return
}
msg := &RpcMessage{
ResId: handler.reqId,
Error: err.Error(),
AuthToken: handler.w.GetAuthToken(),
}
barr, _ := json.Marshal(msg) // will never fail
handler.w.OutputCh <- barr
select {
case handler.w.OutputCh <- barr:
case <-handler.ctx.Done():
}
}

func (handler *RpcResponseHandler) IsCanceled() bool {
Expand All @@ -675,11 +697,11 @@ func (handler *RpcResponseHandler) Finalize() {
if handler.reqId != "" {
handler.w.unregisterResponseHandler(handler.reqId)
}
if handler.reqId == "" || handler.done.Load() {
if handler.done.Load() {
return
}
// SendResponse with done=true will call close() via defer, even when reqId is empty
handler.SendResponse(nil, true)
handler.close()
}

func (handler *RpcResponseHandler) IsDone() bool {
Expand Down Expand Up @@ -726,8 +748,13 @@ func (w *WshRpc) SendComplexRequest(command string, data any, opts *wshrpc.RpcOp
return nil, err
}
handler.respCh = w.registerRpc(handler, command, opts.Route, handler.reqId)
w.OutputCh <- barr
return handler, nil
select {
case w.OutputCh <- barr:
return handler, nil
case <-handler.ctx.Done():
handler.finalize()
return nil, fmt.Errorf("timeout sending request")
}
}

func (w *WshRpc) IsServerDone() bool {
Expand Down
Loading