Skip to content

Commit

Permalink
[#73] Refactor Redis Store methods to return errors instead of bool (#89
Browse files Browse the repository at this point in the history
)
  • Loading branch information
massenz authored Mar 9, 2023
1 parent 352f509 commit 04de5ca
Show file tree
Hide file tree
Showing 10 changed files with 390 additions and 380 deletions.
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ GOOS ?= $(shell uname -s | tr "[:upper:]" "[:lower:]")
GOARCH ?= amd64
GOMOD := $(shell go list -m)

version := v0.10.1
version := v0.11.0
release := $(version)-g$(shell git rev-parse --short HEAD)
prog := sm-server
bin := out/bin/$(prog)-$(version)_$(GOOS)-$(GOARCH)
Expand Down Expand Up @@ -70,8 +70,8 @@ $(dockerbin): $(srcs)
-ldflags "-X $(GOMOD)/api.Release=$(release)" \
-o $(dockerbin) cmd/main.go

$(healthcheck): clients/grpc_health.go
GOOS=linux GOARCH=amd64 go build -o $(healthcheck) clients/grpc_health.go
$(healthcheck): grpc_health.go
GOOS=linux GOARCH=amd64 go build -o $(healthcheck) grpc_health.go

.PHONY: build
build: $(bin) ## Builds the Statemachine server binary
Expand Down
31 changes: 16 additions & 15 deletions grpc/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,9 @@ func (s *grpcSubscriber) GetConfiguration(ctx context.Context, configId *wrapper
*protos.Configuration, error) {
cfgId := configId.Value
s.Logger.Trace("retrieving Configuration %s", cfgId)
cfg, found := s.Store.GetConfig(cfgId)
if !found {
cfg, err := s.Store.GetConfig(cfgId)
if err != nil {
s.Logger.Error("could not get configuration: %v", err)
return nil, status.Errorf(codes.NotFound, "configuration %s not found", cfgId)
}
return cfg, nil
Expand All @@ -168,8 +169,8 @@ func (s *grpcSubscriber) PutFiniteStateMachine(ctx context.Context,
request *protos.PutFsmRequest) (*protos.PutResponse, error) {
fsm := request.Fsm
// First check that the configuration for the FSM is valid
cfg, ok := s.Store.GetConfig(fsm.ConfigId)
if !ok {
cfg, err := s.Store.GetConfig(fsm.ConfigId)
if err != nil {
return nil, status.Error(codes.FailedPrecondition, storage.NotFoundError(
fsm.ConfigId).Error())
}
Expand Down Expand Up @@ -212,8 +213,8 @@ func (s *grpcSubscriber) GetFiniteStateMachine(ctx context.Context, in *protos.G
return nil, status.Error(codes.InvalidArgument, "ID must always be provided when looking up statemachine")
}
s.Logger.Debug("looking up FSM [%s] (Configuration: %s)", fsmId, cfg)
fsm, ok := s.Store.GetStateMachine(fsmId, cfg)
if !ok {
fsm, err := s.Store.GetStateMachine(fsmId, cfg)
if err != nil {
return nil, status.Error(codes.NotFound, storage.NotFoundError(fsmId).Error())
}
return fsm, nil
Expand All @@ -239,9 +240,9 @@ func (s *grpcSubscriber) GetEventOutcome(ctx context.Context, in *protos.EventRe
evtId := in.GetId()
cfg := in.GetConfig()
s.Logger.Debug("looking up EventOutcome %s (%s)", evtId, cfg)
outcome, ok := s.Store.GetOutcomeForEvent(evtId, cfg)
if !ok {
return nil, status.Error(codes.NotFound, fmt.Sprintf("outcome for event %s not found", evtId))
outcome, err := s.Store.GetOutcomeForEvent(evtId, cfg)
if err != nil {
return nil, status.Errorf(codes.NotFound, "cannot get outcome for event %s: %v", evtId, err)
}
return &protos.EventResponse{
EventId: evtId,
Expand All @@ -256,9 +257,9 @@ func (s *grpcSubscriber) StreamAllInstate(in *protos.GetFsmRequest, stream State
}
cfgName := in.GetConfig()
for _, id := range response.GetIds() {
fsm, found := s.Store.GetStateMachine(id, cfgName)
if !found {
return storage.NotFoundError(id)
fsm, err := s.Store.GetStateMachine(id, cfgName)
if err != nil {
return err
}
if err = stream.SendMsg(&protos.PutResponse{
Id: id,
Expand All @@ -280,9 +281,9 @@ func (s *grpcSubscriber) StreamAllConfigurations(in *wrapperspb.StringValue, str
return nil
}
for _, cfgId := range response.GetIds() {
cfg, found := s.Store.GetConfig(cfgId)
if !found {
return storage.NotFoundError(cfgId)
cfg, err := s.Store.GetConfig(cfgId)
if err != nil {
return err
}
if err = stream.SendMsg(cfg); err != nil {
return err
Expand Down
26 changes: 13 additions & 13 deletions grpc/grpc_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ type Mockstore struct {
func (m *Mockstore) SetLogLevel(level slf4go.LogLevel) {
}

func (m *Mockstore) GetConfig(versionId string) (*protos.Configuration, bool) {
return nil, false
func (m *Mockstore) GetConfig(versionId string) (*protos.Configuration, storage.StoreErr) {
return nil, nil
}

func (m *Mockstore) PutConfig(cfg *protos.Configuration) error {
func (m *Mockstore) PutConfig(cfg *protos.Configuration) storage.StoreErr {
return NotImplemented
}

Expand All @@ -60,8 +60,8 @@ func (m *Mockstore) GetAllVersions(name string) []string {
return nil
}

func (m *Mockstore) GetStateMachine(id string, cfg string) (*protos.FiniteStateMachine, bool) {
return nil, false
func (m *Mockstore) GetStateMachine(id string, cfg string) (*protos.FiniteStateMachine, storage.StoreErr) {
return nil, NotImplemented
}

func (m *Mockstore) PutStateMachine(id string, fsm *protos.FiniteStateMachine) error {
Expand All @@ -76,8 +76,8 @@ func (m *Mockstore) UpdateState(cfgName string, id string, oldState string, newS
return NotImplemented
}

func (m *Mockstore) GetEvent(id string, cfg string) (*protos.Event, bool) {
return nil, false
func (m *Mockstore) GetEvent(id string, cfg string) (*protos.Event, storage.StoreErr) {
return nil, NotImplemented
}

func (m *Mockstore) PutEvent(event *protos.Event, cfg string, ttl time.Duration) error {
Expand All @@ -88,8 +88,8 @@ func (m *Mockstore) AddEventOutcome(eventId string, cfgName string, response *pr
return NotImplemented
}

func (m *Mockstore) GetOutcomeForEvent(eventId string, cfgName string) (*protos.EventOutcome, bool) {
return nil, false
func (m *Mockstore) GetOutcomeForEvent(eventId string, cfgName string) (*protos.EventOutcome, storage.StoreErr) {
return nil, NotImplemented
}

func (m *Mockstore) SetTimeout(duration time.Duration) {
Expand Down Expand Up @@ -295,14 +295,14 @@ var _ = Describe("the gRPC Server", func() {
}
})
It("should store valid configurations", func() {
_, ok := store.GetConfig(GetVersionId(cfg))
Ω(ok).To(BeFalse())
_, err := store.GetConfig(GetVersionId(cfg))
Ω(err).ToNot(BeNil())
response, err := client.PutConfiguration(bkgnd, cfg)
Ω(err).ToNot(HaveOccurred())
Ω(response).ToNot(BeNil())
Ω(response.Id).To(Equal(GetVersionId(cfg)))
found, ok := store.GetConfig(response.Id)
Ω(ok).Should(BeTrue())
found, err := store.GetConfig(response.Id)
Ω(err).Should(BeNil())
Ω(found).Should(Respect(cfg))
})
It("should fail for invalid configuration", func() {
Expand Down
File renamed without changes.
8 changes: 4 additions & 4 deletions pubsub/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,16 @@ func (listener *EventsListener) ListenForMessages() {
fmt.Sprintf("could not store event: %v", err)))
continue
}
fsm, ok := listener.store.GetStateMachine(fsmId, config)
if !ok {
fsm, err := listener.store.GetStateMachine(fsmId, config)
if err != nil {
listener.PostNotificationAndReportOutcome(makeResponse(&request,
protos.EventOutcome_FsmNotFound,
fmt.Sprintf("statemachine [%s] could not be found", fsmId)))
continue
}
// TODO: cache the configuration locally: they are immutable anyway.
cfg, ok := listener.store.GetConfig(fsm.ConfigId)
if !ok {
cfg, err := listener.store.GetConfig(fsm.ConfigId)
if err != nil {
listener.PostNotificationAndReportOutcome(makeResponse(&request,
protos.EventOutcome_ConfigurationNotFound,
fmt.Sprintf("configuration [%s] could not be found", fsm.ConfigId)))
Expand Down
37 changes: 21 additions & 16 deletions pubsub/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
package pubsub_test

import (
. "github.com/JiaYongfei/respect/gomega"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

Expand Down Expand Up @@ -111,17 +112,17 @@ var _ = Describe("A Listener", func() {

Eventually(func(g Gomega) {
// Now we want to test that the state machine was updated
fsm, ok := store.GetStateMachine(requestId, "test")
g.Ω(ok).ToNot(BeFalse())
fsm, err := store.GetStateMachine(requestId, "test")
g.Ω(err).To(BeNil())
g.Ω(fsm.State).To(Equal("end"))
g.Ω(len(fsm.History)).To(Equal(1))
g.Ω(fsm.History[0].Details).To(Equal("more details"))
g.Ω(fsm.History[0].Transition.Event).To(Equal("move"))
}).Should(Succeed())
Eventually(func() bool {
_, found := store.GetEvent(event.EventId, "test")
return found
}).Should(BeTrue())
Eventually(func() storage.StoreErr {
_, err := store.GetEvent(event.EventId, "test")
return err
}).Should(BeNil())
})
It("sends notifications for missing state-machine", func() {
event := protos.Event{
Expand Down Expand Up @@ -215,18 +216,22 @@ var _ = Describe("A Listener", func() {
return nil
}
}).Should(BeNil())
Eventually(func() *protos.Event {
e, _ := store.GetEvent(event.EventId, request.Config)
return e
}, 100*time.Millisecond, 20*time.Millisecond).ShouldNot(BeNil())
Eventually(func() protos.EventOutcome_StatusCode {
e, ok := store.GetOutcomeForEvent(event.EventId, request.Config)
if ok {
return e.Code
Eventually(func(g Gomega) {
evt, err := store.GetEvent(event.EventId, request.Config)
Ω(err).ToNot(HaveOccurred())
if evt != nil {
Ω(evt).To(Respect(&event))
} else {
return protos.EventOutcome_GenericError
Fail("event is nil")
}
}, 100*time.Millisecond, 20*time.Millisecond).Should(Succeed())
Eventually(func(g Gomega) {
outcome, err := store.GetOutcomeForEvent(event.EventId, request.Config)
Ω(err).ToNot(HaveOccurred())
if outcome != nil {
Ω(outcome.Code).To(Equal(protos.EventOutcome_Ok))
}
}, 100*time.Millisecond, 20*time.Millisecond).Should(Equal(protos.EventOutcome_Ok))
}, 100*time.Millisecond, 20*time.Millisecond).Should(Succeed())
})
})
})
81 changes: 0 additions & 81 deletions storage/redis_sets_store.go

This file was deleted.

Loading

0 comments on commit 04de5ca

Please sign in to comment.