From bb0f70d658ff36e4c938205bf7cbc2d67c52bf10 Mon Sep 17 00:00:00 2001 From: Dominic Reed Date: Tue, 26 Aug 2025 13:38:03 -0700 Subject: [PATCH 1/3] fix racey logic --- service/authorization/v2/authorization.go | 9 +- service/authorization/v2/cache.go | 108 +++++------------- service/authorization/v2/cache_test.go | 16 +-- .../internal/access/v2/just_in_time_pdp.go | 16 +-- service/internal/access/v2/policy_store.go | 31 +++++ service/pkg/cache/cache.go | 4 + 6 files changed, 77 insertions(+), 107 deletions(-) diff --git a/service/authorization/v2/authorization.go b/service/authorization/v2/authorization.go index 0e633dde9e..d582999826 100644 --- a/service/authorization/v2/authorization.go +++ b/service/authorization/v2/authorization.go @@ -15,7 +15,6 @@ import ( otdf "github.com/opentdf/platform/sdk" "github.com/opentdf/platform/service/internal/access/v2" "github.com/opentdf/platform/service/logger" - "github.com/opentdf/platform/service/pkg/cache" "github.com/opentdf/platform/service/pkg/serviceregistry" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/propagation" @@ -78,12 +77,6 @@ func NewRegistration() *serviceregistry.Service[authzV2Connect.AuthorizationServ return as, nil } - cacheClient, err := srp.NewCacheClient(cache.Options{}) - if err != nil || cacheClient == nil { - l.Error("failed to create platform cache client", slog.Any("error", err)) - panic(fmt.Errorf("failed to create platform cache client: %w", err)) - } - refreshInterval, err := time.ParseDuration(authZCfg.Cache.RefreshInterval) if err != nil { l.Error("failed to parse entitlement policy cache refresh interval", slog.Any("error", err)) @@ -91,7 +84,7 @@ func NewRegistration() *serviceregistry.Service[authzV2Connect.AuthorizationServ } retriever := access.NewEntitlementPolicyRetriever(as.sdk) - as.cache, err = NewEntitlementPolicyCache(context.Background(), l, retriever, cacheClient, refreshInterval) + as.cache, err = NewEntitlementPolicyCache(context.Background(), l, retriever, refreshInterval) if err != nil { l.Error("failed to create entitlement policy cache", slog.Any("error", err)) panic(fmt.Errorf("failed to create entitlement policy cache: %w", err)) diff --git a/service/authorization/v2/cache.go b/service/authorization/v2/cache.go index 27baebffdb..21fd83ed0c 100644 --- a/service/authorization/v2/cache.go +++ b/service/authorization/v2/cache.go @@ -3,14 +3,13 @@ package authorization import ( "context" "errors" - "fmt" "log/slog" + "sync" "time" "github.com/opentdf/platform/protocol/go/policy" "github.com/opentdf/platform/service/internal/access/v2" "github.com/opentdf/platform/service/logger" - "github.com/opentdf/platform/service/pkg/cache" ) const ( @@ -41,8 +40,9 @@ var ( // EntitlementPolicyCache caches attributes and subject mappings with periodic refresh type EntitlementPolicyCache struct { - logger *logger.Logger - cacheClient *cache.Cache + logger *logger.Logger + policy access.EntitlementPolicy + mu sync.RWMutex // SDK-connected retriever to fetch fresh data from policy services retriever *access.EntitlementPolicyRetriever @@ -56,13 +56,7 @@ type EntitlementPolicyCache struct { isCacheFilled bool } -// The EntitlementPolicy struct holds all the cached entitlement policy, as generics allow one -// data type per service cache instance. -type EntitlementPolicy struct { - Attributes []*policy.Attribute - SubjectMappings []*policy.SubjectMapping - RegisteredResources []*policy.RegisteredResource -} + // NewEntitlementPolicyCache holds a platform-provided cache client and manages a periodic refresh of // cached entitlement policy data, fetching fresh data from the policy services at configured interval. @@ -70,7 +64,6 @@ func NewEntitlementPolicyCache( ctx context.Context, l *logger.Logger, retriever *access.EntitlementPolicyRetriever, - cacheClient *cache.Cache, cacheRefreshInterval time.Duration, ) (*EntitlementPolicyCache, error) { if cacheRefreshInterval == 0 { @@ -82,7 +75,6 @@ func NewEntitlementPolicyCache( instance := &EntitlementPolicyCache{ logger: l, - cacheClient: cacheClient, retriever: retriever, configuredRefreshInterval: cacheRefreshInterval, stopRefresh: make(chan struct{}), @@ -181,23 +173,12 @@ func (c *EntitlementPolicyCache) Refresh(ctx context.Context) error { return err } - // If there is an error when Setting with fresh data, mark not filled so IsReady() will re-attempt refresh - err = c.cacheClient.Set(ctx, attributesCacheKey, attributes, authzCacheTags) - if err != nil { - c.isCacheFilled = false - return errors.Join(ErrFailedToSet, err) - } - - err = c.cacheClient.Set(ctx, subjectMappingsCacheKey, subjectMappings, authzCacheTags) - if err != nil { - c.isCacheFilled = false - return errors.Join(ErrFailedToSet, err) - } - - err = c.cacheClient.Set(ctx, registeredResourcesCacheKey, registeredResources, authzCacheTags) - if err != nil { - c.isCacheFilled = false - return errors.Join(ErrFailedToSet, err) + c.mu.Lock() + defer c.mu.Unlock() + c.policy = access.EntitlementPolicy{ + Attributes: attributes, + RegisteredResources: registeredResources, + SubjectMappings: subjectMappings, } c.logger.DebugContext(ctx, @@ -215,70 +196,41 @@ func (c *EntitlementPolicyCache) Refresh(ctx context.Context) error { // ListAllAttributes returns the cached attributes func (c *EntitlementPolicyCache) ListAllAttributes(ctx context.Context) ([]*policy.Attribute, error) { - var ( - attributes []*policy.Attribute - ok bool - ) + c.mu.RLock() + defer c.mu.RUnlock() - cached, err := c.cacheClient.Get(ctx, attributesCacheKey) - if err != nil { - if errors.Is(err, cache.ErrCacheMiss) { - return attributes, nil - } - return nil, fmt.Errorf("%w, attributes: %w", ErrFailedToGet, err) - } - - attributes, ok = cached.([]*policy.Attribute) - if !ok { - return nil, fmt.Errorf("%w: %T", ErrCachedTypeNotExpected, attributes) - } + var attributes []*policy.Attribute + attributes = c.policy.Attributes return attributes, nil } // ListAllSubjectMappings returns the cached subject mappings func (c *EntitlementPolicyCache) ListAllSubjectMappings(ctx context.Context) ([]*policy.SubjectMapping, error) { - var ( - subjectMappings []*policy.SubjectMapping - ok bool - ) - - cached, err := c.cacheClient.Get(ctx, subjectMappingsCacheKey) - if err != nil { - if errors.Is(err, cache.ErrCacheMiss) { - return subjectMappings, nil - } - return nil, fmt.Errorf("%w, subject mappings: %w", ErrFailedToGet, err) - } + c.mu.RLock() + defer c.mu.RUnlock() - subjectMappings, ok = cached.([]*policy.SubjectMapping) - if !ok { - return nil, fmt.Errorf("%w: %T", ErrCachedTypeNotExpected, subjectMappings) - } + var subjectMappings []*policy.SubjectMapping + subjectMappings = c.policy.SubjectMappings return subjectMappings, nil } // ListAllRegisteredResources returns the cached registered resources, or none in the event of a cache miss func (c *EntitlementPolicyCache) ListAllRegisteredResources(ctx context.Context) ([]*policy.RegisteredResource, error) { - var ( - registeredResources []*policy.RegisteredResource - ok bool - ) + c.mu.RLock() + defer c.mu.RUnlock() - cached, err := c.cacheClient.Get(ctx, registeredResourcesCacheKey) - if err != nil { - if errors.Is(err, cache.ErrCacheMiss) { - return registeredResources, nil - } - return nil, fmt.Errorf("%w, registered resources: %w", ErrFailedToGet, err) - } - - registeredResources, ok = cached.([]*policy.RegisteredResource) - if !ok { - return nil, fmt.Errorf("%w: %T", ErrCachedTypeNotExpected, registeredResources) - } + var registeredResources []*policy.RegisteredResource + registeredResources = c.policy.RegisteredResources return registeredResources, nil } +func (c *EntitlementPolicyCache) GetEntitlementPolicy(ctx context.Context) (access.EntitlementPolicy, error) { + c.mu.RLock() + defer c.mu.RUnlock() + + return c.policy, nil +} + // periodicRefresh refreshes the cache at the specified interval func (c *EntitlementPolicyCache) periodicRefresh(ctx context.Context) { waitTimeout := c.configuredRefreshInterval diff --git a/service/authorization/v2/cache_test.go b/service/authorization/v2/cache_test.go index 8b56994c36..fab0674b35 100644 --- a/service/authorization/v2/cache_test.go +++ b/service/authorization/v2/cache_test.go @@ -19,9 +19,8 @@ var ( func Test_NewEntitlementPolicyCache(t *testing.T) { ctx := t.Context() refreshInterval := 10 * time.Second - mockCache, _ := cache.TestCacheClient(mockCacheExpiry) - c, err := NewEntitlementPolicyCache(ctx, l, nil, mockCache, refreshInterval) + c, err := NewEntitlementPolicyCache(ctx, l, nil, refreshInterval) require.NoError(t, err) assert.NotNil(t, c) assert.Equal(t, refreshInterval, c.configuredRefreshInterval) @@ -31,13 +30,12 @@ func Test_NewEntitlementPolicyCache(t *testing.T) { func Test_EntitlementPolicyCache_RefreshInterval(t *testing.T) { var refreshInterval time.Duration ctx := t.Context() - mockCache, _ := cache.TestCacheClient(mockCacheExpiry) - _, err := NewEntitlementPolicyCache(ctx, l, nil, mockCache, refreshInterval) + _, err := NewEntitlementPolicyCache(ctx, l, nil, refreshInterval) require.ErrorIs(t, err, ErrCacheDisabled) refreshInterval = 10 * time.Second - c, err := NewEntitlementPolicyCache(ctx, l, nil, mockCache, refreshInterval) + c, err := NewEntitlementPolicyCache(ctx, l, nil, refreshInterval) require.NoError(t, err) assert.NotNil(t, c) } @@ -48,12 +46,11 @@ func Test_EntitlementPolicyCache_Enabled(t *testing.T) { err error ctx = t.Context() refreshInterval = 10 * time.Second - mockCache, _ = cache.TestCacheClient(mockCacheExpiry) ) assert.False(t, c.IsEnabled()) assert.False(t, c.IsReady(ctx)) - c, err = NewEntitlementPolicyCache(ctx, l, nil, mockCache, refreshInterval) + c, err = NewEntitlementPolicyCache(ctx, l, nil, refreshInterval) require.NoError(t, err) assert.NotNil(t, c) assert.True(t, c.IsEnabled()) @@ -63,9 +60,8 @@ func Test_EntitlementPolicyCache_Enabled(t *testing.T) { func Test_EntitlementPolicyCache_CacheMiss(t *testing.T) { ctx := t.Context() - mockCache, _ := cache.TestCacheClient(mockCacheExpiry) - c, err := NewEntitlementPolicyCache(ctx, l, nil, mockCache, 1*time.Hour) + c, err := NewEntitlementPolicyCache(ctx, l, nil, 1*time.Hour) require.NoError(t, err) // No errors, but empty lists on cache misses @@ -93,7 +89,7 @@ func Test_EntitlementPolicyCache_CacheHits(t *testing.T) { _ = mockCache.Set(ctx, subjectMappingsCacheKey, subjMappingsList, nil) _ = mockCache.Set(ctx, registeredResourcesCacheKey, resourcesList, nil) - c, err := NewEntitlementPolicyCache(ctx, l, nil, mockCache, 1*time.Hour) + c, err := NewEntitlementPolicyCache(ctx, l, nil, 1*time.Hour) require.NoError(t, err) // Allow for some concurrency overhead in cache library to prevent flakiness in tests diff --git a/service/internal/access/v2/just_in_time_pdp.go b/service/internal/access/v2/just_in_time_pdp.go index acfae78b0b..024f91fbf1 100644 --- a/service/internal/access/v2/just_in_time_pdp.go +++ b/service/internal/access/v2/just_in_time_pdp.go @@ -60,19 +60,13 @@ func NewJustInTimePDP( l.DebugContext(ctx, "no EntitlementPolicyStore provided or not yet ready, will retrieve directly from policy services") store = NewEntitlementPolicyRetriever(sdk) } - - allAttributes, err := store.ListAllAttributes(ctx) - if err != nil { - return nil, fmt.Errorf("failed to list cached attributes: %w", err) - } - allSubjectMappings, err := store.ListAllSubjectMappings(ctx) - if err != nil { - return nil, fmt.Errorf("failed to list cached subject mappings: %w", err) - } - allRegisteredResources, err := store.ListAllRegisteredResources(ctx) + entitlementPolicy, err := store.GetEntitlementPolicy(ctx) if err != nil { - return nil, fmt.Errorf("failed to fetch all registered resources: %w", err) + return nil, fmt.Errorf("failed to get entitlement policy from store: %w", err) } + allAttributes := entitlementPolicy.Attributes + allSubjectMappings := entitlementPolicy.SubjectMappings + allRegisteredResources := entitlementPolicy.RegisteredResources pdp, err := NewPolicyDecisionPoint(ctx, l, allAttributes, allSubjectMappings, allRegisteredResources) if err != nil { diff --git a/service/internal/access/v2/policy_store.go b/service/internal/access/v2/policy_store.go index 81181110d2..1738354f14 100644 --- a/service/internal/access/v2/policy_store.go +++ b/service/internal/access/v2/policy_store.go @@ -17,10 +17,19 @@ type EntitlementPolicyStore interface { ListAllAttributes(ctx context.Context) ([]*policy.Attribute, error) ListAllSubjectMappings(ctx context.Context) ([]*policy.SubjectMapping, error) ListAllRegisteredResources(ctx context.Context) ([]*policy.RegisteredResource, error) + GetEntitlementPolicy(ctx context.Context) (EntitlementPolicy, error) IsEnabled() bool IsReady(context.Context) bool } +// The EntitlementPolicy struct holds all the cached entitlement policy, as generics allow one +// data type per service cache instance. +type EntitlementPolicy struct { + Attributes []*policy.Attribute + SubjectMappings []*policy.SubjectMapping + RegisteredResources []*policy.RegisteredResource +} + var ( ErrFailedToFetchAttributes = errors.New("failed to fetch attributes from policy service") ErrFailedToFetchSubjectMappings = errors.New("failed to fetch subject mappings from policy service") @@ -126,3 +135,25 @@ func (p *EntitlementPolicyRetriever) ListAllRegisteredResources(ctx context.Cont return rrList, nil } + +func (p *EntitlementPolicyRetriever) GetEntitlementPolicy(ctx context.Context) (EntitlementPolicy, error) { + var ep EntitlementPolicy + var err error + + ep.Attributes, err = p.ListAllAttributes(ctx) + if err != nil { + return EntitlementPolicy{}, err + } + + ep.SubjectMappings, err = p.ListAllSubjectMappings(ctx) + if err != nil { + return EntitlementPolicy{}, err + } + + ep.RegisteredResources, err = p.ListAllRegisteredResources(ctx) + if err != nil { + return EntitlementPolicy{}, err + } + + return ep, nil +} diff --git a/service/pkg/cache/cache.go b/service/pkg/cache/cache.go index db32985b00..21eb583b3a 100644 --- a/service/pkg/cache/cache.go +++ b/service/pkg/cache/cache.go @@ -138,6 +138,10 @@ func (c *Cache) Delete(ctx context.Context, key string) error { return c.manager.cache.Delete(ctx, c.getKey(key)) } +func (c *Cache) Wait() { + c.manager.underlyingStore.Wait() +} + func (c *Cache) getKey(key string) string { return c.serviceName + ":" + key } From 0cd36589ac0dc1f7064a704b8c8948d0c4aa5078 Mon Sep 17 00:00:00 2001 From: Dominic Reed Date: Tue, 26 Aug 2025 13:52:24 -0700 Subject: [PATCH 2/3] lint fixes --- service/authorization/v2/cache.go | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/service/authorization/v2/cache.go b/service/authorization/v2/cache.go index 21fd83ed0c..20a4491dfd 100644 --- a/service/authorization/v2/cache.go +++ b/service/authorization/v2/cache.go @@ -19,9 +19,6 @@ const ( ) var ( - // Cache tags for authorization-related data set in the cache - authzCacheTags = []string{"authorization", "policy", "entitlements"} - // stopTimeout is the maximum time to wait for the periodic refresh goroutine to stop stopTimeout = 5 * time.Second @@ -56,8 +53,6 @@ type EntitlementPolicyCache struct { isCacheFilled bool } - - // NewEntitlementPolicyCache holds a platform-provided cache client and manages a periodic refresh of // cached entitlement policy data, fetching fresh data from the policy services at configured interval. func NewEntitlementPolicyCache( @@ -195,7 +190,7 @@ func (c *EntitlementPolicyCache) Refresh(ctx context.Context) error { } // ListAllAttributes returns the cached attributes -func (c *EntitlementPolicyCache) ListAllAttributes(ctx context.Context) ([]*policy.Attribute, error) { +func (c *EntitlementPolicyCache) ListAllAttributes(_ context.Context) ([]*policy.Attribute, error) { c.mu.RLock() defer c.mu.RUnlock() @@ -205,7 +200,7 @@ func (c *EntitlementPolicyCache) ListAllAttributes(ctx context.Context) ([]*poli } // ListAllSubjectMappings returns the cached subject mappings -func (c *EntitlementPolicyCache) ListAllSubjectMappings(ctx context.Context) ([]*policy.SubjectMapping, error) { +func (c *EntitlementPolicyCache) ListAllSubjectMappings(_ context.Context) ([]*policy.SubjectMapping, error) { c.mu.RLock() defer c.mu.RUnlock() @@ -215,7 +210,7 @@ func (c *EntitlementPolicyCache) ListAllSubjectMappings(ctx context.Context) ([] } // ListAllRegisteredResources returns the cached registered resources, or none in the event of a cache miss -func (c *EntitlementPolicyCache) ListAllRegisteredResources(ctx context.Context) ([]*policy.RegisteredResource, error) { +func (c *EntitlementPolicyCache) ListAllRegisteredResources(_ context.Context) ([]*policy.RegisteredResource, error) { c.mu.RLock() defer c.mu.RUnlock() @@ -224,7 +219,7 @@ func (c *EntitlementPolicyCache) ListAllRegisteredResources(ctx context.Context) return registeredResources, nil } -func (c *EntitlementPolicyCache) GetEntitlementPolicy(ctx context.Context) (access.EntitlementPolicy, error) { +func (c *EntitlementPolicyCache) GetEntitlementPolicy(_ context.Context) (access.EntitlementPolicy, error) { c.mu.RLock() defer c.mu.RUnlock() From 121515f1c3d8ae59af10fe17059781f3d1b7464f Mon Sep 17 00:00:00 2001 From: Dominic Reed Date: Tue, 26 Aug 2025 14:05:19 -0700 Subject: [PATCH 3/3] fix unit tests --- service/authorization/v2/cache.go | 6 ------ service/authorization/v2/cache_test.go | 17 +++++++---------- 2 files changed, 7 insertions(+), 16 deletions(-) diff --git a/service/authorization/v2/cache.go b/service/authorization/v2/cache.go index 20a4491dfd..0e5fd21f95 100644 --- a/service/authorization/v2/cache.go +++ b/service/authorization/v2/cache.go @@ -12,12 +12,6 @@ import ( "github.com/opentdf/platform/service/logger" ) -const ( - attributesCacheKey = "attributes_cache_key" - subjectMappingsCacheKey = "subject_mappings_cache_key" - registeredResourcesCacheKey = "registered_resources_cache_key" -) - var ( // stopTimeout is the maximum time to wait for the periodic refresh goroutine to stop stopTimeout = 5 * time.Second diff --git a/service/authorization/v2/cache_test.go b/service/authorization/v2/cache_test.go index fab0674b35..06a388c5de 100644 --- a/service/authorization/v2/cache_test.go +++ b/service/authorization/v2/cache_test.go @@ -5,16 +5,13 @@ import ( "time" "github.com/opentdf/platform/protocol/go/policy" + "github.com/opentdf/platform/service/internal/access/v2" "github.com/opentdf/platform/service/logger" - "github.com/opentdf/platform/service/pkg/cache" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -var ( - mockCacheExpiry = 5 * time.Minute - l = logger.CreateTestLogger() -) +var l = logger.CreateTestLogger() func Test_NewEntitlementPolicyCache(t *testing.T) { ctx := t.Context() @@ -80,17 +77,17 @@ func Test_EntitlementPolicyCache_CacheMiss(t *testing.T) { func Test_EntitlementPolicyCache_CacheHits(t *testing.T) { ctx := t.Context() - mockCache, _ := cache.TestCacheClient(mockCacheExpiry) - attrsList := []*policy.Attribute{{Name: "attr1"}} subjMappingsList := []*policy.SubjectMapping{{Id: "id-123"}} resourcesList := []*policy.RegisteredResource{{Name: "res1"}} - _ = mockCache.Set(ctx, attributesCacheKey, attrsList, nil) - _ = mockCache.Set(ctx, subjectMappingsCacheKey, subjMappingsList, nil) - _ = mockCache.Set(ctx, registeredResourcesCacheKey, resourcesList, nil) c, err := NewEntitlementPolicyCache(ctx, l, nil, 1*time.Hour) require.NoError(t, err) + c.policy = access.EntitlementPolicy{ + Attributes: attrsList, + SubjectMappings: subjMappingsList, + RegisteredResources: resourcesList, + } // Allow for some concurrency overhead in cache library to prevent flakiness in tests time.Sleep(10 * time.Millisecond)