Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions pkg/kubernetes/provider_watch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package kubernetes

import (
"os"
"reflect"
"sync"
"testing"
"time"

"github.com/containers/kubernetes-mcp-server/internal/test"
"github.com/containers/kubernetes-mcp-server/pkg/config"
"github.com/stretchr/testify/suite"
)

type ProviderWatchTargetsTestSuite struct {
suite.Suite
mockServer *test.MockServer
}

func (s *ProviderWatchTargetsTestSuite) SetupTest() {
s.mockServer = test.NewMockServer()

s.T().Setenv("CLUSTER_STATE_POLL_INTERVAL_MS", "100")
s.T().Setenv("CLUSTER_STATE_DEBOUNCE_WINDOW_MS", "50")
}

func (s *ProviderWatchTargetsTestSuite) TearDownTest() {
s.mockServer.Close()
}

// WaitForWatchTargets sets up a WatchTargets callback, executes the provided function, and waits for the callback to be invoked.
func (s *ProviderWatchTargetsTestSuite) WaitForWatchTargets(timeout time.Duration, provider Provider, fn func()) {
callbackInvoked := make(chan struct{})
var once sync.Once
provider.WatchTargets(func() error {
once.Do(func() {
close(callbackInvoked)
})
return nil
})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no guarantee here that the watcher has successfully started listening for events. I feel like this might introduce a race?

Copy link
Member Author

@manusa manusa Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a utility callback for the tests (not production code).

It's used to wait for the callback to be invoked and then proceed with the assertions.
If the watchers haven't started (or any other issue) the callback will never be invoked and the tests will fail after the provided timeout.

I haven't seen any concurrency issues so far.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, thinking it better, I understand that maybe you're referring that the fn() is invoked without guarantees of watchers having started.

For the watcher.Kubeconfig the code is synchronous and shouldn't be an issue, the watch is started by the time the method returns.

For watcher.ClusterState the code is not synchronous, but shouldn't be an issue either.

In all cases, the code executed in the enclosed fn() is what unblocks the callbackInvoked select.
If there's a race condition the test simply won't pass.

fn()
select {
case <-callbackInvoked:
// Callback was invoked
case <-time.After(timeout):
s.Fail("Timeout waiting for callback to be invoked")
}
}

// WriteKubeconfig appends a newline to the kubeconfig file to trigger the file watcher.
func (s *ProviderWatchTargetsTestSuite) WriteKubeconfig(k *Kubernetes) {
f, err := os.OpenFile(k.ToRawKubeConfigLoader().ConfigAccess().GetExplicitFile(), os.O_APPEND|os.O_WRONLY, 0644)
s.Require().NoError(err, "Expected no error opening kubeconfig file")
_, err = f.WriteString("\n")
s.Require().NoError(err, "Expected no error writing to kubeconfig file")
s.Require().NoError(f.Close(), "Expected no error closing kubeconfig file")
}

func (s *ProviderWatchTargetsTestSuite) TestKubeconfigCacheInvalidation() {
s.mockServer.Handle(&test.DiscoveryClientHandler{})
staticConfig := &config.StaticConfig{KubeConfig: test.KubeconfigFile(s.T(), s.mockServer.Kubeconfig())}

testCases := []func() (Provider, error){
func() (Provider, error) { return newKubeConfigClusterProvider(staticConfig) },
func() (Provider, error) {
return newSingleClusterProvider(config.ClusterProviderDisabled)(staticConfig)
},
}
for _, tc := range testCases {
provider, err := tc()
s.Require().NoError(err, "Expected no error from provider creation")

s.Run("With provider "+reflect.TypeOf(provider).String(), func() {
k, err := provider.GetDerivedKubernetes(s.T().Context(), provider.GetDefaultTarget())
s.Require().NoError(err, "Expected no error from GetDerivedKubernetes")

s.Run("given a fresh cache", func() {
_, err := k.AccessControlClientset().DiscoveryClient().ServerGroups()
s.Require().NoError(err, "Expected no error from AccessControlClientset")
s.Require().True(k.AccessControlClientset().DiscoveryClient().Fresh())
})

s.Run("invalidates caches (fresh==false) when kubeconfig is changed", func() {
s.WaitForWatchTargets(5*time.Second, provider, func() {
s.WriteKubeconfig(k)
})
s.Require().False(k.AccessControlClientset().DiscoveryClient().Fresh())
})
})
}
}

func TestProviderWatchTargetsTestSuite(t *testing.T) {
suite.Run(t, new(ProviderWatchTargetsTestSuite))
}