From b5aafe721db6590178ebc80034817fad07b94dc4 Mon Sep 17 00:00:00 2001 From: Marco Massenzio Date: Mon, 29 Aug 2022 19:10:35 -0700 Subject: [PATCH 1/5] Fixed failure in GH Action --- .github/workflows/release.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index d7c73d2..097435a 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -53,7 +53,7 @@ jobs: docker login -u ${{ secrets.DOCKERHUB_USERNAME }} -p ${{ secrets.DOCKERHUB_PASSWORD }} docker push $IMAGE - - uses: ncipollo/release-action@v1.10 + - uses: ncipollo/release-action@v1.10.0 with: tag: ${{ env.TAG }} generateReleaseNotes: true From 5ebf98a84c5ba8d8d1c34e86c0708f5f3a72ef9a Mon Sep 17 00:00:00 2001 From: Marco Massenzio <1153951+massenz@users.noreply.github.com> Date: Wed, 31 Aug 2022 19:38:44 -0700 Subject: [PATCH 2/5] [Issue #22] - adds handling Events' metadata (#28) * Added Event Details to Protobuf handling * Added to gRPC API * Removed internal `pubsub.EventMessage` unnecessary type **TODO** Refactor SQS Event processing --- .run/Run gRPC Client.run.xml | 22 +-- api/fsm.go | 6 +- api/statemachine_test.go | 12 +- clients/grpc_client.go | 21 ++- cmd/main.go | 5 +- go.mod | 2 +- go.sum | 4 +- grpc/grpc_server.go | 32 ++-- grpc/grpc_server_test.go | 22 +-- pubsub/listener.go | 49 ++--- pubsub/listener_test.go | 133 +++++-------- pubsub/pubsub_suite_test.go | 14 +- pubsub/pubsub_test.go | 89 --------- pubsub/sqs_pub.go | 138 +++++++------- pubsub/sqs_pub_test.go | 54 +++--- pubsub/sqs_sub.go | 273 ++++++++++++++------------- pubsub/sqs_sub_test.go | 33 ++-- pubsub/types.go | 131 +++++-------- server/statemachine_handlers_test.go | 123 +++++------- 19 files changed, 488 insertions(+), 675 deletions(-) delete mode 100644 pubsub/pubsub_test.go diff --git a/.run/Run gRPC Client.run.xml b/.run/Run gRPC Client.run.xml index 3dc17cb..31107f6 100644 --- a/.run/Run gRPC Client.run.xml +++ b/.run/Run gRPC Client.run.xml @@ -1,30 +1,12 @@ - - - + - + \ No newline at end of file diff --git a/api/fsm.go b/api/fsm.go index 97be419..71722da 100644 --- a/api/fsm.go +++ b/api/fsm.go @@ -62,18 +62,14 @@ type ConfiguredStateMachine struct { } func NewStateMachine(configuration *protos.Configuration) (*ConfiguredStateMachine, error) { - if configuration.Name == "" { + if configuration.Name == "" || configuration.Version == "" { Logger.Error("Missing configuration name") return nil, MalformedConfigurationError } - if configuration.Version == "" { - configuration.Version = "v1" - } return &ConfiguredStateMachine{ FSM: &protos.FiniteStateMachine{ ConfigId: configuration.Name + ":" + configuration.Version, State: configuration.StartingState, - //History: make([]string, 0), }, Config: configuration, }, nil diff --git a/api/statemachine_test.go b/api/statemachine_test.go index 10afb85..c698ba7 100644 --- a/api/statemachine_test.go +++ b/api/statemachine_test.go @@ -110,7 +110,7 @@ var _ = Describe("FSM Protocol Buffers", func() { }) Describe("Finite State Machines", func() { - Context("with an unnamed configuration", func() { + Context("with a configuration", func() { var spaceship protos.Configuration BeforeEach(func() { @@ -125,18 +125,18 @@ var _ = Describe("FSM Protocol Buffers", func() { }) It("without name will fail", func() { + spaceship.Version = "v0.1" _, err := NewStateMachine(&spaceship) Expect(err).Should(HaveOccurred()) }) - It("will get a default version, if missing", func() { + It("will fail with a missing configuration version", func() { spaceship.Name = "mars_orbiter" - s, err := NewStateMachine(&spaceship) - Expect(err).ShouldNot(HaveOccurred()) - Expect(s.FSM.ConfigId).To(Equal("mars_orbiter:v1")) + _, err := NewStateMachine(&spaceship) + Expect(err).To(HaveOccurred()) }) It("will carry the configuration embedded", func() { spaceship.Name = "mars_orbiter" - spaceship.Version = "v3" + spaceship.Version = "v1.0.1" s, err := NewStateMachine(&spaceship) Expect(err).ToNot(HaveOccurred()) Expect(s).ToNot(BeNil()) diff --git a/clients/grpc_client.go b/clients/grpc_client.go index ef690fb..6e9b0d2 100644 --- a/clients/grpc_client.go +++ b/clients/grpc_client.go @@ -20,14 +20,23 @@ package main import ( "context" + "encoding/json" "flag" "fmt" "github.com/google/uuid" "github.com/massenz/statemachine-proto/golang/api" "google.golang.org/grpc" "google.golang.org/protobuf/types/known/timestamppb" + "time" ) +type OrderDetails struct { + OrderId string + CustomerId string + OrderDate time.Time + OrderTotal float64 +} + func main() { serverAddr := flag.String("addr", ":4567", "The address (host:port) for the GRPC server") fsmId := flag.String("dest", "", "The ID for the FSM to send an Event to") @@ -44,6 +53,15 @@ func main() { cc, _ := grpc.Dial(*serverAddr, clientOptions...) client := api.NewStatemachineServiceClient(cc) + // Fake order + order := OrderDetails{ + OrderId: uuid.New().String(), + CustomerId: uuid.New().String(), + OrderDate: time.Now(), + OrderTotal: 100.0, + } + details, _ := json.Marshal(order) + response, err := client.ConsumeEvent(context.Background(), &api.EventRequest{ Event: &api.Event{ @@ -52,7 +70,8 @@ func main() { Transition: &api.Transition{ Event: *event, }, - Originator: "gRPC Client", + Details: string(details), + Originator: "new gRPC Client with details", }, Dest: *fsmId, }) diff --git a/cmd/main.go b/cmd/main.go index 1bd6859..924aa14 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -30,6 +30,7 @@ import ( "github.com/massenz/go-statemachine/server" "github.com/massenz/go-statemachine/storage" log "github.com/massenz/slf4go/logging" + protos "github.com/massenz/statemachine-proto/golang/api" "net" "sync" ) @@ -60,7 +61,7 @@ var ( // whether we can support a fully concurrent system with a // buffered channel errorsCh chan pubsub.EventErrorMessage = nil - eventsCh = make(chan pubsub.EventMessage) + eventsCh = make(chan protos.EventRequest) wg sync.WaitGroup ) @@ -184,7 +185,7 @@ func setLogLevel(debug bool, trace bool) { // the local `port` and will send any incoming // `EventRequest` to the receiving channel. // This MUST be run as a go-routine, which never returns -func startGrpcServer(port int, events chan<- pubsub.EventMessage) { +func startGrpcServer(port int, events chan<- protos.EventRequest) { defer wg.Done() l, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) if err != nil { diff --git a/go.mod b/go.mod index 445348f..125bed6 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/google/uuid v1.3.0 github.com/gorilla/mux v1.8.0 github.com/massenz/slf4go v0.3.1-gb35df61 - github.com/massenz/statemachine-proto/golang v0.4.1-gdb3d0ac + github.com/massenz/statemachine-proto/golang v0.5.0-g73ba49a github.com/onsi/ginkgo v1.16.5 github.com/onsi/gomega v1.18.1 google.golang.org/grpc v1.49.0 diff --git a/go.sum b/go.sum index e8a4053..f3481b9 100644 --- a/go.sum +++ b/go.sum @@ -57,8 +57,8 @@ github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGw github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= github.com/massenz/slf4go v0.3.1-gb35df61 h1:X/rcmd918F2nkkPbahMcQE0Qb8wc6xg37rpsfunjGMM= github.com/massenz/slf4go v0.3.1-gb35df61/go.mod h1:ZJjthXAnZMJGwXUz3Z3v5uyban00uAFFoDYODOoLFpw= -github.com/massenz/statemachine-proto/golang v0.4.1-gdb3d0ac h1:+oLEMwoMSk/BYkPtj9OfFI1JTVfBT2C+ESume7EXpsk= -github.com/massenz/statemachine-proto/golang v0.4.1-gdb3d0ac/go.mod h1:EkwQg7wD6c/cmXVxfqNaUOVSrBLlti+xYljIxaQNJqA= +github.com/massenz/statemachine-proto/golang v0.5.0-g73ba49a h1:YE5KMefF7skqZs7JFTQH+ghn5lLsd9uTW+MRwvv8aZw= +github.com/massenz/statemachine-proto/golang v0.5.0-g73ba49a/go.mod h1:EkwQg7wD6c/cmXVxfqNaUOVSrBLlti+xYljIxaQNJqA= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= diff --git a/grpc/grpc_server.go b/grpc/grpc_server.go index ec85a7c..1b05ecb 100644 --- a/grpc/grpc_server.go +++ b/grpc/grpc_server.go @@ -22,18 +22,18 @@ import ( "context" "fmt" "github.com/google/uuid" - "github.com/massenz/go-statemachine/api" - "github.com/massenz/go-statemachine/pubsub" - "github.com/massenz/go-statemachine/storage" "github.com/massenz/slf4go/logging" "google.golang.org/grpc" - "time" + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/massenz/go-statemachine/api" + "github.com/massenz/go-statemachine/storage" protos "github.com/massenz/statemachine-proto/golang/api" ) type Config struct { - EventsChannel chan<- pubsub.EventMessage + EventsChannel chan<- protos.EventRequest Store storage.StoreManager Logger *logging.Log } @@ -60,23 +60,17 @@ func (s *grpcSubscriber) ConsumeEvent(ctx context.Context, request *protos.Event if request.Event.Transition.Event == "" { return nil, api.MissingEventNameError } - - s.Logger.Trace("Received gRPC request: %v", request) - evt := pubsub.EventMessage{ - Sender: request.Event.Originator, - Destination: request.Dest, - EventId: request.Event.EventId, - EventName: request.Event.Transition.Event, - EventTimestamp: time.Now(), + if request.Event.EventId == "" { + request.Event.EventId = uuid.NewString() } - if evt.EventId == "" { - evt.EventId = uuid.NewString() + if request.Event.Timestamp == nil { + request.Event.Timestamp = timestamppb.Now() } - s.Logger.Trace("Sending Event to channel: %v", evt.EventId) - // TODO: use the context to set a timeout and cancel the request if the channel cannot accept + s.Logger.Trace("Sending Event to channel: %v", request.Event) + // TODO: use the context and cancel the request if the channel cannot accept // the event within the given timeout. - s.EventsChannel <- evt - return &protos.EventResponse{EventId: evt.EventId}, nil + s.EventsChannel <- *request + return &protos.EventResponse{EventId: request.Event.EventId}, nil } func (s *grpcSubscriber) PutConfiguration(ctx context.Context, cfg *protos.Configuration) (*protos.PutResponse, error) { diff --git a/grpc/grpc_server_test.go b/grpc/grpc_server_test.go index c8c5dfe..db46656 100644 --- a/grpc/grpc_server_test.go +++ b/grpc/grpc_server_test.go @@ -8,14 +8,12 @@ import ( "context" "fmt" "github.com/massenz/slf4go/logging" - g "google.golang.org/grpc" "net" "time" . "github.com/massenz/go-statemachine/api" "github.com/massenz/go-statemachine/grpc" - "github.com/massenz/go-statemachine/pubsub" "github.com/massenz/go-statemachine/storage" "github.com/massenz/statemachine-proto/golang/api" ) @@ -23,7 +21,7 @@ import ( var _ = Describe("GrpcServer", func() { Context("when configured", func() { - var testCh chan pubsub.EventMessage + var testCh chan api.EventRequest var listener net.Listener var client api.StatemachineServiceClient var done func() @@ -32,7 +30,7 @@ var _ = Describe("GrpcServer", func() { BeforeEach(func() { var err error - testCh = make(chan pubsub.EventMessage, 5) + testCh = make(chan api.EventRequest, 5) listener, err = net.Listen("tcp", ":0") Ω(err).ShouldNot(HaveOccurred()) @@ -77,10 +75,10 @@ var _ = Describe("GrpcServer", func() { done() select { case evt := <-testCh: - Ω(evt.EventId).To(Equal("1")) - Ω(evt.EventName).To(Equal("test-vt")) - Ω(evt.Sender).To(Equal("test")) - Ω(evt.Destination).To(Equal("2")) + Ω(evt.Event.EventId).To(Equal("1")) + Ω(evt.Event.Transition.Event).To(Equal("test-vt")) + Ω(evt.Event.Originator).To(Equal("test")) + Ω(evt.Dest).To(Equal("2")) case <-time.After(10 * time.Millisecond): Fail("Timed out") @@ -102,8 +100,8 @@ var _ = Describe("GrpcServer", func() { done() select { case evt := <-testCh: - Ω(evt.EventId).Should(Equal(generatedId)) - Ω(evt.EventName).To(Equal("test-vt")) + Ω(evt.Event.EventId).Should(Equal(generatedId)) + Ω(evt.Event.Transition.Event).To(Equal("test-vt")) case <-time.After(10 * time.Millisecond): Fail("Timed out") } @@ -121,7 +119,7 @@ var _ = Describe("GrpcServer", func() { done() select { case evt := <-testCh: - Fail(fmt.Sprintf("Unexpected event: %s", evt)) + Fail(fmt.Sprintf("Unexpected event: %s", evt.String())) case <-time.After(10 * time.Millisecond): Succeed() } @@ -140,7 +138,7 @@ var _ = Describe("GrpcServer", func() { done() select { case evt := <-testCh: - Fail(fmt.Sprintf("UnΩed event: %s", evt)) + Fail(fmt.Sprintf("UnΩed event: %s", evt.String())) case <-time.After(10 * time.Millisecond): Succeed() } diff --git a/pubsub/listener.go b/pubsub/listener.go index 04e2364..45e4cf9 100644 --- a/pubsub/listener.go +++ b/pubsub/listener.go @@ -20,11 +20,8 @@ package pubsub import ( "fmt" - log "github.com/massenz/slf4go/logging" - "google.golang.org/protobuf/types/known/timestamppb" - . "github.com/massenz/go-statemachine/api" - "github.com/massenz/statemachine-proto/golang/api" + log "github.com/massenz/slf4go/logging" ) func NewEventsListener(options *ListenerOptions) *EventsListener { @@ -52,22 +49,24 @@ func (listener *EventsListener) PostErrorNotification(error *EventErrorMessage) func (listener *EventsListener) ListenForMessages() { listener.logger.Info("Events message listener started") for event := range listener.events { - listener.logger.Debug("Received event %s", event) - if event.Destination == "" { - listener.PostErrorNotification(ErrorMessage(fmt.Errorf("no destination for event"), &event)) + listener.logger.Debug("Received event %s", event.Event.String()) + if event.Dest == "" { + listener.PostErrorNotification(ErrorMessage(fmt.Errorf("no destination for event"), + event.Event, "")) continue } - fsm, ok := listener.store.GetStateMachine(event.Destination) + fsm, ok := listener.store.GetStateMachine(event.Dest) if !ok { - listener.PostErrorNotification(ErrorMessage(fmt.Errorf("statemachine [%s] could not be found", - event.Destination), &event)) + listener.PostErrorNotification(ErrorMessage( + fmt.Errorf("statemachine [%s] could not be found", event.Dest), event.Event, "")) continue } // TODO: cache the configuration locally: they are immutable anyway. cfg, ok := listener.store.GetConfig(fsm.ConfigId) if !ok { - listener.PostErrorNotification(ErrorMessage(fmt.Errorf("configuration [%s] could not be found", - fsm.ConfigId), &event)) + listener.PostErrorNotification(ErrorMessage( + fmt.Errorf("configuration [%s] could not be found", + fsm.ConfigId), event.Event, "")) continue } @@ -75,29 +74,17 @@ func (listener *EventsListener) ListenForMessages() { Config: cfg, FSM: fsm, } - pbEvent := NewPBEvent(event) - if err := cfgFsm.SendEvent(pbEvent); err != nil { - listener.PostErrorNotification(ErrorMessageWithDetail(err, &event, fmt.Sprintf( - "FSM [%s] cannot process event `%s`", event.Destination, event.EventName))) + if err := cfgFsm.SendEvent(event.Event); err != nil { + listener.PostErrorNotification(ErrorMessage(err, event.Event, fmt.Sprintf( + "FSM [%s] cannot process event `%s`", event.Dest, event.Event.Transition.Event))) continue } - err := listener.store.PutStateMachine(event.Destination, fsm) + err := listener.store.PutStateMachine(event.Dest, fsm) if err != nil { - listener.PostErrorNotification(ErrorMessage(err, &event)) + listener.PostErrorNotification(ErrorMessage(err, event.Event, "could not save FSM")) continue } - listener.logger.Debug("Event %s transitioned FSM [%s] to state `%s`", - event.EventName, event.Destination, fsm.State) - } -} - -func NewPBEvent(message EventMessage) *api.Event { - return &api.Event{ - EventId: message.EventId, - Originator: message.Sender, - Timestamp: timestamppb.New(message.EventTimestamp), - Transition: &api.Transition{ - Event: message.EventName, - }, + listener.logger.Info("Event %s transitioned FSM [%s] to state `%s`", + event.Event.Transition.Event, event.Dest, fsm.State) } } diff --git a/pubsub/listener_test.go b/pubsub/listener_test.go index b04d3a4..73ebeb2 100644 --- a/pubsub/listener_test.go +++ b/pubsub/listener_test.go @@ -33,35 +33,15 @@ import ( ) var _ = Describe("A Listener", func() { - Context("from an EventMessage", func() { - It("can create a PB Event", func() { - msg := pubsub.EventMessage{ - Sender: "test-sender", - Destination: "test-destination", - EventId: "test-abed", - EventName: "an-event", - // 2022-05-09T22:52:39+0000 - EventTimestamp: time.Unix(1652161959, 0), - } - evt := pubsub.NewPBEvent(msg) - Expect(evt).ToNot(BeNil()) - Expect(evt.EventId).To(Equal(msg.EventId)) - Expect(evt.Transition).ToNot(BeNil()) - Expect(evt.Transition.Event).To(Equal(msg.EventName)) - Expect(evt.Timestamp.AsTime().Unix()).To(Equal(msg.EventTimestamp.Unix())) - Expect(evt.Originator).To(Equal(msg.Sender)) - }) - }) - Context("when store-backed", func() { var ( testListener *pubsub.EventsListener - eventsCh chan pubsub.EventMessage + eventsCh chan api.EventRequest notificationsCh chan pubsub.EventErrorMessage store storage.StoreManager ) BeforeEach(func() { - eventsCh = make(chan pubsub.EventMessage) + eventsCh = make(chan api.EventRequest) notificationsCh = make(chan pubsub.EventErrorMessage) store = storage.NewInMemoryStore() testListener = pubsub.NewEventsListener(&pubsub.ListenerOptions{ @@ -75,13 +55,16 @@ var _ = Describe("A Listener", func() { }) It("can post error notifications", func() { defer close(notificationsCh) - msg := pubsub.EventMessage{ - Sender: "me", - EventId: "feed-beef", - EventName: "test-me", + msg := api.Event{ + EventId: "feed-beef", + Originator: "me", + Transition: &api.Transition{ + Event: "test-me", + }, + Details: "more details", } - detail := "more details about the error" - notification := pubsub.ErrorMessageWithDetail(fmt.Errorf("this is a test"), &msg, detail) + detail := "some error" + notification := pubsub.ErrorMessage(fmt.Errorf("this is a test"), &msg, detail) go testListener.PostErrorNotification(notification) select { case n := <-notificationsCh: @@ -95,13 +78,19 @@ var _ = Describe("A Listener", func() { }) It("can receive events", func() { done := make(chan interface{}) - msg := pubsub.EventMessage{ - Sender: "1234", - EventId: "feed-dead-beef", - EventName: "move", - Destination: "99", + event := api.Event{ + EventId: "feed-beef", + Originator: "me", + Transition: &api.Transition{ + Event: "move", + }, + Details: "more details", } - Expect(store.PutStateMachine(msg.Destination, &api.FiniteStateMachine{ + request := api.EventRequest{ + Event: &event, + Dest: "test-fsm", + } + Expect(store.PutStateMachine(request.Dest, &api.FiniteStateMachine{ ConfigId: "test:v1", State: "start", History: nil, @@ -118,86 +107,66 @@ var _ = Describe("A Listener", func() { defer close(done) testListener.ListenForMessages() }() - eventsCh <- msg + eventsCh <- request close(eventsCh) select { case n := <-notificationsCh: Fail(fmt.Sprintf("unexpected error: %v", n.String())) case <-done: - fsm, ok := store.GetStateMachine(msg.Destination) + fsm, ok := store.GetStateMachine(request.Dest) Expect(ok).ToNot(BeFalse()) Expect(fsm.State).To(Equal("end")) + Expect(len(fsm.History)).To(Equal(1)) + Expect(fsm.History[0].Details).To(Equal("more details")) + Expect(fsm.History[0].Transition.Event).To(Equal("move")) case <-time.After(timeout): Fail("the listener did not exit when the events channel was closed") } }) - It("sends notifications for missing configurations", func() { - msg := pubsub.EventMessage{ - Sender: "1234", - EventId: "feed-beef", - EventName: "move", - Destination: "778899", - } - Expect(store.PutStateMachine(msg.Destination, &api.FiniteStateMachine{ - ConfigId: "test.v3", - State: "start", - History: nil, - })).ToNot(HaveOccurred()) - go func() { - testListener.ListenForMessages() - }() - eventsCh <- msg - close(eventsCh) - - select { - case n := <-notificationsCh: - Expect(n.Message).ToNot(BeNil()) - Expect(n.Message.EventId).To(Equal(msg.EventId)) - Expect(n.Error.Error()).To(Equal("configuration [test.v3] could not be found")) - case <-time.After(timeout): - Fail("the listener did not exit when the events channel was closed") + It("sends notifications for missing statemachine", func() { + event := api.Event{ + EventId: "feed-beef", + Originator: "me", + Transition: &api.Transition{ + Event: "move", + }, + Details: "more details", } - }) - It("sends notifications for missing FSM", func() { - msg := pubsub.EventMessage{ - Sender: "1234", - EventId: "feed-beef", - EventName: "failed", - Destination: "fake", + request := api.EventRequest{ + Event: &event, + Dest: "fake-fsm", } go func() { testListener.ListenForMessages() }() - eventsCh <- msg + eventsCh <- request close(eventsCh) select { case n := <-notificationsCh: Expect(n.Message).ToNot(BeNil()) - Expect(n.Message.EventId).To(Equal(msg.EventId)) - Expect(n.Error.Error()).To(Equal("statemachine [fake] could not be found")) - + Expect(n.Message.EventId).To(Equal(request.Event.EventId)) + Expect(n.Error.Error()).To(Equal("statemachine [fake-fsm] could not be found")) case <-time.After(timeout): - Fail("no error notification received") + Fail("the listener did not exit when the events channel was closed") } }) It("sends notifications for missing destinations", func() { - msg := pubsub.EventMessage{ - Sender: "1234", - EventId: "feed-beef", - EventName: "failed", + request := api.EventRequest{ + Event: &api.Event{ + EventId: "feed-beef", + }, + Dest: "", } - go func() { - testListener.ListenForMessages() - }() - eventsCh <- msg + go func() {testListener.ListenForMessages()}() + eventsCh <- request close(eventsCh) select { case n := <-notificationsCh: Expect(n.Message).ToNot(BeNil()) - Expect(n.Message.EventId).To(Equal(msg.EventId)) + Expect(n.Message.EventId).To(Equal(request.Event.EventId)) Expect(n.Error.Error()).To(Equal("no destination for event")) case <-time.After(timeout): Fail("no error notification received") diff --git a/pubsub/pubsub_suite_test.go b/pubsub/pubsub_suite_test.go index b76b97b..3dfd1df 100644 --- a/pubsub/pubsub_suite_test.go +++ b/pubsub/pubsub_suite_test.go @@ -21,6 +21,7 @@ package pubsub_test import ( "fmt" "github.com/massenz/go-statemachine/pubsub" + "github.com/massenz/statemachine-proto/golang/api" "os" "testing" "time" @@ -119,25 +120,26 @@ func getSqsMessage(queue string) *sqs.Message { // postSqsMessage mirrors the decoding of the SQS Message in the Subscriber and will // send it over the `queue`, so that we can test the Publisher can correctly receive it. -func postSqsMessage(queue string, msg *pubsub.EventMessage) error { +func postSqsMessage(queue string, msg *api.EventRequest) error { q := pubsub.GetQueueUrl(testSqsClient, queue) - testLog.Debug("Post Message -- Timestamp: %v", msg.EventTimestamp) + evt := msg.Event + testLog.Debug("Post Message -- Timestamp: %v", evt.Timestamp) _, err := testSqsClient.SendMessage(&sqs.SendMessageInput{ MessageAttributes: map[string]*sqs.MessageAttributeValue{ "DestinationId": { DataType: aws.String("String"), - StringValue: aws.String(msg.Destination), + StringValue: aws.String(msg.Dest), }, "EventId": { DataType: aws.String("String"), - StringValue: aws.String(msg.EventId), + StringValue: aws.String(evt.EventId), }, "Sender": { DataType: aws.String("String"), - StringValue: aws.String(msg.Sender), + StringValue: aws.String(evt.Originator), }, }, - MessageBody: aws.String(msg.EventName), + MessageBody: aws.String(evt.Transition.Event), QueueUrl: &q, }) return err diff --git a/pubsub/pubsub_test.go b/pubsub/pubsub_test.go deleted file mode 100644 index 3536b0f..0000000 --- a/pubsub/pubsub_test.go +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Copyright (c) 2022 AlertAvert.com. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Author: Marco Massenzio (marco@alertavert.com) - */ - -// This code uses the Respect library from https://github.com/JiaYongfei/respect/ -// licensed under the MIT License. -// See: https://github.com/JiaYongfei/respect/blob/main/LICENSE - -package pubsub_test - -import ( - "encoding/json" - "fmt" - . "github.com/JiaYongfei/respect/gomega" - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - "time" - - "github.com/massenz/go-statemachine/pubsub" -) - -var _ = Describe("PubSub types", func() { - - Context("when serializing messages", func() { - var ( - msg pubsub.EventMessage - errMsg pubsub.EventErrorMessage - ) - BeforeEach(func() { - msg = pubsub.EventMessage{ - Sender: "test-sender", - Destination: "test-dest", - EventId: "12345", - EventName: "an-event", - EventTimestamp: time.Now(), - } - errMsg = pubsub.EventErrorMessage{ - Error: *pubsub.NewEventProcessingError(fmt.Errorf("an error")), - ErrorDetail: "error detail", - Message: &msg, - } - - }) - It("should convert to and from JSON without loss of meaning", func() { - s := msg.String() - Expect(s).ToNot(Equal("")) - var newMsg pubsub.EventMessage - Expect(json.Unmarshal([]byte(s), &newMsg)).ToNot(HaveOccurred()) - Expect(newMsg).Should(Respect(msg)) - }) - It("should convert errors to and from JSON without loss of meaning", func() { - s := errMsg.String() - Expect(s).ToNot(Equal("")) - var newMsg pubsub.EventErrorMessage - Expect(json.Unmarshal([]byte(s), &newMsg)).ToNot(HaveOccurred()) - Expect(newMsg).Should(Respect(errMsg)) - }) - }) - Context("when serializing messages with empty fields", func() { - var msg pubsub.EventMessage - BeforeEach(func() { - msg = pubsub.EventMessage{ - EventName: "an-event", - } - }) - - It("should convert to and from JSON without loss of meaning", func() { - s := msg.String() - Expect(s).To(Equal(`{"event_name":"an-event","timestamp":"0001-01-01T00:00:00Z"}`)) - var newMsg pubsub.EventMessage - Expect(json.Unmarshal([]byte(s), &newMsg)).ToNot(HaveOccurred()) - Expect(newMsg).Should(Respect(msg)) - }) - }) -}) diff --git a/pubsub/sqs_pub.go b/pubsub/sqs_pub.go index fcc7ae8..6977b95 100644 --- a/pubsub/sqs_pub.go +++ b/pubsub/sqs_pub.go @@ -19,98 +19,102 @@ package pubsub import ( - "fmt" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/service/sqs" - log "github.com/massenz/slf4go/logging" + "fmt" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/sqs" + "github.com/golang/protobuf/proto" + log "github.com/massenz/slf4go/logging" ) type SqsPublisher struct { - logger *log.Log - client *sqs.SQS - errors <-chan EventErrorMessage + logger *log.Log + client *sqs.SQS + errors <-chan EventErrorMessage } // NewSqsPublisher will create a new `Publisher` to send error notifications to // an SQS `dead-letter queue`. func NewSqsPublisher(errorsChannel <-chan EventErrorMessage, sqsUrl *string) *SqsPublisher { - client := getSqsClient(sqsUrl) - if client == nil { - return nil - } - return &SqsPublisher{ - logger: log.NewLog("SQS-Pub"), - client: client, - errors: errorsChannel, - } + client := getSqsClient(sqsUrl) + if client == nil { + return nil + } + return &SqsPublisher{ + logger: log.NewLog("SQS-Pub"), + client: client, + errors: errorsChannel, + } } // SetLogLevel allows the SqsSubscriber to implement the log.Loggable interface func (s *SqsPublisher) SetLogLevel(level log.LogLevel) { - if s == nil { - fmt.Println("WARN: attempting to set log level on nil Publisher") - return - } - s.logger.Level = level + if s == nil { + fmt.Println("WARN: attempting to set log level on nil Publisher") + return + } + s.logger.Level = level } // GetQueueUrl retrieves from AWS SQS the URL for the queue, given the topic name func GetQueueUrl(client *sqs.SQS, topic string) string { - out, err := client.GetQueueUrl(&sqs.GetQueueUrlInput{ - QueueName: &topic, - }) - if err != nil || out.QueueUrl == nil { - // From the Google School: fail fast and noisily from an unrecoverable error - log.RootLog.Fatal(fmt.Errorf("cannot get SQS Queue URL for topic %s: %v", topic, err)) - } - return *out.QueueUrl + out, err := client.GetQueueUrl(&sqs.GetQueueUrlInput{ + QueueName: &topic, + }) + if err != nil || out.QueueUrl == nil { + // From the Google School: fail fast and noisily from an unrecoverable error + log.RootLog.Fatal(fmt.Errorf("cannot get SQS Queue URL for topic %s: %v", topic, err)) + } + return *out.QueueUrl } // Publish sends an error message to the DLQ `topic` func (s *SqsPublisher) Publish(topic string) { - queueUrl := GetQueueUrl(s.client, topic) - s.logger.Info("SQS Publisher started for queue: %s", queueUrl) + queueUrl := GetQueueUrl(s.client, topic) + s.logger.Info("SQS Publisher started for queue: %s", queueUrl) - for msg := range s.errors { - delay := int64(0) - s.logger.Debug("Publishing %s to %s", msg.String(), queueUrl) - msgResult, err := s.client.SendMessage(&sqs.SendMessageInput{ - DelaySeconds: &delay, - MessageAttributes: makeAttributes(msg), - MessageBody: aws.String(msg.Message.String()), - QueueUrl: &queueUrl, - }) - if err != nil { - s.logger.Error("Cannot publish msg (%s): %v", msg.String(), err) - continue - } - s.logger.Debug("Error msg sent to DLQ: %s", *msgResult.MessageId) - } - s.logger.Info("SQS Publisher exiting") + for msg := range s.errors { + delay := int64(0) + s.logger.Debug("Publishing %s to %s", msg.String(), queueUrl) + msgResult, err := s.client.SendMessage(&sqs.SendMessageInput{ + DelaySeconds: &delay, + MessageAttributes: makeAttributes(msg), + // Encodes the Event as a string, using Protobuf implementation. + // TODO: this whole thing will be replaced as we move away from EventErrorMessages + // and to native Protobuf error response. + MessageBody: aws.String(proto.MarshalTextString(msg.Message)), + QueueUrl: &queueUrl, + }) + if err != nil { + s.logger.Error("Cannot publish msg (%s): %v", msg.String(), err) + continue + } + s.logger.Debug("Error msg sent to DLQ: %s", *msgResult.MessageId) + } + s.logger.Info("SQS Publisher exiting") } // makeAttributes is necessary as we can't just copy the strings into the MessageAttributeValue // values, as empty strings will cause an error with SQS. // This function will only add those keys for which we have an actual value. func makeAttributes(msg EventErrorMessage) map[string]*sqs.MessageAttributeValue { - res := make(map[string]*sqs.MessageAttributeValue) - if msg.Error.Error() != "" { - res["Error"] = &sqs.MessageAttributeValue{ - DataType: aws.String("String"), - StringValue: aws.String(msg.Error.Error()), - } - } - if msg.Message != nil && msg.Message.EventId != "" { - res["EventId"] = &sqs.MessageAttributeValue{ - DataType: aws.String("String"), - StringValue: aws.String(msg.Message.EventId), - } - } - if msg.ErrorDetail != "" { - res["ErrorDetails"] = &sqs.MessageAttributeValue{ - DataType: aws.String("String"), - StringValue: aws.String(msg.ErrorDetail), - } - } - return res + res := make(map[string]*sqs.MessageAttributeValue) + if msg.Error.Error() != "" { + res["Error"] = &sqs.MessageAttributeValue{ + DataType: aws.String("String"), + StringValue: aws.String(msg.Error.Error()), + } + } + if msg.Message != nil && msg.Message.EventId != "" { + res["EventId"] = &sqs.MessageAttributeValue{ + DataType: aws.String("String"), + StringValue: aws.String(msg.Message.EventId), + } + } + if msg.ErrorDetail != "" { + res["ErrorDetails"] = &sqs.MessageAttributeValue{ + DataType: aws.String("String"), + StringValue: aws.String(msg.ErrorDetail), + } + } + return res } diff --git a/pubsub/sqs_pub_test.go b/pubsub/sqs_pub_test.go index 64d5b7b..6a142e8 100644 --- a/pubsub/sqs_pub_test.go +++ b/pubsub/sqs_pub_test.go @@ -19,19 +19,22 @@ package pubsub_test import ( - "encoding/json" - "fmt" . "github.com/JiaYongfei/respect/gomega" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + + "fmt" + "github.com/golang/protobuf/proto" + log "github.com/massenz/slf4go/logging" "time" + "github.com/massenz/go-statemachine/api" "github.com/massenz/go-statemachine/pubsub" - log "github.com/massenz/slf4go/logging" + + protos "github.com/massenz/statemachine-proto/golang/api" ) var _ = Describe("SQS Publisher", func() { - Context("when correctly initialized", func() { var ( testPublisher *pubsub.SqsPublisher @@ -45,15 +48,12 @@ var _ = Describe("SQS Publisher", func() { testPublisher.SetLogLevel(log.NONE) }) It("can publish error notifications", func() { - msg := pubsub.EventMessage{ - Sender: "me", - Destination: "some-fsm", - EventId: "feed-beef", - EventName: "test-me", - EventTimestamp: time.Now(), - } + msg := api.NewEvent("test-event") + msg.Originator = "me" + msg.EventId = "feed-beef" + msg.Details = `{"foo": "bar"}` detail := "more details about the error" - notification := pubsub.ErrorMessageWithDetail(fmt.Errorf("this is a test"), &msg, detail) + notification := pubsub.ErrorMessage(fmt.Errorf("this is a test"), msg, detail) done := make(chan interface{}) go func() { defer close(done) @@ -63,16 +63,16 @@ var _ = Describe("SQS Publisher", func() { notificationsCh <- *notification res := getSqsMessage(getQueueName(notificationsQueue)) Expect(res).ToNot(BeNil()) + body := *res.Body - var sentMsg pubsub.EventMessage - Expect(json.Unmarshal([]byte(body), &sentMsg)).ToNot(HaveOccurred()) - Expect(sentMsg).To(Respect(msg)) + var receivedEvt protos.Event + Expect(proto.UnmarshalText(body, &receivedEvt)).Should(Succeed()) + Expect(receivedEvt).To(Respect(*msg)) close(notificationsCh) select { case <-done: Succeed() - case <-time.After(timeout): Fail("timed out waiting for Publisher to exit") } @@ -102,27 +102,23 @@ var _ = Describe("SQS Publisher", func() { It("will send several messages within a reasonable timeframe", func() { go testPublisher.Publish(getQueueName(notificationsQueue)) - for i := range [10]int{} { - msg := pubsub.EventMessage{ - Sender: "someone", - Destination: fmt.Sprintf("dest-%d", i), - EventId: fmt.Sprintf("evt-%d", i), - EventName: "many-messages-test", - EventTimestamp: time.Now(), - } + for range [10]int{} { + evt := api.NewEvent("many-messages-test") detail := "more details about the error" - notificationsCh <- *pubsub.ErrorMessageWithDetail(fmt.Errorf("this is a test"), &msg, detail) + notificationsCh <- *pubsub.ErrorMessage(fmt.Errorf("this is a test"), evt, detail) } done := make(chan interface{}) go func() { + // This is necessary as we make assertions in this goroutine, + //and we want to make sure we can see the errors if they fail. + defer GinkgoRecover() defer close(done) for range [10]int{} { res := getSqsMessage(getQueueName(notificationsQueue)) Expect(res).ToNot(BeNil()) - body := *res.Body - var sentMsg pubsub.EventMessage - Expect(json.Unmarshal([]byte(body), &sentMsg)).ToNot(HaveOccurred()) - Expect(sentMsg.EventName).To(Equal("many-messages-test")) + var receivedEvt protos.Event + Expect(proto.UnmarshalText(*res.Body, &receivedEvt)).Should(Succeed()) + Expect(receivedEvt.Transition.Event).To(Equal("many-messages-test")) } }() close(notificationsCh) diff --git a/pubsub/sqs_sub.go b/pubsub/sqs_sub.go index fcc800d..3f52f7d 100644 --- a/pubsub/sqs_sub.go +++ b/pubsub/sqs_sub.go @@ -19,160 +19,179 @@ package pubsub import ( - "fmt" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/sqs" - log "github.com/massenz/slf4go/logging" - "os" - "strconv" - "time" + "fmt" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/sqs" + "github.com/google/uuid" + log "github.com/massenz/slf4go/logging" + protos "github.com/massenz/statemachine-proto/golang/api" + "google.golang.org/protobuf/types/known/timestamppb" + "os" + "strconv" + "time" ) // TODO: should we need to generalize and abstract the implementation of a Subscriber? // This would be necessary if we were to implement a different message broker (e.g., Kafka) type SqsSubscriber struct { - logger *log.Log - client *sqs.SQS - events chan<- EventMessage - Timeout time.Duration - PollingInterval time.Duration + logger *log.Log + client *sqs.SQS + events chan<- protos.EventRequest + Timeout time.Duration + PollingInterval time.Duration } // getSqsClient connects to AWS and obtains an SQS client; passing `nil` as the `sqsUrl` will // connect by default to AWS; use a different (possibly local) URL for a LocalStack test deployment. func getSqsClient(sqsUrl *string) *sqs.SQS { - var sess *session.Session - if sqsUrl == nil { - sess = session.Must(session.NewSessionWithOptions(session.Options{ - SharedConfigState: session.SharedConfigEnable, - })) - } else { - region, found := os.LookupEnv("AWS_REGION") - if !found { - fmt.Printf("No AWS Region configured, cannot connect to SQS provider at %s\n", - *sqsUrl) - return nil - } - sess = session.Must(session.NewSessionWithOptions(session.Options{ - SharedConfigState: session.SharedConfigEnable, - Config: aws.Config{ - Endpoint: sqsUrl, - Region: ®ion, - }, - })) - } - return sqs.New(sess) + var sess *session.Session + if sqsUrl == nil { + sess = session.Must(session.NewSessionWithOptions(session.Options{ + SharedConfigState: session.SharedConfigEnable, + })) + } else { + region, found := os.LookupEnv("AWS_REGION") + if !found { + fmt.Printf("No AWS Region configured, cannot connect to SQS provider at %s\n", + *sqsUrl) + return nil + } + sess = session.Must(session.NewSessionWithOptions(session.Options{ + SharedConfigState: session.SharedConfigEnable, + Config: aws.Config{ + Endpoint: sqsUrl, + Region: ®ion, + }, + })) + } + return sqs.New(sess) } // NewSqsSubscriber will create a new `Subscriber` to listen to // incoming api.Event from a SQS `queue`. -func NewSqsSubscriber(eventsChannel chan<- EventMessage, sqsUrl *string) *SqsSubscriber { - client := getSqsClient(sqsUrl) - if client == nil { - return nil - } - return &SqsSubscriber{ - logger: log.NewLog("SQS-Sub"), - client: client, - events: eventsChannel, - Timeout: DefaultVisibilityTimeout, - PollingInterval: DefaultPollingInterval, - } +func NewSqsSubscriber(eventsChannel chan<- protos.EventRequest, sqsUrl *string) *SqsSubscriber { + client := getSqsClient(sqsUrl) + if client == nil { + return nil + } + return &SqsSubscriber{ + logger: log.NewLog("SQS-Sub"), + client: client, + events: eventsChannel, + Timeout: DefaultVisibilityTimeout, + PollingInterval: DefaultPollingInterval, + } } // SetLogLevel allows the SqsSubscriber to implement the log.Loggable interface func (s *SqsSubscriber) SetLogLevel(level log.LogLevel) { - s.logger.Level = level + s.logger.Level = level } // Subscribe runs until signaled on the Done channel and listens for incoming Events func (s *SqsSubscriber) Subscribe(topic string, done <-chan interface{}) { - queueUrl := GetQueueUrl(s.client, topic) - s.logger.Info("SQS Subscriber started for queue: %s", queueUrl) + queueUrl := GetQueueUrl(s.client, topic) + s.logger.Info("SQS Subscriber started for queue: %s", queueUrl) - timeout := int64(s.Timeout.Seconds()) - for { - select { - case <-done: - s.logger.Info("SQS Subscriber terminating") - return - default: - s.logger.Trace("...") - } - start := time.Now() - s.logger.Trace("Polling SQS at %v", start) - msgResult, err := s.client.ReceiveMessage(&sqs.ReceiveMessageInput{ - AttributeNames: []*string{ - aws.String(sqs.MessageSystemAttributeNameSentTimestamp), - }, - MessageAttributeNames: []*string{ - aws.String(sqs.QueueAttributeNameAll), - }, - QueueUrl: &queueUrl, - MaxNumberOfMessages: aws.Int64(10), - VisibilityTimeout: &timeout, - }) - if err == nil { - if len(msgResult.Messages) > 0 { - s.logger.Debug("Got %d messages", len(msgResult.Messages)) - } else { - s.logger.Trace("no messages in queue") - } - for _, msg := range msgResult.Messages { - s.logger.Trace("processing %v", msg.String()) - go s.ProcessMessage(msg, &queueUrl) - } - } else { - s.logger.Error(err.Error()) - } - timeLeft := s.PollingInterval - time.Since(start) - if timeLeft > 0 { - s.logger.Trace("sleeping for %v", timeLeft) - time.Sleep(timeLeft) - } - } + timeout := int64(s.Timeout.Seconds()) + for { + select { + case <-done: + s.logger.Info("SQS Subscriber terminating") + return + default: + } + start := time.Now() + s.logger.Trace("Polling SQS at %v", start) + msgResult, err := s.client.ReceiveMessage(&sqs.ReceiveMessageInput{ + AttributeNames: []*string{ + aws.String(sqs.MessageSystemAttributeNameSentTimestamp), + }, + MessageAttributeNames: []*string{ + aws.String(sqs.QueueAttributeNameAll), + }, + QueueUrl: &queueUrl, + MaxNumberOfMessages: aws.Int64(10), + VisibilityTimeout: &timeout, + }) + if err == nil { + if len(msgResult.Messages) > 0 { + s.logger.Debug("Got %d messages", len(msgResult.Messages)) + } else { + s.logger.Trace("no messages in queue") + } + for _, msg := range msgResult.Messages { + s.logger.Trace("processing %v", msg.String()) + go s.ProcessMessage(msg, &queueUrl) + } + } else { + s.logger.Error(err.Error()) + } + timeLeft := s.PollingInterval - time.Since(start) + if timeLeft > 0 { + s.logger.Trace("sleeping for %v", timeLeft) + time.Sleep(timeLeft) + } + } } func (s *SqsSubscriber) ProcessMessage(msg *sqs.Message, queueUrl *string) { - s.logger.Trace("Processing Message %v", msg.MessageId) - var event = EventMessage{} - event.Destination = *msg.MessageAttributes["DestinationId"].StringValue - event.EventName = *msg.Body + s.logger.Trace("Processing Message %v", msg.MessageId) - if event.Destination != "" && event.EventName != "" { - event.EventId = *msg.MessageAttributes["EventId"].StringValue - event.Sender = *msg.MessageAttributes["Sender"].StringValue - timestamp := msg.Attributes[sqs.MessageSystemAttributeNameSentTimestamp] - if timestamp == nil { - s.logger.Warn("No Timestamp in received event, using current time") - event.EventTimestamp = time.Now() - } else { - // An SQS Message timestamp is a Unix milliseconds from epoch. - // TODO: We may need some amount of error-checking here. - ts, _ := strconv.ParseInt(*timestamp, 10, 64) - event.EventTimestamp = time.UnixMilli(ts) - } - s.logger.Debug("Sent at: %s", event.EventTimestamp.String()) - s.events <- event - } else { - errDetails := fmt.Sprintf("No Destination ID or Event in %v", msg.String()) - s.logger.Error(errDetails) - ErrorMessage(fmt.Errorf(errDetails), &event) - // TODO: publish error to DLQ. - } + timestamp := msg.Attributes[sqs.MessageSystemAttributeNameSentTimestamp] + if timestamp == nil { + s.logger.Warn("No Timestamp in received event, using current time") + timestamp = aws.String(strconv.FormatInt(time.Now().Unix(), 10)) + } - s.logger.Debug("Removing message %v from SQS", *msg.MessageId) - _, err := s.client.DeleteMessage(&sqs.DeleteMessageInput{ - QueueUrl: queueUrl, - ReceiptHandle: msg.ReceiptHandle, - }) - if err != nil { - errDetails := fmt.Sprintf("Failed to remove message %v from SQS", msg.MessageId) - s.logger.Error("%s: %v", errDetails, err) - ErrorMessageWithDetail(err, &event, errDetails) - // TODO: publish error to DLQ, should also retry removal here. - } - s.logger.Trace("Message %v removed", msg.MessageId) + // The body of the message (the actual event) and the destination (the FSM ID) are mandatory. + destId, hasDest := msg.MessageAttributes["DestinationId"] + if msg.Body != nil && hasDest { + // The Event ID is optional and, if missing, will be generated here. + eventId, hasId := msg.MessageAttributes["EventId"] + if !hasId { + eventId = &sqs.MessageAttributeValue{ + DataType: aws.String("String"), + StringValue: aws.String(uuid.NewString()), + } + } + // An SQS Message timestamp is a Unix milliseconds from epoch. + // TODO: We may need some amount of error-checking here. + ts, _ := strconv.ParseInt(*timestamp, 10, 64) + var request = protos.EventRequest{ + Event: &protos.Event{ + EventId: *eventId.StringValue, + Timestamp: timestamppb.New(time.UnixMilli(ts)), + Transition: &protos.Transition{ + Event: *msg.Body, + }, + }, + Dest: *destId.StringValue, + } + if sender := msg.MessageAttributes["Sender"]; sender != nil { + request.Event.Originator = *sender.StringValue + } + if details := msg.MessageAttributes["Details"]; details != nil { + request.Event.Details = *details.StringValue + } + s.events <- request + } else { + errDetails := fmt.Sprintf("No Destination ID or Event in %v", msg.String()) + s.logger.Error(errDetails) + // TODO: publish error to DLQ. + } + + s.logger.Debug("Removing message %v from SQS", *msg.MessageId) + _, err := s.client.DeleteMessage(&sqs.DeleteMessageInput{ + QueueUrl: queueUrl, + ReceiptHandle: msg.ReceiptHandle, + }) + if err != nil { + errDetails := fmt.Sprintf("Failed to remove message %v from SQS", msg.MessageId) + s.logger.Error("%s: %v", errDetails, err) + // TODO: publish error to DLQ, should also retry removal here. + } + s.logger.Trace("Message %v removed", msg.MessageId) } diff --git a/pubsub/sqs_sub_test.go b/pubsub/sqs_sub_test.go index afbde9a..2b23a65 100644 --- a/pubsub/sqs_sub_test.go +++ b/pubsub/sqs_sub_test.go @@ -25,19 +25,21 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/massenz/go-statemachine/pubsub" log "github.com/massenz/slf4go/logging" + + "github.com/massenz/go-statemachine/api" + "github.com/massenz/go-statemachine/pubsub" + protos "github.com/massenz/statemachine-proto/golang/api" ) var _ = Describe("SQS Subscriber", func() { - Context("when correctly initialized", func() { var ( testSubscriber *pubsub.SqsSubscriber - eventsCh chan pubsub.EventMessage + eventsCh chan protos.EventRequest ) BeforeEach(func() { - eventsCh = make(chan pubsub.EventMessage) + eventsCh = make(chan protos.EventRequest) testSubscriber = pubsub.NewSqsSubscriber(eventsCh, &sqsUrl) Expect(testSubscriber).ToNot(BeNil()) // Set to DEBUG when diagnosing failing tests @@ -47,13 +49,13 @@ var _ = Describe("SQS Subscriber", func() { testSubscriber.PollingInterval = d }) It("receives events", func() { - msg := pubsub.EventMessage{ - Sender: "me", - Destination: "some-fsm", - EventId: "feed-beef", - EventName: "test-me", + msg := protos.EventRequest{ + Event: api.NewEvent("test-event"), + Dest: "some-fsm", } - Expect(postSqsMessage(getQueueName(eventsQueue), &msg)).ToNot(HaveOccurred()) + msg.Event.EventId = "feed-beef" + msg.Event.Originator = "test-subscriber" + Expect(postSqsMessage(getQueueName(eventsQueue), &msg)).Should(Succeed()) done := make(chan interface{}) doneTesting := make(chan interface{}) go func() { @@ -62,11 +64,12 @@ var _ = Describe("SQS Subscriber", func() { }() select { - case event := <-eventsCh: - testLog.Debug("Received Event -- Timestamp: %v", event.EventTimestamp) - // Workaround as we can't set the time sent - msg.EventTimestamp = event.EventTimestamp - Expect(event).To(Respect(msg)) + case req := <-eventsCh: + testLog.Debug("Received Event -- Timestamp: %v", req.Event.Timestamp) + // We null the timestamp as we don't want to compare that with Respect + msg.Event.Timestamp = nil + req.Event.Timestamp = nil + Expect(req.Event).To(Respect(msg.Event)) close(doneTesting) case <-time.After(timeout): Fail("timed out waiting to receive a message") diff --git a/pubsub/types.go b/pubsub/types.go index b09f5e7..4d7221f 100644 --- a/pubsub/types.go +++ b/pubsub/types.go @@ -19,134 +19,101 @@ package pubsub import ( - "encoding/json" - "fmt" - "github.com/massenz/go-statemachine/storage" - log "github.com/massenz/slf4go/logging" - "time" + "encoding/json" + "fmt" + "github.com/massenz/go-statemachine/storage" + log "github.com/massenz/slf4go/logging" + protos "github.com/massenz/statemachine-proto/golang/api" + "time" ) -// EventMessage abstracts away the details of the actual structure of the events and the actual -// message broker implementation. It is the Internal Representation ( -// IR) for an event being originated by the `sender` and being sent to a `Destination` StateMachine. -type EventMessage struct { - Sender string `json:"sender,omitempty"` - Destination string `json:"destination,omitempty"` - EventId string `json:"event_id,omitempty"` - EventName string `json:"event_name,omitempty"` - EventTimestamp time.Time `json:"timestamp"` -} - -func (m *EventMessage) String() string { - s, err := json.Marshal(*m) - if err != nil { - return err.Error() - } - return string(s) -} - // EventProcessingError is used to encapsulate the error for the event processing. type EventProcessingError struct { - err error + err error } -func (epe *EventProcessingError) Error() string { - if epe.err != nil { - return epe.err.Error() - } - return "" +func (epe EventProcessingError) Error() string { + if epe.err != nil { + return epe.err.Error() + } + return "" } // MarshalJSON Amazingly enough, `json` does not know how to Marshal an error; MarshalJSON for the // EventProcessingError fills the gap, // so we can serialize an EventErrorMessage with the embedded error. func (epe EventProcessingError) MarshalJSON() ([]byte, error) { - return json.Marshal(epe.err.Error()) + return json.Marshal(epe.Error()) } // UnmarshalJSON is the inverse of MarshalJSON and reads in an error description. // By convention, if the passed in string is `null` this is a no-op. func (epe EventProcessingError) UnmarshalJSON(data []byte) error { - s := string(data) - if s != "null" { - epe.err = fmt.Errorf(s) - } - return nil + s := string(data) + if s != "null" { + epe.err = fmt.Errorf(s) + } + return nil } func NewEventProcessingError(err error) *EventProcessingError { - return &EventProcessingError{err: err} + return &EventProcessingError{err: err} } +// FIXME: this will soon be replaced instead by a Protobuf message. // An EventErrorMessage encapsulates an error occurred while processing the `Message` and is // returned over the `notifications` channel to a `Publisher` for eventual upstream processing. type EventErrorMessage struct { - Error EventProcessingError `json:"error,omitempty"` - ErrorDetail string `json:"detail,omitempty"` // optional - Message *EventMessage `json:"message,omitempty"` + Error EventProcessingError `json:"error,omitempty"` + ErrorDetail string `json:"detail,omitempty"` // optional + Message *protos.Event `json:"message,omitempty"` } func (m *EventErrorMessage) String() string { - // FIXME: this probably needs a better approach to omit JSON entirely as apparently - // `omitempty` does not work here (and json.Marshal() panics for nil elements). - if m.Message == nil { - m.Message = &EventMessage{} - } - if m.Error.err == nil { - m.Error.err = fmt.Errorf("no error") - } - s, err := json.Marshal(*m) - if err != nil { - return err.Error() - } - return string(s) + s, err := json.Marshal(*m) + if err != nil { + return err.Error() + } + return string(s) } -// ErrorMessageWithDetail creates a new EventErrorMessage from the given error and detail (optional, +// ErrorMessage creates a new EventErrorMessage from the given error and detail (optional, // can be nil) and an optional EventMessage (can be nil). // Modeled on the fmt.Error() function. -func ErrorMessageWithDetail(err error, msg *EventMessage, detail string) *EventErrorMessage { - ret := EventErrorMessage{ - Error: EventProcessingError{err: err}, - Message: msg, - } - if detail != "" { - ret.ErrorDetail = detail - } - return &ret -} - -// ErrorMessage creates a new EventErrorMessage from the given error and an EventMessage (can be nil). -func ErrorMessage(err error, msg *EventMessage) *EventErrorMessage { - return ErrorMessageWithDetail(err, msg, "") +func ErrorMessage(err error, msg *protos.Event, detail string) *EventErrorMessage { + return &EventErrorMessage{ + Error: EventProcessingError{err: err}, + Message: msg, + ErrorDetail: detail, + } } // Not really "variables" - but Go is too dumb to figure out they're actually constants. var ( - // We poll SQS every DefaultPollingInterval seconds - DefaultPollingInterval, _ = time.ParseDuration("5s") + // We poll SQS every DefaultPollingInterval seconds + DefaultPollingInterval, _ = time.ParseDuration("5s") - // DefaultVisibilityTimeout sets how long SQS will wait for the subscriber to remove the - // message from the queue. - // See: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html - DefaultVisibilityTimeout, _ = time.ParseDuration("5s") + // DefaultVisibilityTimeout sets how long SQS will wait for the subscriber to remove the + // message from the queue. + // See: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html + DefaultVisibilityTimeout, _ = time.ParseDuration("5s") ) // An EventsListener will process EventMessage in a separate goroutine. // The messages are posted on an `events` channel, and if any error is encountered, // error messages are posted on a `notifications` channel for further processing upstream. type EventsListener struct { - logger *log.Log - events <-chan EventMessage - notifications chan<- EventErrorMessage - store storage.StoreManager + logger *log.Log + events <-chan protos.EventRequest + notifications chan<- EventErrorMessage + store storage.StoreManager } // ListenerOptions are used to configure an EventsListener at creation and are used // to decouple the internals of the listener from its exposed configuration. type ListenerOptions struct { - EventsChannel <-chan EventMessage - NotificationsChannel chan<- EventErrorMessage - StatemachinesStore storage.StoreManager - ListenersPoolSize int8 + EventsChannel <-chan protos.EventRequest + NotificationsChannel chan<- EventErrorMessage + StatemachinesStore storage.StoreManager + ListenersPoolSize int8 } diff --git a/server/statemachine_handlers_test.go b/server/statemachine_handlers_test.go index 3b2e76a..5c8c44c 100644 --- a/server/statemachine_handlers_test.go +++ b/server/statemachine_handlers_test.go @@ -25,15 +25,16 @@ import ( "bytes" "encoding/json" log "github.com/massenz/slf4go/logging" + "google.golang.org/protobuf/types/known/timestamppb" "io" "net/http" "net/http/httptest" "strings" - "time" - "github.com/massenz/go-statemachine/pubsub" + . "github.com/massenz/go-statemachine/api" "github.com/massenz/go-statemachine/server" "github.com/massenz/go-statemachine/storage" + "github.com/massenz/statemachine-proto/golang/api" ) @@ -218,43 +219,29 @@ var _ = Describe("Handlers", func() { }) It("with a missing ID will return Not Allowed", func() { req = httptest.NewRequest(http.MethodGet, server.StatemachinesEndpoint, nil) - router.ServeHTTP(writer, req) Expect(writer.Code).To(Equal(http.StatusMethodNotAllowed)) }) - It("with gibberish data will still fail gracefully", func() { cfg := api.Configuration{} Expect(store.PutConfig(&cfg)).ToNot(HaveOccurred()) endpoint := strings.Join([]string{server.StatemachinesEndpoint, "6789"}, "/") req = httptest.NewRequest(http.MethodGet, endpoint, nil) - router.ServeHTTP(writer, req) Expect(writer.Code).To(Equal(http.StatusNotFound)) }) }) - Context("when receiving events", func() { - // We need a channel to send events to + Context("when the statemachine has events", func() { var store storage.StoreManager - var events chan pubsub.EventMessage - var listener *pubsub.EventsListener + var fsmId = "12345" + var config *api.Configuration BeforeEach(func() { - events = make(chan pubsub.EventMessage) writer = httptest.NewRecorder() store = storage.NewInMemoryStore() server.SetStore(store) - - listener = pubsub.NewEventsListener(&pubsub.ListenerOptions{ - EventsChannel: events, - NotificationsChannel: nil, - StatemachinesStore: store, - ListenersPoolSize: 0, - }) - listener.SetLogLevel(log.NONE) - - config := &api.Configuration{ + config = &api.Configuration{ Name: "car", Version: "v1", States: []string{"stopped", "running", "slowing"}, @@ -266,82 +253,60 @@ var _ = Describe("Handlers", func() { }, StartingState: "stopped", } - car := &api.FiniteStateMachine{ - ConfigId: "car:v1", - State: "stopped", - History: nil, - } - Expect(store.PutConfig(config)).ToNot(HaveOccurred()) - Expect(store.PutStateMachine("sm-123", car)).ToNot(HaveOccurred()) - + car, _ := NewStateMachine(config) + Expect(store.PutConfig(config)).To(Succeed()) + Expect(store.PutStateMachine(fsmId, car.FSM)).To(Succeed()) }) It("it should show them", func() { - done := make(chan interface{}) - go func() { - defer close(done) - go listener.ListenForMessages() - }() - - fsmId := "sm-123" - event := pubsub.EventMessage{ - Sender: "test-sender", - Destination: fsmId, - EventId: "1", - EventName: "start", - EventTimestamp: time.Time{}, + found, _ := store.GetStateMachine(fsmId) + car := ConfiguredStateMachine{ + Config: config, + FSM: found, } - events <- event - - event.EventId = "2" - event.EventName = "brake" - events <- event - - event.EventId = "3" - event.EventName = "stop" - events <- event - close(events) - - // Wait a bit for the events to be fully processed. - time.Sleep(5 * time.Millisecond) + Expect(car.SendEvent(&api.Event{ + EventId: "1", + Timestamp: timestamppb.Now(), + Transition: &api.Transition{Event: "start"}, + Originator: "test", + Details: "this is a test", + })).To(Succeed()) + Expect(car.SendEvent(&api.Event{ + EventId: "2", + Timestamp: timestamppb.Now(), + Transition: &api.Transition{Event: "brake"}, + Originator: "test", + Details: "a test is this not", + })).To(Succeed()) + Expect(store.PutStateMachine(fsmId, car.FSM)).To(Succeed()) endpoint := strings.Join([]string{server.StatemachinesEndpoint, fsmId}, "/") req = httptest.NewRequest(http.MethodGet, endpoint, nil) - router.ServeHTTP(writer, req) Expect(writer.Code).To(Equal(http.StatusOK)) var result server.StateMachineResponse - Expect(json.NewDecoder(writer.Body).Decode(&result)).ToNot(HaveOccurred()) + Expect(json.NewDecoder(writer.Body).Decode(&result)).To(Succeed()) Expect(result.ID).To(Equal(fsmId)) Expect(result.StateMachine).ToNot(BeNil()) fsm := result.StateMachine - Expect(fsm.State).To(Equal("stopped")) - Expect(len(fsm.History)).To(Equal(3)) + Expect(fsm.State).To(Equal("slowing")) + Expect(len(fsm.History)).To(Equal(2)) var history []*api.Event history = fsm.History - startEvent := history[0] - Expect(startEvent.EventId).To(Equal("1")) - Expect(startEvent.Originator).To(Equal("test-sender")) - Expect(startEvent.Transition.Event).To(Equal("start")) - Expect(startEvent.Transition.From).To(Equal("stopped")) - Expect(startEvent.Transition.To).To(Equal("running")) - brakeEvent := history[1] - Expect(brakeEvent.EventId).To(Equal("2")) - Expect(brakeEvent.Transition.Event).To(Equal("brake")) - Expect(brakeEvent.Transition.To).To(Equal("slowing")) - stopEvent := history[2] - Expect(stopEvent.EventId).To(Equal("3")) - Expect(stopEvent.Transition.Event).To(Equal("stop")) - Expect(stopEvent.Transition.To).To(Equal("stopped")) - - select { - case <-done: - Succeed() - case <-time.After(100 * time.Millisecond): - Fail("timed out waiting for Listener to exit") - } + event := history[0] + Expect(event.EventId).To(Equal("1")) + Expect(event.Originator).To(Equal("test")) + Expect(event.Transition.Event).To(Equal("start")) + Expect(event.Transition.From).To(Equal("stopped")) + Expect(event.Transition.To).To(Equal("running")) + Expect(event.Details).To(Equal("this is a test")) + event = history[1] + Expect(event.EventId).To(Equal("2")) + Expect(event.Transition.Event).To(Equal("brake")) + Expect(event.Transition.To).To(Equal("slowing")) + Expect(event.Details).To(Equal("a test is this not")) }) }) }) From e994c211bbec0a995b7bdf5a5eca985a4cfa4136 Mon Sep 17 00:00:00 2001 From: Marco Massenzio <1153951+massenz@users.noreply.github.com> Date: Thu, 1 Sep 2022 18:33:53 -0700 Subject: [PATCH 3/5] [Issue #22] Updated SQS Sub code to parse Protobuf (#29) --- .run/Run SQS Client.run.xml | 6 ++-- .run/Run Server.run.xml | 22 ++----------- .run/Run gRPC Client.run.xml | 2 +- Makefile | 9 ++++-- clients/grpc_client.go | 20 ++---------- clients/orders.go | 48 +++++++++++++++++++++++++++ clients/sqs_client.go | 43 ++++++++++++------------ pubsub/pubsub_suite_test.go | 20 ++---------- pubsub/sqs_sub.go | 63 +++++++++++++----------------------- 9 files changed, 110 insertions(+), 123 deletions(-) create mode 100644 clients/orders.go diff --git a/.run/Run SQS Client.run.xml b/.run/Run SQS Client.run.xml index b4114f5..bf5a56a 100644 --- a/.run/Run SQS Client.run.xml +++ b/.run/Run SQS Client.run.xml @@ -1,12 +1,12 @@ - + - + - + \ No newline at end of file diff --git a/.run/Run Server.run.xml b/.run/Run Server.run.xml index 0a93f97..27d2204 100644 --- a/.run/Run Server.run.xml +++ b/.run/Run Server.run.xml @@ -1,21 +1,3 @@ - - @@ -28,7 +10,7 @@ - + - + \ No newline at end of file diff --git a/.run/Run gRPC Client.run.xml b/.run/Run gRPC Client.run.xml index 31107f6..706624d 100644 --- a/.run/Run gRPC Client.run.xml +++ b/.run/Run gRPC Client.run.xml @@ -6,7 +6,7 @@ - + \ No newline at end of file diff --git a/Makefile b/Makefile index 6ae3217..43bf108 100644 --- a/Makefile +++ b/Makefile @@ -14,6 +14,8 @@ build: cmd/main.go go build -ldflags "-X main.Release=$(tag)" -o $(out) cmd/main.go @chmod +x $(out) +$(out): build + services: @docker-compose -f $(compose) up -d @@ -22,10 +24,10 @@ queues: aws --no-cli-pager --endpoint-url=http://localhost:4566 --region us-west-2 \ sqs create-queue --queue-name $$queue; done >/dev/null -test: build services queues +test: $(out) services queues ginkgo -p $(pkgs) -container: +container: $(out) docker build -f $(dockerfile) -t $(image):$(tag) . # Runs test coverage and displays the results in browser @@ -34,5 +36,6 @@ cov: build services queues @go tool cover -html=/tmp/cov.out clean: - @rm -f api/*.pb.go $(out) + @rm $(out) @docker-compose -f $(compose) down + @docker rmi $(image):$(tag) diff --git a/clients/grpc_client.go b/clients/grpc_client.go index 6e9b0d2..7eb2c20 100644 --- a/clients/grpc_client.go +++ b/clients/grpc_client.go @@ -20,23 +20,14 @@ package main import ( "context" - "encoding/json" "flag" "fmt" "github.com/google/uuid" "github.com/massenz/statemachine-proto/golang/api" "google.golang.org/grpc" "google.golang.org/protobuf/types/known/timestamppb" - "time" ) -type OrderDetails struct { - OrderId string - CustomerId string - OrderDate time.Time - OrderTotal float64 -} - func main() { serverAddr := flag.String("addr", ":4567", "The address (host:port) for the GRPC server") fsmId := flag.String("dest", "", "The ID for the FSM to send an Event to") @@ -54,14 +45,7 @@ func main() { client := api.NewStatemachineServiceClient(cc) // Fake order - order := OrderDetails{ - OrderId: uuid.New().String(), - CustomerId: uuid.New().String(), - OrderDate: time.Now(), - OrderTotal: 100.0, - } - details, _ := json.Marshal(order) - + order := NewOrderDetails(uuid.New().String(), "cust-1234", 123.55) response, err := client.ConsumeEvent(context.Background(), &api.EventRequest{ Event: &api.Event{ @@ -70,7 +54,7 @@ func main() { Transition: &api.Transition{ Event: *event, }, - Details: string(details), + Details: order.String(), Originator: "new gRPC Client with details", }, Dest: *fsmId, diff --git a/clients/orders.go b/clients/orders.go new file mode 100644 index 0000000..a7ea8e9 --- /dev/null +++ b/clients/orders.go @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2022 AlertAvert.com. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Author: Marco Massenzio (marco@alertavert.com) + */ + +package main + +import ( + "encoding/json" + "time" +) + +type OrderDetails struct { + OrderId string + CustomerId string + OrderDate time.Time + OrderTotal float64 +} + +func NewOrderDetails(orderId, customerId string, orderTotal float64) *OrderDetails { + return &OrderDetails{ + OrderId: orderId, + CustomerId: customerId, + OrderDate: time.Now(), + OrderTotal: orderTotal, + } +} + +func (o *OrderDetails) String() string { + res, error := json.Marshal(o) + if error != nil { + panic(error) + } + return string(res) +} diff --git a/clients/sqs_client.go b/clients/sqs_client.go index 47e86cd..db841a5 100644 --- a/clients/sqs_client.go +++ b/clients/sqs_client.go @@ -23,11 +23,15 @@ import ( "flag" "fmt" "github.com/aws/aws-sdk-go/aws" + "github.com/golang/protobuf/proto" "github.com/google/uuid" + "google.golang.org/protobuf/types/known/timestamppb" "os" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/sqs" + + protos "github.com/massenz/statemachine-proto/golang/api" ) var CTX = context.TODO() @@ -55,12 +59,6 @@ func main() { event := flag.String("evt", "", "The Event for the FSM") flag.Parse() - if *fsmId == "" || *event == "" { - panic(fmt.Errorf("must specify both of -id and -evt")) - } - fmt.Printf("Publishing Event `%s` for FSM `%s` to SQS Topic: [%s]\n", - *event, *fsmId, *q) - queue := NewSqs(endpoint) queueUrl, err := queue.GetQueueUrl(&sqs.GetQueueUrlInput{ QueueName: q, @@ -68,22 +66,25 @@ func main() { if err != nil { panic(err) } - _, err = queue.SendMessage(&sqs.SendMessageInput{ - MessageAttributes: map[string]*sqs.MessageAttributeValue{ - "DestinationId": { - DataType: aws.String("String"), - StringValue: aws.String(*fsmId), - }, - "EventId": { - DataType: aws.String("String"), - StringValue: aws.String(uuid.NewString()), - }, - "Sender": { - DataType: aws.String("String"), - StringValue: aws.String("SQS Client"), - }, + + if *fsmId == "" || *event == "" { + panic(fmt.Errorf("must specify both of -id and -evt")) + } + fmt.Printf("Publishing Event `%s` for FSM `%s` to SQS Topic: [%s]\n", *event, *fsmId, *q) + order := NewOrderDetails(uuid.NewString(), "sqs-cust-1234", 99.99) + msg := &protos.EventRequest{ + Event: &protos.Event{ + EventId: uuid.NewString(), + Timestamp: timestamppb.Now(), + Transition: &protos.Transition{Event: *event}, + Originator: "New SQS Client with Details", + Details: order.String(), }, - MessageBody: aws.String(*event), + Dest: *fsmId, + } + + _, err = queue.SendMessage(&sqs.SendMessageInput{ + MessageBody: aws.String(proto.MarshalTextString(msg)), QueueUrl: queueUrl.QueueUrl, }) if err != nil { diff --git a/pubsub/pubsub_suite_test.go b/pubsub/pubsub_suite_test.go index 3dfd1df..531528d 100644 --- a/pubsub/pubsub_suite_test.go +++ b/pubsub/pubsub_suite_test.go @@ -20,6 +20,7 @@ package pubsub_test import ( "fmt" + "github.com/golang/protobuf/proto" "github.com/massenz/go-statemachine/pubsub" "github.com/massenz/statemachine-proto/golang/api" "os" @@ -122,24 +123,9 @@ func getSqsMessage(queue string) *sqs.Message { // send it over the `queue`, so that we can test the Publisher can correctly receive it. func postSqsMessage(queue string, msg *api.EventRequest) error { q := pubsub.GetQueueUrl(testSqsClient, queue) - evt := msg.Event - testLog.Debug("Post Message -- Timestamp: %v", evt.Timestamp) + testLog.Debug("Post Message -- Timestamp: %v", msg.Event.Timestamp) _, err := testSqsClient.SendMessage(&sqs.SendMessageInput{ - MessageAttributes: map[string]*sqs.MessageAttributeValue{ - "DestinationId": { - DataType: aws.String("String"), - StringValue: aws.String(msg.Dest), - }, - "EventId": { - DataType: aws.String("String"), - StringValue: aws.String(evt.EventId), - }, - "Sender": { - DataType: aws.String("String"), - StringValue: aws.String(evt.Originator), - }, - }, - MessageBody: aws.String(evt.Transition.Event), + MessageBody: aws.String(proto.MarshalTextString(msg)), QueueUrl: &q, }) return err diff --git a/pubsub/sqs_sub.go b/pubsub/sqs_sub.go index 3f52f7d..d152377 100644 --- a/pubsub/sqs_sub.go +++ b/pubsub/sqs_sub.go @@ -23,12 +23,11 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/sqs" + "github.com/golang/protobuf/proto" "github.com/google/uuid" log "github.com/massenz/slf4go/logging" protos "github.com/massenz/statemachine-proto/golang/api" - "google.golang.org/protobuf/types/known/timestamppb" "os" - "strconv" "time" ) @@ -140,51 +139,35 @@ func (s *SqsSubscriber) Subscribe(topic string, done <-chan interface{}) { func (s *SqsSubscriber) ProcessMessage(msg *sqs.Message, queueUrl *string) { s.logger.Trace("Processing Message %v", msg.MessageId) - timestamp := msg.Attributes[sqs.MessageSystemAttributeNameSentTimestamp] - if timestamp == nil { - s.logger.Warn("No Timestamp in received event, using current time") - timestamp = aws.String(strconv.FormatInt(time.Now().Unix(), 10)) + // The body of the message (the actual request) is mandatory. + if msg.Body == nil { + s.logger.Error("Message %v has no body", msg.MessageId) + // TODO: publish error to DLQ. + return + } + var request protos.EventRequest + err := proto.UnmarshalText(*msg.Body, &request) + if err != nil { + s.logger.Error("Message %v has invalid body: %s", msg.MessageId, err.Error()) + // TODO: publish error to DLQ. + return } - // The body of the message (the actual event) and the destination (the FSM ID) are mandatory. - destId, hasDest := msg.MessageAttributes["DestinationId"] - if msg.Body != nil && hasDest { - // The Event ID is optional and, if missing, will be generated here. - eventId, hasId := msg.MessageAttributes["EventId"] - if !hasId { - eventId = &sqs.MessageAttributeValue{ - DataType: aws.String("String"), - StringValue: aws.String(uuid.NewString()), - } - } - // An SQS Message timestamp is a Unix milliseconds from epoch. - // TODO: We may need some amount of error-checking here. - ts, _ := strconv.ParseInt(*timestamp, 10, 64) - var request = protos.EventRequest{ - Event: &protos.Event{ - EventId: *eventId.StringValue, - Timestamp: timestamppb.New(time.UnixMilli(ts)), - Transition: &protos.Transition{ - Event: *msg.Body, - }, - }, - Dest: *destId.StringValue, - } - if sender := msg.MessageAttributes["Sender"]; sender != nil { - request.Event.Originator = *sender.StringValue - } - if details := msg.MessageAttributes["Details"]; details != nil { - request.Event.Details = *details.StringValue - } - s.events <- request - } else { - errDetails := fmt.Sprintf("No Destination ID or Event in %v", msg.String()) + destId := request.Dest + if destId == "" { + errDetails := fmt.Sprintf("No Destination ID in %v", request.String()) s.logger.Error(errDetails) // TODO: publish error to DLQ. + return + } + // The Event ID is optional and, if missing, will be generated here. + if request.Event.EventId == "" { + request.Event.EventId = uuid.NewString() } + s.events <- request s.logger.Debug("Removing message %v from SQS", *msg.MessageId) - _, err := s.client.DeleteMessage(&sqs.DeleteMessageInput{ + _, err = s.client.DeleteMessage(&sqs.DeleteMessageInput{ QueueUrl: queueUrl, ReceiptHandle: msg.ReceiptHandle, }) From 373c49b4171c6699b0b3242929b7da6cf6fc956b Mon Sep 17 00:00:00 2001 From: Marco Massenzio Date: Thu, 1 Sep 2022 18:51:58 -0700 Subject: [PATCH 4/5] Preparing to release --- README.md | 60 +++++++++++++++++++++++++++---------------- build.settings | 2 +- clients/sqs_client.go | 16 ++++++++++++ 3 files changed, 55 insertions(+), 23 deletions(-) diff --git a/README.md b/README.md index 9ded513..373aee6 100644 --- a/README.md +++ b/README.md @@ -211,35 +211,51 @@ GET /api/v1/configurations/test.orders:v3 ### SQS Messages -> **NOTE**
-> **The format of the `Body` of the message will change, to include a `details` string** +#### EventRequest To send an Event to an FSM via an SQS Message we use the [following code](clients/sqs_client.go): ```golang - _, err = queue.SendMessage(&sqs.SendMessageInput{ - MessageAttributes: map[string]*sqs.MessageAttributeValue{ - "DestinationId": { - DataType: aws.String("String"), - StringValue: aws.String("6b5af0e8-9033-47e2-97db-337476f1402a"), - }, - "EventId": { - DataType: aws.String("String"), - StringValue: aws.String(uuid.NewString()), - }, - "Sender": { - DataType: aws.String("String"), - StringValue: aws.String("SQS Client"), - }, - }, - MessageBody: aws.String("backorder"), - QueueUrl: queueUrl.QueueUrl, - }) +// This is the object you want to send across as Event's metadata. +order := NewOrderDetails(uuid.NewString(), "sqs-cust-1234", 99.99) + +msg := &protos.EventRequest{ + Event: &protos.Event{ + // This is actually unnecessary; if no EventId is present, SM will + // generate one automatically and if the client does not need to store + // it somewhere else, it is safe to omit it. + EventId: uuid.NewString(), + + // This is also unnecessary, as SM will automatically generate a timestamp + // if one is not already present. + Timestamp: timestamppb.Now(), + + Transition: &protos.Transition{Event: "backorder"}, + Originator: "New SQS Client with Details", + + // Here you convert the Event metadata to a string by, e.g., JSON-serializing it. + Details: order.String(), + }, + + // This is the unique ID for the entity you are sending the event to; MUST + // match the `id` of an existing `statemachine` (see the REST API). + Dest: "6b5af0e8-9033-47e2-97db-337476f1402a", +} + +_, err = queue.SendMessage(&sqs.SendMessageInput{ + // Here we serialize the Protobuf using text serialization. + MessageBody: aws.String(proto.MarshalTextString(msg)), + QueueUrl: queueUrl.QueueUrl, +}) ``` -This will cause a `backorder` event to be sent to our FSM whose `id` matches the UUID in `dest`; if there are errors (eg, the FSM does not exist, or the event is not allowed for the machine's configuration and current state) errors may be optionally sent to the SQS queue configured via the `-errors` option (see [Running the Server](#running-the-server)): see the [`pubsub` code](pubsub/sqs_pub.go) code for details as to how we encode the error message as an SQS message. +This will cause a `backorder` event to be sent to our FSM whose `id` matches the UUID in `Dest`; if there are errors (eg, the FSM does not exist, or the event is not allowed for the machine's configuration and current state) errors may be optionally sent to the SQS queue configured via the `-errors` option (see [Running the Server](#running-the-server)): see the [`pubsub` code](pubsub/sqs_pub.go) code for details as to how we encode the error message as an SQS message. + +See [`EventRequest` in `statemachine-proto`](https://github.com/massenz/statemachine-proto/blob/main/api/statemachine.proto#L86) for details on the event being sent. + +#### SQS Error notifications -`TODO: add encoding description of the notification message` +`TODO:` Once we refactor `EventErrorMessage` we should update this section too. ### gRPC Methods diff --git a/build.settings b/build.settings index a41eaea..84f4649 100644 --- a/build.settings +++ b/build.settings @@ -1,3 +1,3 @@ # Build configuration -version = 0.5.0 +version = 0.5.1 diff --git a/clients/sqs_client.go b/clients/sqs_client.go index db841a5..d109c5a 100644 --- a/clients/sqs_client.go +++ b/clients/sqs_client.go @@ -52,6 +52,8 @@ func NewSqs(endpoint *string) *sqs.SQS { return sqs.New(sess) } +// main simulates a Client sending an SQS event message for an Order entity +// whose status is being tracked by `sm-server`. func main() { endpoint := flag.String("endpoint", "", "Use http://localhost:4566 to use LocalStack") q := flag.String("q", "", "The SQS Queue to send an Event to") @@ -71,15 +73,29 @@ func main() { panic(fmt.Errorf("must specify both of -id and -evt")) } fmt.Printf("Publishing Event `%s` for FSM `%s` to SQS Topic: [%s]\n", *event, *fsmId, *q) + + // This is the object you want to send across as Event's metadata. order := NewOrderDetails(uuid.NewString(), "sqs-cust-1234", 99.99) + msg := &protos.EventRequest{ Event: &protos.Event{ + // This is actually unnecessary; if no EventId is present, SM will + // generate one automatically and if the client does not need to store + // it somewhere else, it is safe to omit it. EventId: uuid.NewString(), + + // This is also unnecessary, as SM will automatically generate a timestamp + // if one is not already present. Timestamp: timestamppb.Now(), Transition: &protos.Transition{Event: *event}, Originator: "New SQS Client with Details", + + // Here you convert the Event metadata to a string by, e.g., JSON-serializing it. Details: order.String(), }, + + // This is the unique ID for the entity you are sending the event to; MUST + // match the `id` of an existing `statemachine` (see the REST API). Dest: *fsmId, } From 2f4b1cc652dd3a844a0831ac8ca314ece97d60ce Mon Sep 17 00:00:00 2001 From: Marco Massenzio <1153951+massenz@users.noreply.github.com> Date: Thu, 1 Sep 2022 19:15:32 -0700 Subject: [PATCH 5/5] Update Event enrichment with ts/id if missing (#30) --- api/fsm.go | 46 ++++++++----------------------------------- clients/sqs_client.go | 7 +++---- grpc/grpc_server.go | 16 ++++++--------- pubsub/sqs_sub.go | 12 +++++------ 4 files changed, 23 insertions(+), 58 deletions(-) diff --git a/api/fsm.go b/api/fsm.go index 71722da..8782836 100644 --- a/api/fsm.go +++ b/api/fsm.go @@ -185,42 +185,12 @@ func CheckValid(c *protos.Configuration) error { return nil } -//////// encoding interface ///////////// - -type MyConfig struct { - protos.Configuration -} - -type MyFSM struct { - protos.FiniteStateMachine -} - -// MarshalBinary is needed to encode the data before storing in Redis, -// and to retrieve it later. -// -// **NOTE** the receiver must be a concrete type (NOT a pointer) or the -// serialization to Redis will fail. -func (x MyConfig) MarshalBinary() ([]byte, error) { - return proto.Marshal(&x) -} - -// UnmarshalBinary is the dual of MarshalBinary and will parse the -// binary data into the receiver. -// See: https://pkg.go.dev/encoding -func (x *MyConfig) UnmarshalBinary(data []byte) error { - res := proto.Unmarshal(data, x) - return res -} - -// Identical implementation for the FiniteStateMachine, but necessary as -// we can't really define an ABC for both types, and using proto.Message wouldn't -// work either. - -func (x MyFSM) MarshalBinary() ([]byte, error) { - return proto.Marshal(&x) -} - -func (x *MyFSM) UnmarshalBinary(data []byte) error { - res := proto.Unmarshal(data, x) - return res +// UpdateEvent adds the ID and timestamp to the event, if not already set. +func UpdateEvent(event *protos.Event) { + if event.EventId == "" { + event.EventId = uuid.NewString() + } + if event.Timestamp == nil { + event.Timestamp = tspb.Now() + } } diff --git a/clients/sqs_client.go b/clients/sqs_client.go index d109c5a..e69d9b7 100644 --- a/clients/sqs_client.go +++ b/clients/sqs_client.go @@ -25,7 +25,6 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/golang/protobuf/proto" "github.com/google/uuid" - "google.golang.org/protobuf/types/known/timestamppb" "os" "github.com/aws/aws-sdk-go/aws/session" @@ -82,16 +81,16 @@ func main() { // This is actually unnecessary; if no EventId is present, SM will // generate one automatically and if the client does not need to store // it somewhere else, it is safe to omit it. - EventId: uuid.NewString(), + //EventId: uuid.NewString(), // This is also unnecessary, as SM will automatically generate a timestamp // if one is not already present. - Timestamp: timestamppb.Now(), + //Timestamp: timestamppb.Now(), Transition: &protos.Transition{Event: *event}, Originator: "New SQS Client with Details", // Here you convert the Event metadata to a string by, e.g., JSON-serializing it. - Details: order.String(), + Details: order.String(), }, // This is the unique ID for the entity you are sending the event to; MUST diff --git a/grpc/grpc_server.go b/grpc/grpc_server.go index 1b05ecb..f03643d 100644 --- a/grpc/grpc_server.go +++ b/grpc/grpc_server.go @@ -22,12 +22,10 @@ import ( "context" "fmt" "github.com/google/uuid" - "github.com/massenz/slf4go/logging" - "google.golang.org/grpc" - "google.golang.org/protobuf/types/known/timestamppb" - "github.com/massenz/go-statemachine/api" "github.com/massenz/go-statemachine/storage" + "github.com/massenz/slf4go/logging" + "google.golang.org/grpc" protos "github.com/massenz/statemachine-proto/golang/api" ) @@ -60,12 +58,10 @@ func (s *grpcSubscriber) ConsumeEvent(ctx context.Context, request *protos.Event if request.Event.Transition.Event == "" { return nil, api.MissingEventNameError } - if request.Event.EventId == "" { - request.Event.EventId = uuid.NewString() - } - if request.Event.Timestamp == nil { - request.Event.Timestamp = timestamppb.Now() - } + + // If missing, add ID and timestamp. + api.UpdateEvent(request.Event) + s.Logger.Trace("Sending Event to channel: %v", request.Event) // TODO: use the context and cancel the request if the channel cannot accept // the event within the given timeout. diff --git a/pubsub/sqs_sub.go b/pubsub/sqs_sub.go index d152377..1d2ba9e 100644 --- a/pubsub/sqs_sub.go +++ b/pubsub/sqs_sub.go @@ -24,11 +24,13 @@ import ( "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/sqs" "github.com/golang/protobuf/proto" - "github.com/google/uuid" log "github.com/massenz/slf4go/logging" - protos "github.com/massenz/statemachine-proto/golang/api" "os" "time" + + "github.com/massenz/go-statemachine/api" + + protos "github.com/massenz/statemachine-proto/golang/api" ) // TODO: should we need to generalize and abstract the implementation of a Subscriber? @@ -160,10 +162,8 @@ func (s *SqsSubscriber) ProcessMessage(msg *sqs.Message, queueUrl *string) { // TODO: publish error to DLQ. return } - // The Event ID is optional and, if missing, will be generated here. - if request.Event.EventId == "" { - request.Event.EventId = uuid.NewString() - } + // The Event ID and timestamp are optional and, if missing, will be generated here. + api.UpdateEvent(request.Event) s.events <- request s.logger.Debug("Removing message %v from SQS", *msg.MessageId)