Skip to content

Commit

Permalink
Fix code smells identified by SonarQube (#88)
Browse files Browse the repository at this point in the history
  • Loading branch information
massenz authored Mar 4, 2023
1 parent c22de89 commit 352f509
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 166 deletions.
18 changes: 17 additions & 1 deletion grpc/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,18 +114,27 @@ func (s *grpcSubscriber) SendEvent(ctx context.Context, request *protos.EventReq
}

func (s *grpcSubscriber) PutConfiguration(ctx context.Context, cfg *protos.Configuration) (*protos.PutResponse, error) {
// FIXME: use Context to set a timeout, etc.
if err := api.CheckValid(cfg); err != nil {
s.Logger.Error("invalid configuration: %v", err)
return nil, status.Errorf(codes.InvalidArgument, "invalid configuration: %v", err)
}
if deadline, ok := ctx.Deadline(); ok {
if deadline.Before(time.Now()) {
return nil, ctx.Err()
}
}
if err := s.Store.PutConfig(cfg); err != nil {
s.Logger.Error("could not store configuration: %v", err)
if strings.Contains(err.Error(), "already exists") {
return nil, status.Errorf(codes.AlreadyExists, "cannot store configuration: %v", err)
}
return nil, status.Error(codes.Internal, err.Error())
}
if deadline, ok := ctx.Deadline(); ok {
if deadline.Before(time.Now()) {
return nil, ctx.Err()
}
}
s.Logger.Trace("configuration stored: %s", api.GetVersionId(cfg))
return &protos.PutResponse{
Id: api.GetVersionId(cfg),
Expand Down Expand Up @@ -164,6 +173,11 @@ func (s *grpcSubscriber) PutFiniteStateMachine(ctx context.Context,
return nil, status.Error(codes.FailedPrecondition, storage.NotFoundError(
fsm.ConfigId).Error())
}
if deadline, ok := ctx.Deadline(); ok {
if deadline.Before(time.Now()) {
return nil, ctx.Err()
}
}
var id = request.Id
if id == "" {
id = uuid.NewString()
Expand All @@ -178,6 +192,8 @@ func (s *grpcSubscriber) PutFiniteStateMachine(ctx context.Context,
s.Logger.Error("could not store FSM [%v]: %v", fsm, err)
return nil, status.Error(codes.Internal, err.Error())
}
// we cannot interrupt here, even if deadline is passed, as it would leave the
// store in an inconsistent state.
if err := s.Store.UpdateState(cfg.Name, id, "", fsm.State); err != nil {
s.Logger.Error("could not store FSM in state set [%s]: %v", fsm.State, err)
return nil, status.Error(codes.Internal, err.Error())
Expand Down
69 changes: 28 additions & 41 deletions grpc/grpc_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,83 +35,68 @@ import (
protos "github.com/massenz/statemachine-proto/golang/api"
)

var NotImplemented = storage.NotImplementedError("mock")

type Mockstore struct {
mock.Mock
}

func (m *Mockstore) SetLogLevel(level slf4go.LogLevel) {
//TODO implement me
panic("implement me")
}

func (m *Mockstore) GetConfig(versionId string) (*protos.Configuration, bool) {
//TODO implement me
panic("implement me")
return nil, false
}

func (m *Mockstore) PutConfig(cfg *protos.Configuration) error {
//TODO implement me
panic("implement me")
return NotImplemented
}

func (m *Mockstore) GetAllConfigs() []string {
//TODO implement me
panic("implement me")
return nil
}

func (m *Mockstore) GetAllVersions(name string) []string {
//TODO implement me
panic("implement me")
return nil
}

func (m *Mockstore) GetStateMachine(id string, cfg string) (*protos.FiniteStateMachine, bool) {
//TODO implement me
panic("implement me")
return nil, false
}

func (m *Mockstore) PutStateMachine(id string, fsm *protos.FiniteStateMachine) error {
//TODO implement me
panic("implement me")
return NotImplemented
}

func (m *Mockstore) GetAllInState(cfg string, state string) []string {
//TODO implement me
panic("implement me")
return nil
}

func (m *Mockstore) UpdateState(cfgName string, id string, oldState string, newState string) error {
//TODO implement me
panic("implement me")
return NotImplemented
}

func (m *Mockstore) GetEvent(id string, cfg string) (*protos.Event, bool) {
//TODO implement me
panic("implement me")
return nil, false
}

func (m *Mockstore) PutEvent(event *protos.Event, cfg string, ttl time.Duration) error {
//TODO implement me
panic("implement me")
return NotImplemented
}

func (m *Mockstore) AddEventOutcome(eventId string, cfgName string, response *protos.EventOutcome, ttl time.Duration) error {
//TODO implement me
panic("implement me")
return NotImplemented
}

func (m *Mockstore) GetOutcomeForEvent(eventId string, cfgName string) (*protos.EventOutcome, bool) {
//TODO implement me
panic("implement me")
return nil, false
}

func (m *Mockstore) SetTimeout(duration time.Duration) {
//TODO implement me
panic("implement me")
}

func (m *Mockstore) GetTimeout() time.Duration {
//TODO implement me
panic("implement me")
return 0
}

func (m *Mockstore) Health() error {
Expand Down Expand Up @@ -162,12 +147,13 @@ var _ = Describe("the gRPC Server", func() {
g.Ω(hr.State).Should(Equal(protos.HealthResponse_READY))
}, 100*time.Millisecond, 20*time.Millisecond).Should(Succeed())
})
const EventName = "test-event"
It("should succeed for well-formed events", func() {
response, err := client.SendEvent(bkgnd, &protos.EventRequest{
Event: &protos.Event{
EventId: "1",
Transition: &protos.Transition{
Event: "test-vt",
Event: EventName,
},
Originator: "test",
},
Expand All @@ -181,7 +167,7 @@ var _ = Describe("the gRPC Server", func() {
select {
case evt := <-testCh:
Ω(evt.Event.EventId).To(Equal("1"))
Ω(evt.Event.Transition.Event).To(Equal("test-vt"))
Ω(evt.Event.Transition.Event).To(Equal(EventName))
Ω(evt.Event.Originator).To(Equal("test"))
Ω(evt.Id).To(Equal("2"))
case <-time.After(10 * time.Millisecond):
Expand All @@ -193,7 +179,7 @@ var _ = Describe("the gRPC Server", func() {
response, err := client.SendEvent(bkgnd, &protos.EventRequest{
Event: &protos.Event{
Transition: &protos.Transition{
Event: "test-vt",
Event: EventName,
},
Originator: "test",
},
Expand All @@ -207,7 +193,7 @@ var _ = Describe("the gRPC Server", func() {
select {
case evt := <-testCh:
Ω(evt.Event.EventId).Should(Equal(generatedId))
Ω(evt.Event.Transition.Event).To(Equal("test-vt"))
Ω(evt.Event.Transition.Event).To(Equal(EventName))
case <-time.After(10 * time.Millisecond):
Fail("Timed out")
}
Expand All @@ -216,7 +202,7 @@ var _ = Describe("the gRPC Server", func() {
_, err := client.SendEvent(bkgnd, &protos.EventRequest{
Event: &protos.Event{
Transition: &protos.Transition{
Event: "test-vt",
Event: EventName,
},
Originator: "test",
},
Expand Down Expand Up @@ -452,34 +438,35 @@ var _ = Describe("the gRPC Server", func() {
AssertStatusCode(codes.NotFound, err)
})
It("will find all FSMs by State", func() {
const ConfigName = "test.m"
for i := 1; i <= 5; i++ {
id := fmt.Sprintf("fsm-%d", i)
Ω(store.PutStateMachine(id,
&protos.FiniteStateMachine{
ConfigId: "test.m:v1",
ConfigId: ConfigName + ":v1",
State: "start",
})).Should(Succeed())
store.UpdateState("test.m", id, "", "start")
store.UpdateState(ConfigName, id, "", "start")
}
for i := 10; i < 13; i++ {
id := fmt.Sprintf("fsm-%d", i)
Ω(store.PutStateMachine(id,
&protos.FiniteStateMachine{
ConfigId: "test.m:v1",
ConfigId: ConfigName + ":v1",
State: "stop",
})).Should(Succeed())
store.UpdateState("test.m", id, "", "stop")
store.UpdateState(ConfigName, id, "", "stop")

}
items, err := client.GetAllInState(bkgnd, &protos.GetFsmRequest{
Config: "test.m",
Config: ConfigName,
Query: &protos.GetFsmRequest_State{State: "start"},
})
Ω(err).ShouldNot(HaveOccurred())
Ω(len(items.GetIds())).Should(Equal(5))
Ω(items.GetIds()).Should(ContainElements("fsm-3", "fsm-5"))
items, err = client.GetAllInState(bkgnd, &protos.GetFsmRequest{
Config: "test.m",
Config: ConfigName,
Query: &protos.GetFsmRequest_State{State: "stop"},
})
Ω(err).ShouldNot(HaveOccurred())
Expand Down
16 changes: 9 additions & 7 deletions pubsub/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,11 @@ var _ = Describe("A Listener", func() {
// Set to DEBUG when diagnosing test failures
testListener.SetLogLevel(logging.NONE)
})
const eventId = "1234-abcdef"
It("can post error notifications", func() {
defer close(notificationsCh)
msg := protos.Event{
EventId: "feed-beef",
EventId: eventId,
Originator: "me",
Transition: &protos.Transition{
Event: "test-me",
Expand Down Expand Up @@ -76,17 +77,18 @@ var _ = Describe("A Listener", func() {
})
It("can process well-formed events", func() {
event := protos.Event{
EventId: "feed-beef",
EventId: eventId,
Originator: "me",
Transition: &protos.Transition{
Event: "move",
},
Details: "more details",
}
const requestId = "12345-faa44"
request := protos.EventRequest{
Event: &event,
Config: "test",
Id: "12345-faa44",
Id: requestId,
}
Ω(store.PutConfig(&protos.Configuration{
Name: "test",
Expand All @@ -95,7 +97,7 @@ var _ = Describe("A Listener", func() {
Transitions: []*protos.Transition{{From: "start", To: "end", Event: "move"}},
StartingState: "start",
})).ToNot(HaveOccurred())
Ω(store.PutStateMachine("12345-faa44", &protos.FiniteStateMachine{
Ω(store.PutStateMachine(requestId, &protos.FiniteStateMachine{
ConfigId: "test:v1",
State: "start",
History: nil,
Expand All @@ -109,7 +111,7 @@ var _ = Describe("A Listener", func() {

Eventually(func(g Gomega) {
// Now we want to test that the state machine was updated
fsm, ok := store.GetStateMachine("12345-faa44", "test")
fsm, ok := store.GetStateMachine(requestId, "test")
g.Ω(ok).ToNot(BeFalse())
g.Ω(fsm.State).To(Equal("end"))
g.Ω(len(fsm.History)).To(Equal(1))
Expand All @@ -123,7 +125,7 @@ var _ = Describe("A Listener", func() {
})
It("sends notifications for missing state-machine", func() {
event := protos.Event{
EventId: "feed-beef",
EventId: eventId,
Originator: "me",
Transition: &protos.Transition{
Event: "move",
Expand Down Expand Up @@ -152,7 +154,7 @@ var _ = Describe("A Listener", func() {
It("sends notifications for missing destinations", func() {
request := protos.EventRequest{
Event: &protos.Event{
EventId: "feed-beef",
EventId: eventId,
},
}
go func() { testListener.ListenForMessages() }()
Expand Down
43 changes: 22 additions & 21 deletions pubsub/sqs_sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ import (
protos "github.com/massenz/statemachine-proto/golang/api"
)

// TODO: should we need to generalize and abstract the implementation of a Subscriber?
// This would be necessary if we were to implement a different message broker (e.g., Kafka)

// getSqsClient connects to AWS and obtains an SQS client; passing `nil` as the `awsEndpointUrl` will
// connect by default to AWS; use a different (possibly local) URL for a LocalStack test deployment.
func getSqsClient(awsEndpointUrl *string) *sqs.SQS {
Expand Down Expand Up @@ -61,11 +58,12 @@ func NewSqsSubscriber(eventsChannel chan<- protos.EventRequest, sqsUrl *string)
return nil
}
return &SqsSubscriber{
logger: log.NewLog("SQS-Sub"),
client: client,
events: eventsChannel,
Timeout: DefaultVisibilityTimeout,
PollingInterval: DefaultPollingInterval,
logger: log.NewLog("SQS-Sub"),
client: client,
events: eventsChannel,
Timeout: DefaultVisibilityTimeout,
PollingInterval: DefaultPollingInterval,
MessageRemoveRetries: DefaultRetries,
}
}

Expand Down Expand Up @@ -134,14 +132,14 @@ func (s *SqsSubscriber) ProcessMessage(msg *sqs.Message, queueUrl *string) {
var request protos.EventRequest
err := proto.UnmarshalText(*msg.Body, &request)
if err != nil {
s.logger.Error("Message %v has invalid body: %s", msg.MessageId, err.Error())
s.logger.Error("message %v has invalid body: %s", msg.MessageId, err.Error())
// TODO: publish error to DLQ.
return
}

destId := request.GetId()
if destId == "" {
errDetails := fmt.Sprintf("No Destination ID in %v", request.String())
errDetails := fmt.Sprintf("no Destination ID in %v", request.String())
s.logger.Error(errDetails)
// TODO: publish error to DLQ.
return
Expand All @@ -150,16 +148,19 @@ func (s *SqsSubscriber) ProcessMessage(msg *sqs.Message, queueUrl *string) {
api.UpdateEvent(request.Event)
s.events <- request

s.logger.Debug("Removing message %v from SQS", *msg.MessageId)
_, err = s.client.DeleteMessage(&sqs.DeleteMessageInput{
QueueUrl: queueUrl,
ReceiptHandle: msg.ReceiptHandle,
})
if err != nil {
// FIXME: add retries
errDetails := fmt.Sprintf("Failed to remove message %v from SQS", msg.MessageId)
s.logger.Error("%s: %v", errDetails, err)
// TODO: publish error to DLQ, should also retry removal here.
for i := 0; i < s.MessageRemoveRetries; i++ {
s.logger.Debug("removing message %v from SQS", *msg.MessageId)
_, err = s.client.DeleteMessage(&sqs.DeleteMessageInput{
QueueUrl: queueUrl,
ReceiptHandle: msg.ReceiptHandle,
})
if err != nil {
errDetails := fmt.Sprintf("failed to remove message %v from SQS (attempt: %d)",
msg.MessageId, i+1)
s.logger.Error("%s: %v", errDetails, err)
} else {
break
}
}
s.logger.Trace("Message %v removed", msg.MessageId)
s.logger.Trace("message %v removed", msg.MessageId)
}
Loading

0 comments on commit 352f509

Please sign in to comment.