diff --git a/internal/common/store_mock.go b/internal/common/store_mock.go index 724a870432..97e03957f7 100644 --- a/internal/common/store_mock.go +++ b/internal/common/store_mock.go @@ -246,7 +246,7 @@ func (m *StoreMock) GetEvaluationRules(ctx context.Context, flag storage.Resourc return args.Get(0).([]*storage.EvaluationRule), args.Error(1) } -func (m *StoreMock) GetEvaluationDistributions(ctx context.Context, rule storage.IDRequest) ([]*storage.EvaluationDistribution, error) { +func (m *StoreMock) GetEvaluationDistributions(ctx context.Context, r storage.ResourceRequest, rule storage.IDRequest) ([]*storage.EvaluationDistribution, error) { args := m.Called(ctx, rule) return args.Get(0).([]*storage.EvaluationDistribution), args.Error(1) } diff --git a/internal/server/evaluation/data/evaluation_store_mock.go b/internal/server/evaluation/data/evaluation_store_mock.go index 56efa3fa81..079930196e 100644 --- a/internal/server/evaluation/data/evaluation_store_mock.go +++ b/internal/server/evaluation/data/evaluation_store_mock.go @@ -38,7 +38,7 @@ func (e *evaluationStoreMock) GetEvaluationRules(ctx context.Context, flag stora return args.Get(0).([]*storage.EvaluationRule), args.Error(1) } -func (e *evaluationStoreMock) GetEvaluationDistributions(ctx context.Context, ruleID storage.IDRequest) ([]*storage.EvaluationDistribution, error) { +func (e *evaluationStoreMock) GetEvaluationDistributions(ctx context.Context, flag storage.ResourceRequest, ruleID storage.IDRequest) ([]*storage.EvaluationDistribution, error) { args := e.Called(ctx, ruleID) return args.Get(0).([]*storage.EvaluationDistribution), args.Error(1) } diff --git a/internal/server/evaluation/data/server.go b/internal/server/evaluation/data/server.go index ca72f00e2a..05c6d52fc1 100644 --- a/internal/server/evaluation/data/server.go +++ b/internal/server/evaluation/data/server.go @@ -2,8 +2,8 @@ package data import ( "context" - "crypto/sha1" //nolint:gosec - + //nolint:gosec + "crypto/sha1" "fmt" "github.com/blang/semver/v4" @@ -123,7 +123,6 @@ var ( ) func (srv *Server) EvaluationSnapshotNamespace(ctx context.Context, r *evaluation.EvaluationNamespaceSnapshotRequest) (*evaluation.EvaluationNamespaceSnapshot, error) { - var ( namespaceKey = r.Key reference = r.Reference @@ -264,7 +263,7 @@ func (srv *Server) EvaluationSnapshotNamespace(ctx context.Context, r *evaluatio rule.Segments = append(rule.Segments, ss) } - distributions, err := srv.store.GetEvaluationDistributions(ctx, storage.NewID(r.ID, storage.WithReference(reference))) + distributions, err := srv.store.GetEvaluationDistributions(ctx, storage.NewResource(f.NamespaceKey, f.Key), storage.NewID(r.ID, storage.WithReference(reference))) if err != nil { return nil, fmt.Errorf("getting distributions for rule %q: %w", r.ID, err) } diff --git a/internal/server/evaluation/evaluation_store_mock.go b/internal/server/evaluation/evaluation_store_mock.go index ab44ea49ac..3ea33d77de 100644 --- a/internal/server/evaluation/evaluation_store_mock.go +++ b/internal/server/evaluation/evaluation_store_mock.go @@ -28,7 +28,7 @@ func (e *evaluationStoreMock) GetEvaluationRules(ctx context.Context, flag stora return args.Get(0).([]*storage.EvaluationRule), args.Error(1) } -func (e *evaluationStoreMock) GetEvaluationDistributions(ctx context.Context, ruleID storage.IDRequest) ([]*storage.EvaluationDistribution, error) { +func (e *evaluationStoreMock) GetEvaluationDistributions(ctx context.Context, r storage.ResourceRequest, ruleID storage.IDRequest) ([]*storage.EvaluationDistribution, error) { args := e.Called(ctx, ruleID) return args.Get(0).([]*storage.EvaluationDistribution), args.Error(1) } diff --git a/internal/server/evaluation/legacy_evaluator.go b/internal/server/evaluation/legacy_evaluator.go index 5de7441829..3310e89026 100644 --- a/internal/server/evaluation/legacy_evaluator.go +++ b/internal/server/evaluation/legacy_evaluator.go @@ -164,7 +164,7 @@ func (e *Evaluator) Evaluate(ctx context.Context, flag *flipt.Flag, r *evaluatio resp.SegmentKeys = segmentKeys } - distributions, err := e.store.GetEvaluationDistributions(ctx, storage.NewID(rule.ID)) + distributions, err := e.store.GetEvaluationDistributions(ctx, storage.NewResource(r.NamespaceKey, r.FlagKey), storage.NewID(rule.ID)) if err != nil { resp.Reason = flipt.EvaluationReason_ERROR_EVALUATION_REASON return resp, err @@ -287,7 +287,7 @@ func (e *Evaluator) matchConstraints(evalCtx map[string]string, constraints []st } } - var matched = true + matched := true switch segmentMatchType { case flipt.MatchType_ALL_MATCH_TYPE: diff --git a/internal/server/evaluation/server.go b/internal/server/evaluation/server.go index 342f886afe..645a4e0cd0 100644 --- a/internal/server/evaluation/server.go +++ b/internal/server/evaluation/server.go @@ -14,7 +14,7 @@ import ( type Storer interface { GetFlag(ctx context.Context, flag storage.ResourceRequest) (*flipt.Flag, error) GetEvaluationRules(ctx context.Context, flag storage.ResourceRequest) ([]*storage.EvaluationRule, error) - GetEvaluationDistributions(ctx context.Context, ruleID storage.IDRequest) ([]*storage.EvaluationDistribution, error) + GetEvaluationDistributions(ctx context.Context, flag storage.ResourceRequest, ruleID storage.IDRequest) ([]*storage.EvaluationDistribution, error) GetEvaluationRollouts(ctx context.Context, flag storage.ResourceRequest) ([]*storage.EvaluationRollout, error) } diff --git a/internal/storage/cache/cache.go b/internal/storage/cache/cache.go index 809df738d1..9dec685622 100644 --- a/internal/storage/cache/cache.go +++ b/internal/storage/cache/cache.go @@ -72,12 +72,16 @@ type Store struct { } const ( + // storage:namespaces + namespaceCacheKeyPrefix = "s:n" // storage:flags flagCacheKeyPrefix = "s:f" // storage:evaluationRules evaluationRulesCacheKeyPrefix = "s:er" // storage:evaluationRollouts evaluationRolloutsCacheKeyPrefix = "s:ero" + // storage:evaluationDistributions + evaluationDistributionsCacheKeyPrefix = "s:erd" ) func NewStore(store storage.Store, cacher cache.Cacher, logger *zap.Logger) *Store { @@ -100,110 +104,214 @@ func (s *Store) getProtobuf(ctx context.Context, key string, value proto.Message return get(ctx, s, unmarshalFunc[proto.Message](proto.Unmarshal), key, value) } -func (s *Store) CreateFlag(ctx context.Context, r *flipt.CreateFlagRequest) (*flipt.Flag, error) { - flag, err := s.Store.CreateFlag(ctx, r) - if err != nil { - return nil, err - } - - cacheKey := cacheKey(flagCacheKeyPrefix, storage.NewResource(r.NamespaceKey, r.Key)) - s.setProtobuf(ctx, cacheKey, flag) - return flag, nil +func (s *Store) UpdateNamespace(ctx context.Context, r *flipt.UpdateNamespaceRequest) (*flipt.Namespace, error) { + namespace, err := s.Store.UpdateNamespace(ctx, r) + s.cacheUpdateNamespacedVersion(ctx, storage.NewNamespace(r.GetKey())) + return namespace, err } -func (s *Store) GetFlag(ctx context.Context, r storage.ResourceRequest) (*flipt.Flag, error) { - var ( - f = &flipt.Flag{} - cacheKey = cacheKey(flagCacheKeyPrefix, r) - cacheHit = s.getProtobuf(ctx, cacheKey, f) - ) - - if cacheHit { - return f, nil +func (s *Store) DeleteNamespace(ctx context.Context, r *flipt.DeleteNamespaceRequest) error { + err := s.Store.DeleteNamespace(ctx, r) + if err != nil { + return err } - f, err := s.Store.GetFlag(ctx, r) + cacheNsKey := s.cacheNsKey(storage.NewNamespace(r.GetKey())) + err = s.cacher.Delete(ctx, cacheNsKey) if err != nil { - return nil, err + s.logger.Error("deleting from storage cache", zap.Error(err)) } - s.setProtobuf(ctx, cacheKey, f) - return f, nil + return nil +} + +func (s *Store) CreateFlag(ctx context.Context, r *flipt.CreateFlagRequest) (*flipt.Flag, error) { + v, err := s.Store.CreateFlag(ctx, r) + s.cacheUpdateNamespacedVersion(ctx, storage.NewNamespace(r.GetNamespaceKey())) + return v, err } func (s *Store) UpdateFlag(ctx context.Context, r *flipt.UpdateFlagRequest) (*flipt.Flag, error) { - // delete from cache as flag has changed - cacheKey := cacheKey(flagCacheKeyPrefix, storage.NewResource(r.NamespaceKey, r.Key)) - err := s.cacher.Delete(ctx, cacheKey) - if err != nil { - s.logger.Error("deleting from storage cache", zap.Error(err)) - } + v, err := s.Store.UpdateFlag(ctx, r) + s.cacheUpdateNamespacedVersion(ctx, storage.NewNamespace(r.GetNamespaceKey())) + return v, err +} - flag, err := s.Store.UpdateFlag(ctx, r) - if err != nil { - return nil, err - } +func (s *Store) DeleteFlag(ctx context.Context, r *flipt.DeleteFlagRequest) error { + err := s.Store.DeleteFlag(ctx, r) + s.cacheUpdateNamespacedVersion(ctx, storage.NewNamespace(r.GetNamespaceKey())) + return err +} - return flag, nil +func (s *Store) CreateVariant(ctx context.Context, r *flipt.CreateVariantRequest) (*flipt.Variant, error) { + v, err := s.Store.CreateVariant(ctx, r) + s.cacheUpdateNamespacedVersion(ctx, storage.NewNamespace(r.GetNamespaceKey())) + return v, err } -func (s *Store) DeleteFlag(ctx context.Context, r *flipt.DeleteFlagRequest) error { - cacheKey := cacheKey(flagCacheKeyPrefix, storage.NewResource(r.NamespaceKey, r.Key)) - err := s.cacher.Delete(ctx, cacheKey) - if err != nil { - s.logger.Error("deleting from storage cache", zap.Error(err)) - } +func (s *Store) UpdateVariant(ctx context.Context, r *flipt.UpdateVariantRequest) (*flipt.Variant, error) { + v, err := s.Store.UpdateVariant(ctx, r) + s.cacheUpdateNamespacedVersion(ctx, storage.NewNamespace(r.GetNamespaceKey())) + return v, err +} - return s.Store.DeleteFlag(ctx, r) +func (s *Store) DeleteVariant(ctx context.Context, r *flipt.DeleteVariantRequest) error { + err := s.Store.DeleteVariant(ctx, r) + s.cacheUpdateNamespacedVersion(ctx, storage.NewNamespace(r.GetNamespaceKey())) + return err } -func (s *Store) CreateVariant(ctx context.Context, r *flipt.CreateVariantRequest) (*flipt.Variant, error) { - // delete from cache as flag has changed - cacheKey := cacheKey(flagCacheKeyPrefix, storage.NewResource(r.NamespaceKey, r.FlagKey)) - err := s.cacher.Delete(ctx, cacheKey) - if err != nil { - s.logger.Error("deleting from storage cache", zap.Error(err)) - } +func (s *Store) CreateSegment(ctx context.Context, r *flipt.CreateSegmentRequest) (*flipt.Segment, error) { + v, err := s.Store.CreateSegment(ctx, r) + s.cacheUpdateNamespacedVersion(ctx, storage.NewNamespace(r.GetNamespaceKey())) + return v, err +} - variant, err := s.Store.CreateVariant(ctx, r) - if err != nil { - return nil, err - } +func (s *Store) UpdateSegment(ctx context.Context, r *flipt.UpdateSegmentRequest) (*flipt.Segment, error) { + v, err := s.Store.UpdateSegment(ctx, r) + s.cacheUpdateNamespacedVersion(ctx, storage.NewNamespace(r.GetNamespaceKey())) + return v, err +} - return variant, nil +func (s *Store) DeleteSegment(ctx context.Context, r *flipt.DeleteSegmentRequest) error { + err := s.Store.DeleteSegment(ctx, r) + s.cacheUpdateNamespacedVersion(ctx, storage.NewNamespace(r.GetNamespaceKey())) + return err } -func (s *Store) UpdateVariant(ctx context.Context, r *flipt.UpdateVariantRequest) (*flipt.Variant, error) { - // delete from cache as flag has changed - cacheKey := cacheKey(flagCacheKeyPrefix, storage.NewResource(r.NamespaceKey, r.FlagKey)) - err := s.cacher.Delete(ctx, cacheKey) - if err != nil { - s.logger.Error("deleting from storage cache", zap.Error(err)) +func (s *Store) CreateConstraint(ctx context.Context, r *flipt.CreateConstraintRequest) (*flipt.Constraint, error) { + v, err := s.Store.CreateConstraint(ctx, r) + s.cacheUpdateNamespacedVersion(ctx, storage.NewNamespace(r.GetNamespaceKey())) + return v, err +} + +func (s *Store) UpdateConstraint(ctx context.Context, r *flipt.UpdateConstraintRequest) (*flipt.Constraint, error) { + v, err := s.Store.UpdateConstraint(ctx, r) + s.cacheUpdateNamespacedVersion(ctx, storage.NewNamespace(r.GetNamespaceKey())) + return v, err +} + +func (s *Store) DeleteConstraint(ctx context.Context, r *flipt.DeleteConstraintRequest) error { + err := s.Store.DeleteConstraint(ctx, r) + s.cacheUpdateNamespacedVersion(ctx, storage.NewNamespace(r.GetNamespaceKey())) + return err +} + +func (s *Store) CreateRule(ctx context.Context, r *flipt.CreateRuleRequest) (*flipt.Rule, error) { + v, err := s.Store.CreateRule(ctx, r) + s.cacheUpdateNamespacedVersion(ctx, storage.NewNamespace(r.GetNamespaceKey())) + return v, err +} + +func (s *Store) UpdateRule(ctx context.Context, r *flipt.UpdateRuleRequest) (*flipt.Rule, error) { + v, err := s.Store.UpdateRule(ctx, r) + s.cacheUpdateNamespacedVersion(ctx, storage.NewNamespace(r.GetNamespaceKey())) + return v, err +} + +func (s *Store) DeleteRule(ctx context.Context, r *flipt.DeleteRuleRequest) error { + err := s.Store.DeleteRule(ctx, r) + s.cacheUpdateNamespacedVersion(ctx, storage.NewNamespace(r.GetNamespaceKey())) + return err +} + +func (s *Store) OrderRules(ctx context.Context, r *flipt.OrderRulesRequest) error { + err := s.Store.OrderRules(ctx, r) + s.cacheUpdateNamespacedVersion(ctx, storage.NewNamespace(r.GetNamespaceKey())) + return err +} + +func (s *Store) CreateDistribution(ctx context.Context, r *flipt.CreateDistributionRequest) (*flipt.Distribution, error) { + v, err := s.Store.CreateDistribution(ctx, r) + s.cacheUpdateNamespacedVersion(ctx, storage.NewNamespace(r.GetNamespaceKey())) + return v, err +} + +func (s *Store) UpdateDistribution(ctx context.Context, r *flipt.UpdateDistributionRequest) (*flipt.Distribution, error) { + v, err := s.Store.UpdateDistribution(ctx, r) + s.cacheUpdateNamespacedVersion(ctx, storage.NewNamespace(r.GetNamespaceKey())) + return v, err +} + +func (s *Store) DeleteDistribution(ctx context.Context, r *flipt.DeleteDistributionRequest) error { + err := s.Store.DeleteDistribution(ctx, r) + s.cacheUpdateNamespacedVersion(ctx, storage.NewNamespace(r.GetNamespaceKey())) + return err +} + +func (s *Store) CreateRollout(ctx context.Context, r *flipt.CreateRolloutRequest) (*flipt.Rollout, error) { + v, err := s.Store.CreateRollout(ctx, r) + s.cacheUpdateNamespacedVersion(ctx, storage.NewNamespace(r.GetNamespaceKey())) + return v, err +} + +func (s *Store) UpdateRollout(ctx context.Context, r *flipt.UpdateRolloutRequest) (*flipt.Rollout, error) { + v, err := s.Store.UpdateRollout(ctx, r) + s.cacheUpdateNamespacedVersion(ctx, storage.NewNamespace(r.GetNamespaceKey())) + return v, err +} + +func (s *Store) DeleteRollout(ctx context.Context, r *flipt.DeleteRolloutRequest) error { + err := s.Store.DeleteRollout(ctx, r) + s.cacheUpdateNamespacedVersion(ctx, storage.NewNamespace(r.GetNamespaceKey())) + return err +} + +func (s *Store) OrderRollouts(ctx context.Context, r *flipt.OrderRolloutsRequest) error { + err := s.Store.OrderRollouts(ctx, r) + s.cacheUpdateNamespacedVersion(ctx, storage.NewNamespace(r.GetNamespaceKey())) + return err +} + +func (s *Store) ListFlags(ctx context.Context, r *storage.ListRequest[storage.NamespaceRequest]) (storage.ResultSet[*flipt.Flag], error) { + ns := storage.NewResource(r.Predicate.Namespace(), "") + namespaceVersion, _ := s.GetVersion(ctx, ns.NamespaceRequest) + var ( + f = storage.ResultSet[*flipt.Flag]{} + cacheKey = cacheKey(flagCacheKeyPrefix, ns, namespaceVersion) + cacheHit = s.getJSON(ctx, cacheKey, f) + ) + + if cacheHit { + return f, nil } - variant, err := s.Store.UpdateVariant(ctx, r) + f, err := s.Store.ListFlags(ctx, r) if err != nil { - return nil, err + return f, err } - return variant, nil + s.setJSON(ctx, cacheKey, f) + return f, nil } -func (s *Store) DeleteVariant(ctx context.Context, r *flipt.DeleteVariantRequest) error { - // delete from cache as flag has changed - cacheKey := cacheKey(flagCacheKeyPrefix, storage.NewResource(r.NamespaceKey, r.FlagKey)) - err := s.cacher.Delete(ctx, cacheKey) +func (s *Store) GetFlag(ctx context.Context, r storage.ResourceRequest) (*flipt.Flag, error) { + namespaceVersion, _ := s.GetVersion(ctx, r.NamespaceRequest) + var ( + f = &flipt.Flag{} + cacheKey = cacheKey(flagCacheKeyPrefix, r, namespaceVersion) + cacheHit = s.getProtobuf(ctx, cacheKey, f) + ) + + if cacheHit { + return f, nil + } + + f, err := s.Store.GetFlag(ctx, r) if err != nil { - s.logger.Error("deleting from storage cache", zap.Error(err)) + return nil, err } - return s.Store.DeleteVariant(ctx, r) + s.setProtobuf(ctx, cacheKey, f) + return f, nil } func (s *Store) GetEvaluationRules(ctx context.Context, r storage.ResourceRequest) ([]*storage.EvaluationRule, error) { + namespaceVersion, _ := s.GetVersion(ctx, r.NamespaceRequest) + var ( rules []*storage.EvaluationRule - cacheKey = cacheKey(evaluationRulesCacheKeyPrefix, r) + cacheKey = cacheKey(evaluationRulesCacheKeyPrefix, r, namespaceVersion) cacheHit = s.getJSON(ctx, cacheKey, &rules) ) @@ -221,9 +329,11 @@ func (s *Store) GetEvaluationRules(ctx context.Context, r storage.ResourceReques } func (s *Store) GetEvaluationRollouts(ctx context.Context, r storage.ResourceRequest) ([]*storage.EvaluationRollout, error) { + namespaceVersion, _ := s.GetVersion(ctx, r.NamespaceRequest) + var ( rollouts []*storage.EvaluationRollout - cacheKey = cacheKey(evaluationRolloutsCacheKeyPrefix, r) + cacheKey = cacheKey(evaluationRolloutsCacheKeyPrefix, r, namespaceVersion) cacheHit = s.getJSON(ctx, cacheKey, &rollouts) ) @@ -240,7 +350,68 @@ func (s *Store) GetEvaluationRollouts(ctx context.Context, r storage.ResourceReq return rollouts, nil } -func cacheKey(prefix string, r storage.ResourceRequest) string { +func (s *Store) GetEvaluationDistributions(ctx context.Context, r storage.ResourceRequest, rule storage.IDRequest) ([]*storage.EvaluationDistribution, error) { + namespaceVersion, _ := s.GetVersion(ctx, r.NamespaceRequest) + + var ( + rollouts []*storage.EvaluationDistribution + cacheKey = cacheKey(evaluationDistributionsCacheKeyPrefix, r, namespaceVersion) + cacheHit = s.getJSON(ctx, cacheKey, &rollouts) + ) + + if cacheHit { + return rollouts, nil + } + + rollouts, err := s.Store.GetEvaluationDistributions(ctx, r, rule) + if err != nil { + return nil, err + } + + s.setJSON(ctx, cacheKey, rollouts) + return rollouts, nil +} + +func (s *Store) GetVersion(ctx context.Context, ns storage.NamespaceRequest) (string, error) { + cacheNsKey := s.cacheNsKey(ns) + version, hits, err := s.cacher.Get(ctx, cacheNsKey) + if err != nil { + s.logger.Error("failed to get version to storage cache", zap.Error(err)) + } + + if hits { + return string(version), nil + } + + originalVersion, err := s.Store.GetVersion(ctx, ns) + if err == nil { + err := s.cacher.Set(ctx, cacheNsKey, []byte(originalVersion)) + if err != nil { + s.logger.Error("failed to set version to storage cache", zap.Error(err)) + } + } + return originalVersion, err +} + +func (s *Store) cacheUpdateNamespacedVersion(ctx context.Context, r storage.NamespaceRequest) { + cacheNsKey := s.cacheNsKey(r) + version, err := s.Store.GetVersion(ctx, r) + if err != nil { + s.logger.Error("getting from storage cache", zap.Error(err)) + return + } + err = s.cacher.Set(ctx, cacheNsKey, []byte(version)) + if err != nil { + s.logger.Error("updating from storage cache", zap.Error(err)) + } +} + +func (*Store) cacheNsKey(r storage.NamespaceRequest) string { + cacheKey := fmt.Sprintf("%s:%s", namespaceCacheKeyPrefix, r.Namespace()) + return cacheKey +} + +func cacheKey(prefix string, r storage.ResourceRequest, version string) string { // :: - return fmt.Sprintf("%s:%s:%s", prefix, r.Namespace(), r.Key) + return fmt.Sprintf("%s:%s:%s:%s", prefix, r.Namespace(), version, r.Key) } diff --git a/internal/storage/cache/cache_test.go b/internal/storage/cache/cache_test.go index 4aa05cc19c..e288fdd45c 100644 --- a/internal/storage/cache/cache_test.go +++ b/internal/storage/cache/cache_test.go @@ -93,6 +93,10 @@ func TestGetEvaluationRules(t *testing.T) { store = &common.StoreMock{} ) + store.On("GetVersion", context.TODO(), storage.NewNamespace("ns")).Return( + "v-123", nil, + ) + store.On("GetEvaluationRules", context.TODO(), storage.NewResource("ns", "flag-1")).Return( expectedRules, nil, ) @@ -113,14 +117,18 @@ func TestGetEvaluationRules(t *testing.T) { _, err := cachedStore.GetEvaluationRules(context.TODO(), storage.NewResource("ns", "flag-1")) require.NoError(t, err) assert.NotEmpty(t, cacher.setItems) - assert.NotEmpty(t, cacher.setItems["s:er:ns:flag-1"]) - assert.Equal(t, 1, cacher.setCalled) + assert.NotEmpty(t, cacher.setItems["s:er:ns:v-123:flag-1"]) + assert.NotEmpty(t, cacher.setItems["s:n:ns"]) + assert.Equal(t, 2, cacher.setCalled) // Second call to get rules should hit the cache _, err = cachedStore.GetEvaluationRules(context.TODO(), storage.NewResource("ns", "flag-1")) require.NoError(t, err) assert.NotEmpty(t, cacher.getKeys) - _, ok := cacher.getKeys["s:er:ns:flag-1"] + _, ok := cacher.getKeys["s:er:ns:v-123:flag-1"] + assert.True(t, ok) + + _, ok = cacher.getKeys["s:n:ns"] assert.True(t, ok) store.AssertNumberOfCalls(t, "GetEvaluationRules", 1) @@ -132,6 +140,10 @@ func TestGetEvaluationRollouts(t *testing.T) { store = &common.StoreMock{} ) + store.On("GetVersion", context.TODO(), storage.NewNamespace("ns")).Return( + "v-321", nil, + ) + store.On("GetEvaluationRollouts", context.TODO(), storage.NewResource("ns", "flag-1")).Return( expectedRollouts, nil, ) @@ -152,14 +164,15 @@ func TestGetEvaluationRollouts(t *testing.T) { _, err := cachedStore.GetEvaluationRollouts(context.TODO(), storage.NewResource("ns", "flag-1")) require.NoError(t, err) assert.NotEmpty(t, cacher.setItems) - assert.NotEmpty(t, cacher.setItems["s:ero:ns:flag-1"]) - assert.Equal(t, 1, cacher.setCalled) + assert.NotEmpty(t, cacher.setItems["s:ero:ns:v-321:flag-1"]) + assert.NotEmpty(t, cacher.setItems["s:n:ns"]) + assert.Equal(t, 2, cacher.setCalled) // Second call to get rollouts should hit the cache _, err = cachedStore.GetEvaluationRollouts(context.TODO(), storage.NewResource("ns", "flag-1")) require.NoError(t, err) assert.NotEmpty(t, cacher.getKeys) - _, ok := cacher.getKeys["s:ero:ns:flag-1"] + _, ok := cacher.getKeys["s:ero:ns:v-321:flag-1"] assert.True(t, ok) store.AssertNumberOfCalls(t, "GetEvaluationRollouts", 1) @@ -171,6 +184,9 @@ func TestGetFlag(t *testing.T) { store = &common.StoreMock{} ) + store.On("GetVersion", context.TODO(), storage.NewNamespace("ns")).Return( + "v-321", nil, + ) store.On("GetFlag", context.TODO(), storage.NewResource("ns", "flag-1")).Return( expectedFlag, nil, ) @@ -191,14 +207,15 @@ func TestGetFlag(t *testing.T) { _, err := cachedStore.GetFlag(context.TODO(), storage.NewResource("ns", "flag-1")) require.NoError(t, err) assert.NotEmpty(t, cacher.setItems) - assert.NotEmpty(t, cacher.setItems["s:f:ns:flag-1"]) - assert.Equal(t, 1, cacher.setCalled) + assert.NotEmpty(t, cacher.setItems["s:f:ns:v-321:flag-1"]) + assert.NotEmpty(t, cacher.setItems["s:n:ns"]) + assert.Equal(t, 2, cacher.setCalled) // Second call to get flag should hit the cache _, err = cachedStore.GetFlag(context.TODO(), storage.NewResource("ns", "flag-1")) require.NoError(t, err) assert.NotEmpty(t, cacher.getKeys) - _, ok := cacher.getKeys["s:f:ns:flag-1"] + _, ok := cacher.getKeys["s:f:ns:v-321:flag-1"] assert.True(t, ok) store.AssertNumberOfCalls(t, "GetFlag", 1) @@ -210,6 +227,9 @@ func TestCreateFlag(t *testing.T) { store = &common.StoreMock{} ) + store.On("GetVersion", context.TODO(), storage.NewNamespace("ns")).Return( + "v-321", nil, + ) store.On("CreateFlag", context.TODO(), mock.Anything).Return(expectedFlag, nil) var ( @@ -228,7 +248,7 @@ func TestCreateFlag(t *testing.T) { _, err := cachedStore.CreateFlag(context.TODO(), &flipt.CreateFlagRequest{NamespaceKey: "ns", Key: "flag-1"}) require.NoError(t, err) assert.NotEmpty(t, cacher.setItems) - _, ok := cacher.setItems["s:f:ns:flag-1"] + _, ok := cacher.setItems["s:n:ns"] assert.True(t, ok) assert.Equal(t, 1, cacher.setCalled) @@ -240,6 +260,9 @@ func TestUpdateFlag(t *testing.T) { expectedFlag = &flipt.Flag{NamespaceKey: "ns", Key: "flag-1"} store = &common.StoreMock{} ) + store.On("GetVersion", context.TODO(), storage.NewNamespace("ns")).Return( + "v-321", nil, + ) store.On("UpdateFlag", context.TODO(), mock.Anything).Return(expectedFlag, nil) @@ -258,10 +281,10 @@ func TestUpdateFlag(t *testing.T) { // Update flag should call the store and delete the cache _, err := cachedStore.UpdateFlag(context.TODO(), &flipt.UpdateFlagRequest{NamespaceKey: "ns", Key: "flag-1"}) require.NoError(t, err) - assert.NotEmpty(t, cacher.deleteKeys) - _, ok := cacher.deleteKeys["s:f:ns:flag-1"] + assert.NotEmpty(t, cacher.setItems) + _, ok := cacher.setItems["s:n:ns"] assert.True(t, ok) - assert.Equal(t, 1, cacher.deleteCalled) + assert.Equal(t, 1, cacher.setCalled) store.AssertNumberOfCalls(t, "UpdateFlag", 1) } @@ -269,6 +292,9 @@ func TestUpdateFlag(t *testing.T) { func TestDeleteFlag(t *testing.T) { store := &common.StoreMock{} + store.On("GetVersion", context.TODO(), storage.NewNamespace("ns")).Return( + "v-321", nil, + ) store.On("DeleteFlag", context.TODO(), mock.Anything).Return(nil) var ( @@ -286,10 +312,10 @@ func TestDeleteFlag(t *testing.T) { // Delete flag should call the store and delete the cache err := cachedStore.DeleteFlag(context.TODO(), &flipt.DeleteFlagRequest{NamespaceKey: "ns", Key: "flag-1"}) require.NoError(t, err) - assert.NotEmpty(t, cacher.deleteKeys) - _, ok := cacher.deleteKeys["s:f:ns:flag-1"] + assert.NotEmpty(t, cacher.setItems) + _, ok := cacher.setItems["s:n:ns"] assert.True(t, ok) - assert.Equal(t, 1, cacher.deleteCalled) + assert.Equal(t, 1, cacher.setCalled) store.AssertNumberOfCalls(t, "DeleteFlag", 1) } @@ -300,6 +326,9 @@ func TestCreateVariant(t *testing.T) { store = &common.StoreMock{} ) + store.On("GetVersion", context.TODO(), storage.NewNamespace("ns")).Return( + "v-321", nil, + ) store.On("CreateVariant", context.TODO(), mock.Anything).Return(expectedVariant, nil) var ( @@ -317,10 +346,10 @@ func TestCreateVariant(t *testing.T) { // Create variant should call the store and delete the cache _, err := cachedStore.CreateVariant(context.TODO(), &flipt.CreateVariantRequest{NamespaceKey: "ns", FlagKey: "flag-1", Key: "variant-1"}) require.NoError(t, err) - assert.NotEmpty(t, cacher.deleteKeys) - _, ok := cacher.deleteKeys["s:f:ns:flag-1"] + assert.NotEmpty(t, cacher.setItems) + _, ok := cacher.setItems["s:n:ns"] assert.True(t, ok) - assert.Equal(t, 1, cacher.deleteCalled) + assert.Equal(t, 1, cacher.setCalled) store.AssertNumberOfCalls(t, "CreateVariant", 1) } @@ -331,6 +360,9 @@ func TestUpdateVariant(t *testing.T) { store = &common.StoreMock{} ) + store.On("GetVersion", context.TODO(), storage.NewNamespace("ns")).Return( + "v-321", nil, + ) store.On("UpdateVariant", context.TODO(), mock.Anything).Return(expectedVariant, nil) var ( @@ -348,10 +380,10 @@ func TestUpdateVariant(t *testing.T) { // Update variant should call the store and delete the cache _, err := cachedStore.UpdateVariant(context.TODO(), &flipt.UpdateVariantRequest{NamespaceKey: "ns", FlagKey: "flag-1", Key: "variant-1"}) require.NoError(t, err) - assert.NotEmpty(t, cacher.deleteKeys) - _, ok := cacher.deleteKeys["s:f:ns:flag-1"] + assert.NotEmpty(t, cacher.setItems) + _, ok := cacher.setItems["s:n:ns"] assert.True(t, ok) - assert.Equal(t, 1, cacher.deleteCalled) + assert.Equal(t, 1, cacher.setCalled) store.AssertNumberOfCalls(t, "UpdateVariant", 1) } @@ -359,6 +391,9 @@ func TestUpdateVariant(t *testing.T) { func TestDeleteVariant(t *testing.T) { store := &common.StoreMock{} + store.On("GetVersion", context.TODO(), storage.NewNamespace("ns")).Return( + "v-321", nil, + ) store.On("DeleteVariant", context.TODO(), mock.Anything).Return(nil) var ( @@ -376,10 +411,10 @@ func TestDeleteVariant(t *testing.T) { // Delete variant should call the store and delete the cache err := cachedStore.DeleteVariant(context.TODO(), &flipt.DeleteVariantRequest{NamespaceKey: "ns", FlagKey: "flag-1", Id: "variant-1"}) require.NoError(t, err) - assert.NotEmpty(t, cacher.deleteKeys) - _, ok := cacher.deleteKeys["s:f:ns:flag-1"] + assert.NotEmpty(t, cacher.setItems) + _, ok := cacher.setItems["s:n:ns"] assert.True(t, ok) - assert.Equal(t, 1, cacher.deleteCalled) + assert.Equal(t, 1, cacher.setCalled) store.AssertNumberOfCalls(t, "DeleteVariant", 1) } diff --git a/internal/storage/fs/snapshot.go b/internal/storage/fs/snapshot.go index 84ce0580fb..828d35e638 100644 --- a/internal/storage/fs/snapshot.go +++ b/internal/storage/fs/snapshot.go @@ -782,7 +782,7 @@ func (ss *Snapshot) GetEvaluationRules(ctx context.Context, flag storage.Resourc return rules, nil } -func (ss *Snapshot) GetEvaluationDistributions(ctx context.Context, rule storage.IDRequest) ([]*storage.EvaluationDistribution, error) { +func (ss *Snapshot) GetEvaluationDistributions(ctx context.Context, flag storage.ResourceRequest, rule storage.IDRequest) ([]*storage.EvaluationDistribution, error) { dists, ok := ss.evalDists[rule.ID] if !ok { return []*storage.EvaluationDistribution{}, nil diff --git a/internal/storage/fs/snapshot_test.go b/internal/storage/fs/snapshot_test.go index cbcdf3aa67..eacda5e325 100644 --- a/internal/storage/fs/snapshot_test.go +++ b/internal/storage/fs/snapshot_test.go @@ -62,7 +62,6 @@ func TestSnapshotFromFS_Invalid(t *testing.T) { } }) } - } func TestWalkDocuments(t *testing.T) { @@ -633,7 +632,7 @@ func (fis *FSIndexSuite) TestGetEvaluationDistributions() { require.NoError(t, err) assert.Len(t, rules.Results, 1) - dist, err := fis.store.GetEvaluationDistributions(context.TODO(), storage.NewID(rules.Results[0].Id)) + dist, err := fis.store.GetEvaluationDistributions(context.TODO(), storage.NewResource(tc.namespace, tc.flagKey), storage.NewID(rules.Results[0].Id)) require.NoError(t, err) @@ -1587,7 +1586,7 @@ func (fis *FSWithoutIndexSuite) TestGetEvaluationDistributions() { require.NoError(t, err) assert.Len(t, rules.Results, 1) - dist, err := fis.store.GetEvaluationDistributions(context.TODO(), storage.NewID(rules.Results[0].Id)) + dist, err := fis.store.GetEvaluationDistributions(context.TODO(), storage.NewResource(tc.namespace, tc.flagKey), storage.NewID(rules.Results[0].Id)) require.NoError(t, err) @@ -1780,7 +1779,7 @@ func TestFS_YAML_Stream(t *testing.T) { // 3 namespaces including default assert.Len(t, ns.Results, 3) - var namespaces = make([]string, 0, len(ns.Results)) + namespaces := make([]string, 0, len(ns.Results)) for _, n := range ns.Results { namespaces = append(namespaces, n.Key) diff --git a/internal/storage/fs/store.go b/internal/storage/fs/store.go index ee619127c7..56666391a5 100644 --- a/internal/storage/fs/store.go +++ b/internal/storage/fs/store.go @@ -154,9 +154,9 @@ func (s *Store) GetEvaluationRules(ctx context.Context, flag storage.ResourceReq }) } -func (s *Store) GetEvaluationDistributions(ctx context.Context, rule storage.IDRequest) (dists []*storage.EvaluationDistribution, err error) { +func (s *Store) GetEvaluationDistributions(ctx context.Context, r storage.ResourceRequest, rule storage.IDRequest) (dists []*storage.EvaluationDistribution, err error) { return dists, s.viewer.View(ctx, rule.Reference, func(ss storage.ReadOnlyStore) error { - dists, err = ss.GetEvaluationDistributions(ctx, rule) + dists, err = ss.GetEvaluationDistributions(ctx, r, rule) return err }) } diff --git a/internal/storage/fs/store_test.go b/internal/storage/fs/store_test.go index d2230b117e..24b8ea8462 100644 --- a/internal/storage/fs/store_test.go +++ b/internal/storage/fs/store_test.go @@ -203,7 +203,7 @@ func TestGetEvaluationDistributions(t *testing.T) { id := storage.NewID("id") storeMock.On("GetEvaluationDistributions", mock.Anything, id).Return([]*storage.EvaluationDistribution{}, nil) - _, err := ss.GetEvaluationDistributions(context.TODO(), id) + _, err := ss.GetEvaluationDistributions(context.TODO(), storage.NewResource("", "flag"), id) require.NoError(t, err) } diff --git a/internal/storage/sql/common/evaluation.go b/internal/storage/sql/common/evaluation.go index 28e58e55f1..a454f3cb4a 100644 --- a/internal/storage/sql/common/evaluation.go +++ b/internal/storage/sql/common/evaluation.go @@ -32,7 +32,7 @@ func (s *Store) GetEvaluationRules(ctx context.Context, flag storage.ResourceReq SegmentOperator flipt.SegmentOperator } - var rmMap = make(map[string]*RuleMeta) + rmMap := make(map[string]*RuleMeta) ruleIDs := make([]string, 0) for ruleMetaRows.Next() { @@ -197,7 +197,7 @@ func (s *Store) GetEvaluationRules(ctx context.Context, flag storage.ResourceReq return rules, nil } -func (s *Store) GetEvaluationDistributions(ctx context.Context, rule storage.IDRequest) (_ []*storage.EvaluationDistribution, err error) { +func (s *Store) GetEvaluationDistributions(ctx context.Context, r storage.ResourceRequest, rule storage.IDRequest) (_ []*storage.EvaluationDistribution, err error) { rows, err := s.builder.Select("d.id, d.rule_id, d.variant_id, d.rollout, v.\"key\", v.attachment"). From("distributions d"). Join("variants v ON (d.variant_id = v.id)"). diff --git a/internal/storage/sql/evaluation_test.go b/internal/storage/sql/evaluation_test.go index 7885356417..7da4ce8d6a 100644 --- a/internal/storage/sql/evaluation_test.go +++ b/internal/storage/sql/evaluation_test.go @@ -359,7 +359,7 @@ func (s *DBTestSuite) TestGetEvaluationDistributions() { require.NoError(t, err) - evaluationDistributions, err := s.store.GetEvaluationDistributions(context.TODO(), storage.NewID(rule.Id)) + evaluationDistributions, err := s.store.GetEvaluationDistributions(context.TODO(), storage.NewResource(flag.NamespaceKey, flag.Key), storage.NewID(rule.Id)) require.NoError(t, err) assert.Len(t, evaluationDistributions, 2) @@ -464,7 +464,7 @@ func (s *DBTestSuite) TestGetEvaluationDistributionsNamespace() { require.NoError(t, err) - evaluationDistributions, err := s.store.GetEvaluationDistributions(context.TODO(), storage.NewID(rule.Id)) + evaluationDistributions, err := s.store.GetEvaluationDistributions(context.TODO(), storage.NewResource(flag.NamespaceKey, flag.Key), storage.NewID(rule.Id)) require.NoError(t, err) assert.Len(t, evaluationDistributions, 2) @@ -561,7 +561,7 @@ func (s *DBTestSuite) TestGetEvaluationDistributions_MaintainOrder() { require.NoError(t, err) - evaluationDistributions, err := s.store.GetEvaluationDistributions(context.TODO(), storage.NewID(rule.Id)) + evaluationDistributions, err := s.store.GetEvaluationDistributions(context.TODO(), storage.NewResource(flag.NamespaceKey, flag.Key), storage.NewID(rule.Id)) require.NoError(t, err) assert.Len(t, evaluationDistributions, 2) @@ -603,7 +603,7 @@ func (s *DBTestSuite) TestGetEvaluationDistributions_MaintainOrder() { require.NoError(t, err) - evaluationDistributions, err = s.store.GetEvaluationDistributions(context.TODO(), storage.NewID(rule.Id)) + evaluationDistributions, err = s.store.GetEvaluationDistributions(context.TODO(), storage.NewResource(flag.NamespaceKey, flag.Key), storage.NewID(rule.Id)) require.NoError(t, err) assert.Len(t, evaluationDistributions, 2) diff --git a/internal/storage/storage.go b/internal/storage/storage.go index bab541d80a..42ea4abb74 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -194,7 +194,7 @@ type EvaluationStore interface { // GetEvaluationRules returns rules applicable to flagKey provided // Note: Rules MUST be returned in order by Rank GetEvaluationRules(ctx context.Context, flag ResourceRequest) ([]*EvaluationRule, error) - GetEvaluationDistributions(ctx context.Context, rule IDRequest) ([]*EvaluationDistribution, error) + GetEvaluationDistributions(ctx context.Context, flag ResourceRequest, rule IDRequest) ([]*EvaluationDistribution, error) // GetEvaluationRollouts returns rollouts applicable to namespaceKey + flagKey provided // Note: Rollouts MUST be returned in order by rank GetEvaluationRollouts(ctx context.Context, flag ResourceRequest) ([]*EvaluationRollout, error)