From 11a033be4d6e163cbd60d5d0519f63d86ca1f12b Mon Sep 17 00:00:00 2001 From: Marco Massenzio Date: Sat, 25 Feb 2023 22:43:48 -0800 Subject: [PATCH 1/4] Fixed GH action --- .github/workflows/release.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 4492e8f..8e55330 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -25,7 +25,7 @@ jobs: - name: Create release tag run: | - TAG=$(./get-tag) + TAG=$(make version) echo TAG=${TAG} >> $GITHUB_ENV git config user.name "$AUTHOR" git config user.email "<$EMAIL>" From c22de89478cee64257f7c02fea7f990e8b758d11 Mon Sep 17 00:00:00 2001 From: Marco Massenzio <1153951+massenz@users.noreply.github.com> Date: Thu, 2 Mar 2023 22:51:31 -0800 Subject: [PATCH 2/4] [#76] Add health check for server (#87) * Added HEALTHCHECK directive & binary for container health status check --- .run/Run all tests.run.xml | 2 +- Makefile | 18 ++++--- clients/grpc_health.go | 68 +++++++++++++++++++++++++ docker/Dockerfile | 8 ++- docker/entrypoint.sh | 2 +- go.mod | 12 +++-- go.sum | 100 ++++++------------------------------- grpc/grpc_server.go | 37 +++++++++++--- grpc/grpc_server_test.go | 95 ++++++++++++++++++++++++++++++++++- 9 files changed, 236 insertions(+), 106 deletions(-) create mode 100644 clients/grpc_health.go diff --git a/.run/Run all tests.run.xml b/.run/Run all tests.run.xml index 69bd431..1e9c0dc 100644 --- a/.run/Run all tests.run.xml +++ b/.run/Run all tests.run.xml @@ -4,7 +4,7 @@ - + diff --git a/Makefile b/Makefile index 90dbe38..6522237 100644 --- a/Makefile +++ b/Makefile @@ -6,11 +6,12 @@ GOOS ?= $(shell uname -s | tr "[:upper:]" "[:lower:]") GOARCH ?= amd64 GOMOD := $(shell go list -m) -version := v0.10.0 +version := v0.10.1 release := $(version)-g$(shell git rev-parse --short HEAD) prog := sm-server bin := out/bin/$(prog)-$(version)_$(GOOS)-$(GOARCH) dockerbin := out/bin/$(prog)-$(version)_linux-amd64 +healthcheck := out/bin/grpc-health_linux-amd64 image := massenz/statemachine compose := docker/compose.yaml @@ -60,15 +61,18 @@ fmt: ## Formats the Go source code using 'go fmt' .PHONY: build test container cov clean fmt $(bin): cmd/main.go $(srcs) @mkdir -p $(shell dirname $(bin)) - GOOS=$(GOOS); GOARCH=$(GOARCH); go build \ + GOOS=$(GOOS) GOARCH=$(GOARCH) go build \ -ldflags "-X $(GOMOD)/api.Release=$(release)" \ -o $(bin) cmd/main.go -$(dockerbin): - GOOS=linux; GOARCH=amd64; go build \ +$(dockerbin): $(srcs) + GOOS=linux GOARCH=amd64 go build \ -ldflags "-X $(GOMOD)/api.Release=$(release)" \ -o $(dockerbin) cmd/main.go +$(healthcheck): clients/grpc_health.go + GOOS=linux GOARCH=amd64 go build -o $(healthcheck) clients/grpc_health.go + .PHONY: build build: $(bin) ## Builds the Statemachine server binary @@ -83,8 +87,10 @@ cov: $(srcs) $(test_srcs) ## Runs the Test Coverage target and opens a browser # Convenience targets to run locally containers and # setup the test environments. -container: $(dockerbin) ## Builds the container image - docker build --build-arg appname=$(dockerbin) -f $(dockerfile) -t $(image):$(release) . +container: $(dockerbin) $(healthcheck) ## Builds the container image + docker build --build-arg appname=$(dockerbin) \ + --build-arg hc=$(healthcheck) \ + -f $(dockerfile) -t $(image):$(release) . .PHONY: start start: ## Starts the Redis and LocalStack containers, and Creates the SQS Queues in LocalStack diff --git a/clients/grpc_health.go b/clients/grpc_health.go new file mode 100644 index 0000000..b8f19f3 --- /dev/null +++ b/clients/grpc_health.go @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2023 AlertAvert.com. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Author: Marco Massenzio (marco@alertavert.com) + */ + +package main + +import ( + "context" + "crypto/tls" + "flag" + "fmt" + "github.com/golang/protobuf/jsonpb" + protos "github.com/massenz/statemachine-proto/golang/api" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/protobuf/types/known/emptypb" + "log" + "time" +) + +// Most basic binary to run health checks on the server. +// Used to assert readiness of the container/pod in Docker/Kubernetes. +func main() { + var address = flag.String("host", "localhost:7398", + "The address (host:port) for the GRPC server") + var timeout = flag.Duration("timeout", 200*time.Millisecond, + "timeout expressed as a duration string (e.g., 200ms, 1s, etc.)") + var noTLS = flag.Bool("insecure", false, + "timeout expressed as a duration string (e.g., 200ms, 1s, etc.)") + flag.Parse() + + var creds credentials.TransportCredentials + if *noTLS { + creds = insecure.NewCredentials() + } else { + config := &tls.Config{ + InsecureSkipVerify: true, + } + creds = credentials.NewTLS(config) + } + + cc, err := grpc.Dial(*address, grpc.WithTransportCredentials(creds)) + defer cc.Close() + if err != nil { + log.Fatalf("cannot open connection to %s: %v", *address, err) + } + + client := protos.NewStatemachineServiceClient(cc) + ctx, cancel := context.WithTimeout(context.Background(), *timeout) + defer cancel() + + resp, err := client.Health(ctx, &emptypb.Empty{}) + if err != nil { + log.Fatal("cannot connect to server:", err) + } + marshaler := &jsonpb.Marshaler{} + jsonString, err := marshaler.MarshalToString(resp) + if err != nil { + log.Fatal("Error while marshaling the message to JSON:", err) + } + fmt.Println(jsonString) +} diff --git a/docker/Dockerfile b/docker/Dockerfile index 5f41287..cb3e4ab 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -12,6 +12,7 @@ LABEL org.opencontainers.image.description="Statemachine gRPC Server" LABEL org.opencontainers.image.licenses=Apache-2.0 ARG appname +ARG hc #RUN apt-get update && apt-get install ca-certificates -y RUN groupadd -r sm-bot && useradd -r -g sm-bot sm-bot @@ -19,18 +20,21 @@ RUN groupadd -r sm-bot && useradd -r -g sm-bot sm-bot # Sensible defaults for the server, for reference # we list all the environment variables used by the # entrypoint script. -# ENV GRPC_PORT=7398 SERVER_PORT=7399 DEBUG="" \ EVENTS_Q="events" NOTIFICATIONS_Q="notifications" \ ACKS_Q="" ERRORS_ONLY=t \ - CLUSTER="" REDIS="redis:6379" + CLUSTER="" REDIS="redis:6379" \ + TIMEOUT=500ms INSECURE="" WORKDIR /app RUN chown sm-bot:sm-bot /app USER sm-bot ADD $appname ./sm-server +ADD $hc ./ping ADD docker/entrypoint.sh ./ EXPOSE ${SERVER_PORT} ENTRYPOINT ["./entrypoint.sh"] +HEALTHCHECK --start-period=5s --interval=30s --timeout=200ms --retries=5 \ + CMD ./ping -host localhost:$GRPC_PORT -timeout $TIMEOUT $INSECURE diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh index 3419f8c..1280a55 100755 --- a/docker/entrypoint.sh +++ b/docker/entrypoint.sh @@ -40,7 +40,7 @@ then retries="-max-retries ${RETRIES}" fi -cmd="./sm-server -grpc-port ${GRPC_PORT} -redis ${REDIS:-} ${DEBUG:-} +cmd="./sm-server -grpc-port ${GRPC_PORT} -redis ${REDIS:-} ${DEBUG:-} ${INSECURE:-} ${endpoint} ${timeout} ${retries} ${events} ${notifications} $@" echo $cmd diff --git a/go.mod b/go.mod index fa4f70f..1d1b1f3 100644 --- a/go.mod +++ b/go.mod @@ -9,11 +9,12 @@ require ( github.com/golang/protobuf v1.5.2 github.com/google/uuid v1.3.0 github.com/massenz/slf4go v0.3.2-g4eb5504 - github.com/massenz/statemachine-proto/golang v1.1.0-beta-g1fc5dd8 + github.com/massenz/statemachine-proto/golang v1.2.0-g8dbe9c5 github.com/onsi/ginkgo v1.16.5 github.com/onsi/gomega v1.18.1 + github.com/stretchr/testify v1.8.2 github.com/testcontainers/testcontainers-go v0.18.0 - google.golang.org/grpc v1.51.0 + google.golang.org/grpc v1.53.0 google.golang.org/protobuf v1.28.1 ) @@ -21,8 +22,9 @@ require ( github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect github.com/Microsoft/go-winio v0.5.2 // indirect github.com/cenkalti/backoff/v4 v4.2.0 // indirect - github.com/cespare/xxhash/v2 v2.1.2 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/containerd/containerd v1.6.18 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/docker/distribution v2.8.1+incompatible // indirect github.com/docker/docker v23.0.0+incompatible // indirect @@ -43,11 +45,13 @@ require ( github.com/opencontainers/image-spec v1.1.0-rc2 // indirect github.com/opencontainers/runc v1.1.3 // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/sirupsen/logrus v1.9.0 // indirect + github.com/stretchr/objx v0.5.0 // indirect golang.org/x/net v0.7.0 // indirect golang.org/x/sys v0.5.0 // indirect golang.org/x/text v0.7.0 // indirect - google.golang.org/genproto v0.0.0-20220617124728-180714bec0ad // indirect + google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f // indirect gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index 8474dc4..bad3954 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,3 @@ -cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= -cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8= github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= @@ -8,27 +6,17 @@ github.com/JiaYongfei/respect v0.0.0-20211019032000-61a979c8e39a/go.mod h1:3hhXC github.com/Microsoft/go-winio v0.5.2 h1:a9IhgEQBCUEk6QCdml9CiJGhAws+YwffDHEMp1VMrpA= github.com/Microsoft/go-winio v0.5.2/go.mod h1:WpS1mjBmmwHBEWmogvA2mj8546UReBk4v8QkMxJ6pZY= github.com/Microsoft/hcsshim v0.9.6 h1:VwnDOgLeoi2du6dAznfmspNqTiwczvjv4K7NxuY9jsY= -github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/aws/aws-sdk-go v1.44.4 h1:ePN0CVJMdiz2vYUcJH96eyxRrtKGSDMgyhP6rah2OgE= github.com/aws/aws-sdk-go v1.44.4/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo= github.com/cenkalti/backoff/v4 v4.2.0 h1:HN5dHm3WBOgndBH6E8V0q2jIYIR3s9yglV8k/+MN3u4= github.com/cenkalti/backoff/v4 v4.2.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= -github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= -github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= -github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/checkpoint-restore/go-criu/v5 v5.3.0/go.mod h1:E/eQpaFtUKGOOSEBZgmKAcn+zUUwWxqcaKZlF54wK8E= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/cilium/ebpf v0.7.0/go.mod h1:/oI2+1shJiTGAMgl6/RgJr36Eo1jzrRcAWbcXO2usCA= -github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= -github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= -github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= -github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XPnfI9Z40ddWsdw2W/uZgQLFXToKeRcDiI= -github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= -github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= -github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/containerd/console v1.0.3/go.mod h1:7LqA/THxQ86k76b8c/EMSiaJ3h1eZkMkXar0TQ1gf3U= github.com/containerd/containerd v1.6.18 h1:qZbsLvmyu+Vlty0/Ex5xc0z2YtKpIsb5n45mAMI+2Ns= github.com/containerd/containerd v1.6.18/go.mod h1:1RdCUu95+gc2v9t3IL+zIlpClSmew7/0YS8O5eQZrOw= @@ -52,18 +40,11 @@ github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5Xh github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= -github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= -github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= -github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= -github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= -github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE= -github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.5.4 h1:jRbGcIw6P2Meqdwuo0H1p6JVLbL5DHKAKlYndzMwVZI= github.com/fsnotify/fsnotify v1.5.4/go.mod h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU= -github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= @@ -71,36 +52,25 @@ github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5x github.com/godbus/dbus/v5 v5.0.6/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= -github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= -github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= -github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= -github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= -github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= -github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= @@ -121,8 +91,8 @@ github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0V github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= github.com/massenz/slf4go v0.3.2-g4eb5504 h1:tRrxPOKcqNKQn25eS8Dy9bW3NMNPpuK4Sla9jKfWmSs= github.com/massenz/slf4go v0.3.2-g4eb5504/go.mod h1:ZJjthXAnZMJGwXUz3Z3v5uyban00uAFFoDYODOoLFpw= -github.com/massenz/statemachine-proto/golang v1.1.0-beta-g1fc5dd8 h1:Dp2yv070ogiHLwQU5LppXskUDnCoO8tDkqgszyZMNmk= -github.com/massenz/statemachine-proto/golang v1.1.0-beta-g1fc5dd8/go.mod h1:g6CkyXxfs7XF8wv6OLdMZZDUu0fn4PY6HQQ2WDbW3GU= +github.com/massenz/statemachine-proto/golang v1.2.0-g8dbe9c5 h1:0QLU3fwkZg2s17QsjrJ4RKdxmWbsvF7WPzBeEO1m32Y= +github.com/massenz/statemachine-proto/golang v1.2.0-g8dbe9c5/go.mod h1:AYRhBXOvJkJDA0j6wce63gr0mQwX8Wfp3Qn9L/3cz28= github.com/moby/patternmatcher v0.5.0 h1:YCZgJOeULcxLw1Q+sVR636pmS7sPEn1Qo2iAN6M7DBo= github.com/moby/patternmatcher v0.5.0/go.mod h1:hDPoyOpDY7OrrMDLaYoY3hf52gNCR/YOUYxkhApJIxc= github.com/moby/sys/mountinfo v0.5.0/go.mod h1:3bMD3Rg+zkqx8MRYPi7Pyb0Ie97QEBmdxbhnCLlSvSU= @@ -162,8 +132,6 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= -github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.8.1 h1:geMPLpDpQOgVyCg5z5GoRwLHepNdb71NXb67XFkP+Eg= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/seccomp/libseccomp-golang v0.9.2-0.20220502022130-f33da4d89646/go.mod h1:JA8cRccbGaA1s33RQf7Y1+q9gHmZX1yB/z9WDN1C6fg= @@ -173,10 +141,16 @@ github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0= github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= +github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww= github.com/testcontainers/testcontainers-go v0.18.0 h1:8RXrcIQv5xX/uBOSmZd297gzvA7F0yuRA37/918o7Yg= github.com/testcontainers/testcontainers-go v0.18.0/go.mod h1:rLC7hR2SWRjJZZNrUYiTKvUXCziNxzZiYtz9icTWYNQ= @@ -185,43 +159,26 @@ github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYp github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= -golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= -golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= -golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= -golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= -golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= -golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= -golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -239,9 +196,7 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210906170528-6f6e22806c34/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -257,17 +212,12 @@ golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuX golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 h1:vVKdlvoWBphwdxWKrFZEuM0kGgGLxUOYcY4U/2Vjg44= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= -golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= -golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= @@ -277,36 +227,19 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= -google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= -google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= -google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= -google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= -google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= -google.golang.org/genproto v0.0.0-20220617124728-180714bec0ad h1:kqrS+lhvaMHCxul6sKQvKJ8nAAhlVItmZV822hYFH/U= -google.golang.org/genproto v0.0.0-20220617124728-180714bec0ad/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA= -google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= -google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= -google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= -google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= -google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0= -google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= -google.golang.org/grpc v1.47.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk= -google.golang.org/grpc v1.51.0 h1:E1eGv1FTqoLIdnBCZufiSHgKjlqG6fKFf6pPWtMTh8U= -google.golang.org/grpc v1.51.0/go.mod h1:wgNDFcnuBGmxLKI/qn4T+m5BtEBYXJPvibbUPsAIPww= +google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f h1:BWUVssLB0HVOSY78gIdvk1dTVYtT1y8SBWtPYuTJ/6w= +google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f/go.mod h1:RGgjbofJ8xD9Sq1VVhDM1Vok1vRONV+rg+CjzG4SZKM= +google.golang.org/grpc v1.53.0 h1:LAv2ds7cmFV/XTS3XG1NneeENYrXGmorPxsBbptIjNc= +google.golang.org/grpc v1.53.0/go.mod h1:OnIrk0ipVdj4N5d9IUoFUx72/VlD7+jUsHwZgwSMQpw= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= -google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= -google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= -google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w= google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= @@ -315,7 +248,6 @@ gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMy gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= @@ -326,5 +258,3 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gotest.tools/v3 v3.4.0 h1:ZazjZUfuVeZGLAmlKKuyv3IKP5orXcwtOwDQH6YVr6o= gotest.tools/v3 v3.4.0/go.mod h1:CtbdzLSsqVhDgMtKsx03ird5YTGB3ar27v0u/yKBW5g= -honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= -honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/grpc/grpc_server.go b/grpc/grpc_server.go index 7e673cd..0d1ea72 100644 --- a/grpc/grpc_server.go +++ b/grpc/grpc_server.go @@ -21,6 +21,7 @@ import ( "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/emptypb" "google.golang.org/protobuf/types/known/wrapperspb" "os" "path/filepath" @@ -59,6 +60,30 @@ type grpcSubscriber struct { *Config } +// Health will return the status of the server and the underlying store +func (s *grpcSubscriber) Health(context.Context, *emptypb.Empty) (*protos.HealthResponse, error) { + var response = &protos.HealthResponse{ + State: protos.HealthResponse_READY, + Release: api.Release, + TlsEnabled: s.TlsEnabled, + } + if s.Store == nil { + s.Logger.Error("Redis store not initialized, cannot process requests") + return nil, status.Error(codes.Internal, "data store not configured") + } + if s.EventsChannel == nil { + s.Logger.Error("events channel not initialized, cannot process events") + response.State = protos.HealthResponse_NOT_READY + return response, nil + } + err := s.Store.Health() + if err != nil { + s.Logger.Error("Redis store not ready: %v", err) + response.State = protos.HealthResponse_NOT_READY + } + return response, nil +} + func (s *grpcSubscriber) SendEvent(ctx context.Context, request *protos.EventRequest) (*protos. EventResponse, error) { if request.GetId() == "" { @@ -180,8 +205,8 @@ func (s *grpcSubscriber) GetFiniteStateMachine(ctx context.Context, in *protos.G func (s *grpcSubscriber) GetAllInState(ctx context.Context, in *protos.GetFsmRequest) ( *protos.ListResponse, error) { - cfgName := in.GetConfig() - if cfgName == "" { + cfg := in.GetConfig() + if cfg == "" { return nil, status.Errorf(codes.InvalidArgument, "configuration must always be specified") } state := in.GetState() @@ -189,16 +214,16 @@ func (s *grpcSubscriber) GetAllInState(ctx context.Context, in *protos.GetFsmReq // TODO: implement table scanning return nil, status.Errorf(codes.Unimplemented, "missing state, table scan not implemented") } - ids := s.Store.GetAllInState(cfgName, state) + ids := s.Store.GetAllInState(cfg, state) return &protos.ListResponse{Ids: ids}, nil } func (s *grpcSubscriber) GetEventOutcome(ctx context.Context, in *protos.EventRequest) ( *protos.EventResponse, error) { evtId := in.GetId() - config := in.GetConfig() - s.Logger.Debug("looking up EventOutcome %s (%s)", evtId, config) - outcome, ok := s.Store.GetOutcomeForEvent(evtId, config) + cfg := in.GetConfig() + s.Logger.Debug("looking up EventOutcome %s (%s)", evtId, cfg) + outcome, ok := s.Store.GetOutcomeForEvent(evtId, cfg) if !ok { return nil, status.Error(codes.NotFound, fmt.Sprintf("outcome for event %s not found", evtId)) } diff --git a/grpc/grpc_server_test.go b/grpc/grpc_server_test.go index 993bb40..7585573 100644 --- a/grpc/grpc_server_test.go +++ b/grpc/grpc_server_test.go @@ -12,12 +12,14 @@ package grpc_test import ( "context" "fmt" + "google.golang.org/protobuf/types/known/emptypb" "net" "strings" "time" "github.com/go-redis/redis/v8" slf4go "github.com/massenz/slf4go/logging" + "github.com/stretchr/testify/mock" g "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" @@ -33,6 +35,89 @@ import ( protos "github.com/massenz/statemachine-proto/golang/api" ) +type Mockstore struct { + mock.Mock +} + +func (m *Mockstore) SetLogLevel(level slf4go.LogLevel) { + //TODO implement me + panic("implement me") +} + +func (m *Mockstore) GetConfig(versionId string) (*protos.Configuration, bool) { + //TODO implement me + panic("implement me") +} + +func (m *Mockstore) PutConfig(cfg *protos.Configuration) error { + //TODO implement me + panic("implement me") +} + +func (m *Mockstore) GetAllConfigs() []string { + //TODO implement me + panic("implement me") +} + +func (m *Mockstore) GetAllVersions(name string) []string { + //TODO implement me + panic("implement me") +} + +func (m *Mockstore) GetStateMachine(id string, cfg string) (*protos.FiniteStateMachine, bool) { + //TODO implement me + panic("implement me") +} + +func (m *Mockstore) PutStateMachine(id string, fsm *protos.FiniteStateMachine) error { + //TODO implement me + panic("implement me") +} + +func (m *Mockstore) GetAllInState(cfg string, state string) []string { + //TODO implement me + panic("implement me") +} + +func (m *Mockstore) UpdateState(cfgName string, id string, oldState string, newState string) error { + //TODO implement me + panic("implement me") +} + +func (m *Mockstore) GetEvent(id string, cfg string) (*protos.Event, bool) { + //TODO implement me + panic("implement me") +} + +func (m *Mockstore) PutEvent(event *protos.Event, cfg string, ttl time.Duration) error { + //TODO implement me + panic("implement me") +} + +func (m *Mockstore) AddEventOutcome(eventId string, cfgName string, response *protos.EventOutcome, ttl time.Duration) error { + //TODO implement me + panic("implement me") +} + +func (m *Mockstore) GetOutcomeForEvent(eventId string, cfgName string) (*protos.EventOutcome, bool) { + //TODO implement me + panic("implement me") +} + +func (m *Mockstore) SetTimeout(duration time.Duration) { + //TODO implement me + panic("implement me") +} + +func (m *Mockstore) GetTimeout() time.Duration { + //TODO implement me + panic("implement me") +} + +func (m *Mockstore) Health() error { + return nil +} + var bkgnd = context.Background() var _ = Describe("the gRPC Server", func() { When("processing events", func() { @@ -44,7 +129,7 @@ var _ = Describe("the gRPC Server", func() { BeforeEach(func() { var err error testCh = make(chan protos.EventRequest, 5) - listener, err = net.Listen("tcp", "localhost:5763") + listener, err = net.Listen("tcp", ":0") Ω(err).ShouldNot(HaveOccurred()) client = NewClient(listener.Addr().String(), false) @@ -58,6 +143,7 @@ var _ = Describe("the gRPC Server", func() { EventsChannel: testCh, Logger: l, ServerAddress: listener.Addr().String(), + Store: new(Mockstore), }) Ω(err).ToNot(HaveOccurred()) Ω(server).ToNot(BeNil()) @@ -69,6 +155,13 @@ var _ = Describe("the gRPC Server", func() { server.Stop() } }) + It("should have a healthy status", func() { + Eventually(func(g Gomega) { + hr, err := client.Health(bkgnd, &emptypb.Empty{}) + g.Ω(err).ToNot(HaveOccurred()) + g.Ω(hr.State).Should(Equal(protos.HealthResponse_READY)) + }, 100*time.Millisecond, 20*time.Millisecond).Should(Succeed()) + }) It("should succeed for well-formed events", func() { response, err := client.SendEvent(bkgnd, &protos.EventRequest{ Event: &protos.Event{ From 352f509794793bd1fc1514eeaca04f58c918f52e Mon Sep 17 00:00:00 2001 From: Marco Massenzio <1153951+massenz@users.noreply.github.com> Date: Fri, 3 Mar 2023 22:48:07 -0800 Subject: [PATCH 3/4] Fix code smells identified by SonarQube (#88) --- grpc/grpc_server.go | 18 +++- grpc/grpc_server_test.go | 69 +++++++-------- pubsub/listener_test.go | 16 ++-- pubsub/sqs_sub.go | 43 +++++----- pubsub/types.go | 14 ++-- storage/redis_sets_store.go | 18 ++-- storage/redis_store_test.go | 152 +++++++++++++++------------------- storage/storage_suite_test.go | 3 +- 8 files changed, 167 insertions(+), 166 deletions(-) diff --git a/grpc/grpc_server.go b/grpc/grpc_server.go index 0d1ea72..129fa28 100644 --- a/grpc/grpc_server.go +++ b/grpc/grpc_server.go @@ -114,11 +114,15 @@ func (s *grpcSubscriber) SendEvent(ctx context.Context, request *protos.EventReq } func (s *grpcSubscriber) PutConfiguration(ctx context.Context, cfg *protos.Configuration) (*protos.PutResponse, error) { - // FIXME: use Context to set a timeout, etc. if err := api.CheckValid(cfg); err != nil { s.Logger.Error("invalid configuration: %v", err) return nil, status.Errorf(codes.InvalidArgument, "invalid configuration: %v", err) } + if deadline, ok := ctx.Deadline(); ok { + if deadline.Before(time.Now()) { + return nil, ctx.Err() + } + } if err := s.Store.PutConfig(cfg); err != nil { s.Logger.Error("could not store configuration: %v", err) if strings.Contains(err.Error(), "already exists") { @@ -126,6 +130,11 @@ func (s *grpcSubscriber) PutConfiguration(ctx context.Context, cfg *protos.Confi } return nil, status.Error(codes.Internal, err.Error()) } + if deadline, ok := ctx.Deadline(); ok { + if deadline.Before(time.Now()) { + return nil, ctx.Err() + } + } s.Logger.Trace("configuration stored: %s", api.GetVersionId(cfg)) return &protos.PutResponse{ Id: api.GetVersionId(cfg), @@ -164,6 +173,11 @@ func (s *grpcSubscriber) PutFiniteStateMachine(ctx context.Context, return nil, status.Error(codes.FailedPrecondition, storage.NotFoundError( fsm.ConfigId).Error()) } + if deadline, ok := ctx.Deadline(); ok { + if deadline.Before(time.Now()) { + return nil, ctx.Err() + } + } var id = request.Id if id == "" { id = uuid.NewString() @@ -178,6 +192,8 @@ func (s *grpcSubscriber) PutFiniteStateMachine(ctx context.Context, s.Logger.Error("could not store FSM [%v]: %v", fsm, err) return nil, status.Error(codes.Internal, err.Error()) } + // we cannot interrupt here, even if deadline is passed, as it would leave the + // store in an inconsistent state. if err := s.Store.UpdateState(cfg.Name, id, "", fsm.State); err != nil { s.Logger.Error("could not store FSM in state set [%s]: %v", fsm.State, err) return nil, status.Error(codes.Internal, err.Error()) diff --git a/grpc/grpc_server_test.go b/grpc/grpc_server_test.go index 7585573..3d8f8b7 100644 --- a/grpc/grpc_server_test.go +++ b/grpc/grpc_server_test.go @@ -35,83 +35,68 @@ import ( protos "github.com/massenz/statemachine-proto/golang/api" ) +var NotImplemented = storage.NotImplementedError("mock") + type Mockstore struct { mock.Mock } func (m *Mockstore) SetLogLevel(level slf4go.LogLevel) { - //TODO implement me - panic("implement me") } func (m *Mockstore) GetConfig(versionId string) (*protos.Configuration, bool) { - //TODO implement me - panic("implement me") + return nil, false } func (m *Mockstore) PutConfig(cfg *protos.Configuration) error { - //TODO implement me - panic("implement me") + return NotImplemented } func (m *Mockstore) GetAllConfigs() []string { - //TODO implement me - panic("implement me") + return nil } func (m *Mockstore) GetAllVersions(name string) []string { - //TODO implement me - panic("implement me") + return nil } func (m *Mockstore) GetStateMachine(id string, cfg string) (*protos.FiniteStateMachine, bool) { - //TODO implement me - panic("implement me") + return nil, false } func (m *Mockstore) PutStateMachine(id string, fsm *protos.FiniteStateMachine) error { - //TODO implement me - panic("implement me") + return NotImplemented } func (m *Mockstore) GetAllInState(cfg string, state string) []string { - //TODO implement me - panic("implement me") + return nil } func (m *Mockstore) UpdateState(cfgName string, id string, oldState string, newState string) error { - //TODO implement me - panic("implement me") + return NotImplemented } func (m *Mockstore) GetEvent(id string, cfg string) (*protos.Event, bool) { - //TODO implement me - panic("implement me") + return nil, false } func (m *Mockstore) PutEvent(event *protos.Event, cfg string, ttl time.Duration) error { - //TODO implement me - panic("implement me") + return NotImplemented } func (m *Mockstore) AddEventOutcome(eventId string, cfgName string, response *protos.EventOutcome, ttl time.Duration) error { - //TODO implement me - panic("implement me") + return NotImplemented } func (m *Mockstore) GetOutcomeForEvent(eventId string, cfgName string) (*protos.EventOutcome, bool) { - //TODO implement me - panic("implement me") + return nil, false } func (m *Mockstore) SetTimeout(duration time.Duration) { - //TODO implement me - panic("implement me") } func (m *Mockstore) GetTimeout() time.Duration { - //TODO implement me - panic("implement me") + return 0 } func (m *Mockstore) Health() error { @@ -162,12 +147,13 @@ var _ = Describe("the gRPC Server", func() { g.Ω(hr.State).Should(Equal(protos.HealthResponse_READY)) }, 100*time.Millisecond, 20*time.Millisecond).Should(Succeed()) }) + const EventName = "test-event" It("should succeed for well-formed events", func() { response, err := client.SendEvent(bkgnd, &protos.EventRequest{ Event: &protos.Event{ EventId: "1", Transition: &protos.Transition{ - Event: "test-vt", + Event: EventName, }, Originator: "test", }, @@ -181,7 +167,7 @@ var _ = Describe("the gRPC Server", func() { select { case evt := <-testCh: Ω(evt.Event.EventId).To(Equal("1")) - Ω(evt.Event.Transition.Event).To(Equal("test-vt")) + Ω(evt.Event.Transition.Event).To(Equal(EventName)) Ω(evt.Event.Originator).To(Equal("test")) Ω(evt.Id).To(Equal("2")) case <-time.After(10 * time.Millisecond): @@ -193,7 +179,7 @@ var _ = Describe("the gRPC Server", func() { response, err := client.SendEvent(bkgnd, &protos.EventRequest{ Event: &protos.Event{ Transition: &protos.Transition{ - Event: "test-vt", + Event: EventName, }, Originator: "test", }, @@ -207,7 +193,7 @@ var _ = Describe("the gRPC Server", func() { select { case evt := <-testCh: Ω(evt.Event.EventId).Should(Equal(generatedId)) - Ω(evt.Event.Transition.Event).To(Equal("test-vt")) + Ω(evt.Event.Transition.Event).To(Equal(EventName)) case <-time.After(10 * time.Millisecond): Fail("Timed out") } @@ -216,7 +202,7 @@ var _ = Describe("the gRPC Server", func() { _, err := client.SendEvent(bkgnd, &protos.EventRequest{ Event: &protos.Event{ Transition: &protos.Transition{ - Event: "test-vt", + Event: EventName, }, Originator: "test", }, @@ -452,34 +438,35 @@ var _ = Describe("the gRPC Server", func() { AssertStatusCode(codes.NotFound, err) }) It("will find all FSMs by State", func() { + const ConfigName = "test.m" for i := 1; i <= 5; i++ { id := fmt.Sprintf("fsm-%d", i) Ω(store.PutStateMachine(id, &protos.FiniteStateMachine{ - ConfigId: "test.m:v1", + ConfigId: ConfigName + ":v1", State: "start", })).Should(Succeed()) - store.UpdateState("test.m", id, "", "start") + store.UpdateState(ConfigName, id, "", "start") } for i := 10; i < 13; i++ { id := fmt.Sprintf("fsm-%d", i) Ω(store.PutStateMachine(id, &protos.FiniteStateMachine{ - ConfigId: "test.m:v1", + ConfigId: ConfigName + ":v1", State: "stop", })).Should(Succeed()) - store.UpdateState("test.m", id, "", "stop") + store.UpdateState(ConfigName, id, "", "stop") } items, err := client.GetAllInState(bkgnd, &protos.GetFsmRequest{ - Config: "test.m", + Config: ConfigName, Query: &protos.GetFsmRequest_State{State: "start"}, }) Ω(err).ShouldNot(HaveOccurred()) Ω(len(items.GetIds())).Should(Equal(5)) Ω(items.GetIds()).Should(ContainElements("fsm-3", "fsm-5")) items, err = client.GetAllInState(bkgnd, &protos.GetFsmRequest{ - Config: "test.m", + Config: ConfigName, Query: &protos.GetFsmRequest_State{State: "stop"}, }) Ω(err).ShouldNot(HaveOccurred()) diff --git a/pubsub/listener_test.go b/pubsub/listener_test.go index cb655a8..1f326ea 100644 --- a/pubsub/listener_test.go +++ b/pubsub/listener_test.go @@ -43,10 +43,11 @@ var _ = Describe("A Listener", func() { // Set to DEBUG when diagnosing test failures testListener.SetLogLevel(logging.NONE) }) + const eventId = "1234-abcdef" It("can post error notifications", func() { defer close(notificationsCh) msg := protos.Event{ - EventId: "feed-beef", + EventId: eventId, Originator: "me", Transition: &protos.Transition{ Event: "test-me", @@ -76,17 +77,18 @@ var _ = Describe("A Listener", func() { }) It("can process well-formed events", func() { event := protos.Event{ - EventId: "feed-beef", + EventId: eventId, Originator: "me", Transition: &protos.Transition{ Event: "move", }, Details: "more details", } + const requestId = "12345-faa44" request := protos.EventRequest{ Event: &event, Config: "test", - Id: "12345-faa44", + Id: requestId, } Ω(store.PutConfig(&protos.Configuration{ Name: "test", @@ -95,7 +97,7 @@ var _ = Describe("A Listener", func() { Transitions: []*protos.Transition{{From: "start", To: "end", Event: "move"}}, StartingState: "start", })).ToNot(HaveOccurred()) - Ω(store.PutStateMachine("12345-faa44", &protos.FiniteStateMachine{ + Ω(store.PutStateMachine(requestId, &protos.FiniteStateMachine{ ConfigId: "test:v1", State: "start", History: nil, @@ -109,7 +111,7 @@ var _ = Describe("A Listener", func() { Eventually(func(g Gomega) { // Now we want to test that the state machine was updated - fsm, ok := store.GetStateMachine("12345-faa44", "test") + fsm, ok := store.GetStateMachine(requestId, "test") g.Ω(ok).ToNot(BeFalse()) g.Ω(fsm.State).To(Equal("end")) g.Ω(len(fsm.History)).To(Equal(1)) @@ -123,7 +125,7 @@ var _ = Describe("A Listener", func() { }) It("sends notifications for missing state-machine", func() { event := protos.Event{ - EventId: "feed-beef", + EventId: eventId, Originator: "me", Transition: &protos.Transition{ Event: "move", @@ -152,7 +154,7 @@ var _ = Describe("A Listener", func() { It("sends notifications for missing destinations", func() { request := protos.EventRequest{ Event: &protos.Event{ - EventId: "feed-beef", + EventId: eventId, }, } go func() { testListener.ListenForMessages() }() diff --git a/pubsub/sqs_sub.go b/pubsub/sqs_sub.go index 1a34be0..dbceb59 100644 --- a/pubsub/sqs_sub.go +++ b/pubsub/sqs_sub.go @@ -24,9 +24,6 @@ import ( protos "github.com/massenz/statemachine-proto/golang/api" ) -// TODO: should we need to generalize and abstract the implementation of a Subscriber? -// This would be necessary if we were to implement a different message broker (e.g., Kafka) - // getSqsClient connects to AWS and obtains an SQS client; passing `nil` as the `awsEndpointUrl` will // connect by default to AWS; use a different (possibly local) URL for a LocalStack test deployment. func getSqsClient(awsEndpointUrl *string) *sqs.SQS { @@ -61,11 +58,12 @@ func NewSqsSubscriber(eventsChannel chan<- protos.EventRequest, sqsUrl *string) return nil } return &SqsSubscriber{ - logger: log.NewLog("SQS-Sub"), - client: client, - events: eventsChannel, - Timeout: DefaultVisibilityTimeout, - PollingInterval: DefaultPollingInterval, + logger: log.NewLog("SQS-Sub"), + client: client, + events: eventsChannel, + Timeout: DefaultVisibilityTimeout, + PollingInterval: DefaultPollingInterval, + MessageRemoveRetries: DefaultRetries, } } @@ -134,14 +132,14 @@ func (s *SqsSubscriber) ProcessMessage(msg *sqs.Message, queueUrl *string) { var request protos.EventRequest err := proto.UnmarshalText(*msg.Body, &request) if err != nil { - s.logger.Error("Message %v has invalid body: %s", msg.MessageId, err.Error()) + s.logger.Error("message %v has invalid body: %s", msg.MessageId, err.Error()) // TODO: publish error to DLQ. return } destId := request.GetId() if destId == "" { - errDetails := fmt.Sprintf("No Destination ID in %v", request.String()) + errDetails := fmt.Sprintf("no Destination ID in %v", request.String()) s.logger.Error(errDetails) // TODO: publish error to DLQ. return @@ -150,16 +148,19 @@ func (s *SqsSubscriber) ProcessMessage(msg *sqs.Message, queueUrl *string) { api.UpdateEvent(request.Event) s.events <- request - s.logger.Debug("Removing message %v from SQS", *msg.MessageId) - _, err = s.client.DeleteMessage(&sqs.DeleteMessageInput{ - QueueUrl: queueUrl, - ReceiptHandle: msg.ReceiptHandle, - }) - if err != nil { - // FIXME: add retries - errDetails := fmt.Sprintf("Failed to remove message %v from SQS", msg.MessageId) - s.logger.Error("%s: %v", errDetails, err) - // TODO: publish error to DLQ, should also retry removal here. + for i := 0; i < s.MessageRemoveRetries; i++ { + s.logger.Debug("removing message %v from SQS", *msg.MessageId) + _, err = s.client.DeleteMessage(&sqs.DeleteMessageInput{ + QueueUrl: queueUrl, + ReceiptHandle: msg.ReceiptHandle, + }) + if err != nil { + errDetails := fmt.Sprintf("failed to remove message %v from SQS (attempt: %d)", + msg.MessageId, i+1) + s.logger.Error("%s: %v", errDetails, err) + } else { + break + } } - s.logger.Trace("Message %v removed", msg.MessageId) + s.logger.Trace("message %v removed", msg.MessageId) } diff --git a/pubsub/types.go b/pubsub/types.go index da69ecb..61651e0 100644 --- a/pubsub/types.go +++ b/pubsub/types.go @@ -25,6 +25,9 @@ const ( // message from the queue. // See: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html DefaultVisibilityTimeout = 5 * time.Second + + // DefaultRetries is the number of times we will try to remove the message from the SQS queue + DefaultRetries = 3 ) // An EventsListener will process `EventRequests` in a separate goroutine. @@ -59,9 +62,10 @@ type SqsPublisher struct { // The subscriber will poll the queue for new messages, // and will post them on the `events` channel from where an `EventsListener` will process them. type SqsSubscriber struct { - logger *log.Log - client *sqs.SQS - events chan<- protos.EventRequest - Timeout time.Duration - PollingInterval time.Duration + logger *log.Log + client *sqs.SQS + events chan<- protos.EventRequest + Timeout time.Duration + PollingInterval time.Duration + MessageRemoveRetries int } diff --git a/storage/redis_sets_store.go b/storage/redis_sets_store.go index 98fde88..4f0df67 100644 --- a/storage/redis_sets_store.go +++ b/storage/redis_sets_store.go @@ -14,6 +14,11 @@ import ( "fmt" ) +const ( + ReturningItemsFmt = "Returning %d items" + NoConfigurationsFmt = "Could not retrieve configurations: %s" +) + func (csm *RedisStore) UpdateState(cfgName string, id string, oldState string, newState string) error { var key string var err error @@ -44,10 +49,11 @@ func (csm *RedisStore) GetAllInState(cfg string, state string) []string { key := NewKeyForMachinesByState(cfg, state) fsms, err := csm.client.SMembers(context.Background(), key).Result() if err != nil { - csm.logger.Error("Could not retrieve FSMs for state `%s`: %s", state, err) + const format = "Could not retrieve FSMs for state `%s`: %s" + csm.logger.Error(format, state, err) return nil } - csm.logger.Debug("Returning %d items", len(fsms)) + csm.logger.Debug(ReturningItemsFmt, len(fsms)) return fsms } @@ -56,10 +62,10 @@ func (csm *RedisStore) GetAllConfigs() []string { csm.logger.Debug("Looking up all configs in DB") configs, err := csm.client.SMembers(context.Background(), ConfigsPrefix).Result() if err != nil { - csm.logger.Error("Could not retrieve configurations: %s", err) + csm.logger.Error(NoConfigurationsFmt, err) return nil } - csm.logger.Debug("Returning %d items", len(configs)) + csm.logger.Debug(ReturningItemsFmt, len(configs)) return configs } @@ -67,9 +73,9 @@ func (csm *RedisStore) GetAllVersions(name string) []string { csm.logger.Debug("Looking up all versions for Configurations `%s` in DB", name) configs, err := csm.client.SMembers(context.Background(), NewKeyForConfig(name)).Result() if err != nil { - csm.logger.Error("Could not retrieve configurations: %s", err) + csm.logger.Error(NoConfigurationsFmt, err) return nil } - csm.logger.Debug("Returning %d items", len(configs)) + csm.logger.Debug(ReturningItemsFmt, len(configs)) return configs } diff --git a/storage/redis_store_test.go b/storage/redis_store_test.go index be13b5d..18714e5 100644 --- a/storage/redis_store_test.go +++ b/storage/redis_store_test.go @@ -22,9 +22,48 @@ import ( protos "github.com/massenz/statemachine-proto/golang/api" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "strings" ) -var _ = Describe("RedisStore", func() { +const ( + cfgName = "orders" + fsmIdFmt = "fsm-%d" +) + +// configId is a constant in all but name +var configId = strings.Join([]string{cfgName, "v4"}, api.ConfigurationVersionSeparator) + +func setupStoreRedis() (storage.StoreManager, *redis.Client) { + store := storage.NewRedisStoreWithDefaults(container.Address) + Expect(store).ToNot(BeNil()) + store.SetLogLevel(slf4go.NONE) + + // This is used to go "behind the back" of our StoreManager and mess with it for testing + // purposes. Do NOT do this in your code. + rdb := redis.NewClient(&redis.Options{ + Addr: container.Address, + DB: storage.DefaultRedisDb, + }) + return store, rdb +} + +func storeSomeFSMs(store storage.StoreManager, count int) { + for id := 1; id < count; id++ { + fsm := &protos.FiniteStateMachine{ + ConfigId: configId, + State: "in_transit", + History: []*protos.Event{ + {Transition: &protos.Transition{Event: "confirmed"}, Originator: "bot"}, + {Transition: &protos.Transition{Event: "shipped"}, Originator: "bot"}, + }, + } + fsmId := fmt.Sprintf(fsmIdFmt, id) + Expect(store.PutStateMachine(fsmId, fsm)).ToNot(HaveOccurred()) + Expect(store.UpdateState("orders", fsmId, "", fsm.State)) + } +} + +var _ = Describe("Redis Store", func() { Context("for simple operations", func() { var store storage.StoreManager @@ -38,18 +77,7 @@ var _ = Describe("RedisStore", func() { StartingState: "start", } Expect(container).ToNot(BeNil()) - store = storage.NewRedisStoreWithDefaults(container.Address) - Expect(store).ToNot(BeNil()) - // Mute unnecessary logging during tests; re-enable ( - // and set to DEBUG) when diagnosing failures. - store.SetLogLevel(slf4go.NONE) - - // This is used to go "behind the back" of our StoreManager and mess with it for testing - // purposes. Do NOT do this in your code. - rdb = redis.NewClient(&redis.Options{ - Addr: container.Address, - DB: storage.DefaultRedisDb, - }) + store, rdb = setupStoreRedis() }, 0.5) AfterEach(func() { // Cleaning up the DB to prevent "dirty" store to impact test results @@ -120,7 +148,7 @@ var _ = Describe("RedisStore", func() { id := "99" // uuid.New().String() var found protos.FiniteStateMachine fsm := &protos.FiniteStateMachine{ - ConfigId: "orders:v4", + ConfigId: configId, State: "in_transit", History: []*protos.Event{ {Transition: &protos.Transition{Event: "confirmed"}, Originator: "bot"}, @@ -128,7 +156,7 @@ var _ = Describe("RedisStore", func() { }, } Expect(store.PutStateMachine(id, fsm)).ToNot(HaveOccurred()) - val, err := rdb.Get(context.Background(), storage.NewKeyForMachine(id, "orders")).Bytes() + val, err := rdb.Get(context.Background(), storage.NewKeyForMachine(id, cfgName)).Bytes() Expect(err).ToNot(HaveOccurred()) Expect(proto.Unmarshal(val, &found)).ToNot(HaveOccurred()) @@ -145,20 +173,20 @@ var _ = Describe("RedisStore", func() { It("can get events back", func() { id := uuid.New().String() ev := api.NewEvent("confirmed") - key := storage.NewKeyForEvent(id, "orders") + key := storage.NewKeyForEvent(id, cfgName) val, _ := proto.Marshal(ev) _, err := rdb.Set(context.Background(), key, val, storage.NeverExpire).Result() Expect(err).ToNot(HaveOccurred()) - found, ok := store.GetEvent(id, "orders") + found, ok := store.GetEvent(id, cfgName) Expect(ok).To(BeTrue()) Expect(found).To(Respect(ev)) }) It("can save events", func() { ev := api.NewEvent("confirmed") id := ev.EventId - Expect(store.PutEvent(ev, "orders", storage.NeverExpire)).ToNot(HaveOccurred()) - val, err := rdb.Get(context.Background(), storage.NewKeyForEvent(id, "orders")).Bytes() + Expect(store.PutEvent(ev, cfgName, storage.NeverExpire)).ToNot(HaveOccurred()) + val, err := rdb.Get(context.Background(), storage.NewKeyForEvent(id, cfgName)).Bytes() Expect(err).ToNot(HaveOccurred()) var found protos.Event @@ -166,12 +194,12 @@ var _ = Describe("RedisStore", func() { Expect(&found).To(Respect(ev)) }) It("will return an error for a non-existent event", func() { - _, ok := store.GetEvent("fake", "orders") + _, ok := store.GetEvent("fake", cfgName) Expect(ok).To(BeFalse()) }) It("can save an event Outcome", func() { id := uuid.New().String() - cfg := "orders" + cfg := cfgName response := &protos.EventOutcome{ Code: protos.EventOutcome_Ok, Config: "test", @@ -189,7 +217,7 @@ var _ = Describe("RedisStore", func() { }) It("can get an event Outcome", func() { id := uuid.New().String() - cfg := "orders" + cfg := cfgName response := &protos.EventOutcome{ Code: protos.EventOutcome_Ok, Details: "this was just a test", @@ -210,7 +238,7 @@ var _ = Describe("RedisStore", func() { Expect(store.PutStateMachine("fake", nil)).To(HaveOccurred()) }) It("should gracefully handle a nil Event", func() { - Expect(store.PutEvent(nil, "orders", storage.NeverExpire)).To(HaveOccurred()) + Expect(store.PutEvent(nil, cfgName, storage.NeverExpire)).To(HaveOccurred()) }) It("should gracefully handle a nil Outcome", func() { Expect(store.AddEventOutcome("fake", "test", nil, @@ -223,17 +251,7 @@ var _ = Describe("RedisStore", func() { var rdb *redis.Client BeforeEach(func() { - Expect(container).ToNot(BeNil()) - store = storage.NewRedisStoreWithDefaults(container.Address) - Expect(store).ToNot(BeNil()) - store.SetLogLevel(slf4go.NONE) - - // This is used to go "behind the back" of our StoreManager and mess with it for testing - // purposes. Do NOT do this in your code. - rdb = redis.NewClient(&redis.Options{ - Addr: container.Address, - DB: storage.DefaultRedisDb, - }) + store, rdb = setupStoreRedis() }, 0.5) AfterEach(func() { // Cleaning up the DB to prevent "dirty" store to impact test results @@ -241,20 +259,20 @@ var _ = Describe("RedisStore", func() { }, 0.2) It("can get all configuration names", func() { - for _, name := range []string{"orders", "devices", "users"} { + for _, name := range []string{cfgName, "devices", "users"} { Expect(store.PutConfig(&protos.Configuration{Name: name, Version: "v3", StartingState: "start"})). ToNot(HaveOccurred()) } configs := store.GetAllConfigs() Expect(len(configs)).To(Equal(3)) - Expect(configs).To(ContainElements("orders", "devices", "users")) + Expect(configs).To(ContainElements(cfgName, "devices", "users")) }) It("can get all versions of a configuration", func() { for _, version := range []string{"v1alpha1", "v1beta", "v1"} { - Expect(store.PutConfig(&protos.Configuration{Name: "orders", Version: version, StartingState: "start"})). + Expect(store.PutConfig(&protos.Configuration{Name: cfgName, Version: version, StartingState: "start"})). ToNot(HaveOccurred()) } - configs := store.GetAllVersions("orders") + configs := store.GetAllVersions(cfgName) Expect(len(configs)).To(Equal(3)) Expect(configs).To(ContainElements("orders:v1alpha1", "orders:v1beta", "orders:v1")) }) @@ -268,74 +286,40 @@ var _ = Describe("RedisStore", func() { var rdb *redis.Client BeforeEach(func() { - Expect(container).ToNot(BeNil()) - store = storage.NewRedisStoreWithDefaults(container.Address) - Expect(store).ToNot(BeNil()) - store.SetLogLevel(slf4go.NONE) - - // This is used to go "behind the back" of our StoreManager and mess with it for testing - // purposes. Do NOT do this in your code. - rdb = redis.NewClient(&redis.Options{ - Addr: container.Address, - DB: storage.DefaultRedisDb, - }) + store, rdb = setupStoreRedis() }, 0.5) AfterEach(func() { // Cleaning up the DB to prevent "dirty" store to impact test results rdb.FlushDB(context.Background()) }, 0.2) It("finds them by state", func() { - for id := 1; id < 5; id++ { - fsm := &protos.FiniteStateMachine{ - ConfigId: "orders:v4", - State: "in_transit", - History: []*protos.Event{ - {Transition: &protos.Transition{Event: "confirmed"}, Originator: "bot"}, - {Transition: &protos.Transition{Event: "shipped"}, Originator: "bot"}, - }, - } - fsmId := fmt.Sprintf("fsm-%d", id) - Expect(store.PutStateMachine(fsmId, fsm)).ToNot(HaveOccurred()) - Expect(store.UpdateState("orders", fsmId, "", fsm.State)) - } - res := store.GetAllInState("orders", "in_transit") + storeSomeFSMs(store, 5) + res := store.GetAllInState(cfgName, "in_transit") Expect(len(res)).To(Equal(4)) for id := 1; id < 5; id++ { - Expect(res).To(ContainElement(fmt.Sprintf("fsm-%d", id))) + Expect(res).To(ContainElement(fmt.Sprintf(fsmIdFmt, id))) } }) When("transitioning state", func() { BeforeEach(func() { - for id := 1; id < 10; id++ { - fsm := &protos.FiniteStateMachine{ - ConfigId: "orders:v4", - State: "in_transit", - History: []*protos.Event{ - {Transition: &protos.Transition{Event: "confirmed"}, Originator: "bot"}, - {Transition: &protos.Transition{Event: "shipped"}, Originator: "bot"}, - }, - } - fsmId := fmt.Sprintf("fsm-%d", id) - Expect(store.PutStateMachine(fsmId, fsm)).ToNot(HaveOccurred()) - Expect(store.UpdateState("orders", fsmId, "", fsm.State)) - } + storeSomeFSMs(store, 10) }) It("finds them", func() { for id := 3; id < 6; id++ { - fsmId := fmt.Sprintf("fsm-%d", id) - Expect(store.UpdateState("orders", fsmId, "in_transit", "shipped")) + fsmId := fmt.Sprintf(fsmIdFmt, id) + Expect(store.UpdateState(cfgName, fsmId, "in_transit", "shipped")) } - res := store.GetAllInState("orders", "shipped") + res := store.GetAllInState(cfgName, "shipped") Expect(len(res)).To(Equal(3)) for id := 3; id < 6; id++ { - Expect(res).To(ContainElement(fmt.Sprintf("fsm-%d", id))) + Expect(res).To(ContainElement(fmt.Sprintf(fsmIdFmt, id))) } - res = store.GetAllInState("orders", "in_transit") + res = store.GetAllInState(cfgName, "in_transit") Expect(len(res)).To(Equal(6)) }) It("will remove with an empty newState", func() { - Expect(store.UpdateState("orders", "fsm-1", "in_transit", "")).To(Succeed()) - res := store.GetAllInState("orders", "in_transit") + Expect(store.UpdateState(cfgName, "fsm-1", "in_transit", "")).To(Succeed()) + res := store.GetAllInState(cfgName, "in_transit") Ω(res).ToNot(ContainElement("fsm-1")) }) }) diff --git a/storage/storage_suite_test.go b/storage/storage_suite_test.go index 09a95dc..0735bc9 100644 --- a/storage/storage_suite_test.go +++ b/storage/storage_suite_test.go @@ -28,7 +28,8 @@ var container *internals.Container var _ = BeforeSuite(func() { var err error container, err = internals.NewRedisContainer(context.Background()) - Expect(err).ToNot(HaveOccurred()) + Ω(err).ToNot(HaveOccurred()) + Ω(container).ToNot(BeNil()) // Note the timeout here is in seconds (and it's not a time.Duration either) }, 5.0) From 04de5ca1e8cc3b1a92f3963fe3c2610c70acef14 Mon Sep 17 00:00:00 2001 From: Marco Massenzio <1153951+massenz@users.noreply.github.com> Date: Wed, 8 Mar 2023 22:39:43 -0800 Subject: [PATCH 4/4] [#73] Refactor Redis Store methods to return errors instead of bool (#89) --- Makefile | 6 +- grpc/grpc_server.go | 31 +- grpc/grpc_server_test.go | 26 +- clients/grpc_health.go => grpc_health.go | 0 pubsub/listener.go | 8 +- pubsub/listener_test.go | 37 ++- storage/redis_sets_store.go | 81 ----- storage/redis_store.go | 394 ++++++++++++++--------- storage/redis_store_test.go | 149 +++++---- storage/types.go | 38 ++- 10 files changed, 390 insertions(+), 380 deletions(-) rename clients/grpc_health.go => grpc_health.go (100%) delete mode 100644 storage/redis_sets_store.go diff --git a/Makefile b/Makefile index 6522237..d32e114 100644 --- a/Makefile +++ b/Makefile @@ -6,7 +6,7 @@ GOOS ?= $(shell uname -s | tr "[:upper:]" "[:lower:]") GOARCH ?= amd64 GOMOD := $(shell go list -m) -version := v0.10.1 +version := v0.11.0 release := $(version)-g$(shell git rev-parse --short HEAD) prog := sm-server bin := out/bin/$(prog)-$(version)_$(GOOS)-$(GOARCH) @@ -70,8 +70,8 @@ $(dockerbin): $(srcs) -ldflags "-X $(GOMOD)/api.Release=$(release)" \ -o $(dockerbin) cmd/main.go -$(healthcheck): clients/grpc_health.go - GOOS=linux GOARCH=amd64 go build -o $(healthcheck) clients/grpc_health.go +$(healthcheck): grpc_health.go + GOOS=linux GOARCH=amd64 go build -o $(healthcheck) grpc_health.go .PHONY: build build: $(bin) ## Builds the Statemachine server binary diff --git a/grpc/grpc_server.go b/grpc/grpc_server.go index 129fa28..810b909 100644 --- a/grpc/grpc_server.go +++ b/grpc/grpc_server.go @@ -157,8 +157,9 @@ func (s *grpcSubscriber) GetConfiguration(ctx context.Context, configId *wrapper *protos.Configuration, error) { cfgId := configId.Value s.Logger.Trace("retrieving Configuration %s", cfgId) - cfg, found := s.Store.GetConfig(cfgId) - if !found { + cfg, err := s.Store.GetConfig(cfgId) + if err != nil { + s.Logger.Error("could not get configuration: %v", err) return nil, status.Errorf(codes.NotFound, "configuration %s not found", cfgId) } return cfg, nil @@ -168,8 +169,8 @@ func (s *grpcSubscriber) PutFiniteStateMachine(ctx context.Context, request *protos.PutFsmRequest) (*protos.PutResponse, error) { fsm := request.Fsm // First check that the configuration for the FSM is valid - cfg, ok := s.Store.GetConfig(fsm.ConfigId) - if !ok { + cfg, err := s.Store.GetConfig(fsm.ConfigId) + if err != nil { return nil, status.Error(codes.FailedPrecondition, storage.NotFoundError( fsm.ConfigId).Error()) } @@ -212,8 +213,8 @@ func (s *grpcSubscriber) GetFiniteStateMachine(ctx context.Context, in *protos.G return nil, status.Error(codes.InvalidArgument, "ID must always be provided when looking up statemachine") } s.Logger.Debug("looking up FSM [%s] (Configuration: %s)", fsmId, cfg) - fsm, ok := s.Store.GetStateMachine(fsmId, cfg) - if !ok { + fsm, err := s.Store.GetStateMachine(fsmId, cfg) + if err != nil { return nil, status.Error(codes.NotFound, storage.NotFoundError(fsmId).Error()) } return fsm, nil @@ -239,9 +240,9 @@ func (s *grpcSubscriber) GetEventOutcome(ctx context.Context, in *protos.EventRe evtId := in.GetId() cfg := in.GetConfig() s.Logger.Debug("looking up EventOutcome %s (%s)", evtId, cfg) - outcome, ok := s.Store.GetOutcomeForEvent(evtId, cfg) - if !ok { - return nil, status.Error(codes.NotFound, fmt.Sprintf("outcome for event %s not found", evtId)) + outcome, err := s.Store.GetOutcomeForEvent(evtId, cfg) + if err != nil { + return nil, status.Errorf(codes.NotFound, "cannot get outcome for event %s: %v", evtId, err) } return &protos.EventResponse{ EventId: evtId, @@ -256,9 +257,9 @@ func (s *grpcSubscriber) StreamAllInstate(in *protos.GetFsmRequest, stream State } cfgName := in.GetConfig() for _, id := range response.GetIds() { - fsm, found := s.Store.GetStateMachine(id, cfgName) - if !found { - return storage.NotFoundError(id) + fsm, err := s.Store.GetStateMachine(id, cfgName) + if err != nil { + return err } if err = stream.SendMsg(&protos.PutResponse{ Id: id, @@ -280,9 +281,9 @@ func (s *grpcSubscriber) StreamAllConfigurations(in *wrapperspb.StringValue, str return nil } for _, cfgId := range response.GetIds() { - cfg, found := s.Store.GetConfig(cfgId) - if !found { - return storage.NotFoundError(cfgId) + cfg, err := s.Store.GetConfig(cfgId) + if err != nil { + return err } if err = stream.SendMsg(cfg); err != nil { return err diff --git a/grpc/grpc_server_test.go b/grpc/grpc_server_test.go index 3d8f8b7..f3a16a1 100644 --- a/grpc/grpc_server_test.go +++ b/grpc/grpc_server_test.go @@ -44,11 +44,11 @@ type Mockstore struct { func (m *Mockstore) SetLogLevel(level slf4go.LogLevel) { } -func (m *Mockstore) GetConfig(versionId string) (*protos.Configuration, bool) { - return nil, false +func (m *Mockstore) GetConfig(versionId string) (*protos.Configuration, storage.StoreErr) { + return nil, nil } -func (m *Mockstore) PutConfig(cfg *protos.Configuration) error { +func (m *Mockstore) PutConfig(cfg *protos.Configuration) storage.StoreErr { return NotImplemented } @@ -60,8 +60,8 @@ func (m *Mockstore) GetAllVersions(name string) []string { return nil } -func (m *Mockstore) GetStateMachine(id string, cfg string) (*protos.FiniteStateMachine, bool) { - return nil, false +func (m *Mockstore) GetStateMachine(id string, cfg string) (*protos.FiniteStateMachine, storage.StoreErr) { + return nil, NotImplemented } func (m *Mockstore) PutStateMachine(id string, fsm *protos.FiniteStateMachine) error { @@ -76,8 +76,8 @@ func (m *Mockstore) UpdateState(cfgName string, id string, oldState string, newS return NotImplemented } -func (m *Mockstore) GetEvent(id string, cfg string) (*protos.Event, bool) { - return nil, false +func (m *Mockstore) GetEvent(id string, cfg string) (*protos.Event, storage.StoreErr) { + return nil, NotImplemented } func (m *Mockstore) PutEvent(event *protos.Event, cfg string, ttl time.Duration) error { @@ -88,8 +88,8 @@ func (m *Mockstore) AddEventOutcome(eventId string, cfgName string, response *pr return NotImplemented } -func (m *Mockstore) GetOutcomeForEvent(eventId string, cfgName string) (*protos.EventOutcome, bool) { - return nil, false +func (m *Mockstore) GetOutcomeForEvent(eventId string, cfgName string) (*protos.EventOutcome, storage.StoreErr) { + return nil, NotImplemented } func (m *Mockstore) SetTimeout(duration time.Duration) { @@ -295,14 +295,14 @@ var _ = Describe("the gRPC Server", func() { } }) It("should store valid configurations", func() { - _, ok := store.GetConfig(GetVersionId(cfg)) - Ω(ok).To(BeFalse()) + _, err := store.GetConfig(GetVersionId(cfg)) + Ω(err).ToNot(BeNil()) response, err := client.PutConfiguration(bkgnd, cfg) Ω(err).ToNot(HaveOccurred()) Ω(response).ToNot(BeNil()) Ω(response.Id).To(Equal(GetVersionId(cfg))) - found, ok := store.GetConfig(response.Id) - Ω(ok).Should(BeTrue()) + found, err := store.GetConfig(response.Id) + Ω(err).Should(BeNil()) Ω(found).Should(Respect(cfg)) }) It("should fail for invalid configuration", func() { diff --git a/clients/grpc_health.go b/grpc_health.go similarity index 100% rename from clients/grpc_health.go rename to grpc_health.go diff --git a/pubsub/listener.go b/pubsub/listener.go index a233a28..d9d9125 100644 --- a/pubsub/listener.go +++ b/pubsub/listener.go @@ -69,16 +69,16 @@ func (listener *EventsListener) ListenForMessages() { fmt.Sprintf("could not store event: %v", err))) continue } - fsm, ok := listener.store.GetStateMachine(fsmId, config) - if !ok { + fsm, err := listener.store.GetStateMachine(fsmId, config) + if err != nil { listener.PostNotificationAndReportOutcome(makeResponse(&request, protos.EventOutcome_FsmNotFound, fmt.Sprintf("statemachine [%s] could not be found", fsmId))) continue } // TODO: cache the configuration locally: they are immutable anyway. - cfg, ok := listener.store.GetConfig(fsm.ConfigId) - if !ok { + cfg, err := listener.store.GetConfig(fsm.ConfigId) + if err != nil { listener.PostNotificationAndReportOutcome(makeResponse(&request, protos.EventOutcome_ConfigurationNotFound, fmt.Sprintf("configuration [%s] could not be found", fsm.ConfigId))) diff --git a/pubsub/listener_test.go b/pubsub/listener_test.go index 1f326ea..d547ccc 100644 --- a/pubsub/listener_test.go +++ b/pubsub/listener_test.go @@ -10,6 +10,7 @@ package pubsub_test import ( + . "github.com/JiaYongfei/respect/gomega" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -111,17 +112,17 @@ var _ = Describe("A Listener", func() { Eventually(func(g Gomega) { // Now we want to test that the state machine was updated - fsm, ok := store.GetStateMachine(requestId, "test") - g.Ω(ok).ToNot(BeFalse()) + fsm, err := store.GetStateMachine(requestId, "test") + g.Ω(err).To(BeNil()) g.Ω(fsm.State).To(Equal("end")) g.Ω(len(fsm.History)).To(Equal(1)) g.Ω(fsm.History[0].Details).To(Equal("more details")) g.Ω(fsm.History[0].Transition.Event).To(Equal("move")) }).Should(Succeed()) - Eventually(func() bool { - _, found := store.GetEvent(event.EventId, "test") - return found - }).Should(BeTrue()) + Eventually(func() storage.StoreErr { + _, err := store.GetEvent(event.EventId, "test") + return err + }).Should(BeNil()) }) It("sends notifications for missing state-machine", func() { event := protos.Event{ @@ -215,18 +216,22 @@ var _ = Describe("A Listener", func() { return nil } }).Should(BeNil()) - Eventually(func() *protos.Event { - e, _ := store.GetEvent(event.EventId, request.Config) - return e - }, 100*time.Millisecond, 20*time.Millisecond).ShouldNot(BeNil()) - Eventually(func() protos.EventOutcome_StatusCode { - e, ok := store.GetOutcomeForEvent(event.EventId, request.Config) - if ok { - return e.Code + Eventually(func(g Gomega) { + evt, err := store.GetEvent(event.EventId, request.Config) + Ω(err).ToNot(HaveOccurred()) + if evt != nil { + Ω(evt).To(Respect(&event)) } else { - return protos.EventOutcome_GenericError + Fail("event is nil") + } + }, 100*time.Millisecond, 20*time.Millisecond).Should(Succeed()) + Eventually(func(g Gomega) { + outcome, err := store.GetOutcomeForEvent(event.EventId, request.Config) + Ω(err).ToNot(HaveOccurred()) + if outcome != nil { + Ω(outcome.Code).To(Equal(protos.EventOutcome_Ok)) } - }, 100*time.Millisecond, 20*time.Millisecond).Should(Equal(protos.EventOutcome_Ok)) + }, 100*time.Millisecond, 20*time.Millisecond).Should(Succeed()) }) }) }) diff --git a/storage/redis_sets_store.go b/storage/redis_sets_store.go deleted file mode 100644 index 4f0df67..0000000 --- a/storage/redis_sets_store.go +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright (c) 2022 AlertAvert.com. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Author: Marco Massenzio (marco@alertavert.com) - */ - -package storage - -import ( - "context" - "fmt" -) - -const ( - ReturningItemsFmt = "Returning %d items" - NoConfigurationsFmt = "Could not retrieve configurations: %s" -) - -func (csm *RedisStore) UpdateState(cfgName string, id string, oldState string, newState string) error { - var key string - var err error - if oldState != "" { - key = NewKeyForMachinesByState(cfgName, oldState) - err = csm.client.SRem(context.Background(), key, id).Err() - if err != nil { - return fmt.Errorf( - "cannot remove FSM [%s#%s] from state set `%s`: %s", - cfgName, id, oldState, err) - } - } - if newState != "" { - key = NewKeyForMachinesByState(cfgName, newState) - err = csm.client.SAdd(context.Background(), key, id).Err() - if err != nil { - return fmt.Errorf( - "cannot add FSM [%s#%s] to state set `%s`: %s", - cfgName, id, newState, err) - } - } - return nil -} - -func (csm *RedisStore) GetAllInState(cfg string, state string) []string { - // TODO: enable splitting results with a (cursor, count) - csm.logger.Debug("Looking up all FSMs [%s] in DB with state `%s`", cfg, state) - key := NewKeyForMachinesByState(cfg, state) - fsms, err := csm.client.SMembers(context.Background(), key).Result() - if err != nil { - const format = "Could not retrieve FSMs for state `%s`: %s" - csm.logger.Error(format, state, err) - return nil - } - csm.logger.Debug(ReturningItemsFmt, len(fsms)) - return fsms -} - -func (csm *RedisStore) GetAllConfigs() []string { - // TODO: enable splitting results with a (cursor, count) - csm.logger.Debug("Looking up all configs in DB") - configs, err := csm.client.SMembers(context.Background(), ConfigsPrefix).Result() - if err != nil { - csm.logger.Error(NoConfigurationsFmt, err) - return nil - } - csm.logger.Debug(ReturningItemsFmt, len(configs)) - return configs -} - -func (csm *RedisStore) GetAllVersions(name string) []string { - csm.logger.Debug("Looking up all versions for Configurations `%s` in DB", name) - configs, err := csm.client.SMembers(context.Background(), NewKeyForConfig(name)).Result() - if err != nil { - csm.logger.Error(NoConfigurationsFmt, err) - return nil - } - csm.logger.Debug(ReturningItemsFmt, len(configs)) - return configs -} diff --git a/storage/redis_store.go b/storage/redis_store.go index ccaa75b..0650398 100644 --- a/storage/redis_store.go +++ b/storage/redis_store.go @@ -26,11 +26,12 @@ import ( ) const ( - NeverExpire = 0 - DefaultRedisPort = "6379" - DefaultRedisDb = 0 - DefaultMaxRetries = 3 - DefaultTimeout = 200 * time.Millisecond + NeverExpire = 0 + DefaultRedisDb = 0 + DefaultMaxRetries = 3 + DefaultTimeout = 200 * time.Millisecond + ReturningItemsFmt = "Returning %d items" + NoConfigurationsFmt = "Could not retrieve configurations: %s" ) type RedisStore struct { @@ -40,141 +41,11 @@ type RedisStore struct { MaxRetries int } -func (csm *RedisStore) GetConfig(id string) (*protos.Configuration, bool) { - key := NewKeyForConfig(id) - var cfg protos.Configuration - err := csm.get(key, &cfg) - if err != nil { - csm.logger.Error("Error retrieving configuration `%s`: %s", id, err.Error()) - return nil, false - } - return &cfg, true -} - -func (csm *RedisStore) GetEvent(id string, cfg string) (*protos.Event, bool) { - key := NewKeyForEvent(id, cfg) - var event protos.Event - err := csm.get(key, &event) - if err != nil { - csm.logger.Error("Error retrieving event `%s`: %s", key, err.Error()) - return nil, false - } - return &event, true -} - -func (csm *RedisStore) GetStateMachine(id string, cfg string) (*protos.FiniteStateMachine, bool) { - key := NewKeyForMachine(id, cfg) - var stateMachine protos.FiniteStateMachine - err := csm.get(key, &stateMachine) - if err != nil { - csm.logger.Error("cannot access store for state machine `%s`: %s", key, err.Error()) - return nil, false - } - return &stateMachine, true -} - -func (csm *RedisStore) PutConfig(cfg *protos.Configuration) error { - if cfg == nil { - return IllegalStoreError("nil config") - } - key := NewKeyForConfig(api.GetVersionId(cfg)) - if csm.client.Exists(context.Background(), key).Val() == 1 { - return AlreadyExistsError(key) - } - // TODO: Find out whether the client allows to batch requests, instead of sending multiple server requests - csm.client.SAdd(context.Background(), ConfigsPrefix, cfg.Name) - csm.client.SAdd(context.Background(), NewKeyForConfig(cfg.Name), api.GetVersionId(cfg)) - return csm.put(key, cfg, NeverExpire) -} - -func (csm *RedisStore) PutEvent(event *protos.Event, cfg string, ttl time.Duration) error { - if event == nil { - return IllegalStoreError("nil event") - } - key := NewKeyForEvent(event.EventId, cfg) - return csm.put(key, event, ttl) -} - -func (csm *RedisStore) PutStateMachine(id string, stateMachine *protos.FiniteStateMachine) error { - if stateMachine == nil { - return IllegalStoreError("nil statemachine") - } - configName := strings.Split(stateMachine.ConfigId, api.ConfigurationVersionSeparator)[0] - key := NewKeyForMachine(id, configName) - return csm.put(key, stateMachine, NeverExpire) -} - -func (csm *RedisStore) AddEventOutcome(id string, cfg string, response *protos.EventOutcome, ttl time.Duration) error { - if response == nil { - return IllegalStoreError("nil response") - } - key := NewKeyForOutcome(id, cfg) - return csm.put(key, response, ttl) -} - -func (csm *RedisStore) GetOutcomeForEvent(id string, cfg string) (*protos.EventOutcome, bool) { - key := NewKeyForOutcome(id, cfg) - var outcome protos.EventOutcome - err := csm.get(key, &outcome) - if err != nil { - csm.logger.Error("Error retrieving outcome for event `%s`: %s", key, err.Error()) - return nil, false - } - return &outcome, true -} - -func (csm *RedisStore) SetTimeout(duration time.Duration) { - csm.Timeout = duration -} - -func (csm *RedisStore) GetTimeout() time.Duration { - return csm.Timeout -} - -func NewRedisStoreWithDefaults(address string) StoreManager { - return NewRedisStore(address, false, DefaultRedisDb, DefaultTimeout, DefaultMaxRetries) -} +/////// Internal methods -func NewRedisStore(address string, isCluster bool, db int, timeout time.Duration, maxRetries int) StoreManager { - logger := slf4go.NewLog(fmt.Sprintf("redis://%s/%d", address, db)) - - var tlsConfig *tls.Config - var client redis.Cmdable - - if os.Getenv("REDIS_TLS") != "" { - logger.Info("Using TLS for Redis connection") - tlsConfig = &tls.Config{MinVersion: tls.VersionTLS12} - } - - if isCluster { - client = redis.NewClusterClient(&redis.ClusterOptions{ - TLSConfig: tlsConfig, - Addrs: strings.Split(address, ","), - }) - } else { - client = redis.NewClient(&redis.Options{ - TLSConfig: tlsConfig, - Addr: address, - DB: db, // 0 means default DB - }) - } - - return &RedisStore{ - logger: slf4go.NewLog(fmt.Sprintf("redis://%s/%d", address, db)), - client: client, - Timeout: timeout, - MaxRetries: maxRetries, - } -} - -// SetLogLevel for RedisStore implements the Loggable interface -func (csm *RedisStore) SetLogLevel(level slf4go.LogLevel) { - csm.logger.Level = level -} - -// `get` abstracts away the common functionality of looking for a key in Redis, +// get abstracts away the common functionality of looking for a key in Redis, // with a given timeout and a number of retries. -func (csm *RedisStore) get(key string, value proto.Message) error { +func (csm *RedisStore) get(key string, value proto.Message) StoreErr { attemptsLeft := csm.MaxRetries csm.logger.Trace("Looking up key `%s` (Max retries: %d)", key, attemptsLeft) var cancel context.CancelFunc @@ -193,7 +64,7 @@ func (csm *RedisStore) get(key string, value proto.Message) error { // The key isn't there, no point in retrying csm.logger.Debug("Key `%s` not found", key) cancel() - return err + return NotFoundError(key) } else if err != nil { if ctx.Err() == context.DeadlineExceeded { // The error here may be recoverable, so we'll keep trying until we run out of attempts @@ -209,7 +80,7 @@ func (csm *RedisStore) get(key string, value proto.Message) error { // This is a different error, we'll just return it csm.logger.Error(err.Error()) cancel() - return err + return GenericStoreError(err.Error()) } } else { cancel() @@ -218,7 +89,7 @@ func (csm *RedisStore) get(key string, value proto.Message) error { } } -func (csm *RedisStore) put(key string, value proto.Message, ttl time.Duration) error { +func (csm *RedisStore) put(key string, value proto.Message, ttl time.Duration) StoreErr { attemptsLeft := csm.MaxRetries csm.logger.Trace("Storing key `%s` (Max retries: %d)", key, attemptsLeft) var cancel context.CancelFunc @@ -235,48 +106,259 @@ func (csm *RedisStore) put(key string, value proto.Message, ttl time.Duration) e data, err := proto.Marshal(value) if err != nil { csm.logger.Error("cannot convert proto to bytes: %q", err) - return err + return InvalidDataError(err.Error()) } cmd := csm.client.Set(ctx, key, data, ttl) _, err = cmd.Result() if err != nil { if ctx.Err() == context.DeadlineExceeded { // The error here may be recoverable, so we'll keep trying until we run out of attempts - csm.logger.Error(err.Error()) if attemptsLeft == 0 { - csm.logger.Error("max retries reached, giving up") - return err + return TooManyAttempts("") } csm.logger.Debug("retrying after timeout, attempts left: %d", attemptsLeft) csm.wait() } else { - return err + return GenericStoreError(err.Error()) } } else { - csm.logger.Debug("Stored key `%s`", key) + csm.logger.Debug("stored value for key `%s`", key) return nil } } } -func (csm *RedisStore) Health() error { +// wait is a helper function that sleeps for a random amount of time between 0 and half second. +// Poor man's backoff. +// +// TODO: should use some form of exponential backoff +// TODO: wait time should be configurable +func (csm *RedisStore) wait() { + waitForMsec := rand.Intn(500) + time.Sleep(time.Duration(waitForMsec) * time.Millisecond) +} + +/////// StoreManager implementation + +// Health checks that the server is ready to accept connections +func (csm *RedisStore) Health() StoreErr { ctx, cancel := context.WithTimeout(context.Background(), csm.Timeout) defer cancel() _, err := csm.client.Ping(ctx).Result() if err != nil { csm.logger.Error("Error pinging redis: %s", err.Error()) - return fmt.Errorf("redis health check failed: %w", err) + return GenericStoreError(err.Error()) } return nil } -// wait is a helper function that sleeps for a random amount of time between 0 and half second. -// Poor man's backoff. +func (csm *RedisStore) SetTimeout(duration time.Duration) { + csm.Timeout = duration +} + +func (csm *RedisStore) GetTimeout() time.Duration { + return csm.Timeout +} + +// SetLogLevel for RedisStore implements the Loggable interface +func (csm *RedisStore) SetLogLevel(level slf4go.LogLevel) { + csm.logger.Level = level +} + +/////// ConfigStore implementation + +func (csm *RedisStore) GetConfig(id string) (*protos.Configuration, StoreErr) { + key := NewKeyForConfig(id) + var cfg protos.Configuration + err := csm.get(key, &cfg) + if err != nil { + csm.logger.Error("cannot retrieve configuration: %v", err) + return nil, err + } + return &cfg, nil +} + +func (csm *RedisStore) PutConfig(cfg *protos.Configuration) StoreErr { + if cfg == nil { + return InvalidDataError("nil config") + } + key := NewKeyForConfig(api.GetVersionId(cfg)) + if csm.client.Exists(context.Background(), key).Val() == 1 { + return AlreadyExistsError(key) + } + // TODO: Find out whether the client allows to batch requests, instead of sending multiple server requests + csm.client.SAdd(context.Background(), ConfigsPrefix, cfg.Name) + csm.client.SAdd(context.Background(), NewKeyForConfig(cfg.Name), api.GetVersionId(cfg)) + return csm.put(key, cfg, NeverExpire) +} + +func (csm *RedisStore) GetAllConfigs() []string { + // TODO: enable splitting results with a (cursor, count) + csm.logger.Debug("Looking up all configs in DB") + configs, err := csm.client.SMembers(context.Background(), ConfigsPrefix).Result() + if err != nil { + csm.logger.Error(NoConfigurationsFmt, err) + return nil + } + csm.logger.Debug(ReturningItemsFmt, len(configs)) + return configs +} + +func (csm *RedisStore) GetAllVersions(name string) []string { + csm.logger.Debug("Looking up all versions for Configurations `%s` in DB", name) + configs, err := csm.client.SMembers(context.Background(), NewKeyForConfig(name)).Result() + if err != nil { + csm.logger.Error(NoConfigurationsFmt, err) + return nil + } + csm.logger.Debug(ReturningItemsFmt, len(configs)) + return configs +} + +/////// FSMStore implementation + +func (csm *RedisStore) GetStateMachine(id string, cfg string) (*protos.FiniteStateMachine, StoreErr) { + key := NewKeyForMachine(id, cfg) + var stateMachine protos.FiniteStateMachine + err := csm.get(key, &stateMachine) + if err != nil { + csm.logger.Error("error getting FSM `%s`: %v", key, err) + return nil, err + } + return &stateMachine, nil +} + +func (csm *RedisStore) PutStateMachine(id string, stateMachine *protos.FiniteStateMachine) StoreErr { + if stateMachine == nil { + return InvalidDataError("nil statemachine") + } + configName := strings.Split(stateMachine.ConfigId, api.ConfigurationVersionSeparator)[0] + key := NewKeyForMachine(id, configName) + return csm.put(key, stateMachine, NeverExpire) +} + +func (csm *RedisStore) GetAllInState(cfg string, state string) []string { + // TODO: enable splitting results with a (cursor, count) + csm.logger.Debug("Looking up all FSMs [%s] in DB with state `%s`", cfg, state) + key := NewKeyForMachinesByState(cfg, state) + fsms, err := csm.client.SMembers(context.Background(), key).Result() + if err != nil { + const format = "Could not retrieve FSMs for state `%s`: %s" + csm.logger.Error(format, state, err) + return nil + } + csm.logger.Debug(ReturningItemsFmt, len(fsms)) + return fsms +} + +func (csm *RedisStore) UpdateState(cfgName string, id string, oldState string, newState string) StoreErr { + var key string + var err error + if oldState != "" { + key = NewKeyForMachinesByState(cfgName, oldState) + err = csm.client.SRem(context.Background(), key, id).Err() + if err != nil { + return fmt.Errorf( + "cannot remove FSM [%s#%s] from state set `%s`: %s", + cfgName, id, oldState, err) + } + } + if newState != "" { + key = NewKeyForMachinesByState(cfgName, newState) + err = csm.client.SAdd(context.Background(), key, id).Err() + if err != nil { + return fmt.Errorf( + "cannot add FSM [%s#%s] to state set `%s`: %s", + cfgName, id, newState, err) + } + } + return nil +} + +/////// EventStore implementation + +func (csm *RedisStore) GetEvent(id string, cfg string) (*protos.Event, StoreErr) { + key := NewKeyForEvent(id, cfg) + var event protos.Event + err := csm.get(key, &event) + if err != nil { + csm.logger.Error("cannot retrieve event `%s`: %v", key, err) + return nil, err + } + return &event, nil +} + +func (csm *RedisStore) PutEvent(event *protos.Event, cfg string, ttl time.Duration) StoreErr { + if event == nil { + return InvalidDataError("nil event") + } + key := NewKeyForEvent(event.EventId, cfg) + return csm.put(key, event, ttl) +} + +func (csm *RedisStore) AddEventOutcome(id string, cfg string, response *protos.EventOutcome, ttl time.Duration) StoreErr { + if response == nil { + return InvalidDataError("nil response") + } + key := NewKeyForOutcome(id, cfg) + return csm.put(key, response, ttl) +} + +func (csm *RedisStore) GetOutcomeForEvent(id string, cfg string) (*protos.EventOutcome, StoreErr) { + key := NewKeyForOutcome(id, cfg) + var outcome protos.EventOutcome + err := csm.get(key, &outcome) + if err != nil { + csm.logger.Error("cannot retrieve outcome for event `%s`: %v", key, err) + return nil, err + } + return &outcome, nil +} + +/////// Constructor methods + +// NewRedisStoreWithDefaults creates a new StoreManager backed by a Redis server, with +// all default settings, in a single node configuration. +func NewRedisStoreWithDefaults(address string) StoreManager { + return NewRedisStore(address, false, DefaultRedisDb, DefaultTimeout, DefaultMaxRetries) +} + +// NewRedisStore creates a new StoreManager backed by a Redis server, reachable at address, in +// cluster configuration if isCluster is set to true. +// The db value indicates which database to use. // -// TODO: should use some form of exponential backoff -// TODO: wait time should be configurable -func (csm *RedisStore) wait() { - waitForMsec := rand.Intn(500) - time.Sleep(time.Duration(waitForMsec) * time.Millisecond) +// Some store queries (typically the get and put actions) will be retried up to maxRetries times, +// if they time out after timeout expires. +// Use the [Health] function to check whether the store is reachable. +func NewRedisStore(address string, isCluster bool, db int, timeout time.Duration, maxRetries int) StoreManager { + logger := slf4go.NewLog(fmt.Sprintf("redis://%s/%d", address, db)) + + var tlsConfig *tls.Config + var client redis.Cmdable + + if os.Getenv("REDIS_TLS") != "" { + logger.Info("Using TLS for Redis connection") + tlsConfig = &tls.Config{MinVersion: tls.VersionTLS12} + } + + if isCluster { + client = redis.NewClusterClient(&redis.ClusterOptions{ + TLSConfig: tlsConfig, + Addrs: strings.Split(address, ","), + }) + } else { + client = redis.NewClient(&redis.Options{ + TLSConfig: tlsConfig, + Addr: address, + DB: db, // 0 means default DB + }) + } + + return &RedisStore{ + logger: slf4go.NewLog(fmt.Sprintf("redis://%s/%d", address, db)), + client: client, + Timeout: timeout, + MaxRetries: maxRetries, + } } diff --git a/storage/redis_store_test.go b/storage/redis_store_test.go index 18714e5..dbffdaa 100644 --- a/storage/redis_store_test.go +++ b/storage/redis_store_test.go @@ -35,7 +35,7 @@ var configId = strings.Join([]string{cfgName, "v4"}, api.ConfigurationVersionSep func setupStoreRedis() (storage.StoreManager, *redis.Client) { store := storage.NewRedisStoreWithDefaults(container.Address) - Expect(store).ToNot(BeNil()) + Ω(store).ToNot(BeNil()) store.SetLogLevel(slf4go.NONE) // This is used to go "behind the back" of our StoreManager and mess with it for testing @@ -58,8 +58,8 @@ func storeSomeFSMs(store storage.StoreManager, count int) { }, } fsmId := fmt.Sprintf(fsmIdFmt, id) - Expect(store.PutStateMachine(fsmId, fsm)).ToNot(HaveOccurred()) - Expect(store.UpdateState("orders", fsmId, "", fsm.State)) + Ω(store.PutStateMachine(fsmId, fsm)).ToNot(HaveOccurred()) + Ω(store.UpdateState("orders", fsmId, "", fsm.State)) } } @@ -76,7 +76,7 @@ var _ = Describe("Redis Store", func() { Version: "v3", StartingState: "start", } - Expect(container).ToNot(BeNil()) + Ω(container).ToNot(BeNil()) store, rdb = setupStoreRedis() }, 0.5) AfterEach(func() { @@ -84,45 +84,44 @@ var _ = Describe("Redis Store", func() { rdb.FlushDB(context.Background()) }, 0.2) It("is healthy", func() { - Expect(store.Health()).To(Succeed()) + Ω(store.Health()).To(Succeed()) }) It("can get a configuration back", func() { id := api.GetVersionId(cfg) val, _ := proto.Marshal(cfg) res, err := rdb.Set(context.Background(), storage.NewKeyForConfig(id), val, storage.NeverExpire).Result() - Expect(err).ToNot(HaveOccurred()) - Expect(res).To(Equal("OK")) + Ω(err).ToNot(HaveOccurred()) + Ω(res).To(Equal("OK")) - data, ok := store.GetConfig(id) - Expect(ok).To(BeTrue()) - Expect(data).ToNot(BeNil()) - Expect(api.GetVersionId(data)).To(Equal(api.GetVersionId(cfg))) + data, err := store.GetConfig(id) + Ω(err).To(BeNil()) + Ω(data).ToNot(BeNil()) + Ω(api.GetVersionId(data)).To(Equal(api.GetVersionId(cfg))) }) It("will return orderly if the id does not exist", func() { id := "fake" - data, ok := store.GetConfig(id) - Expect(ok).To(BeFalse()) - Expect(data).To(BeNil()) + data, err := store.GetConfig(id) + Ω(err).ToNot(BeNil()) + Ω(data).To(BeNil()) }) It("can save configurations", func() { var found protos.Configuration - Expect(store.PutConfig(cfg)).ToNot(HaveOccurred()) + Ω(store.PutConfig(cfg)).ToNot(HaveOccurred()) val, err := rdb.Get(context.Background(), storage.NewKeyForConfig(api.GetVersionId(cfg))).Bytes() - Expect(err).ToNot(HaveOccurred()) + Ω(err).ToNot(HaveOccurred()) - Expect(proto.Unmarshal(val, &found)).ToNot(HaveOccurred()) - Expect(&found).To(Respect(cfg)) + Ω(proto.Unmarshal(val, &found)).ToNot(HaveOccurred()) + Ω(&found).To(Respect(cfg)) }) It("will not save a duplicate configurations", func() { - Expect(store.PutConfig(cfg)).ToNot(HaveOccurred()) - Expect(store.PutConfig(cfg)).To(HaveOccurred()) + Ω(store.PutConfig(cfg)).ToNot(HaveOccurred()) + Ω(store.PutConfig(cfg)).To(HaveOccurred()) }) It("should not fail for a non-existent FSM", func() { - data, ok := store.GetStateMachine("fake", "bad-config") - Expect(ok).To(BeFalse()) - Expect(data).To(BeNil()) + _, err := store.GetStateMachine("fake", "bad-config") + Ω(err).ToNot(BeNil()) }) It("can get an FSM back", func() { id := uuid.New().String() @@ -136,13 +135,13 @@ var _ = Describe("Redis Store", func() { key := storage.NewKeyForMachine(id, fsm.ConfigId) res, err := rdb.Set(context.Background(), key, val, storage.NeverExpire).Result() - Expect(err).ToNot(HaveOccurred()) - Expect(res).To(Equal("OK")) + Ω(err).ToNot(HaveOccurred()) + Ω(res).To(Equal("OK")) - data, ok := store.GetStateMachine(id, "cfg_id") - Expect(ok).To(BeTrue()) - Expect(data).ToNot(BeNil()) - Expect(data).To(Respect(fsm)) + data, err := store.GetStateMachine(id, "cfg_id") + Ω(err).To(BeNil()) + Ω(data).ToNot(BeNil()) + Ω(data).To(Respect(fsm)) }) It("can save an FSM", func() { id := "99" // uuid.New().String() @@ -155,20 +154,20 @@ var _ = Describe("Redis Store", func() { {Transition: &protos.Transition{Event: "shipped"}, Originator: "bot"}, }, } - Expect(store.PutStateMachine(id, fsm)).ToNot(HaveOccurred()) + Ω(store.PutStateMachine(id, fsm)).ToNot(HaveOccurred()) val, err := rdb.Get(context.Background(), storage.NewKeyForMachine(id, cfgName)).Bytes() - Expect(err).ToNot(HaveOccurred()) + Ω(err).ToNot(HaveOccurred()) - Expect(proto.Unmarshal(val, &found)).ToNot(HaveOccurred()) + Ω(proto.Unmarshal(val, &found)).ToNot(HaveOccurred()) // NOTE: this fails, even though the Protobufs are actually identical: - // Expect(found).To(Respect(*fsm)) + // Ω(found).To(Respect(*fsm)) // it strangely fails on the History field, which is a slice, and actually matches. - Expect(found.ConfigId).To(Equal(fsm.ConfigId)) - Expect(found.State).To(Equal(fsm.State)) - Expect(found.ConfigId).To(Equal(fsm.ConfigId)) - Expect(found.History).To(HaveLen(len(fsm.History))) - Expect(found.History[0]).To(Respect(fsm.History[0])) - Expect(found.History[1]).To(Respect(fsm.History[1])) + Ω(found.ConfigId).To(Equal(fsm.ConfigId)) + Ω(found.State).To(Equal(fsm.State)) + Ω(found.ConfigId).To(Equal(fsm.ConfigId)) + Ω(found.History).To(HaveLen(len(fsm.History))) + Ω(found.History[0]).To(Respect(fsm.History[0])) + Ω(found.History[1]).To(Respect(fsm.History[1])) }) It("can get events back", func() { id := uuid.New().String() @@ -176,26 +175,26 @@ var _ = Describe("Redis Store", func() { key := storage.NewKeyForEvent(id, cfgName) val, _ := proto.Marshal(ev) _, err := rdb.Set(context.Background(), key, val, storage.NeverExpire).Result() - Expect(err).ToNot(HaveOccurred()) + Ω(err).ToNot(HaveOccurred()) - found, ok := store.GetEvent(id, cfgName) - Expect(ok).To(BeTrue()) - Expect(found).To(Respect(ev)) + found, err := store.GetEvent(id, cfgName) + Ω(err).To(BeNil()) + Ω(found).To(Respect(ev)) }) It("can save events", func() { ev := api.NewEvent("confirmed") id := ev.EventId - Expect(store.PutEvent(ev, cfgName, storage.NeverExpire)).ToNot(HaveOccurred()) + Ω(store.PutEvent(ev, cfgName, storage.NeverExpire)).ToNot(HaveOccurred()) val, err := rdb.Get(context.Background(), storage.NewKeyForEvent(id, cfgName)).Bytes() - Expect(err).ToNot(HaveOccurred()) + Ω(err).ToNot(HaveOccurred()) var found protos.Event - Expect(proto.Unmarshal(val, &found)).ToNot(HaveOccurred()) - Expect(&found).To(Respect(ev)) + Ω(proto.Unmarshal(val, &found)).ToNot(HaveOccurred()) + Ω(&found).To(Respect(ev)) }) It("will return an error for a non-existent event", func() { - _, ok := store.GetEvent("fake", cfgName) - Expect(ok).To(BeFalse()) + _, err := store.GetEvent("fake", cfgName) + Ω(err).To(HaveOccurred()) }) It("can save an event Outcome", func() { id := uuid.New().String() @@ -206,14 +205,14 @@ var _ = Describe("Redis Store", func() { Id: "1234-feed-beef", Details: "this was just a test", } - Expect(store.AddEventOutcome(id, cfg, response, storage.NeverExpire)).ToNot(HaveOccurred()) + Ω(store.AddEventOutcome(id, cfg, response, storage.NeverExpire)).ToNot(HaveOccurred()) key := storage.NewKeyForOutcome(id, cfg) val, err := rdb.Get(context.Background(), key).Bytes() - Expect(err).ToNot(HaveOccurred()) + Ω(err).ToNot(HaveOccurred()) var found protos.EventOutcome - Expect(proto.Unmarshal(val, &found)).ToNot(HaveOccurred()) - Expect(&found).To(Respect(response)) + Ω(proto.Unmarshal(val, &found)).ToNot(HaveOccurred()) + Ω(&found).To(Respect(response)) }) It("can get an event Outcome", func() { id := uuid.New().String() @@ -226,22 +225,22 @@ var _ = Describe("Redis Store", func() { key := storage.NewKeyForOutcome(id, cfg) val, _ := proto.Marshal(response) _, err := rdb.Set(context.Background(), key, val, storage.NeverExpire).Result() - Expect(err).ToNot(HaveOccurred()) - found, ok := store.GetOutcomeForEvent(id, cfg) - Expect(ok).To(BeTrue()) - Expect(found).To(Respect(response)) + Ω(err).ToNot(HaveOccurred()) + found, err := store.GetOutcomeForEvent(id, cfg) + Ω(err).ToNot(HaveOccurred()) + Ω(found).To(Respect(response)) }) It("should gracefully handle a nil Configuration", func() { - Expect(store.PutConfig(nil)).To(HaveOccurred()) + Ω(store.PutConfig(nil)).To(HaveOccurred()) }) It("should gracefully handle a nil Statemachine", func() { - Expect(store.PutStateMachine("fake", nil)).To(HaveOccurred()) + Ω(store.PutStateMachine("fake", nil)).To(HaveOccurred()) }) It("should gracefully handle a nil Event", func() { - Expect(store.PutEvent(nil, cfgName, storage.NeverExpire)).To(HaveOccurred()) + Ω(store.PutEvent(nil, cfgName, storage.NeverExpire)).To(HaveOccurred()) }) It("should gracefully handle a nil Outcome", func() { - Expect(store.AddEventOutcome("fake", "test", nil, + Ω(store.AddEventOutcome("fake", "test", nil, storage.NeverExpire)).To(HaveOccurred()) }) }) @@ -260,25 +259,25 @@ var _ = Describe("Redis Store", func() { It("can get all configuration names", func() { for _, name := range []string{cfgName, "devices", "users"} { - Expect(store.PutConfig(&protos.Configuration{Name: name, Version: "v3", StartingState: "start"})). + Ω(store.PutConfig(&protos.Configuration{Name: name, Version: "v3", StartingState: "start"})). ToNot(HaveOccurred()) } configs := store.GetAllConfigs() - Expect(len(configs)).To(Equal(3)) - Expect(configs).To(ContainElements(cfgName, "devices", "users")) + Ω(len(configs)).To(Equal(3)) + Ω(configs).To(ContainElements(cfgName, "devices", "users")) }) It("can get all versions of a configuration", func() { for _, version := range []string{"v1alpha1", "v1beta", "v1"} { - Expect(store.PutConfig(&protos.Configuration{Name: cfgName, Version: version, StartingState: "start"})). + Ω(store.PutConfig(&protos.Configuration{Name: cfgName, Version: version, StartingState: "start"})). ToNot(HaveOccurred()) } configs := store.GetAllVersions(cfgName) - Expect(len(configs)).To(Equal(3)) - Expect(configs).To(ContainElements("orders:v1alpha1", "orders:v1beta", "orders:v1")) + Ω(len(configs)).To(Equal(3)) + Ω(configs).To(ContainElements("orders:v1alpha1", "orders:v1beta", "orders:v1")) }) It("returns an empty slice for a non-existent config", func() { configs := store.GetAllVersions("fake") - Expect(len(configs)).To(Equal(0)) + Ω(len(configs)).To(Equal(0)) }) }) When("querying for FSMs", func() { @@ -295,9 +294,9 @@ var _ = Describe("Redis Store", func() { It("finds them by state", func() { storeSomeFSMs(store, 5) res := store.GetAllInState(cfgName, "in_transit") - Expect(len(res)).To(Equal(4)) + Ω(len(res)).To(Equal(4)) for id := 1; id < 5; id++ { - Expect(res).To(ContainElement(fmt.Sprintf(fsmIdFmt, id))) + Ω(res).To(ContainElement(fmt.Sprintf(fsmIdFmt, id))) } }) When("transitioning state", func() { @@ -307,18 +306,18 @@ var _ = Describe("Redis Store", func() { It("finds them", func() { for id := 3; id < 6; id++ { fsmId := fmt.Sprintf(fsmIdFmt, id) - Expect(store.UpdateState(cfgName, fsmId, "in_transit", "shipped")) + Ω(store.UpdateState(cfgName, fsmId, "in_transit", "shipped")) } res := store.GetAllInState(cfgName, "shipped") - Expect(len(res)).To(Equal(3)) + Ω(len(res)).To(Equal(3)) for id := 3; id < 6; id++ { - Expect(res).To(ContainElement(fmt.Sprintf(fsmIdFmt, id))) + Ω(res).To(ContainElement(fmt.Sprintf(fsmIdFmt, id))) } res = store.GetAllInState(cfgName, "in_transit") - Expect(len(res)).To(Equal(6)) + Ω(len(res)).To(Equal(6)) }) It("will remove with an empty newState", func() { - Expect(store.UpdateState(cfgName, "fsm-1", "in_transit", "")).To(Succeed()) + Ω(store.UpdateState(cfgName, "fsm-1", "in_transit", "")).To(Succeed()) res := store.GetAllInState(cfgName, "in_transit") Ω(res).ToNot(ContainElement("fsm-1")) }) diff --git a/storage/types.go b/storage/types.go index 258fa8a..afd1124 100644 --- a/storage/types.go +++ b/storage/types.go @@ -16,22 +16,26 @@ import ( "time" ) -func Error(msg string) func(string) error { +type StoreErr = error + +func Error(msg string) func(string) StoreErr { return func(key string) error { return fmt.Errorf(msg, key) } } var ( - IllegalStoreError = Error("error storing invalid data: %v") AlreadyExistsError = Error("key %s already exists") + GenericStoreError = Error("store error: %v") + InvalidDataError = Error("error storing invalid data: %v") NotFoundError = Error("key %s not found") NotImplementedError = Error("functionality %s has not been implemented yet") + TooManyAttempts = Error("retries exceeded") ) -type ConfigurationStorageManager interface { - GetConfig(versionId string) (*protos.Configuration, bool) - PutConfig(cfg *protos.Configuration) error +type ConfigStore interface { + GetConfig(versionId string) (*protos.Configuration, StoreErr) + PutConfig(cfg *protos.Configuration) StoreErr // GetAllConfigs returns all the `Configurations` that exist in the server, regardless of // the version, and whether are used or not by an FSM. @@ -42,16 +46,16 @@ type ConfigurationStorageManager interface { GetAllVersions(name string) []string } -type FiniteStateMachineStorageManager interface { +type FSMStore interface { // GetStateMachine will find the FSM with `id and that is configured via a `Configuration` whose // `name` matches `cfg` (without the `version`). - GetStateMachine(id string, cfg string) (*protos.FiniteStateMachine, bool) + GetStateMachine(id string, cfg string) (*protos.FiniteStateMachine, StoreErr) // PutStateMachine creates or updates the FSM whose `id` is given. // No further action is taken: no check that the referenced `Configuration` exists, and the // `state` SETs are not updated either: it is the caller's responsibility to call the // `UpdateState` method (possibly with an empty `oldState`, in the case of creation). - PutStateMachine(id string, fsm *protos.FiniteStateMachine) error + PutStateMachine(id string, fsm *protos.FiniteStateMachine) StoreErr // GetAllInState looks up all the FSMs that are currently in the given `state` and // are configured with a `Configuration` whose name matches `cfg` (regardless of the @@ -67,12 +71,12 @@ type FiniteStateMachineStorageManager interface { // (or not, as the case may be). // // `oldState` may be empty in the case of a new FSM being created. - UpdateState(cfgName string, id string, oldState string, newState string) error + UpdateState(cfgName string, id string, oldState string, newState string) StoreErr } -type EventStorageManager interface { - GetEvent(id string, cfg string) (*protos.Event, bool) - PutEvent(event *protos.Event, cfg string, ttl time.Duration) error +type EventStore interface { + GetEvent(id string, cfg string) (*protos.Event, StoreErr) + PutEvent(event *protos.Event, cfg string, ttl time.Duration) StoreErr // AddEventOutcome adds the outcome of an event to the storage, given the `eventId` and the // "type" (`Configuration.Name`) of the FSM that received the event. @@ -80,18 +84,18 @@ type EventStorageManager interface { // Optionally, it will remove the outcome after a given `ttl` (time-to-live); use // `NeverExpire` to keep the outcome forever. AddEventOutcome(eventId string, cfgName string, response *protos.EventOutcome, - ttl time.Duration) error + ttl time.Duration) StoreErr // GetOutcomeForEvent returns the outcome of an event, given the `eventId` and the "type" of the // FSM that received the event. - GetOutcomeForEvent(eventId string, cfgName string) (*protos.EventOutcome, bool) + GetOutcomeForEvent(eventId string, cfgName string) (*protos.EventOutcome, StoreErr) } type StoreManager interface { log.Loggable - ConfigurationStorageManager - FiniteStateMachineStorageManager - EventStorageManager + ConfigStore + FSMStore + EventStore SetTimeout(duration time.Duration) GetTimeout() time.Duration Health() error