From 76e75be4656473c63944e346a68df5b26558f9b8 Mon Sep 17 00:00:00 2001 From: Roman Dmytrenko Date: Thu, 19 Dec 2024 18:55:46 +0200 Subject: [PATCH] refactor(cache): use namespace.version as part of cache key As namespace has a version which updates on each configuration changes it makes sense to use it as part of the cache key. Changes focus on improving the evaluation response time. All other API calls is not in a critical path so there is very little benefits to cache them Signed-off-by: Roman Dmytrenko --- internal/common/store_mock.go | 2 +- .../evaluation/data/evaluation_store_mock.go | 2 +- internal/server/evaluation/data/server.go | 7 +- .../evaluation/evaluation_store_mock.go | 2 +- .../server/evaluation/legacy_evaluator.go | 4 +- internal/server/evaluation/server.go | 2 +- internal/storage/cache/cache.go | 312 ++++++++++++++---- internal/storage/cache/cache_test.go | 28 +- internal/storage/fs/snapshot.go | 2 +- internal/storage/fs/snapshot_test.go | 7 +- internal/storage/fs/store.go | 4 +- internal/storage/fs/store_test.go | 2 +- internal/storage/sql/common/evaluation.go | 4 +- internal/storage/sql/evaluation_test.go | 8 +- internal/storage/storage.go | 2 +- 15 files changed, 286 insertions(+), 102 deletions(-) diff --git a/internal/common/store_mock.go b/internal/common/store_mock.go index 724a870432..97e03957f7 100644 --- a/internal/common/store_mock.go +++ b/internal/common/store_mock.go @@ -246,7 +246,7 @@ func (m *StoreMock) GetEvaluationRules(ctx context.Context, flag storage.Resourc return args.Get(0).([]*storage.EvaluationRule), args.Error(1) } -func (m *StoreMock) GetEvaluationDistributions(ctx context.Context, rule storage.IDRequest) ([]*storage.EvaluationDistribution, error) { +func (m *StoreMock) GetEvaluationDistributions(ctx context.Context, r storage.ResourceRequest, rule storage.IDRequest) ([]*storage.EvaluationDistribution, error) { args := m.Called(ctx, rule) return args.Get(0).([]*storage.EvaluationDistribution), args.Error(1) } diff --git a/internal/server/evaluation/data/evaluation_store_mock.go b/internal/server/evaluation/data/evaluation_store_mock.go index 56efa3fa81..079930196e 100644 --- a/internal/server/evaluation/data/evaluation_store_mock.go +++ b/internal/server/evaluation/data/evaluation_store_mock.go @@ -38,7 +38,7 @@ func (e *evaluationStoreMock) GetEvaluationRules(ctx context.Context, flag stora return args.Get(0).([]*storage.EvaluationRule), args.Error(1) } -func (e *evaluationStoreMock) GetEvaluationDistributions(ctx context.Context, ruleID storage.IDRequest) ([]*storage.EvaluationDistribution, error) { +func (e *evaluationStoreMock) GetEvaluationDistributions(ctx context.Context, flag storage.ResourceRequest, ruleID storage.IDRequest) ([]*storage.EvaluationDistribution, error) { args := e.Called(ctx, ruleID) return args.Get(0).([]*storage.EvaluationDistribution), args.Error(1) } diff --git a/internal/server/evaluation/data/server.go b/internal/server/evaluation/data/server.go index ca72f00e2a..05c6d52fc1 100644 --- a/internal/server/evaluation/data/server.go +++ b/internal/server/evaluation/data/server.go @@ -2,8 +2,8 @@ package data import ( "context" - "crypto/sha1" //nolint:gosec - + //nolint:gosec + "crypto/sha1" "fmt" "github.com/blang/semver/v4" @@ -123,7 +123,6 @@ var ( ) func (srv *Server) EvaluationSnapshotNamespace(ctx context.Context, r *evaluation.EvaluationNamespaceSnapshotRequest) (*evaluation.EvaluationNamespaceSnapshot, error) { - var ( namespaceKey = r.Key reference = r.Reference @@ -264,7 +263,7 @@ func (srv *Server) EvaluationSnapshotNamespace(ctx context.Context, r *evaluatio rule.Segments = append(rule.Segments, ss) } - distributions, err := srv.store.GetEvaluationDistributions(ctx, storage.NewID(r.ID, storage.WithReference(reference))) + distributions, err := srv.store.GetEvaluationDistributions(ctx, storage.NewResource(f.NamespaceKey, f.Key), storage.NewID(r.ID, storage.WithReference(reference))) if err != nil { return nil, fmt.Errorf("getting distributions for rule %q: %w", r.ID, err) } diff --git a/internal/server/evaluation/evaluation_store_mock.go b/internal/server/evaluation/evaluation_store_mock.go index ab44ea49ac..3ea33d77de 100644 --- a/internal/server/evaluation/evaluation_store_mock.go +++ b/internal/server/evaluation/evaluation_store_mock.go @@ -28,7 +28,7 @@ func (e *evaluationStoreMock) GetEvaluationRules(ctx context.Context, flag stora return args.Get(0).([]*storage.EvaluationRule), args.Error(1) } -func (e *evaluationStoreMock) GetEvaluationDistributions(ctx context.Context, ruleID storage.IDRequest) ([]*storage.EvaluationDistribution, error) { +func (e *evaluationStoreMock) GetEvaluationDistributions(ctx context.Context, r storage.ResourceRequest, ruleID storage.IDRequest) ([]*storage.EvaluationDistribution, error) { args := e.Called(ctx, ruleID) return args.Get(0).([]*storage.EvaluationDistribution), args.Error(1) } diff --git a/internal/server/evaluation/legacy_evaluator.go b/internal/server/evaluation/legacy_evaluator.go index 5de7441829..3310e89026 100644 --- a/internal/server/evaluation/legacy_evaluator.go +++ b/internal/server/evaluation/legacy_evaluator.go @@ -164,7 +164,7 @@ func (e *Evaluator) Evaluate(ctx context.Context, flag *flipt.Flag, r *evaluatio resp.SegmentKeys = segmentKeys } - distributions, err := e.store.GetEvaluationDistributions(ctx, storage.NewID(rule.ID)) + distributions, err := e.store.GetEvaluationDistributions(ctx, storage.NewResource(r.NamespaceKey, r.FlagKey), storage.NewID(rule.ID)) if err != nil { resp.Reason = flipt.EvaluationReason_ERROR_EVALUATION_REASON return resp, err @@ -287,7 +287,7 @@ func (e *Evaluator) matchConstraints(evalCtx map[string]string, constraints []st } } - var matched = true + matched := true switch segmentMatchType { case flipt.MatchType_ALL_MATCH_TYPE: diff --git a/internal/server/evaluation/server.go b/internal/server/evaluation/server.go index 342f886afe..645a4e0cd0 100644 --- a/internal/server/evaluation/server.go +++ b/internal/server/evaluation/server.go @@ -14,7 +14,7 @@ import ( type Storer interface { GetFlag(ctx context.Context, flag storage.ResourceRequest) (*flipt.Flag, error) GetEvaluationRules(ctx context.Context, flag storage.ResourceRequest) ([]*storage.EvaluationRule, error) - GetEvaluationDistributions(ctx context.Context, ruleID storage.IDRequest) ([]*storage.EvaluationDistribution, error) + GetEvaluationDistributions(ctx context.Context, flag storage.ResourceRequest, ruleID storage.IDRequest) ([]*storage.EvaluationDistribution, error) GetEvaluationRollouts(ctx context.Context, flag storage.ResourceRequest) ([]*storage.EvaluationRollout, error) } diff --git a/internal/storage/cache/cache.go b/internal/storage/cache/cache.go index 809df738d1..7104d60bba 100644 --- a/internal/storage/cache/cache.go +++ b/internal/storage/cache/cache.go @@ -72,12 +72,16 @@ type Store struct { } const ( + // storage:namespaces + namespaceCacheKeyPrefix = "s:n" // storage:flags flagCacheKeyPrefix = "s:f" // storage:evaluationRules evaluationRulesCacheKeyPrefix = "s:er" // storage:evaluationRollouts evaluationRolloutsCacheKeyPrefix = "s:ero" + // storage:evaluationDistributions + evaluationDistributionsCacheKeyPrefix = "s:erd" ) func NewStore(store storage.Store, cacher cache.Cacher, logger *zap.Logger) *Store { @@ -100,110 +104,214 @@ func (s *Store) getProtobuf(ctx context.Context, key string, value proto.Message return get(ctx, s, unmarshalFunc[proto.Message](proto.Unmarshal), key, value) } -func (s *Store) CreateFlag(ctx context.Context, r *flipt.CreateFlagRequest) (*flipt.Flag, error) { - flag, err := s.Store.CreateFlag(ctx, r) - if err != nil { - return nil, err - } - - cacheKey := cacheKey(flagCacheKeyPrefix, storage.NewResource(r.NamespaceKey, r.Key)) - s.setProtobuf(ctx, cacheKey, flag) - return flag, nil +func (s *Store) UpdateNamespace(ctx context.Context, r *flipt.UpdateNamespaceRequest) (*flipt.Namespace, error) { + namespace, err := s.Store.UpdateNamespace(ctx, r) + s.cacheUpdateNamespacedVersion(ctx, storage.NewNamespace(r.GetKey())) + return namespace, err } -func (s *Store) GetFlag(ctx context.Context, r storage.ResourceRequest) (*flipt.Flag, error) { - var ( - f = &flipt.Flag{} - cacheKey = cacheKey(flagCacheKeyPrefix, r) - cacheHit = s.getProtobuf(ctx, cacheKey, f) - ) - - if cacheHit { - return f, nil +func (s *Store) DeleteNamespace(ctx context.Context, r *flipt.DeleteNamespaceRequest) error { + err := s.Store.DeleteNamespace(ctx, r) + if err != nil { + return err } - f, err := s.Store.GetFlag(ctx, r) + cacheNsKey := s.cacheNsKey(storage.NewNamespace(r.GetKey())) + err = s.cacher.Delete(ctx, cacheNsKey) if err != nil { - return nil, err + s.logger.Error("deleting from storage cache", zap.Error(err)) } - s.setProtobuf(ctx, cacheKey, f) - return f, nil + return nil +} + +func (s *Store) CreateFlag(ctx context.Context, r *flipt.CreateFlagRequest) (*flipt.Flag, error) { + v, err := s.Store.CreateFlag(ctx, r) + s.cacheUpdateNamespacedVersion(ctx, storage.NewNamespace(r.GetNamespaceKey())) + return v, err } func (s *Store) UpdateFlag(ctx context.Context, r *flipt.UpdateFlagRequest) (*flipt.Flag, error) { - // delete from cache as flag has changed - cacheKey := cacheKey(flagCacheKeyPrefix, storage.NewResource(r.NamespaceKey, r.Key)) - err := s.cacher.Delete(ctx, cacheKey) - if err != nil { - s.logger.Error("deleting from storage cache", zap.Error(err)) - } + v, err := s.Store.UpdateFlag(ctx, r) + s.cacheUpdateNamespacedVersion(ctx, storage.NewNamespace(r.GetNamespaceKey())) + return v, err +} - flag, err := s.Store.UpdateFlag(ctx, r) - if err != nil { - return nil, err - } +func (s *Store) DeleteFlag(ctx context.Context, r *flipt.DeleteFlagRequest) error { + err := s.Store.DeleteFlag(ctx, r) + s.cacheUpdateNamespacedVersion(ctx, storage.NewNamespace(r.GetNamespaceKey())) + return err +} - return flag, nil +func (s *Store) CreateVariant(ctx context.Context, r *flipt.CreateVariantRequest) (*flipt.Variant, error) { + v, err := s.Store.CreateVariant(ctx, r) + s.cacheUpdateNamespacedVersion(ctx, storage.NewNamespace(r.GetNamespaceKey())) + return v, err } -func (s *Store) DeleteFlag(ctx context.Context, r *flipt.DeleteFlagRequest) error { - cacheKey := cacheKey(flagCacheKeyPrefix, storage.NewResource(r.NamespaceKey, r.Key)) - err := s.cacher.Delete(ctx, cacheKey) - if err != nil { - s.logger.Error("deleting from storage cache", zap.Error(err)) - } +func (s *Store) UpdateVariant(ctx context.Context, r *flipt.UpdateVariantRequest) (*flipt.Variant, error) { + v, err := s.Store.UpdateVariant(ctx, r) + s.cacheUpdateNamespacedVersion(ctx, storage.NewNamespace(r.GetNamespaceKey())) + return v, err +} - return s.Store.DeleteFlag(ctx, r) +func (s *Store) DeleteVariant(ctx context.Context, r *flipt.DeleteVariantRequest) error { + err := s.Store.DeleteVariant(ctx, r) + s.cacheUpdateNamespacedVersion(ctx, storage.NewNamespace(r.GetNamespaceKey())) + return err } -func (s *Store) CreateVariant(ctx context.Context, r *flipt.CreateVariantRequest) (*flipt.Variant, error) { - // delete from cache as flag has changed - cacheKey := cacheKey(flagCacheKeyPrefix, storage.NewResource(r.NamespaceKey, r.FlagKey)) - err := s.cacher.Delete(ctx, cacheKey) - if err != nil { - s.logger.Error("deleting from storage cache", zap.Error(err)) - } +func (s *Store) CreateSegment(ctx context.Context, r *flipt.CreateSegmentRequest) (*flipt.Segment, error) { + v, err := s.Store.CreateSegment(ctx, r) + s.cacheUpdateNamespacedVersion(ctx, storage.NewNamespace(r.GetNamespaceKey())) + return v, err +} - variant, err := s.Store.CreateVariant(ctx, r) - if err != nil { - return nil, err - } +func (s *Store) UpdateSegment(ctx context.Context, r *flipt.UpdateSegmentRequest) (*flipt.Segment, error) { + v, err := s.Store.UpdateSegment(ctx, r) + s.cacheUpdateNamespacedVersion(ctx, storage.NewNamespace(r.GetNamespaceKey())) + return v, err +} - return variant, nil +func (s *Store) DeleteSegment(ctx context.Context, r *flipt.DeleteSegmentRequest) error { + err := s.Store.DeleteSegment(ctx, r) + s.cacheUpdateNamespacedVersion(ctx, storage.NewNamespace(r.GetNamespaceKey())) + return err } -func (s *Store) UpdateVariant(ctx context.Context, r *flipt.UpdateVariantRequest) (*flipt.Variant, error) { - // delete from cache as flag has changed - cacheKey := cacheKey(flagCacheKeyPrefix, storage.NewResource(r.NamespaceKey, r.FlagKey)) - err := s.cacher.Delete(ctx, cacheKey) - if err != nil { - s.logger.Error("deleting from storage cache", zap.Error(err)) +func (s *Store) CreateConstraint(ctx context.Context, r *flipt.CreateConstraintRequest) (*flipt.Constraint, error) { + v, err := s.Store.CreateConstraint(ctx, r) + s.cacheUpdateNamespacedVersion(ctx, storage.NewNamespace(r.GetNamespaceKey())) + return v, err +} + +func (s *Store) UpdateConstraint(ctx context.Context, r *flipt.UpdateConstraintRequest) (*flipt.Constraint, error) { + v, err := s.Store.UpdateConstraint(ctx, r) + s.cacheUpdateNamespacedVersion(ctx, storage.NewNamespace(r.GetNamespaceKey())) + return v, err +} + +func (s *Store) DeleteConstraint(ctx context.Context, r *flipt.DeleteConstraintRequest) error { + err := s.Store.DeleteConstraint(ctx, r) + s.cacheUpdateNamespacedVersion(ctx, storage.NewNamespace(r.GetNamespaceKey())) + return err +} + +func (s *Store) CreateRule(ctx context.Context, r *flipt.CreateRuleRequest) (*flipt.Rule, error) { + v, err := s.Store.CreateRule(ctx, r) + s.cacheUpdateNamespacedVersion(ctx, storage.NewNamespace(r.GetNamespaceKey())) + return v, err +} + +func (s *Store) UpdateRule(ctx context.Context, r *flipt.UpdateRuleRequest) (*flipt.Rule, error) { + v, err := s.Store.UpdateRule(ctx, r) + s.cacheUpdateNamespacedVersion(ctx, storage.NewNamespace(r.GetNamespaceKey())) + return v, err +} + +func (s *Store) DeleteRule(ctx context.Context, r *flipt.DeleteRuleRequest) error { + err := s.Store.DeleteRule(ctx, r) + s.cacheUpdateNamespacedVersion(ctx, storage.NewNamespace(r.GetNamespaceKey())) + return err +} + +func (s *Store) OrderRules(ctx context.Context, r *flipt.OrderRulesRequest) error { + err := s.Store.OrderRules(ctx, r) + s.cacheUpdateNamespacedVersion(ctx, storage.NewNamespace(r.GetNamespaceKey())) + return err +} + +func (s *Store) CreateDistribution(ctx context.Context, r *flipt.CreateDistributionRequest) (*flipt.Distribution, error) { + v, err := s.Store.CreateDistribution(ctx, r) + s.cacheUpdateNamespacedVersion(ctx, storage.NewNamespace(r.GetNamespaceKey())) + return v, err +} + +func (s *Store) UpdateDistribution(ctx context.Context, r *flipt.UpdateDistributionRequest) (*flipt.Distribution, error) { + v, err := s.Store.UpdateDistribution(ctx, r) + s.cacheUpdateNamespacedVersion(ctx, storage.NewNamespace(r.GetNamespaceKey())) + return v, err +} + +func (s *Store) DeleteDistribution(ctx context.Context, r *flipt.DeleteDistributionRequest) error { + err := s.Store.DeleteDistribution(ctx, r) + s.cacheUpdateNamespacedVersion(ctx, storage.NewNamespace(r.GetNamespaceKey())) + return err +} + +func (s *Store) CreateRollout(ctx context.Context, r *flipt.CreateRolloutRequest) (*flipt.Rollout, error) { + v, err := s.Store.CreateRollout(ctx, r) + s.cacheUpdateNamespacedVersion(ctx, storage.NewNamespace(r.GetNamespaceKey())) + return v, err +} + +func (s *Store) UpdateRollout(ctx context.Context, r *flipt.UpdateRolloutRequest) (*flipt.Rollout, error) { + v, err := s.Store.UpdateRollout(ctx, r) + s.cacheUpdateNamespacedVersion(ctx, storage.NewNamespace(r.GetNamespaceKey())) + return v, err +} + +func (s *Store) DeleteRollout(ctx context.Context, r *flipt.DeleteRolloutRequest) error { + err := s.Store.DeleteRollout(ctx, r) + s.cacheUpdateNamespacedVersion(ctx, storage.NewNamespace(r.GetNamespaceKey())) + return err +} + +func (s *Store) OrderRollouts(ctx context.Context, r *flipt.OrderRolloutsRequest) error { + err := s.Store.OrderRollouts(ctx, r) + s.cacheUpdateNamespacedVersion(ctx, storage.NewNamespace(r.GetNamespaceKey())) + return err +} + +func (s *Store) ListFlags(ctx context.Context, r *storage.ListRequest[storage.NamespaceRequest]) (storage.ResultSet[*flipt.Flag], error) { + ns := storage.NewResource(r.Predicate.Namespace(), "") + namespaceVersion, _ := s.GetVersion(ctx, ns.NamespaceRequest) + var ( + f = storage.ResultSet[*flipt.Flag]{} + cacheKey = cacheKey(flagCacheKeyPrefix, ns, namespaceVersion) + cacheHit = s.getJSON(ctx, cacheKey, f) + ) + + if cacheHit { + return f, nil } - variant, err := s.Store.UpdateVariant(ctx, r) + f, err := s.Store.ListFlags(ctx, r) if err != nil { - return nil, err + return f, err } - return variant, nil + s.setJSON(ctx, cacheKey, f) + return f, nil } -func (s *Store) DeleteVariant(ctx context.Context, r *flipt.DeleteVariantRequest) error { - // delete from cache as flag has changed - cacheKey := cacheKey(flagCacheKeyPrefix, storage.NewResource(r.NamespaceKey, r.FlagKey)) - err := s.cacher.Delete(ctx, cacheKey) +func (s *Store) GetFlag(ctx context.Context, r storage.ResourceRequest) (*flipt.Flag, error) { + namespaceVersion, _ := s.GetVersion(ctx, r.NamespaceRequest) + var ( + f = &flipt.Flag{} + cacheKey = cacheKey(flagCacheKeyPrefix, r, namespaceVersion) + cacheHit = s.getProtobuf(ctx, cacheKey, f) + ) + + if cacheHit { + return f, nil + } + + f, err := s.Store.GetFlag(ctx, r) if err != nil { - s.logger.Error("deleting from storage cache", zap.Error(err)) + return nil, err } - return s.Store.DeleteVariant(ctx, r) + s.setProtobuf(ctx, cacheKey, f) + return f, nil } func (s *Store) GetEvaluationRules(ctx context.Context, r storage.ResourceRequest) ([]*storage.EvaluationRule, error) { + namespaceVersion, _ := s.GetVersion(ctx, r.NamespaceRequest) + var ( rules []*storage.EvaluationRule - cacheKey = cacheKey(evaluationRulesCacheKeyPrefix, r) + cacheKey = cacheKey(evaluationRulesCacheKeyPrefix, r, namespaceVersion) cacheHit = s.getJSON(ctx, cacheKey, &rules) ) @@ -221,9 +329,11 @@ func (s *Store) GetEvaluationRules(ctx context.Context, r storage.ResourceReques } func (s *Store) GetEvaluationRollouts(ctx context.Context, r storage.ResourceRequest) ([]*storage.EvaluationRollout, error) { + namespaceVersion, _ := s.GetVersion(ctx, r.NamespaceRequest) + var ( rollouts []*storage.EvaluationRollout - cacheKey = cacheKey(evaluationRolloutsCacheKeyPrefix, r) + cacheKey = cacheKey(evaluationRolloutsCacheKeyPrefix, r, namespaceVersion) cacheHit = s.getJSON(ctx, cacheKey, &rollouts) ) @@ -240,7 +350,67 @@ func (s *Store) GetEvaluationRollouts(ctx context.Context, r storage.ResourceReq return rollouts, nil } -func cacheKey(prefix string, r storage.ResourceRequest) string { +func (s *Store) GetEvaluationDistributions(ctx context.Context, r storage.ResourceRequest, rule storage.IDRequest) ([]*storage.EvaluationDistribution, error) { + namespaceVersion, _ := s.GetVersion(ctx, r.NamespaceRequest) + + var ( + rollouts []*storage.EvaluationDistribution + cacheKey = cacheKey(evaluationDistributionsCacheKeyPrefix, r, namespaceVersion) + cacheHit = s.getJSON(ctx, cacheKey, &rollouts) + ) + + if cacheHit { + return rollouts, nil + } + + rollouts, err := s.Store.GetEvaluationDistributions(ctx, r, rule) + if err != nil { + return nil, err + } + + s.setJSON(ctx, cacheKey, rollouts) + return rollouts, nil +} + +func (s *Store) GetVersion(ctx context.Context, ns storage.NamespaceRequest) (string, error) { + cacheNsKey := s.cacheNsKey(ns) + version, hits, err := s.cacher.Get(ctx, cacheNsKey) + if err != nil { + s.logger.Error("failed to get version to storage cache", zap.Error(err)) + } + + if hits { + return string(version), nil + } + + originalVersion, err := s.Store.GetVersion(ctx, ns) + if err == nil { + err := s.cacher.Set(ctx, cacheNsKey, []byte(originalVersion)) + if err != nil { + s.logger.Error("failed to set version to storage cache", zap.Error(err)) + } + } + return originalVersion, err +} + +func (s *Store) cacheUpdateNamespacedVersion(ctx context.Context, r storage.NamespaceRequest) { + cacheNsKey := s.cacheNsKey(r) + version, err := s.Store.GetVersion(ctx, r) + if err != nil { + return + } + err = s.cacher.Set(ctx, cacheNsKey, []byte(version)) + if err != nil { + s.logger.Error("updating from storage cache", zap.Error(err)) + } +} + +func (*Store) cacheNsKey(r storage.NamespaceRequest) string { + cacheKey := fmt.Sprintf("%s:%s", namespaceCacheKeyPrefix, r.Namespace()) + return cacheKey +} + +func cacheKey(prefix string, r storage.ResourceRequest, version string) string { // :: - return fmt.Sprintf("%s:%s:%s", prefix, r.Namespace(), r.Key) + return fmt.Sprintf("%s:%s:%s:%s", prefix, r.Namespace(), version, r.Key) } diff --git a/internal/storage/cache/cache_test.go b/internal/storage/cache/cache_test.go index 4aa05cc19c..bc5d5b27ce 100644 --- a/internal/storage/cache/cache_test.go +++ b/internal/storage/cache/cache_test.go @@ -3,6 +3,7 @@ package cache import ( "context" "errors" + "fmt" "testing" "time" @@ -93,6 +94,10 @@ func TestGetEvaluationRules(t *testing.T) { store = &common.StoreMock{} ) + store.On("GetVersion", context.TODO(), storage.NewNamespace("ns")).Return( + "v-123", nil, + ) + store.On("GetEvaluationRules", context.TODO(), storage.NewResource("ns", "flag-1")).Return( expectedRules, nil, ) @@ -112,15 +117,21 @@ func TestGetEvaluationRules(t *testing.T) { // First call to get rules should call the store and cache the result _, err := cachedStore.GetEvaluationRules(context.TODO(), storage.NewResource("ns", "flag-1")) require.NoError(t, err) + fmt.Println(cacher.setItems) assert.NotEmpty(t, cacher.setItems) - assert.NotEmpty(t, cacher.setItems["s:er:ns:flag-1"]) - assert.Equal(t, 1, cacher.setCalled) + assert.NotEmpty(t, cacher.setItems["s:er:ns:v-123:flag-1"]) + assert.NotEmpty(t, cacher.setItems["s:n:ns"]) + assert.Equal(t, 2, cacher.setCalled) // Second call to get rules should hit the cache _, err = cachedStore.GetEvaluationRules(context.TODO(), storage.NewResource("ns", "flag-1")) require.NoError(t, err) assert.NotEmpty(t, cacher.getKeys) - _, ok := cacher.getKeys["s:er:ns:flag-1"] + fmt.Println(cacher.getKeys) + _, ok := cacher.getKeys["s:er:ns:v-123:flag-1"] + assert.True(t, ok) + + _, ok = cacher.getKeys["s:n:ns"] assert.True(t, ok) store.AssertNumberOfCalls(t, "GetEvaluationRules", 1) @@ -132,6 +143,10 @@ func TestGetEvaluationRollouts(t *testing.T) { store = &common.StoreMock{} ) + store.On("GetVersion", context.TODO(), storage.NewNamespace("ns")).Return( + "v-321", nil, + ) + store.On("GetEvaluationRollouts", context.TODO(), storage.NewResource("ns", "flag-1")).Return( expectedRollouts, nil, ) @@ -152,14 +167,15 @@ func TestGetEvaluationRollouts(t *testing.T) { _, err := cachedStore.GetEvaluationRollouts(context.TODO(), storage.NewResource("ns", "flag-1")) require.NoError(t, err) assert.NotEmpty(t, cacher.setItems) - assert.NotEmpty(t, cacher.setItems["s:ero:ns:flag-1"]) - assert.Equal(t, 1, cacher.setCalled) + assert.NotEmpty(t, cacher.setItems["s:ero:ns:v-321:flag-1"]) + assert.NotEmpty(t, cacher.setItems["s:n:ns"]) + assert.Equal(t, 2, cacher.setCalled) // Second call to get rollouts should hit the cache _, err = cachedStore.GetEvaluationRollouts(context.TODO(), storage.NewResource("ns", "flag-1")) require.NoError(t, err) assert.NotEmpty(t, cacher.getKeys) - _, ok := cacher.getKeys["s:ero:ns:flag-1"] + _, ok := cacher.getKeys["s:ero:ns:v-321:flag-1"] assert.True(t, ok) store.AssertNumberOfCalls(t, "GetEvaluationRollouts", 1) diff --git a/internal/storage/fs/snapshot.go b/internal/storage/fs/snapshot.go index 84ce0580fb..828d35e638 100644 --- a/internal/storage/fs/snapshot.go +++ b/internal/storage/fs/snapshot.go @@ -782,7 +782,7 @@ func (ss *Snapshot) GetEvaluationRules(ctx context.Context, flag storage.Resourc return rules, nil } -func (ss *Snapshot) GetEvaluationDistributions(ctx context.Context, rule storage.IDRequest) ([]*storage.EvaluationDistribution, error) { +func (ss *Snapshot) GetEvaluationDistributions(ctx context.Context, flag storage.ResourceRequest, rule storage.IDRequest) ([]*storage.EvaluationDistribution, error) { dists, ok := ss.evalDists[rule.ID] if !ok { return []*storage.EvaluationDistribution{}, nil diff --git a/internal/storage/fs/snapshot_test.go b/internal/storage/fs/snapshot_test.go index cbcdf3aa67..eacda5e325 100644 --- a/internal/storage/fs/snapshot_test.go +++ b/internal/storage/fs/snapshot_test.go @@ -62,7 +62,6 @@ func TestSnapshotFromFS_Invalid(t *testing.T) { } }) } - } func TestWalkDocuments(t *testing.T) { @@ -633,7 +632,7 @@ func (fis *FSIndexSuite) TestGetEvaluationDistributions() { require.NoError(t, err) assert.Len(t, rules.Results, 1) - dist, err := fis.store.GetEvaluationDistributions(context.TODO(), storage.NewID(rules.Results[0].Id)) + dist, err := fis.store.GetEvaluationDistributions(context.TODO(), storage.NewResource(tc.namespace, tc.flagKey), storage.NewID(rules.Results[0].Id)) require.NoError(t, err) @@ -1587,7 +1586,7 @@ func (fis *FSWithoutIndexSuite) TestGetEvaluationDistributions() { require.NoError(t, err) assert.Len(t, rules.Results, 1) - dist, err := fis.store.GetEvaluationDistributions(context.TODO(), storage.NewID(rules.Results[0].Id)) + dist, err := fis.store.GetEvaluationDistributions(context.TODO(), storage.NewResource(tc.namespace, tc.flagKey), storage.NewID(rules.Results[0].Id)) require.NoError(t, err) @@ -1780,7 +1779,7 @@ func TestFS_YAML_Stream(t *testing.T) { // 3 namespaces including default assert.Len(t, ns.Results, 3) - var namespaces = make([]string, 0, len(ns.Results)) + namespaces := make([]string, 0, len(ns.Results)) for _, n := range ns.Results { namespaces = append(namespaces, n.Key) diff --git a/internal/storage/fs/store.go b/internal/storage/fs/store.go index ee619127c7..56666391a5 100644 --- a/internal/storage/fs/store.go +++ b/internal/storage/fs/store.go @@ -154,9 +154,9 @@ func (s *Store) GetEvaluationRules(ctx context.Context, flag storage.ResourceReq }) } -func (s *Store) GetEvaluationDistributions(ctx context.Context, rule storage.IDRequest) (dists []*storage.EvaluationDistribution, err error) { +func (s *Store) GetEvaluationDistributions(ctx context.Context, r storage.ResourceRequest, rule storage.IDRequest) (dists []*storage.EvaluationDistribution, err error) { return dists, s.viewer.View(ctx, rule.Reference, func(ss storage.ReadOnlyStore) error { - dists, err = ss.GetEvaluationDistributions(ctx, rule) + dists, err = ss.GetEvaluationDistributions(ctx, r, rule) return err }) } diff --git a/internal/storage/fs/store_test.go b/internal/storage/fs/store_test.go index d2230b117e..24b8ea8462 100644 --- a/internal/storage/fs/store_test.go +++ b/internal/storage/fs/store_test.go @@ -203,7 +203,7 @@ func TestGetEvaluationDistributions(t *testing.T) { id := storage.NewID("id") storeMock.On("GetEvaluationDistributions", mock.Anything, id).Return([]*storage.EvaluationDistribution{}, nil) - _, err := ss.GetEvaluationDistributions(context.TODO(), id) + _, err := ss.GetEvaluationDistributions(context.TODO(), storage.NewResource("", "flag"), id) require.NoError(t, err) } diff --git a/internal/storage/sql/common/evaluation.go b/internal/storage/sql/common/evaluation.go index 28e58e55f1..a454f3cb4a 100644 --- a/internal/storage/sql/common/evaluation.go +++ b/internal/storage/sql/common/evaluation.go @@ -32,7 +32,7 @@ func (s *Store) GetEvaluationRules(ctx context.Context, flag storage.ResourceReq SegmentOperator flipt.SegmentOperator } - var rmMap = make(map[string]*RuleMeta) + rmMap := make(map[string]*RuleMeta) ruleIDs := make([]string, 0) for ruleMetaRows.Next() { @@ -197,7 +197,7 @@ func (s *Store) GetEvaluationRules(ctx context.Context, flag storage.ResourceReq return rules, nil } -func (s *Store) GetEvaluationDistributions(ctx context.Context, rule storage.IDRequest) (_ []*storage.EvaluationDistribution, err error) { +func (s *Store) GetEvaluationDistributions(ctx context.Context, r storage.ResourceRequest, rule storage.IDRequest) (_ []*storage.EvaluationDistribution, err error) { rows, err := s.builder.Select("d.id, d.rule_id, d.variant_id, d.rollout, v.\"key\", v.attachment"). From("distributions d"). Join("variants v ON (d.variant_id = v.id)"). diff --git a/internal/storage/sql/evaluation_test.go b/internal/storage/sql/evaluation_test.go index 7885356417..7da4ce8d6a 100644 --- a/internal/storage/sql/evaluation_test.go +++ b/internal/storage/sql/evaluation_test.go @@ -359,7 +359,7 @@ func (s *DBTestSuite) TestGetEvaluationDistributions() { require.NoError(t, err) - evaluationDistributions, err := s.store.GetEvaluationDistributions(context.TODO(), storage.NewID(rule.Id)) + evaluationDistributions, err := s.store.GetEvaluationDistributions(context.TODO(), storage.NewResource(flag.NamespaceKey, flag.Key), storage.NewID(rule.Id)) require.NoError(t, err) assert.Len(t, evaluationDistributions, 2) @@ -464,7 +464,7 @@ func (s *DBTestSuite) TestGetEvaluationDistributionsNamespace() { require.NoError(t, err) - evaluationDistributions, err := s.store.GetEvaluationDistributions(context.TODO(), storage.NewID(rule.Id)) + evaluationDistributions, err := s.store.GetEvaluationDistributions(context.TODO(), storage.NewResource(flag.NamespaceKey, flag.Key), storage.NewID(rule.Id)) require.NoError(t, err) assert.Len(t, evaluationDistributions, 2) @@ -561,7 +561,7 @@ func (s *DBTestSuite) TestGetEvaluationDistributions_MaintainOrder() { require.NoError(t, err) - evaluationDistributions, err := s.store.GetEvaluationDistributions(context.TODO(), storage.NewID(rule.Id)) + evaluationDistributions, err := s.store.GetEvaluationDistributions(context.TODO(), storage.NewResource(flag.NamespaceKey, flag.Key), storage.NewID(rule.Id)) require.NoError(t, err) assert.Len(t, evaluationDistributions, 2) @@ -603,7 +603,7 @@ func (s *DBTestSuite) TestGetEvaluationDistributions_MaintainOrder() { require.NoError(t, err) - evaluationDistributions, err = s.store.GetEvaluationDistributions(context.TODO(), storage.NewID(rule.Id)) + evaluationDistributions, err = s.store.GetEvaluationDistributions(context.TODO(), storage.NewResource(flag.NamespaceKey, flag.Key), storage.NewID(rule.Id)) require.NoError(t, err) assert.Len(t, evaluationDistributions, 2) diff --git a/internal/storage/storage.go b/internal/storage/storage.go index bab541d80a..42ea4abb74 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -194,7 +194,7 @@ type EvaluationStore interface { // GetEvaluationRules returns rules applicable to flagKey provided // Note: Rules MUST be returned in order by Rank GetEvaluationRules(ctx context.Context, flag ResourceRequest) ([]*EvaluationRule, error) - GetEvaluationDistributions(ctx context.Context, rule IDRequest) ([]*EvaluationDistribution, error) + GetEvaluationDistributions(ctx context.Context, flag ResourceRequest, rule IDRequest) ([]*EvaluationDistribution, error) // GetEvaluationRollouts returns rollouts applicable to namespaceKey + flagKey provided // Note: Rollouts MUST be returned in order by rank GetEvaluationRollouts(ctx context.Context, flag ResourceRequest) ([]*EvaluationRollout, error)