From 8bc346bc4c5e6b0781bf1fd119ee5042e6e29967 Mon Sep 17 00:00:00 2001 From: Marco Massenzio <1153951+massenz@users.noreply.github.com> Date: Sat, 25 Feb 2023 09:19:20 -0800 Subject: [PATCH] [#64, #67] Removes REST server & in-memory store (#85) --- .github/workflows/{build.yml => test.yml} | 4 +- .run/Run all tests.run.xml | 2 +- Makefile | 12 +- api/fsm.go | 4 +- cmd/main.go | 172 +++++---- server/configuration_handlers.go | 95 ----- server/configuration_handlers_test.go | 193 ---------- server/event_handlers.go | 62 ---- server/event_handlers_test.go | 173 --------- server/health_handler.go | 58 --- server/http_server.go | 99 ------ server/http_server_test.go | 37 -- server/server_suite_test.go | 22 -- server/statemachine_handlers.go | 180 ---------- server/statemachine_handlers_test.go | 413 ---------------------- server/types.go | 76 ---- storage/memory_store.go | 163 --------- storage/memory_store_test.go | 120 ------- 18 files changed, 109 insertions(+), 1776 deletions(-) rename .github/workflows/{build.yml => test.yml} (90%) delete mode 100644 server/configuration_handlers.go delete mode 100644 server/configuration_handlers_test.go delete mode 100644 server/event_handlers.go delete mode 100644 server/event_handlers_test.go delete mode 100644 server/health_handler.go delete mode 100644 server/http_server.go delete mode 100644 server/http_server_test.go delete mode 100644 server/server_suite_test.go delete mode 100644 server/statemachine_handlers.go delete mode 100644 server/statemachine_handlers_test.go delete mode 100644 server/types.go delete mode 100644 storage/memory_store.go delete mode 100644 storage/memory_store_test.go diff --git a/.github/workflows/build.yml b/.github/workflows/test.yml similarity index 90% rename from .github/workflows/build.yml rename to .github/workflows/test.yml index e90a88d..434796e 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/test.yml @@ -3,7 +3,7 @@ # Copyright (c) 2022 AlertAvert.com. All rights reserved. # Author: Marco Massenzio (marco@alertavert.com) # -name: Build & Test +name: Test on: push: @@ -28,4 +28,4 @@ jobs: run: | mkdir -p ${HOME}/.aws && cp data/credentials ${HOME}/.aws/ export AWS_REGION=us-west-2 - go test ./api ./grpc ./pubsub ./server ./storage + go test ./api ./grpc ./pubsub ./storage diff --git a/.run/Run all tests.run.xml b/.run/Run all tests.run.xml index 7946e6a..69bd431 100644 --- a/.run/Run all tests.run.xml +++ b/.run/Run all tests.run.xml @@ -2,7 +2,7 @@ - + diff --git a/Makefile b/Makefile index 26b61d6..90dbe38 100644 --- a/Makefile +++ b/Makefile @@ -21,7 +21,7 @@ dockerfile := docker/Dockerfile # Edit only the packages list, when adding new functionality, # the rest is deduced automatically. # -pkgs := ./api ./grpc ./pubsub ./server ./storage +pkgs := ./api ./grpc ./pubsub ./storage all_go := $(shell for d in $(pkgs); do find $$d -name "*.go"; done) test_srcs := $(shell for d in $(pkgs); do find $$d -name "*_test.go"; done) srcs := $(filter-out $(test_srcs),$(all_go)) @@ -61,16 +61,16 @@ fmt: ## Formats the Go source code using 'go fmt' $(bin): cmd/main.go $(srcs) @mkdir -p $(shell dirname $(bin)) GOOS=$(GOOS); GOARCH=$(GOARCH); go build \ - -ldflags "-X $(GOMOD)/server.Release=$(release)" \ + -ldflags "-X $(GOMOD)/api.Release=$(release)" \ -o $(bin) cmd/main.go $(dockerbin): GOOS=linux; GOARCH=amd64; go build \ - -ldflags "-X $(GOMOD)/server.Release=$(release)" \ + -ldflags "-X $(GOMOD)/api.Release=$(release)" \ -o $(dockerbin) cmd/main.go .PHONY: build -build: $(bin) ## Builds the server +build: $(bin) ## Builds the Statemachine server binary test: $(srcs) $(test_srcs) ## Runs all tests ginkgo $(pkgs) @@ -89,13 +89,13 @@ container: $(dockerbin) ## Builds the container image .PHONY: start start: ## Starts the Redis and LocalStack containers, and Creates the SQS Queues in LocalStack @RELEASE=$(release) BASEDIR=$(shell pwd) docker compose -f $(compose) --project-name sm up redis localstack -d - @sleep 1 + @sleep 3 @for queue in events notifications; do \ aws --no-cli-pager --endpoint-url=http://localhost:4566 \ --region us-west-2 \ sqs create-queue --queue-name $$queue; done >/dev/null # We need to wait for the SQS Queues to be up before starting the server. - @RELEASE=$(release) BASEDIR=$(shell pwd) docker compose -f $(compose) --project-name sm up server -d + #@RELEASE=$(release) BASEDIR=$(shell pwd) docker compose -f $(compose) --project-name sm up server -d .PHONY: stop stop: ## Stops the Redis and LocalStack containers diff --git a/api/fsm.go b/api/fsm.go index 19bd24e..49f9501 100644 --- a/api/fsm.go +++ b/api/fsm.go @@ -25,6 +25,8 @@ const ( ) var ( + Release string + MalformedConfigurationError = fmt.Errorf("this configuration cannot be parsed") MissingNameConfigurationError = fmt.Errorf( "configuration must always specify a name (and optionally a version)") @@ -43,7 +45,7 @@ var ( // Logger is made accessible so that its `Level` can be changed // or can be sent to a `NullLog` during testing. - Logger = log.NewLog("fsm") + Logger = log.NewLog("api") ) // ConfiguredStateMachine is the internal representation of an FSM, which diff --git a/cmd/main.go b/cmd/main.go index 7118226..64695e6 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -13,14 +13,20 @@ import ( "errors" "flag" "fmt" + g "google.golang.org/grpc" + "net" + "os" + "os/signal" + "sync" + "syscall" + + log "github.com/massenz/slf4go/logging" + protos "github.com/massenz/statemachine-proto/golang/api" + + "github.com/massenz/go-statemachine/api" "github.com/massenz/go-statemachine/grpc" "github.com/massenz/go-statemachine/pubsub" - "github.com/massenz/go-statemachine/server" "github.com/massenz/go-statemachine/storage" - log "github.com/massenz/slf4go/logging" - protos "github.com/massenz/statemachine-proto/golang/api" - "net" - "sync" ) func SetLogLevel(services []log.Loggable, level log.LogLevel) { @@ -32,27 +38,34 @@ func SetLogLevel(services []log.Loggable, level log.LogLevel) { } var ( - logger = log.NewLog("sm-server") - serverLogLevel log.LogLevel = log.INFO + logger = log.NewLog("sm-server") - host = "0.0.0.0" - - store storage.StoreManager - - sub *pubsub.SqsSubscriber - pub *pubsub.SqsPublisher = nil listener *pubsub.EventsListener - - // TODO: for now blocking channels; we will need to confirm - // whether we can support a fully concurrent system with a - // buffered channel + pub *pubsub.SqsPublisher = nil + sub *pubsub.SqsSubscriber + store storage.StoreManager + wg sync.WaitGroup + + // notificationsCh is the channel over which we send error notifications + // to publish on the appropriate queue. + // The Listener will produce error notifications, which will be consumed + // by the PubSub Publisher (if configured) which in turn will produce to + // the -notifications topic. + // + // Not configured by default, it is only used if a -notifications queue + // is defined. notificationsCh chan protos.EventResponse = nil - eventsCh = make(chan protos.EventRequest) - wg sync.WaitGroup + + // eventsCh is the channel over which the Listener receive Events to process. + // Both the gRPC server and the PubSub Subscriber (if configured) will produce + // events for this channel. + // + // Currently, this is a blocking channel (capacity for one item), but once we + // parallelize events processing we can make it deeper. + eventsCh = make(chan protos.EventRequest) ) func main() { - defer close(eventsCh) var awsEndpoint = flag.String("endpoint-url", "", "HTTP URL for AWS SQS to connect to; usually best left undefined, "+ @@ -61,17 +74,14 @@ func main() { "If set, connects to Redis with cluster-mode enabled") var debug = flag.Bool("debug", false, "Verbose logs; better to avoid on Production services") - var eventsTopic = flag.String("events", "", "Topi name to receive events from") + var eventsTopic = flag.String("events", "", "Topic name to receive events from") var grpcPort = flag.Int("grpc-port", 7398, "The port for the gRPC server") var noTls = flag.Bool("insecure", false, "If set, TLS will be disabled (NOT recommended)") - var localOnly = flag.Bool("local", false, - "If set, it only listens to incoming requests from the local host") var maxRetries = flag.Int("max-retries", storage.DefaultMaxRetries, "Max number of attempts for a recoverable error to be retried against the Redis cluster") var notificationsTopic = flag.String("notifications", "", "(optional) The name of the topic to publish events' outcomes to; if not "+ "specified, no outcomes will be published") - var port = flag.Int("http-port", 7399, "HTTP Server port for the REST API") var redisUrl = flag.String("redis", "", "For single node Redis instances: host:port "+ "for the Redis instance. For redis clusters: a comma-separated list of redis nodes. "+ "If using an ElastiCache Redis cluster with cluster mode enabled, this can also be the configuration endpoint.") @@ -82,15 +92,7 @@ func main() { " performance, do not use in production or on heavily loaded systems (will override the -debug option)") flag.Parse() - logger.Info("starting State Machine Server - Rel. %s", server.Release) - - if *localOnly { - logger.Info("listening on local interface only") - host = "localhost" - } else { - logger.Warn("listening on all interfaces") - } - addr := fmt.Sprintf("%s:%d", host, *port) + logger.Info("starting State Machine Server - Rel. %s", api.Release) if *redisUrl == "" { logger.Fatal(errors.New("in-memory store deprecated, a Redis server must be configured")) @@ -99,27 +101,32 @@ func main() { logger.Info("with timeout: %s, max-retries: %d", *timeout, *maxRetries) store = storage.NewRedisStore(*redisUrl, *cluster, 1, *timeout, *maxRetries) } - server.SetStore(store) - - if *eventsTopic == "" { - logger.Warn("no PubSub event topic configured, events can only be sent via gRPC calls") - } - logger.Info("connecting to SQS Topic: %s", *eventsTopic) - sub = pubsub.NewSqsSubscriber(eventsCh, awsEndpoint) - if sub == nil { - panic("Cannot create a valid SQS Subscriber") + done := make(chan interface{}) + if *eventsTopic != "" { + logger.Info("connecting to SQS Topic: %s", *eventsTopic) + sub = pubsub.NewSqsSubscriber(eventsCh, awsEndpoint) + if sub == nil { + logger.Fatal(errors.New("cannot create a valid SQS Subscriber")) + } + wg.Add(1) + go func() { + defer wg.Done() + logger.Info("subscribing to events on topic [%s]", *eventsTopic) + sub.Subscribe(*eventsTopic, done) + }() } if *notificationsTopic != "" { - logger.Info("notifications topic: %s", *notificationsTopic) + logger.Info("sending errors to DLQ topic [%s]", *notificationsTopic) notificationsCh = make(chan protos.EventResponse) defer close(notificationsCh) pub = pubsub.NewSqsPublisher(notificationsCh, awsEndpoint) if pub == nil { - panic("Cannot create a valid SQS Publisher") + logger.Fatal(errors.New("cannot create a valid SQS Publisher")) } go pub.Publish(*notificationsTopic) } + listener = pubsub.NewEventsListener(&pubsub.ListenerOptions{ EventsChannel: eventsCh, NotificationsChannel: notificationsCh, @@ -127,50 +134,59 @@ func main() { // TODO: workers pool not implemented yet. ListenersPoolSize: 0, }) - go sub.Subscribe(*eventsTopic, nil) - - // This should not be invoked until we have initialized all the services. - setLogLevel(*debug, *trace) - logger.Info("starting events listener") - go listener.ListenForMessages() + wg.Add(1) + go func() { + defer wg.Done() + listener.ListenForMessages() + }() logger.Info("gRPC server running at tcp://:%d", *grpcPort) - go startGrpcServer(*grpcPort, *noTls, eventsCh) + svr := startGrpcServer(*grpcPort, *noTls, eventsCh) - // TODO: REST Server is deprecated and will be removed soon. - scheme := "http" - logger.Info("HTTP server (REST API) running at %s://%s", scheme, addr) - srv := server.NewHTTPServer(addr, serverLogLevel) - logger.Fatal(srv.ListenAndServe()) + // This should not be invoked until we have initialized all the services. + setLogLevel(*debug, *trace) + logger.Info("statemachine server ready for processing events...") + RunUntilStopped(done, svr) + logger.Info("...done. Goodbye.") +} + +func RunUntilStopped(done chan interface{}, svr *g.Server) { + // Trap Ctrl-C and SIGTERM (Docker/Kubernetes) to shutdown gracefully + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + + // Block until a signal is received. + _ = <-c + logger.Info("shutting down services...") + close(done) + close(eventsCh) + svr.GracefulStop() + logger.Info("waiting for services to exit...") + wg.Wait() } // setLogLevel sets the logging level for all the services' loggers, depending on // whether the -debug or -trace flag is enabled (if neither, we log at INFO level). // If both are set, then -trace takes priority. func setLogLevel(debug bool, trace bool) { - if debug { + var logLevel log.LogLevel = log.INFO + if debug && !trace { logger.Info("verbose logging enabled") - logger.Level = log.DEBUG - SetLogLevel([]log.Loggable{store, pub, sub, listener}, log.DEBUG) - serverLogLevel = log.DEBUG - } - - if trace { - logger.Warn("trace logging Enabled") - logger.Level = log.TRACE - server.EnableTracing() - SetLogLevel([]log.Loggable{store, pub, sub, listener}, log.TRACE) - serverLogLevel = log.TRACE + logLevel = log.DEBUG + } else if trace { + logger.Info("trace logging enabled") + logLevel = log.TRACE } + logger.Level = logLevel + SetLogLevel([]log.Loggable{store, pub, sub, listener}, logLevel) } // startGrpcServer will start a new gRPC server, bound to // the local `port` and will send any incoming // `EventRequest` to the receiving channel. // This MUST be run as a go-routine, which never returns -func startGrpcServer(port int, disableTls bool, events chan<- protos.EventRequest) { - defer wg.Done() +func startGrpcServer(port int, disableTls bool, events chan<- protos.EventRequest) *g.Server { l, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) if err != nil { panic(err) @@ -183,10 +199,16 @@ func startGrpcServer(port int, disableTls bool, events chan<- protos.EventReques TlsEnabled: !disableTls, }) if err != nil { - log.RootLog.Fatal(err) - } - err = grpcServer.Serve(l) - if err != nil { - log.RootLog.Fatal(err) + logger.Fatal(err) } + wg.Add(1) + go func() { + defer wg.Done() + err = grpcServer.Serve(l) + if err != nil { + logger.Fatal(err) + } + logger.Info("gRPC server exited") + }() + return grpcServer } diff --git a/server/configuration_handlers.go b/server/configuration_handlers.go deleted file mode 100644 index 36daa5b..0000000 --- a/server/configuration_handlers.go +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Copyright (c) 2022 AlertAvert.com. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Author: Marco Massenzio (marco@alertavert.com) - */ - -package server - -import ( - "encoding/json" - "fmt" - "github.com/gorilla/mux" - . "github.com/massenz/go-statemachine/api" - "github.com/massenz/statemachine-proto/golang/api" - pj "google.golang.org/protobuf/encoding/protojson" - "net/http" - "strings" -) - -func CreateConfigurationHandler(w http.ResponseWriter, r *http.Request) { - defer trace(r.RequestURI)() - defaultContent(w) - - var config api.Configuration - err := json.NewDecoder(r.Body).Decode(&config) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - if config.Version == "" { - config.Version = "v1" - } - logger.Debug("Creating new configuration with Version ID: %s", GetVersionId(&config)) - - // TODO: Check this configuration does not already exist. - - err = CheckValid(&config) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - logger.Debug("Configuration is valid (starting state: %s)", config.StartingState) - - err = storeManager.PutConfig(&config) - if err != nil { - if strings.Contains(err.Error(), "already exists") { - logger.Debug("attempts to POST an existing configuration id: %v", err) - http.Error(w, err.Error(), http.StatusConflict) - return - } - logger.Error("cannot store Configuration to store: %v", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - logger.Debug("Configuration stored: %s", GetVersionId(&config)) - - w.Header().Add("Location", ConfigurationsEndpoint+"/"+GetVersionId(&config)) - w.WriteHeader(http.StatusCreated) - err = json.NewEncoder(w).Encode(&config) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - return -} - -func GetConfigurationHandler(w http.ResponseWriter, r *http.Request) { - defer trace(r.RequestURI)() - defaultContent(w) - - vars := mux.Vars(r) - if vars == nil { - logger.Error("Unexpected missing path parameter in Request URI: %s", - r.RequestURI) - http.Error(w, UnexpectedError.Error(), http.StatusInternalServerError) - return - } - - cfgId := vars["cfg_id"] - config, ok := storeManager.GetConfig(cfgId) - if !ok { - http.Error(w, fmt.Sprintf("Configuration [%s] not found", cfgId), http.StatusNotFound) - return - } - resp, err := pj.Marshal(config) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - w.Write(resp) - return -} diff --git a/server/configuration_handlers_test.go b/server/configuration_handlers_test.go deleted file mode 100644 index d8c9ccb..0000000 --- a/server/configuration_handlers_test.go +++ /dev/null @@ -1,193 +0,0 @@ -/* - * Copyright (c) 2022 AlertAvert.com. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Author: Marco Massenzio (marco@alertavert.com) - */ - -package server_test - -import ( - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - - "bytes" - "encoding/json" - slf4go "github.com/massenz/slf4go/logging" - "io" - "io/ioutil" - "net/http" - "net/http/httptest" - "strings" - - . "github.com/massenz/go-statemachine/api" - "github.com/massenz/go-statemachine/server" - "github.com/massenz/go-statemachine/storage" - protos "github.com/massenz/statemachine-proto/golang/api" -) - -const ( - OrdersConfig = "../data/orders.json" -) - -var _ = Describe("Configuration Handlers", func() { - var ( - req *http.Request - writer *httptest.ResponseRecorder - store storage.StoreManager - - // NOTE: we are using the Router here as we need to correctly also parse - // the URI for path args (just using the handler will not do that) - // The `router` can be safely set for all the test contexts, once and for all. - router = server.NewRouter() - ) - // Disabling verbose logging, as it pollutes test output; - // set it back to DEBUG when tests fail, and you need to - // diagnose the failure. - server.SetLogLevel(slf4go.NONE) - - Context("when creating configurations", func() { - BeforeEach(func() { - writer = httptest.NewRecorder() - // TODO: use a RedisStore instead, once we have TestContainers enabled. - store = storage.NewInMemoryStore() - store.SetLogLevel(slf4go.NONE) - server.SetStore(store) - }) - Context("with a valid JSON", func() { - BeforeEach(func() { - body := ReadTestdata() - req = httptest.NewRequest(http.MethodPost, - strings.Join([]string{server.ApiPrefix, server.ConfigurationsEndpoint}, "/"), body) - }) - - It("should succeed", func() { - router.ServeHTTP(writer, req) - Expect(writer.Code).To(Equal(http.StatusCreated)) - location := writer.Header().Get("Location") - Expect(strings.HasSuffix(location, "/test.orders:v2")).To(BeTrue()) - - var response protos.Configuration - Expect(json.Unmarshal(writer.Body.Bytes(), &response)).ToNot(HaveOccurred()) - Expect(response.Name).To(Equal("test.orders")) - Expect(response.States).To(Equal([]string{ - "start", - "pending", - "shipping", - "delivered", - "completed", - "closed", - })) - Expect(response.StartingState).To(Equal("start")) - }) - It("should fill the cache", func() { - router.ServeHTTP(writer, req) - Expect(writer.Code).To(Equal(http.StatusCreated)) - _, found := store.GetConfig("test.orders:v2") - Expect(found).To(BeTrue()) - }) - It("should fail for an existing version", func() { - router.ServeHTTP(writer, req) - Expect(writer.Code).To(Equal(http.StatusCreated)) - body := ReadTestdata() - - // POST the config again - writer2 := httptest.NewRecorder() - req2 := httptest.NewRequest(http.MethodPost, - strings.Join([]string{server.ApiPrefix, server.ConfigurationsEndpoint}, "/"), body) - router.ServeHTTP(writer2, req2) - Expect(writer2.Code).To(Equal(http.StatusConflict)) - }) - }) - - Context("with an invalid JSON", func() { - var body io.Reader - BeforeEach(func() { - req = httptest.NewRequest(http.MethodPost, - strings.Join([]string{server.ApiPrefix, server.ConfigurationsEndpoint}, "/"), body) - }) - It("without name, states or transitions, will fail", func() { - body = strings.NewReader(`{ - "version": "v1", - "starting_state": "source" - }`) - router.ServeHTTP(writer, req) - Expect(writer.Code).To(Equal(http.StatusBadRequest)) - }) - It("without states, will fail", func() { - body = strings.NewReader(`{ - "name": "fake", - "version": "v1", - "starting_state": "source" - "transitions": [ - {"from": "source", "to": "tested", "event": "test"}, - {"from": "tested", "to": "binary", "event": "build"} - ], - }`) - router.ServeHTTP(writer, req) - Expect(writer.Code).To(Equal(http.StatusBadRequest)) - }) - }) - }) - Context("when retrieving configurations", func() { - var spaceship = protos.Configuration{ - Name: "spaceship", - Version: "v1", - StartingState: "earth", - States: []string{"earth", "orbit", "mars"}, - Transitions: []*protos.Transition{ - {From: "earth", To: "orbit", Event: "launch"}, - {From: "orbit", To: "mars", Event: "land"}, - }, - } - var cfgId string - BeforeEach(func() { - writer = httptest.NewRecorder() - // We need an empty, clean store for each test to avoid cross-polluting it. - store = storage.NewInMemoryStore() - store.SetLogLevel(slf4go.NONE) - server.SetStore(store) - - Expect(store.PutConfig(&spaceship)).ToNot(HaveOccurred()) - cfgId = GetVersionId(&spaceship) - }) - It("with a valid ID should succeed", func() { - endpoint := strings.Join([]string{server.ApiPrefix, server.ConfigurationsEndpoint, - cfgId}, "/") - req = httptest.NewRequest(http.MethodGet, endpoint, nil) - var result protos.Configuration - router.ServeHTTP(writer, req) - Expect(writer.Code).To(Equal(http.StatusOK)) - Expect(json.NewDecoder(writer.Body).Decode(&result)).ToNot(HaveOccurred()) - Expect(GetVersionId(&result)).To(Equal(cfgId)) - Expect(result.States).To(Equal(spaceship.States)) - Expect(len(result.Transitions)).To(Equal(len(spaceship.Transitions))) - for n, t := range result.Transitions { - Expect(t.From).To(Equal(spaceship.Transitions[n].From)) - Expect(t.To).To(Equal(spaceship.Transitions[n].To)) - Expect(t.Event).To(Equal(spaceship.Transitions[n].Event)) - } - }) - It("with an invalid ID, it will return Not Found", func() { - endpoint := strings.Join([]string{server.ApiPrefix, server.ConfigurationsEndpoint, - "fake:v3"}, "/") - req = httptest.NewRequest(http.MethodGet, endpoint, nil) - router.ServeHTTP(writer, req) - Expect(writer.Code).To(Equal(http.StatusNotFound)) - }) - It("without ID, it will fail with a NOT ALLOWED error", func() { - endpoint := strings.Join([]string{server.ApiPrefix, server.ConfigurationsEndpoint}, "/") - req = httptest.NewRequest(http.MethodGet, endpoint, nil) - router.ServeHTTP(writer, req) - Expect(writer.Code).To(Equal(http.StatusMethodNotAllowed)) - }) - }) -}) - -func ReadTestdata() *bytes.Reader { - configJson, err := ioutil.ReadFile(OrdersConfig) - Expect(err).ToNot(HaveOccurred()) - return bytes.NewReader(configJson) -} diff --git a/server/event_handlers.go b/server/event_handlers.go deleted file mode 100644 index a44b1f0..0000000 --- a/server/event_handlers.go +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Copyright (c) 2022 AlertAvert.com. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Author: Marco Massenzio (marco@alertavert.com) - */ - -package server - -import ( - "encoding/json" - "fmt" - "github.com/gorilla/mux" - "net/http" -) - -func GetEventHandler(w http.ResponseWriter, r *http.Request) { - defer trace(r.RequestURI)() - defaultContent(w) - - // We don't really need to check for the presence of the parameter, - // as the Mux router takes care of all the error handling for us. - vars := mux.Vars(r) - cfgName := vars["cfg_name"] - evtId := vars["evt_id"] - logger.Debug("Looking up Event: %s#%s", cfgName, evtId) - - event, ok := storeManager.GetEvent(evtId, cfgName) - if !ok { - http.Error(w, fmt.Sprintf("Event [%s] not found", evtId), http.StatusNotFound) - return - } - logger.Debug("Found Event: %s", event.String()) - - err := json.NewEncoder(w).Encode(&EventResponse{ID: evtId, Event: event}) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - } -} - -func GetOutcomeHandler(w http.ResponseWriter, r *http.Request) { - defer trace(r.RequestURI)() - defaultContent(w) - - vars := mux.Vars(r) - cfgName := vars["cfg_name"] - evtId := vars["evt_id"] - logger.Debug("Looking up Outcome for Event: %s#%s", cfgName, evtId) - - outcome, ok := storeManager.GetOutcomeForEvent(evtId, cfgName) - if !ok { - http.Error(w, fmt.Sprintf("Outcome for Event [%s] not found", evtId), http.StatusNotFound) - return - } - logger.Debug("Found Event Outcome: %s", outcome.String()) - err := json.NewEncoder(w).Encode(MakeOutcomeResponse(outcome)) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - } -} diff --git a/server/event_handlers_test.go b/server/event_handlers_test.go deleted file mode 100644 index f4b895b..0000000 --- a/server/event_handlers_test.go +++ /dev/null @@ -1,173 +0,0 @@ -/* - * Copyright (c) 2022 AlertAvert.com. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Author: Marco Massenzio (marco@alertavert.com) - */ - -package server_test - -import ( - . "github.com/JiaYongfei/respect/gomega" - "github.com/google/uuid" - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - - "encoding/json" - log "github.com/massenz/slf4go/logging" - "net/http" - "net/http/httptest" - "strings" - - . "github.com/massenz/go-statemachine/api" - "github.com/massenz/go-statemachine/server" - "github.com/massenz/go-statemachine/storage" - - protos "github.com/massenz/statemachine-proto/golang/api" -) - -var _ = Describe("Event Handlers", func() { - var ( - req *http.Request - writer *httptest.ResponseRecorder - store storage.StoreManager - - // NOTE: we are using the Router here as we need to correctly also parse - // the URI for path args (just using the router will not do that) - // The `router` can be safely set for all the test contexts, once and for all. - router = server.NewRouter() - ) - // Disabling verbose logging, as it pollutes test output; - // set it back to DEBUG when tests fail, and you need to - // diagnose the failure. - server.SetLogLevel(log.NONE) - - Context("when retrieving an Event", func() { - var id string - var evt *protos.Event - - BeforeEach(func() { - store = storage.NewInMemoryStore() - store.SetLogLevel(log.NONE) - server.SetStore(store) - - writer = httptest.NewRecorder() - evt = NewEvent("test") - id = evt.EventId - Expect(store.PutEvent(evt, "test-cfg", storage.NeverExpire)).ToNot(HaveOccurred()) - }) - It("can be retrieved with a valid ID", func() { - endpoint := strings.Join([]string{server.ApiPrefix, - server.EventsEndpoint, "test-cfg", id}, "/") - req = httptest.NewRequest(http.MethodGet, endpoint, nil) - router.ServeHTTP(writer, req) - Expect(writer.Code).To(Equal(http.StatusOK)) - - var result server.EventResponse - Expect(json.NewDecoder(writer.Body).Decode(&result)).ToNot(HaveOccurred()) - Expect(result.ID).To(Equal(id)) - Expect(result.Event).To(Respect(evt)) - }) - It("with an invalid ID will return Not Found", func() { - endpoint := strings.Join([]string{server.ApiPrefix, - server.EventsEndpoint, "test-cfg", uuid.NewString()}, "/") - req = httptest.NewRequest(http.MethodGet, endpoint, nil) - - router.ServeHTTP(writer, req) - Expect(writer.Code).To(Equal(http.StatusNotFound)) - }) - It("with a missing Config will (eventually) return Not Found", func() { - endpoint := strings.Join([]string{server.ApiPrefix, - server.EventsEndpoint, "", "12345"}, "/") - req = httptest.NewRequest(http.MethodGet, endpoint, nil) - router.ServeHTTP(writer, req) - // Note: this is done by the router, automatically, removing the redundant slash - Expect(writer.Code).To(Equal(http.StatusMovedPermanently)) - newLoc := strings.Join([]string{server.ApiPrefix, - server.EventsEndpoint, "12345"}, "/") - Expect(writer.Header().Get("Location")).To(Equal(newLoc)) - - req = httptest.NewRequest(http.MethodGet, newLoc, nil) - writer = httptest.NewRecorder() - router.ServeHTTP(writer, req) - // Note: this is done by the router, automatically, removing the redundant slash - Expect(writer.Code).To(Equal(http.StatusNotFound)) - }) - It("with gibberish data will still fail gracefully", func() { - endpoint := strings.Join([]string{server.ApiPrefix, - server.EventsEndpoint, "fake", id}, "/") - - req = httptest.NewRequest(http.MethodGet, endpoint, nil) - router.ServeHTTP(writer, req) - Expect(writer.Code).To(Equal(http.StatusNotFound)) - }) - }) - - Context("when retrieving an Event Outcome", func() { - var id string - var outcome *protos.EventOutcome - var cfgName = "test-cfg" - - BeforeEach(func() { - store = storage.NewInMemoryStore() - store.SetLogLevel(log.NONE) - server.SetStore(store) - - writer = httptest.NewRecorder() - id = uuid.NewString() - outcome = &protos.EventOutcome{ - Code: protos.EventOutcome_Ok, - Id: "fake-sm", - Details: "something happened", - } - Expect(store.AddEventOutcome(id, cfgName, outcome, - storage.NeverExpire)).ToNot(HaveOccurred()) - }) - It("can be retrieved with a valid ID", func() { - endpoint := strings.Join([]string{server.ApiPrefix, - server.EventsEndpoint, server.EventsOutcome, cfgName, id}, "/") - req = httptest.NewRequest(http.MethodGet, endpoint, nil) - router.ServeHTTP(writer, req) - Expect(writer.Code).To(Equal(http.StatusOK)) - - var result server.OutcomeResponse - Expect(json.NewDecoder(writer.Body).Decode(&result)).ToNot(HaveOccurred()) - Expect(result.StatusCode).To(Equal(outcome.Code.String())) - Expect(result.Message).To(Equal(outcome.Details)) - Expect(result.Destination).To(Equal(outcome.Id)) - }) - It("with an invalid ID will return Not Found", func() { - endpoint := strings.Join([]string{server.ApiPrefix, - server.EventsEndpoint, server.EventsOutcome, cfgName, uuid.NewString()}, "/") - req = httptest.NewRequest(http.MethodGet, endpoint, nil) - router.ServeHTTP(writer, req) - Expect(writer.Code).To(Equal(http.StatusNotFound)) - }) - It("with a missing Config will (eventually) return Not Found", func() { - endpoint := strings.Join([]string{server.ApiPrefix, - server.EventsEndpoint, server.EventsOutcome, "", "12345"}, "/") - req = httptest.NewRequest(http.MethodGet, endpoint, nil) - router.ServeHTTP(writer, req) - // Note: this is done by the router, automatically, removing the redundant slash - Expect(writer.Code).To(Equal(http.StatusMovedPermanently)) - newLoc := strings.Join([]string{server.ApiPrefix, - server.EventsEndpoint, server.EventsOutcome, "12345"}, "/") - Expect(writer.Header().Get("Location")).To(Equal(newLoc)) - - req = httptest.NewRequest(http.MethodGet, newLoc, nil) - writer = httptest.NewRecorder() - router.ServeHTTP(writer, req) - // Note: this is done by the router, automatically, removing the redundant slash - Expect(writer.Code).To(Equal(http.StatusNotFound)) - }) - It("with gibberish data will still fail gracefully", func() { - endpoint := strings.Join([]string{server.ApiPrefix, - server.EventsEndpoint, server.EventsOutcome, "fake", id}, "/") - req = httptest.NewRequest(http.MethodGet, endpoint, nil) - router.ServeHTTP(writer, req) - Expect(writer.Code).To(Equal(http.StatusNotFound)) - }) - }) -}) diff --git a/server/health_handler.go b/server/health_handler.go deleted file mode 100644 index 61aa7b2..0000000 --- a/server/health_handler.go +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright (c) 2022 AlertAvert.com. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Author: Marco Massenzio (marco@alertavert.com) - */ - -package server - -import ( - "encoding/json" - "fmt" - "net/http" -) - -// NOTE: We make the handlers "exportable" so they can be tested, do NOT call directly. - -type HealthResponse struct { - Status string `json:"status"` - Release string `json:"release"` -} - -func HealthHandler(w http.ResponseWriter, r *http.Request) { - // Standard preamble for all handlers, sets tracing (if enabled) and default content type. - defer trace(r.RequestURI)() - defaultContent(w) - - var response MessageResponse - res := HealthResponse{ - Status: "OK", - Release: Release, - } - var err error - if storeManager == nil { - err = fmt.Errorf("store manager is not initialized") - } else { - err = storeManager.Health() - } - if err != nil { - logger.Error("Health check failed: %s", err) - res.Status = "ERROR" - response = MessageResponse{ - Msg: res, - Error: fmt.Sprintf("error connecting to storage: %s", err), - } - } else { - response = MessageResponse{ - Msg: res, - } - } - err = json.NewEncoder(w).Encode(response) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } -} diff --git a/server/http_server.go b/server/http_server.go deleted file mode 100644 index bd08eb0..0000000 --- a/server/http_server.go +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Copyright (c) 2022 AlertAvert.com. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Author: Marco Massenzio (marco@alertavert.com) - */ - -package server - -import ( - "github.com/massenz/go-statemachine/storage" - log "github.com/massenz/slf4go/logging" - "net/http" - "strings" - "time" - - "github.com/gorilla/mux" -) - -const ( - ApiPrefix = "/api/v1" - HealthEndpoint = "/health" - ConfigurationsEndpoint = "configurations" - StatemachinesEndpoint = "statemachines" - EventsEndpoint = "events" - EventsOutcome = "outcome" -) - -var ( - // Release carries the version of the binary, as set by the build script - // See: https://blog.alexellis.io/inject-build-time-vars-golang/ - Release string - - shouldTrace bool - logger = log.NewLog("server") - storeManager storage.StoreManager -) - -func trace(endpoint string) func() { - if !shouldTrace { - return func() {} - } - start := time.Now() - logger.Trace("Handling: [%s]\n", endpoint) - return func() { logger.Trace("%s took %s\n", endpoint, time.Since(start)) } -} - -func defaultContent(w http.ResponseWriter) { - w.Header().Add(ContentType, ApplicationJson) -} - -func EnableTracing() { - shouldTrace = true - logger.Level = log.TRACE -} - -func SetLogLevel(level log.LogLevel) { - logger.Level = level -} - -// NewRouter returns a gorilla/mux Router for the server routes; exposed so -// that path params are testable. -func NewRouter() *mux.Router { - r := mux.NewRouter() - r.HandleFunc(HealthEndpoint, HealthHandler).Methods("GET") - - r.HandleFunc(strings.Join([]string{ApiPrefix, ConfigurationsEndpoint}, "/"), - CreateConfigurationHandler).Methods("POST") - r.HandleFunc(strings.Join([]string{ApiPrefix, ConfigurationsEndpoint, "{cfg_id}"}, "/"), - GetConfigurationHandler).Methods("GET") - - r.HandleFunc(strings.Join([]string{ApiPrefix, StatemachinesEndpoint}, "/"), - CreateStatemachineHandler).Methods("POST") - r.HandleFunc(strings.Join([]string{ApiPrefix, StatemachinesEndpoint, "{cfg_name}", "{sm_id}"}, "/"), - GetStatemachineHandler).Methods("GET") - r.HandleFunc(strings.Join([]string{ApiPrefix, StatemachinesEndpoint, "{cfg_name}", "{sm_id}"}, "/"), - ModifyStatemachineHandler).Methods("PUT") - - r.HandleFunc(strings.Join([]string{ApiPrefix, EventsEndpoint, "{cfg_name}", "{evt_id}"}, "/"), - GetEventHandler).Methods("GET") - r.HandleFunc(strings.Join([]string{ApiPrefix, EventsEndpoint, EventsOutcome, "{cfg_name}", "{evt_id}"}, "/"), - GetOutcomeHandler).Methods("GET") - - return r -} - -func NewHTTPServer(addr string, logLevel log.LogLevel) *http.Server { - logger.Level = logLevel - return &http.Server{ - Addr: addr, - Handler: NewRouter(), - } -} - -func SetStore(store storage.StoreManager) { - storeManager = store -} diff --git a/server/http_server_test.go b/server/http_server_test.go deleted file mode 100644 index e6d32f4..0000000 --- a/server/http_server_test.go +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright (c) 2022 AlertAvert.com. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Author: Marco Massenzio (marco@alertavert.com) - */ - -package server_test - -import ( - "github.com/massenz/go-statemachine/server" - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - "net/http" - "net/http/httptest" -) - -var _ = Describe("Server", func() { - var ( - req *http.Request - handler http.Handler - writer *httptest.ResponseRecorder - ) - Context("when started", func() { - BeforeEach(func() { - handler = http.HandlerFunc(server.HealthHandler) - req = httptest.NewRequest(http.MethodGet, "/health", nil) - writer = httptest.NewRecorder() - }) - It("is healthy", func() { - handler.ServeHTTP(writer, req) - Expect(writer.Code).To(Equal(http.StatusOK)) - }) - }) -}) diff --git a/server/server_suite_test.go b/server/server_suite_test.go deleted file mode 100644 index ac087fe..0000000 --- a/server/server_suite_test.go +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright (c) 2022 AlertAvert.com. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Author: Marco Massenzio (marco@alertavert.com) - */ - -package server_test - -import ( - "testing" - - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" -) - -func TestServer(t *testing.T) { - RegisterFailHandler(Fail) - RunSpecs(t, "Server Suite") -} diff --git a/server/statemachine_handlers.go b/server/statemachine_handlers.go deleted file mode 100644 index 64969e1..0000000 --- a/server/statemachine_handlers.go +++ /dev/null @@ -1,180 +0,0 @@ -/* - * Copyright (c) 2022 AlertAvert.com. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Author: Marco Massenzio (marco@alertavert.com) - */ - -package server - -import ( - "encoding/json" - "fmt" - "github.com/google/uuid" - "github.com/gorilla/mux" - "github.com/massenz/go-statemachine/storage" - "net/http" - "strings" - - . "github.com/massenz/go-statemachine/api" - "github.com/massenz/statemachine-proto/golang/api" -) - -func CreateStatemachineHandler(w http.ResponseWriter, r *http.Request) { - defer trace(r.RequestURI)() - defaultContent(w) - - var request StateMachineRequest - err := json.NewDecoder(r.Body).Decode(&request) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - if request.Configuration == "" { - http.Error(w, "must always specify a fully qualified configuration version", - http.StatusBadRequest) - return - } - if request.ID == "" { - request.ID = uuid.New().String() - } else { - logger.Debug("checking whether FSM [%s] already exists", request.ID) - configNameParts := strings.Split(request.Configuration, - storage.KeyPrefixComponentsSeparator) - if len(configNameParts) != 2 { - http.Error(w, fmt.Sprintf("config name is not properly formatted (name:version): %s", - request.Configuration), http.StatusBadRequest) - return - } - if _, found := storeManager.GetStateMachine(request.ID, configNameParts[0]); found { - logger.Debug("FSM already exists, returning a Conflict error") - http.Error(w, storage.AlreadyExistsError(request.ID).Error(), http.StatusConflict) - return - } - } - - logger.Debug("looking up Config [%s]", request.Configuration) - cfg, ok := storeManager.GetConfig(request.Configuration) - if !ok { - http.Error(w, "configuration not found", http.StatusNotFound) - return - } - logger.Debug("found configuration [%s]", cfg) - logger.Info("Creating a new statemachine [%s] (configuration [%s])", - request.ID, GetVersionId(cfg)) - fsm := &api.FiniteStateMachine{ - ConfigId: GetVersionId(cfg), - State: cfg.StartingState, - History: make([]*api.Event, 0), - } - err = storeManager.PutStateMachine(request.ID, fsm) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - w.Header().Add("Location", strings.Join([]string{ApiPrefix, StatemachinesEndpoint, cfg.Name, - request.ID}, "/")) - w.WriteHeader(http.StatusCreated) - err = json.NewEncoder(w).Encode(&StateMachineResponse{ - ID: request.ID, - StateMachine: fsm, - }) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - } -} - -// ModifyStatemachineHandler handles PUT request and can be used to change an -// existing FSM to use a different configuration and, optionally, to change its state. -// Events history CANNOT be altered. -func ModifyStatemachineHandler(w http.ResponseWriter, r *http.Request) { - defer trace(r.RequestURI)() - defaultContent(w) - - vars := mux.Vars(r) - if vars == nil { - logger.Error("unexpected missing path parameters in Request URI: %s", - r.RequestURI) - http.Error(w, UnexpectedError.Error(), http.StatusMethodNotAllowed) - return - } - cfgName := vars["cfg_name"] - fsmId := vars["sm_id"] - - var request StateMachineChangeRequest - err := json.NewDecoder(r.Body).Decode(&request) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - fsm, found := storeManager.GetStateMachine(fsmId, cfgName) - if !found { - http.Error(w, storage.NotFoundError(fsmId).Error(), http.StatusNotFound) - return - } - - if request.ConfigurationVersion != "" { - // If the Configuration is specified in the request, we assume - // that the caller wanted to update it. - configId := strings.Join([]string{cfgName, request.ConfigurationVersion}, storage.KeyPrefixComponentsSeparator) - logger.Debug("looking up Config [%s]", configId) - cfg, ok := storeManager.GetConfig(configId) - if !ok { - http.Error(w, "configuration not found", http.StatusNotFound) - return - } - logger.Debug("found configuration [%s]", cfg) - fsm.ConfigId = GetVersionId(cfg) - } - if request.CurrentState != "" && request.CurrentState != fsm.State { - logger.Debug("changing FSM state from [%s] to [%s]", fsm.State, request.CurrentState) - fsm.State = request.CurrentState - } - err = storeManager.PutStateMachine(fsmId, fsm) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - err = json.NewEncoder(w).Encode(&StateMachineResponse{ - ID: fsmId, - StateMachine: fsm, - }) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - } -} - -func GetStatemachineHandler(w http.ResponseWriter, r *http.Request) { - defer trace(r.RequestURI)() - defaultContent(w) - - vars := mux.Vars(r) - if vars == nil { - logger.Error("unexpected missing path parameters in Request URI: %s", - r.RequestURI) - http.Error(w, UnexpectedError.Error(), http.StatusMethodNotAllowed) - return - } - cfgName := vars["cfg_name"] - smId := vars["sm_id"] - - logger.Debug("looking up FSM [%s]", storage.NewKeyForMachine(smId, cfgName)) - stateMachine, ok := storeManager.GetStateMachine(smId, cfgName) - if !ok { - http.Error(w, "FSM not found", http.StatusNotFound) - return - } - logger.Debug("found FSM in state '%s'", stateMachine.GetState()) - - err := json.NewEncoder(w).Encode(&StateMachineResponse{ - ID: smId, - StateMachine: stateMachine, - }) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - } -} diff --git a/server/statemachine_handlers_test.go b/server/statemachine_handlers_test.go deleted file mode 100644 index 7664cda..0000000 --- a/server/statemachine_handlers_test.go +++ /dev/null @@ -1,413 +0,0 @@ -/* - * Copyright (c) 2022 AlertAvert.com. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Author: Marco Massenzio (marco@alertavert.com) - */ - -package server_test - -import ( - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - - "bytes" - "encoding/json" - log "github.com/massenz/slf4go/logging" - "google.golang.org/protobuf/types/known/timestamppb" - "io" - "net/http" - "net/http/httptest" - "strings" - - . "github.com/massenz/go-statemachine/api" - "github.com/massenz/go-statemachine/server" - "github.com/massenz/go-statemachine/storage" - - "github.com/massenz/statemachine-proto/golang/api" -) - -// ReaderFromRequest takes a request object and creates a Reader to be used -// as the body of the request. -func ReaderFromRequest(request interface{}) io.Reader { - jsonBytes, err := json.Marshal(request) - Expect(err).ToNot(HaveOccurred()) - return bytes.NewBuffer(jsonBytes) -} - -var _ = Describe("Statemachine Handlers", func() { - var ( - req *http.Request - writer *httptest.ResponseRecorder - store storage.StoreManager - - // NOTE: we are using the Router here as we need to correctly also parse - // the URI for path args (just using the router will not do that) - // The `router` can be safely set for all the test contexts, once and for all. - router = server.NewRouter() - ) - // Disabling verbose logging, as it pollutes test output; - // set it back to DEBUG when tests fail, and you need to - // diagnose the failure. - server.SetLogLevel(log.NONE) - - Context("when creating state machines", func() { - BeforeEach(func() { - writer = httptest.NewRecorder() - store = storage.NewInMemoryStore() - server.SetStore(store) - }) - Context("with a valid request", func() { - var request *server.StateMachineRequest - BeforeEach(func() { - request = &server.StateMachineRequest{ - ID: "test-machine", - Configuration: "test-config:v1", - } - config := &api.Configuration{ - Name: "test-config", - Version: "v1", - States: nil, - Transitions: nil, - StartingState: "start", - } - Expect(store.PutConfig(config)).ToNot(HaveOccurred()) - req = httptest.NewRequest(http.MethodPost, - strings.Join([]string{server.ApiPrefix, server.StatemachinesEndpoint}, "/"), - ReaderFromRequest(request)) - }) - It("should succeed", func() { - router.ServeHTTP(writer, req) - Expect(writer.Code).To(Equal(http.StatusCreated)) - Expect(writer.Header().Get("Location")).To(Equal( - strings.Join([]string{server.ApiPrefix, server.StatemachinesEndpoint, - "test-config", "test-machine"}, "/"))) - response := server.StateMachineResponse{} - Expect(json.Unmarshal(writer.Body.Bytes(), &response)).ToNot(HaveOccurred()) - - Expect(response.ID).To(Equal("test-machine")) - Expect(response.StateMachine.ConfigId).To(Equal("test-config:v1")) - Expect(response.StateMachine.State).To(Equal("start")) - }) - It("should save it in the backing store", func() { - router.ServeHTTP(writer, req) - Expect(writer.Code).To(Equal(http.StatusCreated)) - _, found := store.GetStateMachine("test-machine", "test-config") - Expect(found).To(BeTrue()) - }) - It("should store the correct data", func() { - router.ServeHTTP(writer, req) - Expect(writer.Code).To(Equal(http.StatusCreated)) - fsm, found := store.GetStateMachine("test-machine", "test-config") - Expect(found).To(BeTrue()) - Expect(fsm).ToNot(BeNil()) - Expect(fsm.ConfigId).To(Equal("test-config:v1")) - Expect(fsm.State).To(Equal("start")) - }) - It("should return a Conflict error if the ID already exists", func() { - Expect(store.PutStateMachine(request.ID, &api.FiniteStateMachine{ - ConfigId: request.Configuration, - State: "start", - })).ToNot(HaveOccurred()) - router.ServeHTTP(writer, req) - Expect(writer.Code).To(Equal(http.StatusConflict)) - }) - }) - Context("without specifying an ID", func() { - BeforeEach(func() { - request := &server.StateMachineRequest{ - Configuration: "test-config:v1", - } - config := &api.Configuration{ - Name: "test-config", - Version: "v1", - States: nil, - Transitions: nil, - StartingState: "start", - } - Expect(store.PutConfig(config)).ToNot(HaveOccurred()) - req = httptest.NewRequest(http.MethodPost, - strings.Join([]string{server.ApiPrefix, server.StatemachinesEndpoint}, "/"), - ReaderFromRequest(request)) - }) - - It("should succeed with a newly assigned ID", func() { - router.ServeHTTP(writer, req) - Expect(writer.Code).To(Equal(http.StatusCreated)) - location := writer.Header().Get("Location") - Expect(location).ToNot(BeEmpty()) - response := server.StateMachineResponse{} - Expect(json.Unmarshal(writer.Body.Bytes(), &response)).ToNot(HaveOccurred()) - - Expect(response.ID).ToNot(BeEmpty()) - - Expect(strings.HasSuffix(location, response.ID)).To(BeTrue()) - _, found := store.GetStateMachine(response.ID, "test-config") - Expect(found).To(BeTrue()) - }) - - }) - Context("with a non-existent configuration", func() { - BeforeEach(func() { - request := &server.StateMachineRequest{ - Configuration: "test-config:v2", - ID: "1234", - } - req = httptest.NewRequest(http.MethodPost, - strings.Join([]string{server.ApiPrefix, server.StatemachinesEndpoint}, "/"), - ReaderFromRequest(request)) - }) - It("should fail", func() { - router.ServeHTTP(writer, req) - Expect(writer.Code).To(Equal(http.StatusNotFound)) - location := writer.Header().Get("Location") - Expect(location).To(BeEmpty()) - response := server.StateMachineResponse{} - Expect(json.Unmarshal(writer.Body.Bytes(), &response)).To(HaveOccurred()) - _, found := store.GetConfig("1234") - Expect(found).To(BeFalse()) - }) - }) - }) - - Context("when modifying state machines", func() { - var configName string - var fsmId string - - BeforeEach(func() { - writer = httptest.NewRecorder() - store = storage.NewInMemoryStore() - server.SetStore(store) - }) - Context("with a valid request", func() { - var request *server.StateMachineChangeRequest - BeforeEach(func() { - configName = "test.config" - fsmId = "12345-abcdef" - request = &server.StateMachineChangeRequest{ - ConfigurationVersion: "v2", - CurrentState: "new-state", - } - config := &api.Configuration{ - Name: configName, - Version: "v1", - States: nil, - Transitions: nil, - StartingState: "old-state", - } - Expect(store.PutConfig(config)).ToNot(HaveOccurred()) - fsm := &api.FiniteStateMachine{ - ConfigId: GetVersionId(config), - State: config.StartingState, - History: []*api.Event{ - {Transition: &api.Transition{Event: "order_placed"}, Originator: ""}, - {Transition: &api.Transition{Event: "checked_out"}, Originator: ""}, - }, - } - Expect(store.PutStateMachine(fsmId, fsm)).ToNot(HaveOccurred()) - config.Version = request.ConfigurationVersion - Expect(store.PutConfig(config)).ToNot(HaveOccurred()) - req = httptest.NewRequest(http.MethodPut, - strings.Join([]string{ - server.ApiPrefix, server.StatemachinesEndpoint, configName, fsmId, - }, "/"), - ReaderFromRequest(request)) - }) - It("should succeed", func() { - router.ServeHTTP(writer, req) - Expect(writer.Code).To(Equal(http.StatusOK)) - response := server.StateMachineResponse{} - Expect(json.Unmarshal(writer.Body.Bytes(), &response)).ToNot(HaveOccurred()) - - Expect(response.ID).To(Equal(fsmId)) - Expect(response.StateMachine.ConfigId).To(Equal(configName + ":" + request.ConfigurationVersion)) - Expect(response.StateMachine.State).To(Equal(request.CurrentState)) - }) - It("should save it in the backing store", func() { - router.ServeHTTP(writer, req) - Expect(writer.Code).To(Equal(http.StatusOK)) - fsm, found := store.GetStateMachine(fsmId, configName) - Expect(found).To(BeTrue()) - Expect(fsm.State).To(Equal(request.CurrentState)) - Expect(fsm.ConfigId).To(Equal(configName + ":" + request.ConfigurationVersion)) - Expect(len(fsm.History)).To(Equal(2)) - }) - It("should return a NotFound if the FSM does not exist", func() { - req = httptest.NewRequest(http.MethodPut, - strings.Join([]string{ - server.ApiPrefix, server.StatemachinesEndpoint, configName, "fake-fsm", - }, "/"), - ReaderFromRequest(request)) - router.ServeHTTP(writer, req) - Expect(writer.Code).To(Equal(http.StatusNotFound)) - - }) - It("should return a NotFound error if the Config does not exist", func() { - request = &server.StateMachineChangeRequest{ - ConfigurationVersion: "v8", - CurrentState: "fake-state", - } - req = httptest.NewRequest(http.MethodPut, - strings.Join([]string{ - server.ApiPrefix, server.StatemachinesEndpoint, configName, fsmId, - }, "/"), - ReaderFromRequest(request)) - router.ServeHTTP(writer, req) - Expect(writer.Code).To(Equal(http.StatusNotFound)) - }) - It("should fail gracefully with a malformed request", func() { - req = httptest.NewRequest(http.MethodPut, - strings.Join([]string{ - server.ApiPrefix, server.StatemachinesEndpoint, configName, fsmId, - }, "/"), - bytes.NewReader([]byte(`{"not": "my", "best": json}`))) - router.ServeHTTP(writer, req) - Expect(writer.Code).To(Equal(http.StatusBadRequest)) - }) - }) - }) - - Context("when retrieving a state machine", func() { - var id string - var fsm api.FiniteStateMachine - - BeforeEach(func() { - store = storage.NewInMemoryStore() - server.SetStore(store) - - writer = httptest.NewRecorder() - fsm = api.FiniteStateMachine{ - ConfigId: "order.card:v3", - State: "checkout", - History: []*api.Event{ - {Transition: &api.Transition{Event: "order_placed"}, Originator: ""}, - {Transition: &api.Transition{Event: "checked_out"}, Originator: ""}, - }, - } - id = "12345" - Expect(store.PutStateMachine(id, &fsm)).ToNot(HaveOccurred()) - }) - It("can be retrieved with a valid ID", func() { - store.SetLogLevel(log.NONE) - endpoint := strings.Join([]string{server.ApiPrefix, - server.StatemachinesEndpoint, "order.card", id}, "/") - req = httptest.NewRequest(http.MethodGet, endpoint, nil) - router.ServeHTTP(writer, req) - Expect(writer.Code).To(Equal(http.StatusOK)) - - var result server.StateMachineResponse - Expect(json.NewDecoder(writer.Body).Decode(&result)).ToNot(HaveOccurred()) - Expect(result.ID).To(Equal(id)) - sm := result.StateMachine - Expect(sm.ConfigId).To(Equal(fsm.ConfigId)) - Expect(sm.State).To(Equal(fsm.State)) - Expect(len(sm.History)).To(Equal(len(fsm.History))) - for n, t := range sm.History { - Expect(t.Transition.Event).To(Equal(fsm.History[n].Transition.Event)) - } - }) - It("with an invalid ID will return Not Found", func() { - endpoint := strings.Join([]string{server.ApiPrefix, - server.StatemachinesEndpoint, "foo"}, "/") - req = httptest.NewRequest(http.MethodGet, endpoint, nil) - - router.ServeHTTP(writer, req) - Expect(writer.Code).To(Equal(http.StatusNotFound)) - }) - It("with a missing ID will return Not Allowed", func() { - req = httptest.NewRequest(http.MethodGet, strings.Join([]string{server.ApiPrefix, - server.StatemachinesEndpoint}, "/"), nil) - router.ServeHTTP(writer, req) - Expect(writer.Code).To(Equal(http.StatusMethodNotAllowed)) - }) - It("with gibberish data will still fail gracefully", func() { - cfg := api.Configuration{} - Expect(store.PutConfig(&cfg)).ToNot(HaveOccurred()) - endpoint := strings.Join([]string{server.ApiPrefix, - server.StatemachinesEndpoint, "6789"}, "/") - req = httptest.NewRequest(http.MethodGet, endpoint, nil) - router.ServeHTTP(writer, req) - Expect(writer.Code).To(Equal(http.StatusNotFound)) - }) - }) - - Context("when the statemachine has events", func() { - var store storage.StoreManager - var fsmId = "12345" - var config *api.Configuration - - BeforeEach(func() { - writer = httptest.NewRecorder() - store = storage.NewInMemoryStore() - server.SetStore(store) - config = &api.Configuration{ - Name: "car", - Version: "v1", - States: []string{"stopped", "running", "slowing"}, - Transitions: []*api.Transition{ - {From: "stopped", To: "running", Event: "start"}, - {From: "running", To: "slowing", Event: "brake"}, - {From: "slowing", To: "running", Event: "accelerate"}, - {From: "slowing", To: "stopped", Event: "stop"}, - }, - StartingState: "stopped", - } - car, _ := NewStateMachine(config) - Expect(store.PutConfig(config)).To(Succeed()) - Expect(store.PutStateMachine(fsmId, car.FSM)).To(Succeed()) - }) - It("it should show them", func() { - found, _ := store.GetStateMachine(fsmId, "car") - car := ConfiguredStateMachine{ - Config: config, - FSM: found, - } - Expect(car.SendEvent(&api.Event{ - EventId: "1", - Timestamp: timestamppb.Now(), - Transition: &api.Transition{Event: "start"}, - Originator: "test", - Details: "this is a test", - })).To(Succeed()) - Expect(car.SendEvent(&api.Event{ - EventId: "2", - Timestamp: timestamppb.Now(), - Transition: &api.Transition{Event: "brake"}, - Originator: "test", - Details: "a test is this not", - })).To(Succeed()) - Expect(store.PutStateMachine(fsmId, car.FSM)).To(Succeed()) - - endpoint := strings.Join([]string{server.ApiPrefix, server.StatemachinesEndpoint, - config.Name, fsmId}, "/") - req = httptest.NewRequest(http.MethodGet, endpoint, nil) - router.ServeHTTP(writer, req) - Expect(writer.Code).To(Equal(http.StatusOK)) - - var result server.StateMachineResponse - Expect(json.NewDecoder(writer.Body).Decode(&result)).To(Succeed()) - - Expect(result.ID).To(Equal(fsmId)) - Expect(result.StateMachine).ToNot(BeNil()) - fsm := result.StateMachine - Expect(fsm.State).To(Equal("slowing")) - Expect(len(fsm.History)).To(Equal(2)) - var history []*api.Event - history = fsm.History - event := history[0] - Expect(event.EventId).To(Equal("1")) - Expect(event.Originator).To(Equal("test")) - Expect(event.Transition.Event).To(Equal("start")) - Expect(event.Transition.From).To(Equal("stopped")) - Expect(event.Transition.To).To(Equal("running")) - Expect(event.Details).To(Equal("this is a test")) - event = history[1] - Expect(event.EventId).To(Equal("2")) - Expect(event.Transition.Event).To(Equal("brake")) - Expect(event.Transition.To).To(Equal("slowing")) - Expect(event.Details).To(Equal("a test is this not")) - }) - }) -}) diff --git a/server/types.go b/server/types.go deleted file mode 100644 index 8c449c5..0000000 --- a/server/types.go +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright (c) 2022 AlertAvert.com. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Author: Marco Massenzio (marco@alertavert.com) - */ - -package server - -import ( - protos "github.com/massenz/statemachine-proto/golang/api" -) - -const ( - Authorization = "Authorization" - Bearer = "Bearer" - ContentType = "Content-Type" - ApplicationJson = "application/json" - AllContent = "*/*" - Html = "text/html" -) - -// MessageResponse is returned when a more appropriate response is not available. -type MessageResponse struct { - Msg interface{} `json:"message,omitempty"` - Error string `json:"error,omitempty"` -} - -// StateMachineRequest represents a request for a new FSM to be created, with an optional ID, -// and a reference to a fully qualified Configuration version. -// -// If the ID is not specified, a new UUID will be generated and returned. -// The Configuration is required and **must** match an existing Configuration full `name` and -// `version` (e.g., `orders:v2`) -type StateMachineRequest struct { - ID string `json:"id,omitempty"` - Configuration string `json:"configuration_version"` -} - -// StateMachineChangeRequest represents a request to modify (PUT)( an existing FSM. -// -// The ConfigurationVersion represents **only** the new `version` of the Configuration to be -// used (the `name` is passed in the API URI path). -// Both ConfigurationVersion and CurrentState are optional. -type StateMachineChangeRequest struct { - ConfigurationVersion string `json:"configuration_version,omitempty"` - CurrentState string `json:"current_state,omitempty"` -} - -// StateMachineResponse is returned when a new FSM is created, or as a response to a GET request -type StateMachineResponse struct { - ID string `json:"id"` - StateMachine *protos.FiniteStateMachine `json:"statemachine"` -} - -// EventResponse is returned as a response to a GET Event request -type EventResponse struct { - ID string `json:"id"` - Event *protos.Event `json:"event"` -} - -type OutcomeResponse struct { - StatusCode string `json:"status_code"` - Message string `json:"message"` - Destination string `json:"destination"` -} - -func MakeOutcomeResponse(outcome *protos.EventOutcome) *OutcomeResponse { - return &OutcomeResponse{ - StatusCode: outcome.Code.String(), - Message: outcome.Details, - Destination: outcome.Id, - } -} diff --git a/storage/memory_store.go b/storage/memory_store.go deleted file mode 100644 index b89ca35..0000000 --- a/storage/memory_store.go +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Copyright (c) 2022 AlertAvert.com. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Author: Marco Massenzio (marco@alertavert.com) - */ - -package storage - -import ( - "github.com/golang/protobuf/proto" - "github.com/massenz/go-statemachine/api" - slf4go "github.com/massenz/slf4go/logging" - "strings" - "sync" - "time" - - protos "github.com/massenz/statemachine-proto/golang/api" -) - -type InMemoryStore struct { - logger *slf4go.Log - mux sync.RWMutex - backingStore map[string][]byte -} - -func NewInMemoryStore() StoreManager { - return &InMemoryStore{ - backingStore: make(map[string][]byte), - logger: slf4go.NewLog("InMemoryStore"), - } -} - -func (csm *InMemoryStore) get(key string, value proto.Message) bool { - csm.mux.RLock() - defer csm.mux.RUnlock() - - bytes, ok := csm.backingStore[key] - csm.logger.Trace("key %s - Found: %t", key, ok) - if ok { - err := proto.Unmarshal(bytes, value) - if err != nil { - csm.logger.Error(err.Error()) - return false - } - } - return ok -} - -func (csm *InMemoryStore) put(key string, value proto.Message) error { - csm.mux.Lock() - defer csm.mux.Unlock() - - val, err := proto.Marshal(value) - if err == nil { - csm.logger.Trace("Storing key %s [%T]", key, value) - csm.backingStore[key] = val - } - return err -} - -func (csm *InMemoryStore) GetEvent(id string, cfg string) (*protos.Event, bool) { - key := NewKeyForEvent(id, cfg) - event := &protos.Event{} - return event, csm.get(key, event) -} - -func (csm *InMemoryStore) PutEvent(event *protos.Event, cfg string, ttl time.Duration) error { - key := NewKeyForEvent(event.EventId, cfg) - return csm.put(key, event) -} - -func (csm *InMemoryStore) AddEventOutcome(id string, cfg string, response *protos.EventOutcome, ttl time.Duration) error { - key := NewKeyForOutcome(id, cfg) - return csm.put(key, response) -} - -func (csm *InMemoryStore) GetOutcomeForEvent(id string, cfg string) (*protos.EventOutcome, bool) { - key := NewKeyForOutcome(id, cfg) - var outcome protos.EventOutcome - return &outcome, csm.get(key, &outcome) -} - -func (csm *InMemoryStore) GetConfig(id string) (cfg *protos.Configuration, ok bool) { - key := NewKeyForConfig(id) - csm.logger.Debug("Fetching Configuration [%s]", key) - cfg = &protos.Configuration{} - return cfg, csm.get(key, cfg) -} - -func (csm *InMemoryStore) PutConfig(cfg *protos.Configuration) error { - key := NewKeyForConfig(api.GetVersionId(cfg)) - if _, found := csm.backingStore[key]; found { - return AlreadyExistsError(key) - } - csm.logger.Debug("Storing Configuration [%s] with key: %s", api.GetVersionId(cfg), key) - return csm.put(key, cfg) -} - -func (csm *InMemoryStore) GetStateMachine(id string, cfg string) (*protos.FiniteStateMachine, bool) { - key := NewKeyForMachine(id, cfg) - csm.logger.Debug("Getting StateMachine [%s]", key) - var machine protos.FiniteStateMachine - if csm.get(key, &machine) { - csm.logger.Debug("Found StateMachine [%s] in state: %s", key, machine.State) - return &machine, true - } - csm.logger.Debug("Not found for key %s", key) - return nil, false -} - -func (csm *InMemoryStore) PutStateMachine(id string, machine *protos.FiniteStateMachine) error { - if machine == nil { - return IllegalStoreError(id) - } - key := NewKeyForMachine(id, strings.Split(machine.ConfigId, api.ConfigurationVersionSeparator)[0]) - csm.logger.Debug("Storing StateMachine [%s] with key: %s", id, key) - return csm.put(key, machine) -} - -func (csm *InMemoryStore) SetLogLevel(level slf4go.LogLevel) { - csm.logger.Level = level -} - -// SetTimeout does not really make sense for an in-memory store, so this is a no-op -func (csm *InMemoryStore) SetTimeout(_ time.Duration) { - // do nothing -} - -// GetTimeout does not really make sense for an in-memory store, -// so this just returns a NeverExpire constant. -func (csm *InMemoryStore) GetTimeout() time.Duration { - return NeverExpire -} - -func (csm *InMemoryStore) Health() error { - return nil -} - -func (csm *InMemoryStore) GetAllInState(cfg string, state string) []string { - // TODO [#33] Ability to query for all machines in a given state - csm.logger.Error(NotImplementedError("GetAllInState").Error()) - return nil -} - -func (csm *InMemoryStore) GetAllConfigs() []string { - // TODO [#33] Ability to query for all machines in a given state - csm.logger.Error(NotImplementedError("GetAllConfigs").Error()) - return nil -} - -func (csm *InMemoryStore) GetAllVersions(name string) []string { - // TODO [#33] Ability to query for all machines in a given state - csm.logger.Error(NotImplementedError("GetAllVersions").Error()) - return nil -} -func (csm *InMemoryStore) UpdateState(cfgName string, id string, oldState string, newState string) error { - // TODO [#33] Ability to query for all machines in a given state - csm.logger.Error(NotImplementedError("GetAllVersions").Error()) - return nil -} diff --git a/storage/memory_store_test.go b/storage/memory_store_test.go deleted file mode 100644 index 9a9f955..0000000 --- a/storage/memory_store_test.go +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Copyright (c) 2022 AlertAvert.com. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Author: Marco Massenzio (marco@alertavert.com) - */ - -package storage_test - -import ( - . "github.com/JiaYongfei/respect/gomega" - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - - "google.golang.org/protobuf/types/known/timestamppb" - - "github.com/massenz/go-statemachine/api" - "github.com/massenz/go-statemachine/storage" - protos "github.com/massenz/statemachine-proto/golang/api" -) - -var _ = Describe("InMemory Store", func() { - Context("for local testing", func() { - }) - Context("can be used to save and retrieve a Configuration", func() { - var store storage.StoreManager - var cfg *protos.Configuration - - BeforeEach(func() { - store = storage.NewInMemoryStore() - cfg = &protos.Configuration{ - Name: "my_conf", - Version: "v3", - StartingState: "start", - } - Expect(store.PutConfig(cfg)).ToNot(HaveOccurred()) - }) - It("can be created", func() { - Expect(store).ToNot(BeNil()) - }) - It("will give back the saved Configuration", func() { - found, ok := store.GetConfig(api.GetVersionId(cfg)) - Expect(ok).To(BeTrue()) - Expect(found).To(Respect(cfg)) - }) - It("will not allow to overwrite an existing config", func() { - Expect(store.PutConfig(&protos.Configuration{ - Name: "my_conf", - Version: "v3", - StartingState: "fake", - })).To(HaveOccurred()) - }) - It("will allow a different version", func() { - Expect(store.PutConfig(&protos.Configuration{ - Name: "my_conf", - Version: "v4", - StartingState: "fake", - })).ToNot(HaveOccurred()) - }) - }) - Context("can be used to save and retrieve a StateMachine", func() { - var store storage.StoreManager - var id string - var machine *protos.FiniteStateMachine - - BeforeEach(func() { - store = storage.NewInMemoryStore() - id = "1234" - machine = &protos.FiniteStateMachine{ - ConfigId: "test:v1", - State: "start", - History: nil, - } - Expect(store.PutStateMachine(id, machine)).ToNot(HaveOccurred()) - }) - It("will give it back unchanged", func() { - found, ok := store.GetStateMachine(id, "test") - Expect(ok).To(BeTrue()) - Expect(found).ToNot(BeNil()) - Expect(found.ConfigId).To(Equal(machine.ConfigId)) - Expect(found.History).To(Equal(machine.History)) - Expect(found.State).To(Equal(machine.State)) - }) - It("will return nil for a non-existent id", func() { - found, ok := store.GetStateMachine("fake", "test") - Expect(ok).To(BeFalse()) - Expect(found).To(BeNil()) - }) - It("will return an error for a nil FSM", func() { - machine.ConfigId = "missing" - Expect(store.PutStateMachine(id, nil)).To(HaveOccurred()) - }) - }) - Context("can be used to save and retrieve Events", func() { - var store = storage.NewInMemoryStore() - var id = "1234" - var event = &protos.Event{ - EventId: id, - Timestamp: timestamppb.Now(), - Transition: &protos.Transition{Event: "start"}, - Originator: "test", - Details: "some details", - } - BeforeEach(func() { - Expect(store.PutEvent(event, "test-cfg", storage.NeverExpire)).ToNot(HaveOccurred()) - }) - It("will give it back unchanged", func() { - found, ok := store.GetEvent(id, "test-cfg") - Expect(ok).To(BeTrue()) - Expect(found).ToNot(BeNil()) - Expect(found).To(Respect(event)) - }) - It("will return false for a non-existent id", func() { - _, ok := store.GetEvent("fake", "test-cfg") - Expect(ok).To(BeFalse()) - }) - }) -})