diff --git a/.run/Run all tests.run.xml b/.run/Run all tests.run.xml
index 69bd431..1e9c0dc 100644
--- a/.run/Run all tests.run.xml
+++ b/.run/Run all tests.run.xml
@@ -4,7 +4,7 @@
-
+
diff --git a/Makefile b/Makefile
index 90dbe38..d32e114 100644
--- a/Makefile
+++ b/Makefile
@@ -6,11 +6,12 @@ GOOS ?= $(shell uname -s | tr "[:upper:]" "[:lower:]")
GOARCH ?= amd64
GOMOD := $(shell go list -m)
-version := v0.10.0
+version := v0.11.0
release := $(version)-g$(shell git rev-parse --short HEAD)
prog := sm-server
bin := out/bin/$(prog)-$(version)_$(GOOS)-$(GOARCH)
dockerbin := out/bin/$(prog)-$(version)_linux-amd64
+healthcheck := out/bin/grpc-health_linux-amd64
image := massenz/statemachine
compose := docker/compose.yaml
@@ -60,15 +61,18 @@ fmt: ## Formats the Go source code using 'go fmt'
.PHONY: build test container cov clean fmt
$(bin): cmd/main.go $(srcs)
@mkdir -p $(shell dirname $(bin))
- GOOS=$(GOOS); GOARCH=$(GOARCH); go build \
+ GOOS=$(GOOS) GOARCH=$(GOARCH) go build \
-ldflags "-X $(GOMOD)/api.Release=$(release)" \
-o $(bin) cmd/main.go
-$(dockerbin):
- GOOS=linux; GOARCH=amd64; go build \
+$(dockerbin): $(srcs)
+ GOOS=linux GOARCH=amd64 go build \
-ldflags "-X $(GOMOD)/api.Release=$(release)" \
-o $(dockerbin) cmd/main.go
+$(healthcheck): grpc_health.go
+ GOOS=linux GOARCH=amd64 go build -o $(healthcheck) grpc_health.go
+
.PHONY: build
build: $(bin) ## Builds the Statemachine server binary
@@ -83,8 +87,10 @@ cov: $(srcs) $(test_srcs) ## Runs the Test Coverage target and opens a browser
# Convenience targets to run locally containers and
# setup the test environments.
-container: $(dockerbin) ## Builds the container image
- docker build --build-arg appname=$(dockerbin) -f $(dockerfile) -t $(image):$(release) .
+container: $(dockerbin) $(healthcheck) ## Builds the container image
+ docker build --build-arg appname=$(dockerbin) \
+ --build-arg hc=$(healthcheck) \
+ -f $(dockerfile) -t $(image):$(release) .
.PHONY: start
start: ## Starts the Redis and LocalStack containers, and Creates the SQS Queues in LocalStack
diff --git a/docker/Dockerfile b/docker/Dockerfile
index a9efaa0..ebbf7a1 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -12,24 +12,28 @@ LABEL org.opencontainers.image.description="Statemachine gRPC Server"
LABEL org.opencontainers.image.licenses=Apache-2.0
ARG appname
+ARG hc
RUN apt-get update && apt-get install ca-certificates -y
RUN groupadd -r sm-bot && useradd -r -g sm-bot sm-bot
# Sensible defaults for the server, for reference
# we list all the environment variables used by the
# entrypoint script.
-#
ENV GRPC_PORT=7398 SERVER_PORT=7399 DEBUG="" \
EVENTS_Q="events" NOTIFICATIONS_Q="notifications" \
ACKS_Q="" ERRORS_ONLY=t \
- CLUSTER="" REDIS="redis:6379"
+ CLUSTER="" REDIS="redis:6379" \
+ TIMEOUT=500ms INSECURE=""
WORKDIR /app
RUN chown sm-bot:sm-bot /app
USER sm-bot
ADD $appname ./sm-server
+ADD $hc ./ping
ADD docker/entrypoint.sh ./
EXPOSE ${SERVER_PORT}
ENTRYPOINT ["./entrypoint.sh"]
+HEALTHCHECK --start-period=5s --interval=30s --timeout=200ms --retries=5 \
+ CMD ./ping -host localhost:$GRPC_PORT -timeout $TIMEOUT $INSECURE
diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh
index 3419f8c..1280a55 100755
--- a/docker/entrypoint.sh
+++ b/docker/entrypoint.sh
@@ -40,7 +40,7 @@ then
retries="-max-retries ${RETRIES}"
fi
-cmd="./sm-server -grpc-port ${GRPC_PORT} -redis ${REDIS:-} ${DEBUG:-}
+cmd="./sm-server -grpc-port ${GRPC_PORT} -redis ${REDIS:-} ${DEBUG:-} ${INSECURE:-}
${endpoint} ${timeout} ${retries} ${events} ${notifications} $@"
echo $cmd
diff --git a/go.mod b/go.mod
index fa4f70f..1d1b1f3 100644
--- a/go.mod
+++ b/go.mod
@@ -9,11 +9,12 @@ require (
github.com/golang/protobuf v1.5.2
github.com/google/uuid v1.3.0
github.com/massenz/slf4go v0.3.2-g4eb5504
- github.com/massenz/statemachine-proto/golang v1.1.0-beta-g1fc5dd8
+ github.com/massenz/statemachine-proto/golang v1.2.0-g8dbe9c5
github.com/onsi/ginkgo v1.16.5
github.com/onsi/gomega v1.18.1
+ github.com/stretchr/testify v1.8.2
github.com/testcontainers/testcontainers-go v0.18.0
- google.golang.org/grpc v1.51.0
+ google.golang.org/grpc v1.53.0
google.golang.org/protobuf v1.28.1
)
@@ -21,8 +22,9 @@ require (
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
github.com/Microsoft/go-winio v0.5.2 // indirect
github.com/cenkalti/backoff/v4 v4.2.0 // indirect
- github.com/cespare/xxhash/v2 v2.1.2 // indirect
+ github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/containerd/containerd v1.6.18 // indirect
+ github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/docker/distribution v2.8.1+incompatible // indirect
github.com/docker/docker v23.0.0+incompatible // indirect
@@ -43,11 +45,13 @@ require (
github.com/opencontainers/image-spec v1.1.0-rc2 // indirect
github.com/opencontainers/runc v1.1.3 // indirect
github.com/pkg/errors v0.9.1 // indirect
+ github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
+ github.com/stretchr/objx v0.5.0 // indirect
golang.org/x/net v0.7.0 // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/text v0.7.0 // indirect
- google.golang.org/genproto v0.0.0-20220617124728-180714bec0ad // indirect
+ google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
diff --git a/go.sum b/go.sum
index 8474dc4..bad3954 100644
--- a/go.sum
+++ b/go.sum
@@ -1,5 +1,3 @@
-cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
-cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8=
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
@@ -8,27 +6,17 @@ github.com/JiaYongfei/respect v0.0.0-20211019032000-61a979c8e39a/go.mod h1:3hhXC
github.com/Microsoft/go-winio v0.5.2 h1:a9IhgEQBCUEk6QCdml9CiJGhAws+YwffDHEMp1VMrpA=
github.com/Microsoft/go-winio v0.5.2/go.mod h1:WpS1mjBmmwHBEWmogvA2mj8546UReBk4v8QkMxJ6pZY=
github.com/Microsoft/hcsshim v0.9.6 h1:VwnDOgLeoi2du6dAznfmspNqTiwczvjv4K7NxuY9jsY=
-github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/aws/aws-sdk-go v1.44.4 h1:ePN0CVJMdiz2vYUcJH96eyxRrtKGSDMgyhP6rah2OgE=
github.com/aws/aws-sdk-go v1.44.4/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo=
github.com/cenkalti/backoff/v4 v4.2.0 h1:HN5dHm3WBOgndBH6E8V0q2jIYIR3s9yglV8k/+MN3u4=
github.com/cenkalti/backoff/v4 v4.2.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
-github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
-github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
-github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
-github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
+github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
+github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/checkpoint-restore/go-criu/v5 v5.3.0/go.mod h1:E/eQpaFtUKGOOSEBZgmKAcn+zUUwWxqcaKZlF54wK8E=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/cilium/ebpf v0.7.0/go.mod h1:/oI2+1shJiTGAMgl6/RgJr36Eo1jzrRcAWbcXO2usCA=
-github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
-github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
-github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
-github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XPnfI9Z40ddWsdw2W/uZgQLFXToKeRcDiI=
-github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
-github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
-github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/containerd/console v1.0.3/go.mod h1:7LqA/THxQ86k76b8c/EMSiaJ3h1eZkMkXar0TQ1gf3U=
github.com/containerd/containerd v1.6.18 h1:qZbsLvmyu+Vlty0/Ex5xc0z2YtKpIsb5n45mAMI+2Ns=
github.com/containerd/containerd v1.6.18/go.mod h1:1RdCUu95+gc2v9t3IL+zIlpClSmew7/0YS8O5eQZrOw=
@@ -52,18 +40,11 @@ github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5Xh
github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
-github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
-github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
-github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
-github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
-github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE=
-github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/fsnotify/fsnotify v1.5.4 h1:jRbGcIw6P2Meqdwuo0H1p6JVLbL5DHKAKlYndzMwVZI=
github.com/fsnotify/fsnotify v1.5.4/go.mod h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU=
-github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
@@ -71,36 +52,25 @@ github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5x
github.com/godbus/dbus/v5 v5.0.6/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
-github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
-github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
-github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
-github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
-github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8=
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
-github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
-github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
-github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
-github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
-github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
-github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
@@ -121,8 +91,8 @@ github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0V
github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0=
github.com/massenz/slf4go v0.3.2-g4eb5504 h1:tRrxPOKcqNKQn25eS8Dy9bW3NMNPpuK4Sla9jKfWmSs=
github.com/massenz/slf4go v0.3.2-g4eb5504/go.mod h1:ZJjthXAnZMJGwXUz3Z3v5uyban00uAFFoDYODOoLFpw=
-github.com/massenz/statemachine-proto/golang v1.1.0-beta-g1fc5dd8 h1:Dp2yv070ogiHLwQU5LppXskUDnCoO8tDkqgszyZMNmk=
-github.com/massenz/statemachine-proto/golang v1.1.0-beta-g1fc5dd8/go.mod h1:g6CkyXxfs7XF8wv6OLdMZZDUu0fn4PY6HQQ2WDbW3GU=
+github.com/massenz/statemachine-proto/golang v1.2.0-g8dbe9c5 h1:0QLU3fwkZg2s17QsjrJ4RKdxmWbsvF7WPzBeEO1m32Y=
+github.com/massenz/statemachine-proto/golang v1.2.0-g8dbe9c5/go.mod h1:AYRhBXOvJkJDA0j6wce63gr0mQwX8Wfp3Qn9L/3cz28=
github.com/moby/patternmatcher v0.5.0 h1:YCZgJOeULcxLw1Q+sVR636pmS7sPEn1Qo2iAN6M7DBo=
github.com/moby/patternmatcher v0.5.0/go.mod h1:hDPoyOpDY7OrrMDLaYoY3hf52gNCR/YOUYxkhApJIxc=
github.com/moby/sys/mountinfo v0.5.0/go.mod h1:3bMD3Rg+zkqx8MRYPi7Pyb0Ie97QEBmdxbhnCLlSvSU=
@@ -162,8 +132,6 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
-github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
-github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.8.1 h1:geMPLpDpQOgVyCg5z5GoRwLHepNdb71NXb67XFkP+Eg=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/seccomp/libseccomp-golang v0.9.2-0.20220502022130-f33da4d89646/go.mod h1:JA8cRccbGaA1s33RQf7Y1+q9gHmZX1yB/z9WDN1C6fg=
@@ -173,10 +141,16 @@ github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic
github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0=
github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
+github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
+github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
-github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
+github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
+github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
+github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww=
github.com/testcontainers/testcontainers-go v0.18.0 h1:8RXrcIQv5xX/uBOSmZd297gzvA7F0yuRA37/918o7Yg=
github.com/testcontainers/testcontainers-go v0.18.0/go.mod h1:rLC7hR2SWRjJZZNrUYiTKvUXCziNxzZiYtz9icTWYNQ=
@@ -185,43 +159,26 @@ github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYp
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
-go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
-golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
-golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
-golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
-golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
-golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
-golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
-golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
-golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
-golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
-golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
-golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
-golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
-golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -239,9 +196,7 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
-golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
-golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210906170528-6f6e22806c34/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
@@ -257,17 +212,12 @@ golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuX
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
-golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 h1:vVKdlvoWBphwdxWKrFZEuM0kGgGLxUOYcY4U/2Vjg44=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
-golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
-golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
-golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
-golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
@@ -277,36 +227,19 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
-google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
-google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
-google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
-google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
-google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
-google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
-google.golang.org/genproto v0.0.0-20220617124728-180714bec0ad h1:kqrS+lhvaMHCxul6sKQvKJ8nAAhlVItmZV822hYFH/U=
-google.golang.org/genproto v0.0.0-20220617124728-180714bec0ad/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
-google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
-google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
-google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
-google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
-google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0=
-google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
-google.golang.org/grpc v1.47.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
-google.golang.org/grpc v1.51.0 h1:E1eGv1FTqoLIdnBCZufiSHgKjlqG6fKFf6pPWtMTh8U=
-google.golang.org/grpc v1.51.0/go.mod h1:wgNDFcnuBGmxLKI/qn4T+m5BtEBYXJPvibbUPsAIPww=
+google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f h1:BWUVssLB0HVOSY78gIdvk1dTVYtT1y8SBWtPYuTJ/6w=
+google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f/go.mod h1:RGgjbofJ8xD9Sq1VVhDM1Vok1vRONV+rg+CjzG4SZKM=
+google.golang.org/grpc v1.53.0 h1:LAv2ds7cmFV/XTS3XG1NneeENYrXGmorPxsBbptIjNc=
+google.golang.org/grpc v1.53.0/go.mod h1:OnIrk0ipVdj4N5d9IUoFUx72/VlD7+jUsHwZgwSMQpw=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
-google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
-google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
-google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
-google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w=
google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
@@ -315,7 +248,6 @@ gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMy
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
-gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
@@ -326,5 +258,3 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools/v3 v3.4.0 h1:ZazjZUfuVeZGLAmlKKuyv3IKP5orXcwtOwDQH6YVr6o=
gotest.tools/v3 v3.4.0/go.mod h1:CtbdzLSsqVhDgMtKsx03ird5YTGB3ar27v0u/yKBW5g=
-honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
-honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
diff --git a/grpc/grpc_server.go b/grpc/grpc_server.go
index 7e673cd..810b909 100644
--- a/grpc/grpc_server.go
+++ b/grpc/grpc_server.go
@@ -21,6 +21,7 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
+ "google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/wrapperspb"
"os"
"path/filepath"
@@ -59,6 +60,30 @@ type grpcSubscriber struct {
*Config
}
+// Health will return the status of the server and the underlying store
+func (s *grpcSubscriber) Health(context.Context, *emptypb.Empty) (*protos.HealthResponse, error) {
+ var response = &protos.HealthResponse{
+ State: protos.HealthResponse_READY,
+ Release: api.Release,
+ TlsEnabled: s.TlsEnabled,
+ }
+ if s.Store == nil {
+ s.Logger.Error("Redis store not initialized, cannot process requests")
+ return nil, status.Error(codes.Internal, "data store not configured")
+ }
+ if s.EventsChannel == nil {
+ s.Logger.Error("events channel not initialized, cannot process events")
+ response.State = protos.HealthResponse_NOT_READY
+ return response, nil
+ }
+ err := s.Store.Health()
+ if err != nil {
+ s.Logger.Error("Redis store not ready: %v", err)
+ response.State = protos.HealthResponse_NOT_READY
+ }
+ return response, nil
+}
+
func (s *grpcSubscriber) SendEvent(ctx context.Context, request *protos.EventRequest) (*protos.
EventResponse, error) {
if request.GetId() == "" {
@@ -89,11 +114,15 @@ func (s *grpcSubscriber) SendEvent(ctx context.Context, request *protos.EventReq
}
func (s *grpcSubscriber) PutConfiguration(ctx context.Context, cfg *protos.Configuration) (*protos.PutResponse, error) {
- // FIXME: use Context to set a timeout, etc.
if err := api.CheckValid(cfg); err != nil {
s.Logger.Error("invalid configuration: %v", err)
return nil, status.Errorf(codes.InvalidArgument, "invalid configuration: %v", err)
}
+ if deadline, ok := ctx.Deadline(); ok {
+ if deadline.Before(time.Now()) {
+ return nil, ctx.Err()
+ }
+ }
if err := s.Store.PutConfig(cfg); err != nil {
s.Logger.Error("could not store configuration: %v", err)
if strings.Contains(err.Error(), "already exists") {
@@ -101,6 +130,11 @@ func (s *grpcSubscriber) PutConfiguration(ctx context.Context, cfg *protos.Confi
}
return nil, status.Error(codes.Internal, err.Error())
}
+ if deadline, ok := ctx.Deadline(); ok {
+ if deadline.Before(time.Now()) {
+ return nil, ctx.Err()
+ }
+ }
s.Logger.Trace("configuration stored: %s", api.GetVersionId(cfg))
return &protos.PutResponse{
Id: api.GetVersionId(cfg),
@@ -123,8 +157,9 @@ func (s *grpcSubscriber) GetConfiguration(ctx context.Context, configId *wrapper
*protos.Configuration, error) {
cfgId := configId.Value
s.Logger.Trace("retrieving Configuration %s", cfgId)
- cfg, found := s.Store.GetConfig(cfgId)
- if !found {
+ cfg, err := s.Store.GetConfig(cfgId)
+ if err != nil {
+ s.Logger.Error("could not get configuration: %v", err)
return nil, status.Errorf(codes.NotFound, "configuration %s not found", cfgId)
}
return cfg, nil
@@ -134,11 +169,16 @@ func (s *grpcSubscriber) PutFiniteStateMachine(ctx context.Context,
request *protos.PutFsmRequest) (*protos.PutResponse, error) {
fsm := request.Fsm
// First check that the configuration for the FSM is valid
- cfg, ok := s.Store.GetConfig(fsm.ConfigId)
- if !ok {
+ cfg, err := s.Store.GetConfig(fsm.ConfigId)
+ if err != nil {
return nil, status.Error(codes.FailedPrecondition, storage.NotFoundError(
fsm.ConfigId).Error())
}
+ if deadline, ok := ctx.Deadline(); ok {
+ if deadline.Before(time.Now()) {
+ return nil, ctx.Err()
+ }
+ }
var id = request.Id
if id == "" {
id = uuid.NewString()
@@ -153,6 +193,8 @@ func (s *grpcSubscriber) PutFiniteStateMachine(ctx context.Context,
s.Logger.Error("could not store FSM [%v]: %v", fsm, err)
return nil, status.Error(codes.Internal, err.Error())
}
+ // we cannot interrupt here, even if deadline is passed, as it would leave the
+ // store in an inconsistent state.
if err := s.Store.UpdateState(cfg.Name, id, "", fsm.State); err != nil {
s.Logger.Error("could not store FSM in state set [%s]: %v", fsm.State, err)
return nil, status.Error(codes.Internal, err.Error())
@@ -171,8 +213,8 @@ func (s *grpcSubscriber) GetFiniteStateMachine(ctx context.Context, in *protos.G
return nil, status.Error(codes.InvalidArgument, "ID must always be provided when looking up statemachine")
}
s.Logger.Debug("looking up FSM [%s] (Configuration: %s)", fsmId, cfg)
- fsm, ok := s.Store.GetStateMachine(fsmId, cfg)
- if !ok {
+ fsm, err := s.Store.GetStateMachine(fsmId, cfg)
+ if err != nil {
return nil, status.Error(codes.NotFound, storage.NotFoundError(fsmId).Error())
}
return fsm, nil
@@ -180,8 +222,8 @@ func (s *grpcSubscriber) GetFiniteStateMachine(ctx context.Context, in *protos.G
func (s *grpcSubscriber) GetAllInState(ctx context.Context, in *protos.GetFsmRequest) (
*protos.ListResponse, error) {
- cfgName := in.GetConfig()
- if cfgName == "" {
+ cfg := in.GetConfig()
+ if cfg == "" {
return nil, status.Errorf(codes.InvalidArgument, "configuration must always be specified")
}
state := in.GetState()
@@ -189,18 +231,18 @@ func (s *grpcSubscriber) GetAllInState(ctx context.Context, in *protos.GetFsmReq
// TODO: implement table scanning
return nil, status.Errorf(codes.Unimplemented, "missing state, table scan not implemented")
}
- ids := s.Store.GetAllInState(cfgName, state)
+ ids := s.Store.GetAllInState(cfg, state)
return &protos.ListResponse{Ids: ids}, nil
}
func (s *grpcSubscriber) GetEventOutcome(ctx context.Context, in *protos.EventRequest) (
*protos.EventResponse, error) {
evtId := in.GetId()
- config := in.GetConfig()
- s.Logger.Debug("looking up EventOutcome %s (%s)", evtId, config)
- outcome, ok := s.Store.GetOutcomeForEvent(evtId, config)
- if !ok {
- return nil, status.Error(codes.NotFound, fmt.Sprintf("outcome for event %s not found", evtId))
+ cfg := in.GetConfig()
+ s.Logger.Debug("looking up EventOutcome %s (%s)", evtId, cfg)
+ outcome, err := s.Store.GetOutcomeForEvent(evtId, cfg)
+ if err != nil {
+ return nil, status.Errorf(codes.NotFound, "cannot get outcome for event %s: %v", evtId, err)
}
return &protos.EventResponse{
EventId: evtId,
@@ -215,9 +257,9 @@ func (s *grpcSubscriber) StreamAllInstate(in *protos.GetFsmRequest, stream State
}
cfgName := in.GetConfig()
for _, id := range response.GetIds() {
- fsm, found := s.Store.GetStateMachine(id, cfgName)
- if !found {
- return storage.NotFoundError(id)
+ fsm, err := s.Store.GetStateMachine(id, cfgName)
+ if err != nil {
+ return err
}
if err = stream.SendMsg(&protos.PutResponse{
Id: id,
@@ -239,9 +281,9 @@ func (s *grpcSubscriber) StreamAllConfigurations(in *wrapperspb.StringValue, str
return nil
}
for _, cfgId := range response.GetIds() {
- cfg, found := s.Store.GetConfig(cfgId)
- if !found {
- return storage.NotFoundError(cfgId)
+ cfg, err := s.Store.GetConfig(cfgId)
+ if err != nil {
+ return err
}
if err = stream.SendMsg(cfg); err != nil {
return err
diff --git a/grpc/grpc_server_test.go b/grpc/grpc_server_test.go
index 993bb40..f3a16a1 100644
--- a/grpc/grpc_server_test.go
+++ b/grpc/grpc_server_test.go
@@ -12,12 +12,14 @@ package grpc_test
import (
"context"
"fmt"
+ "google.golang.org/protobuf/types/known/emptypb"
"net"
"strings"
"time"
"github.com/go-redis/redis/v8"
slf4go "github.com/massenz/slf4go/logging"
+ "github.com/stretchr/testify/mock"
g "google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
@@ -33,6 +35,74 @@ import (
protos "github.com/massenz/statemachine-proto/golang/api"
)
+var NotImplemented = storage.NotImplementedError("mock")
+
+type Mockstore struct {
+ mock.Mock
+}
+
+func (m *Mockstore) SetLogLevel(level slf4go.LogLevel) {
+}
+
+func (m *Mockstore) GetConfig(versionId string) (*protos.Configuration, storage.StoreErr) {
+ return nil, nil
+}
+
+func (m *Mockstore) PutConfig(cfg *protos.Configuration) storage.StoreErr {
+ return NotImplemented
+}
+
+func (m *Mockstore) GetAllConfigs() []string {
+ return nil
+}
+
+func (m *Mockstore) GetAllVersions(name string) []string {
+ return nil
+}
+
+func (m *Mockstore) GetStateMachine(id string, cfg string) (*protos.FiniteStateMachine, storage.StoreErr) {
+ return nil, NotImplemented
+}
+
+func (m *Mockstore) PutStateMachine(id string, fsm *protos.FiniteStateMachine) error {
+ return NotImplemented
+}
+
+func (m *Mockstore) GetAllInState(cfg string, state string) []string {
+ return nil
+}
+
+func (m *Mockstore) UpdateState(cfgName string, id string, oldState string, newState string) error {
+ return NotImplemented
+}
+
+func (m *Mockstore) GetEvent(id string, cfg string) (*protos.Event, storage.StoreErr) {
+ return nil, NotImplemented
+}
+
+func (m *Mockstore) PutEvent(event *protos.Event, cfg string, ttl time.Duration) error {
+ return NotImplemented
+}
+
+func (m *Mockstore) AddEventOutcome(eventId string, cfgName string, response *protos.EventOutcome, ttl time.Duration) error {
+ return NotImplemented
+}
+
+func (m *Mockstore) GetOutcomeForEvent(eventId string, cfgName string) (*protos.EventOutcome, storage.StoreErr) {
+ return nil, NotImplemented
+}
+
+func (m *Mockstore) SetTimeout(duration time.Duration) {
+}
+
+func (m *Mockstore) GetTimeout() time.Duration {
+ return 0
+}
+
+func (m *Mockstore) Health() error {
+ return nil
+}
+
var bkgnd = context.Background()
var _ = Describe("the gRPC Server", func() {
When("processing events", func() {
@@ -44,7 +114,7 @@ var _ = Describe("the gRPC Server", func() {
BeforeEach(func() {
var err error
testCh = make(chan protos.EventRequest, 5)
- listener, err = net.Listen("tcp", "localhost:5763")
+ listener, err = net.Listen("tcp", ":0")
Ω(err).ShouldNot(HaveOccurred())
client = NewClient(listener.Addr().String(), false)
@@ -58,6 +128,7 @@ var _ = Describe("the gRPC Server", func() {
EventsChannel: testCh,
Logger: l,
ServerAddress: listener.Addr().String(),
+ Store: new(Mockstore),
})
Ω(err).ToNot(HaveOccurred())
Ω(server).ToNot(BeNil())
@@ -69,12 +140,20 @@ var _ = Describe("the gRPC Server", func() {
server.Stop()
}
})
+ It("should have a healthy status", func() {
+ Eventually(func(g Gomega) {
+ hr, err := client.Health(bkgnd, &emptypb.Empty{})
+ g.Ω(err).ToNot(HaveOccurred())
+ g.Ω(hr.State).Should(Equal(protos.HealthResponse_READY))
+ }, 100*time.Millisecond, 20*time.Millisecond).Should(Succeed())
+ })
+ const EventName = "test-event"
It("should succeed for well-formed events", func() {
response, err := client.SendEvent(bkgnd, &protos.EventRequest{
Event: &protos.Event{
EventId: "1",
Transition: &protos.Transition{
- Event: "test-vt",
+ Event: EventName,
},
Originator: "test",
},
@@ -88,7 +167,7 @@ var _ = Describe("the gRPC Server", func() {
select {
case evt := <-testCh:
Ω(evt.Event.EventId).To(Equal("1"))
- Ω(evt.Event.Transition.Event).To(Equal("test-vt"))
+ Ω(evt.Event.Transition.Event).To(Equal(EventName))
Ω(evt.Event.Originator).To(Equal("test"))
Ω(evt.Id).To(Equal("2"))
case <-time.After(10 * time.Millisecond):
@@ -100,7 +179,7 @@ var _ = Describe("the gRPC Server", func() {
response, err := client.SendEvent(bkgnd, &protos.EventRequest{
Event: &protos.Event{
Transition: &protos.Transition{
- Event: "test-vt",
+ Event: EventName,
},
Originator: "test",
},
@@ -114,7 +193,7 @@ var _ = Describe("the gRPC Server", func() {
select {
case evt := <-testCh:
Ω(evt.Event.EventId).Should(Equal(generatedId))
- Ω(evt.Event.Transition.Event).To(Equal("test-vt"))
+ Ω(evt.Event.Transition.Event).To(Equal(EventName))
case <-time.After(10 * time.Millisecond):
Fail("Timed out")
}
@@ -123,7 +202,7 @@ var _ = Describe("the gRPC Server", func() {
_, err := client.SendEvent(bkgnd, &protos.EventRequest{
Event: &protos.Event{
Transition: &protos.Transition{
- Event: "test-vt",
+ Event: EventName,
},
Originator: "test",
},
@@ -216,14 +295,14 @@ var _ = Describe("the gRPC Server", func() {
}
})
It("should store valid configurations", func() {
- _, ok := store.GetConfig(GetVersionId(cfg))
- Ω(ok).To(BeFalse())
+ _, err := store.GetConfig(GetVersionId(cfg))
+ Ω(err).ToNot(BeNil())
response, err := client.PutConfiguration(bkgnd, cfg)
Ω(err).ToNot(HaveOccurred())
Ω(response).ToNot(BeNil())
Ω(response.Id).To(Equal(GetVersionId(cfg)))
- found, ok := store.GetConfig(response.Id)
- Ω(ok).Should(BeTrue())
+ found, err := store.GetConfig(response.Id)
+ Ω(err).Should(BeNil())
Ω(found).Should(Respect(cfg))
})
It("should fail for invalid configuration", func() {
@@ -359,34 +438,35 @@ var _ = Describe("the gRPC Server", func() {
AssertStatusCode(codes.NotFound, err)
})
It("will find all FSMs by State", func() {
+ const ConfigName = "test.m"
for i := 1; i <= 5; i++ {
id := fmt.Sprintf("fsm-%d", i)
Ω(store.PutStateMachine(id,
&protos.FiniteStateMachine{
- ConfigId: "test.m:v1",
+ ConfigId: ConfigName + ":v1",
State: "start",
})).Should(Succeed())
- store.UpdateState("test.m", id, "", "start")
+ store.UpdateState(ConfigName, id, "", "start")
}
for i := 10; i < 13; i++ {
id := fmt.Sprintf("fsm-%d", i)
Ω(store.PutStateMachine(id,
&protos.FiniteStateMachine{
- ConfigId: "test.m:v1",
+ ConfigId: ConfigName + ":v1",
State: "stop",
})).Should(Succeed())
- store.UpdateState("test.m", id, "", "stop")
+ store.UpdateState(ConfigName, id, "", "stop")
}
items, err := client.GetAllInState(bkgnd, &protos.GetFsmRequest{
- Config: "test.m",
+ Config: ConfigName,
Query: &protos.GetFsmRequest_State{State: "start"},
})
Ω(err).ShouldNot(HaveOccurred())
Ω(len(items.GetIds())).Should(Equal(5))
Ω(items.GetIds()).Should(ContainElements("fsm-3", "fsm-5"))
items, err = client.GetAllInState(bkgnd, &protos.GetFsmRequest{
- Config: "test.m",
+ Config: ConfigName,
Query: &protos.GetFsmRequest_State{State: "stop"},
})
Ω(err).ShouldNot(HaveOccurred())
diff --git a/grpc_health.go b/grpc_health.go
new file mode 100644
index 0000000..b8f19f3
--- /dev/null
+++ b/grpc_health.go
@@ -0,0 +1,68 @@
+/*
+ * Copyright (c) 2023 AlertAvert.com. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Author: Marco Massenzio (marco@alertavert.com)
+ */
+
+package main
+
+import (
+ "context"
+ "crypto/tls"
+ "flag"
+ "fmt"
+ "github.com/golang/protobuf/jsonpb"
+ protos "github.com/massenz/statemachine-proto/golang/api"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials"
+ "google.golang.org/grpc/credentials/insecure"
+ "google.golang.org/protobuf/types/known/emptypb"
+ "log"
+ "time"
+)
+
+// Most basic binary to run health checks on the server.
+// Used to assert readiness of the container/pod in Docker/Kubernetes.
+func main() {
+ var address = flag.String("host", "localhost:7398",
+ "The address (host:port) for the GRPC server")
+ var timeout = flag.Duration("timeout", 200*time.Millisecond,
+ "timeout expressed as a duration string (e.g., 200ms, 1s, etc.)")
+ var noTLS = flag.Bool("insecure", false,
+ "timeout expressed as a duration string (e.g., 200ms, 1s, etc.)")
+ flag.Parse()
+
+ var creds credentials.TransportCredentials
+ if *noTLS {
+ creds = insecure.NewCredentials()
+ } else {
+ config := &tls.Config{
+ InsecureSkipVerify: true,
+ }
+ creds = credentials.NewTLS(config)
+ }
+
+ cc, err := grpc.Dial(*address, grpc.WithTransportCredentials(creds))
+ defer cc.Close()
+ if err != nil {
+ log.Fatalf("cannot open connection to %s: %v", *address, err)
+ }
+
+ client := protos.NewStatemachineServiceClient(cc)
+ ctx, cancel := context.WithTimeout(context.Background(), *timeout)
+ defer cancel()
+
+ resp, err := client.Health(ctx, &emptypb.Empty{})
+ if err != nil {
+ log.Fatal("cannot connect to server:", err)
+ }
+ marshaler := &jsonpb.Marshaler{}
+ jsonString, err := marshaler.MarshalToString(resp)
+ if err != nil {
+ log.Fatal("Error while marshaling the message to JSON:", err)
+ }
+ fmt.Println(jsonString)
+}
diff --git a/pubsub/listener.go b/pubsub/listener.go
index a233a28..d9d9125 100644
--- a/pubsub/listener.go
+++ b/pubsub/listener.go
@@ -69,16 +69,16 @@ func (listener *EventsListener) ListenForMessages() {
fmt.Sprintf("could not store event: %v", err)))
continue
}
- fsm, ok := listener.store.GetStateMachine(fsmId, config)
- if !ok {
+ fsm, err := listener.store.GetStateMachine(fsmId, config)
+ if err != nil {
listener.PostNotificationAndReportOutcome(makeResponse(&request,
protos.EventOutcome_FsmNotFound,
fmt.Sprintf("statemachine [%s] could not be found", fsmId)))
continue
}
// TODO: cache the configuration locally: they are immutable anyway.
- cfg, ok := listener.store.GetConfig(fsm.ConfigId)
- if !ok {
+ cfg, err := listener.store.GetConfig(fsm.ConfigId)
+ if err != nil {
listener.PostNotificationAndReportOutcome(makeResponse(&request,
protos.EventOutcome_ConfigurationNotFound,
fmt.Sprintf("configuration [%s] could not be found", fsm.ConfigId)))
diff --git a/pubsub/listener_test.go b/pubsub/listener_test.go
index cb655a8..d547ccc 100644
--- a/pubsub/listener_test.go
+++ b/pubsub/listener_test.go
@@ -10,6 +10,7 @@
package pubsub_test
import (
+ . "github.com/JiaYongfei/respect/gomega"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
@@ -43,10 +44,11 @@ var _ = Describe("A Listener", func() {
// Set to DEBUG when diagnosing test failures
testListener.SetLogLevel(logging.NONE)
})
+ const eventId = "1234-abcdef"
It("can post error notifications", func() {
defer close(notificationsCh)
msg := protos.Event{
- EventId: "feed-beef",
+ EventId: eventId,
Originator: "me",
Transition: &protos.Transition{
Event: "test-me",
@@ -76,17 +78,18 @@ var _ = Describe("A Listener", func() {
})
It("can process well-formed events", func() {
event := protos.Event{
- EventId: "feed-beef",
+ EventId: eventId,
Originator: "me",
Transition: &protos.Transition{
Event: "move",
},
Details: "more details",
}
+ const requestId = "12345-faa44"
request := protos.EventRequest{
Event: &event,
Config: "test",
- Id: "12345-faa44",
+ Id: requestId,
}
Ω(store.PutConfig(&protos.Configuration{
Name: "test",
@@ -95,7 +98,7 @@ var _ = Describe("A Listener", func() {
Transitions: []*protos.Transition{{From: "start", To: "end", Event: "move"}},
StartingState: "start",
})).ToNot(HaveOccurred())
- Ω(store.PutStateMachine("12345-faa44", &protos.FiniteStateMachine{
+ Ω(store.PutStateMachine(requestId, &protos.FiniteStateMachine{
ConfigId: "test:v1",
State: "start",
History: nil,
@@ -109,21 +112,21 @@ var _ = Describe("A Listener", func() {
Eventually(func(g Gomega) {
// Now we want to test that the state machine was updated
- fsm, ok := store.GetStateMachine("12345-faa44", "test")
- g.Ω(ok).ToNot(BeFalse())
+ fsm, err := store.GetStateMachine(requestId, "test")
+ g.Ω(err).To(BeNil())
g.Ω(fsm.State).To(Equal("end"))
g.Ω(len(fsm.History)).To(Equal(1))
g.Ω(fsm.History[0].Details).To(Equal("more details"))
g.Ω(fsm.History[0].Transition.Event).To(Equal("move"))
}).Should(Succeed())
- Eventually(func() bool {
- _, found := store.GetEvent(event.EventId, "test")
- return found
- }).Should(BeTrue())
+ Eventually(func() storage.StoreErr {
+ _, err := store.GetEvent(event.EventId, "test")
+ return err
+ }).Should(BeNil())
})
It("sends notifications for missing state-machine", func() {
event := protos.Event{
- EventId: "feed-beef",
+ EventId: eventId,
Originator: "me",
Transition: &protos.Transition{
Event: "move",
@@ -152,7 +155,7 @@ var _ = Describe("A Listener", func() {
It("sends notifications for missing destinations", func() {
request := protos.EventRequest{
Event: &protos.Event{
- EventId: "feed-beef",
+ EventId: eventId,
},
}
go func() { testListener.ListenForMessages() }()
@@ -213,18 +216,22 @@ var _ = Describe("A Listener", func() {
return nil
}
}).Should(BeNil())
- Eventually(func() *protos.Event {
- e, _ := store.GetEvent(event.EventId, request.Config)
- return e
- }, 100*time.Millisecond, 20*time.Millisecond).ShouldNot(BeNil())
- Eventually(func() protos.EventOutcome_StatusCode {
- e, ok := store.GetOutcomeForEvent(event.EventId, request.Config)
- if ok {
- return e.Code
+ Eventually(func(g Gomega) {
+ evt, err := store.GetEvent(event.EventId, request.Config)
+ Ω(err).ToNot(HaveOccurred())
+ if evt != nil {
+ Ω(evt).To(Respect(&event))
} else {
- return protos.EventOutcome_GenericError
+ Fail("event is nil")
+ }
+ }, 100*time.Millisecond, 20*time.Millisecond).Should(Succeed())
+ Eventually(func(g Gomega) {
+ outcome, err := store.GetOutcomeForEvent(event.EventId, request.Config)
+ Ω(err).ToNot(HaveOccurred())
+ if outcome != nil {
+ Ω(outcome.Code).To(Equal(protos.EventOutcome_Ok))
}
- }, 100*time.Millisecond, 20*time.Millisecond).Should(Equal(protos.EventOutcome_Ok))
+ }, 100*time.Millisecond, 20*time.Millisecond).Should(Succeed())
})
})
})
diff --git a/pubsub/sqs_sub.go b/pubsub/sqs_sub.go
index 1a34be0..dbceb59 100644
--- a/pubsub/sqs_sub.go
+++ b/pubsub/sqs_sub.go
@@ -24,9 +24,6 @@ import (
protos "github.com/massenz/statemachine-proto/golang/api"
)
-// TODO: should we need to generalize and abstract the implementation of a Subscriber?
-// This would be necessary if we were to implement a different message broker (e.g., Kafka)
-
// getSqsClient connects to AWS and obtains an SQS client; passing `nil` as the `awsEndpointUrl` will
// connect by default to AWS; use a different (possibly local) URL for a LocalStack test deployment.
func getSqsClient(awsEndpointUrl *string) *sqs.SQS {
@@ -61,11 +58,12 @@ func NewSqsSubscriber(eventsChannel chan<- protos.EventRequest, sqsUrl *string)
return nil
}
return &SqsSubscriber{
- logger: log.NewLog("SQS-Sub"),
- client: client,
- events: eventsChannel,
- Timeout: DefaultVisibilityTimeout,
- PollingInterval: DefaultPollingInterval,
+ logger: log.NewLog("SQS-Sub"),
+ client: client,
+ events: eventsChannel,
+ Timeout: DefaultVisibilityTimeout,
+ PollingInterval: DefaultPollingInterval,
+ MessageRemoveRetries: DefaultRetries,
}
}
@@ -134,14 +132,14 @@ func (s *SqsSubscriber) ProcessMessage(msg *sqs.Message, queueUrl *string) {
var request protos.EventRequest
err := proto.UnmarshalText(*msg.Body, &request)
if err != nil {
- s.logger.Error("Message %v has invalid body: %s", msg.MessageId, err.Error())
+ s.logger.Error("message %v has invalid body: %s", msg.MessageId, err.Error())
// TODO: publish error to DLQ.
return
}
destId := request.GetId()
if destId == "" {
- errDetails := fmt.Sprintf("No Destination ID in %v", request.String())
+ errDetails := fmt.Sprintf("no Destination ID in %v", request.String())
s.logger.Error(errDetails)
// TODO: publish error to DLQ.
return
@@ -150,16 +148,19 @@ func (s *SqsSubscriber) ProcessMessage(msg *sqs.Message, queueUrl *string) {
api.UpdateEvent(request.Event)
s.events <- request
- s.logger.Debug("Removing message %v from SQS", *msg.MessageId)
- _, err = s.client.DeleteMessage(&sqs.DeleteMessageInput{
- QueueUrl: queueUrl,
- ReceiptHandle: msg.ReceiptHandle,
- })
- if err != nil {
- // FIXME: add retries
- errDetails := fmt.Sprintf("Failed to remove message %v from SQS", msg.MessageId)
- s.logger.Error("%s: %v", errDetails, err)
- // TODO: publish error to DLQ, should also retry removal here.
+ for i := 0; i < s.MessageRemoveRetries; i++ {
+ s.logger.Debug("removing message %v from SQS", *msg.MessageId)
+ _, err = s.client.DeleteMessage(&sqs.DeleteMessageInput{
+ QueueUrl: queueUrl,
+ ReceiptHandle: msg.ReceiptHandle,
+ })
+ if err != nil {
+ errDetails := fmt.Sprintf("failed to remove message %v from SQS (attempt: %d)",
+ msg.MessageId, i+1)
+ s.logger.Error("%s: %v", errDetails, err)
+ } else {
+ break
+ }
}
- s.logger.Trace("Message %v removed", msg.MessageId)
+ s.logger.Trace("message %v removed", msg.MessageId)
}
diff --git a/pubsub/types.go b/pubsub/types.go
index da69ecb..61651e0 100644
--- a/pubsub/types.go
+++ b/pubsub/types.go
@@ -25,6 +25,9 @@ const (
// message from the queue.
// See: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html
DefaultVisibilityTimeout = 5 * time.Second
+
+ // DefaultRetries is the number of times we will try to remove the message from the SQS queue
+ DefaultRetries = 3
)
// An EventsListener will process `EventRequests` in a separate goroutine.
@@ -59,9 +62,10 @@ type SqsPublisher struct {
// The subscriber will poll the queue for new messages,
// and will post them on the `events` channel from where an `EventsListener` will process them.
type SqsSubscriber struct {
- logger *log.Log
- client *sqs.SQS
- events chan<- protos.EventRequest
- Timeout time.Duration
- PollingInterval time.Duration
+ logger *log.Log
+ client *sqs.SQS
+ events chan<- protos.EventRequest
+ Timeout time.Duration
+ PollingInterval time.Duration
+ MessageRemoveRetries int
}
diff --git a/storage/redis_sets_store.go b/storage/redis_sets_store.go
deleted file mode 100644
index 98fde88..0000000
--- a/storage/redis_sets_store.go
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Copyright (c) 2022 AlertAvert.com. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Author: Marco Massenzio (marco@alertavert.com)
- */
-
-package storage
-
-import (
- "context"
- "fmt"
-)
-
-func (csm *RedisStore) UpdateState(cfgName string, id string, oldState string, newState string) error {
- var key string
- var err error
- if oldState != "" {
- key = NewKeyForMachinesByState(cfgName, oldState)
- err = csm.client.SRem(context.Background(), key, id).Err()
- if err != nil {
- return fmt.Errorf(
- "cannot remove FSM [%s#%s] from state set `%s`: %s",
- cfgName, id, oldState, err)
- }
- }
- if newState != "" {
- key = NewKeyForMachinesByState(cfgName, newState)
- err = csm.client.SAdd(context.Background(), key, id).Err()
- if err != nil {
- return fmt.Errorf(
- "cannot add FSM [%s#%s] to state set `%s`: %s",
- cfgName, id, newState, err)
- }
- }
- return nil
-}
-
-func (csm *RedisStore) GetAllInState(cfg string, state string) []string {
- // TODO: enable splitting results with a (cursor, count)
- csm.logger.Debug("Looking up all FSMs [%s] in DB with state `%s`", cfg, state)
- key := NewKeyForMachinesByState(cfg, state)
- fsms, err := csm.client.SMembers(context.Background(), key).Result()
- if err != nil {
- csm.logger.Error("Could not retrieve FSMs for state `%s`: %s", state, err)
- return nil
- }
- csm.logger.Debug("Returning %d items", len(fsms))
- return fsms
-}
-
-func (csm *RedisStore) GetAllConfigs() []string {
- // TODO: enable splitting results with a (cursor, count)
- csm.logger.Debug("Looking up all configs in DB")
- configs, err := csm.client.SMembers(context.Background(), ConfigsPrefix).Result()
- if err != nil {
- csm.logger.Error("Could not retrieve configurations: %s", err)
- return nil
- }
- csm.logger.Debug("Returning %d items", len(configs))
- return configs
-}
-
-func (csm *RedisStore) GetAllVersions(name string) []string {
- csm.logger.Debug("Looking up all versions for Configurations `%s` in DB", name)
- configs, err := csm.client.SMembers(context.Background(), NewKeyForConfig(name)).Result()
- if err != nil {
- csm.logger.Error("Could not retrieve configurations: %s", err)
- return nil
- }
- csm.logger.Debug("Returning %d items", len(configs))
- return configs
-}
diff --git a/storage/redis_store.go b/storage/redis_store.go
index ccaa75b..0650398 100644
--- a/storage/redis_store.go
+++ b/storage/redis_store.go
@@ -26,11 +26,12 @@ import (
)
const (
- NeverExpire = 0
- DefaultRedisPort = "6379"
- DefaultRedisDb = 0
- DefaultMaxRetries = 3
- DefaultTimeout = 200 * time.Millisecond
+ NeverExpire = 0
+ DefaultRedisDb = 0
+ DefaultMaxRetries = 3
+ DefaultTimeout = 200 * time.Millisecond
+ ReturningItemsFmt = "Returning %d items"
+ NoConfigurationsFmt = "Could not retrieve configurations: %s"
)
type RedisStore struct {
@@ -40,141 +41,11 @@ type RedisStore struct {
MaxRetries int
}
-func (csm *RedisStore) GetConfig(id string) (*protos.Configuration, bool) {
- key := NewKeyForConfig(id)
- var cfg protos.Configuration
- err := csm.get(key, &cfg)
- if err != nil {
- csm.logger.Error("Error retrieving configuration `%s`: %s", id, err.Error())
- return nil, false
- }
- return &cfg, true
-}
-
-func (csm *RedisStore) GetEvent(id string, cfg string) (*protos.Event, bool) {
- key := NewKeyForEvent(id, cfg)
- var event protos.Event
- err := csm.get(key, &event)
- if err != nil {
- csm.logger.Error("Error retrieving event `%s`: %s", key, err.Error())
- return nil, false
- }
- return &event, true
-}
-
-func (csm *RedisStore) GetStateMachine(id string, cfg string) (*protos.FiniteStateMachine, bool) {
- key := NewKeyForMachine(id, cfg)
- var stateMachine protos.FiniteStateMachine
- err := csm.get(key, &stateMachine)
- if err != nil {
- csm.logger.Error("cannot access store for state machine `%s`: %s", key, err.Error())
- return nil, false
- }
- return &stateMachine, true
-}
-
-func (csm *RedisStore) PutConfig(cfg *protos.Configuration) error {
- if cfg == nil {
- return IllegalStoreError("nil config")
- }
- key := NewKeyForConfig(api.GetVersionId(cfg))
- if csm.client.Exists(context.Background(), key).Val() == 1 {
- return AlreadyExistsError(key)
- }
- // TODO: Find out whether the client allows to batch requests, instead of sending multiple server requests
- csm.client.SAdd(context.Background(), ConfigsPrefix, cfg.Name)
- csm.client.SAdd(context.Background(), NewKeyForConfig(cfg.Name), api.GetVersionId(cfg))
- return csm.put(key, cfg, NeverExpire)
-}
-
-func (csm *RedisStore) PutEvent(event *protos.Event, cfg string, ttl time.Duration) error {
- if event == nil {
- return IllegalStoreError("nil event")
- }
- key := NewKeyForEvent(event.EventId, cfg)
- return csm.put(key, event, ttl)
-}
-
-func (csm *RedisStore) PutStateMachine(id string, stateMachine *protos.FiniteStateMachine) error {
- if stateMachine == nil {
- return IllegalStoreError("nil statemachine")
- }
- configName := strings.Split(stateMachine.ConfigId, api.ConfigurationVersionSeparator)[0]
- key := NewKeyForMachine(id, configName)
- return csm.put(key, stateMachine, NeverExpire)
-}
-
-func (csm *RedisStore) AddEventOutcome(id string, cfg string, response *protos.EventOutcome, ttl time.Duration) error {
- if response == nil {
- return IllegalStoreError("nil response")
- }
- key := NewKeyForOutcome(id, cfg)
- return csm.put(key, response, ttl)
-}
-
-func (csm *RedisStore) GetOutcomeForEvent(id string, cfg string) (*protos.EventOutcome, bool) {
- key := NewKeyForOutcome(id, cfg)
- var outcome protos.EventOutcome
- err := csm.get(key, &outcome)
- if err != nil {
- csm.logger.Error("Error retrieving outcome for event `%s`: %s", key, err.Error())
- return nil, false
- }
- return &outcome, true
-}
-
-func (csm *RedisStore) SetTimeout(duration time.Duration) {
- csm.Timeout = duration
-}
-
-func (csm *RedisStore) GetTimeout() time.Duration {
- return csm.Timeout
-}
-
-func NewRedisStoreWithDefaults(address string) StoreManager {
- return NewRedisStore(address, false, DefaultRedisDb, DefaultTimeout, DefaultMaxRetries)
-}
+/////// Internal methods
-func NewRedisStore(address string, isCluster bool, db int, timeout time.Duration, maxRetries int) StoreManager {
- logger := slf4go.NewLog(fmt.Sprintf("redis://%s/%d", address, db))
-
- var tlsConfig *tls.Config
- var client redis.Cmdable
-
- if os.Getenv("REDIS_TLS") != "" {
- logger.Info("Using TLS for Redis connection")
- tlsConfig = &tls.Config{MinVersion: tls.VersionTLS12}
- }
-
- if isCluster {
- client = redis.NewClusterClient(&redis.ClusterOptions{
- TLSConfig: tlsConfig,
- Addrs: strings.Split(address, ","),
- })
- } else {
- client = redis.NewClient(&redis.Options{
- TLSConfig: tlsConfig,
- Addr: address,
- DB: db, // 0 means default DB
- })
- }
-
- return &RedisStore{
- logger: slf4go.NewLog(fmt.Sprintf("redis://%s/%d", address, db)),
- client: client,
- Timeout: timeout,
- MaxRetries: maxRetries,
- }
-}
-
-// SetLogLevel for RedisStore implements the Loggable interface
-func (csm *RedisStore) SetLogLevel(level slf4go.LogLevel) {
- csm.logger.Level = level
-}
-
-// `get` abstracts away the common functionality of looking for a key in Redis,
+// get abstracts away the common functionality of looking for a key in Redis,
// with a given timeout and a number of retries.
-func (csm *RedisStore) get(key string, value proto.Message) error {
+func (csm *RedisStore) get(key string, value proto.Message) StoreErr {
attemptsLeft := csm.MaxRetries
csm.logger.Trace("Looking up key `%s` (Max retries: %d)", key, attemptsLeft)
var cancel context.CancelFunc
@@ -193,7 +64,7 @@ func (csm *RedisStore) get(key string, value proto.Message) error {
// The key isn't there, no point in retrying
csm.logger.Debug("Key `%s` not found", key)
cancel()
- return err
+ return NotFoundError(key)
} else if err != nil {
if ctx.Err() == context.DeadlineExceeded {
// The error here may be recoverable, so we'll keep trying until we run out of attempts
@@ -209,7 +80,7 @@ func (csm *RedisStore) get(key string, value proto.Message) error {
// This is a different error, we'll just return it
csm.logger.Error(err.Error())
cancel()
- return err
+ return GenericStoreError(err.Error())
}
} else {
cancel()
@@ -218,7 +89,7 @@ func (csm *RedisStore) get(key string, value proto.Message) error {
}
}
-func (csm *RedisStore) put(key string, value proto.Message, ttl time.Duration) error {
+func (csm *RedisStore) put(key string, value proto.Message, ttl time.Duration) StoreErr {
attemptsLeft := csm.MaxRetries
csm.logger.Trace("Storing key `%s` (Max retries: %d)", key, attemptsLeft)
var cancel context.CancelFunc
@@ -235,48 +106,259 @@ func (csm *RedisStore) put(key string, value proto.Message, ttl time.Duration) e
data, err := proto.Marshal(value)
if err != nil {
csm.logger.Error("cannot convert proto to bytes: %q", err)
- return err
+ return InvalidDataError(err.Error())
}
cmd := csm.client.Set(ctx, key, data, ttl)
_, err = cmd.Result()
if err != nil {
if ctx.Err() == context.DeadlineExceeded {
// The error here may be recoverable, so we'll keep trying until we run out of attempts
- csm.logger.Error(err.Error())
if attemptsLeft == 0 {
- csm.logger.Error("max retries reached, giving up")
- return err
+ return TooManyAttempts("")
}
csm.logger.Debug("retrying after timeout, attempts left: %d", attemptsLeft)
csm.wait()
} else {
- return err
+ return GenericStoreError(err.Error())
}
} else {
- csm.logger.Debug("Stored key `%s`", key)
+ csm.logger.Debug("stored value for key `%s`", key)
return nil
}
}
}
-func (csm *RedisStore) Health() error {
+// wait is a helper function that sleeps for a random amount of time between 0 and half second.
+// Poor man's backoff.
+//
+// TODO: should use some form of exponential backoff
+// TODO: wait time should be configurable
+func (csm *RedisStore) wait() {
+ waitForMsec := rand.Intn(500)
+ time.Sleep(time.Duration(waitForMsec) * time.Millisecond)
+}
+
+/////// StoreManager implementation
+
+// Health checks that the server is ready to accept connections
+func (csm *RedisStore) Health() StoreErr {
ctx, cancel := context.WithTimeout(context.Background(), csm.Timeout)
defer cancel()
_, err := csm.client.Ping(ctx).Result()
if err != nil {
csm.logger.Error("Error pinging redis: %s", err.Error())
- return fmt.Errorf("redis health check failed: %w", err)
+ return GenericStoreError(err.Error())
}
return nil
}
-// wait is a helper function that sleeps for a random amount of time between 0 and half second.
-// Poor man's backoff.
+func (csm *RedisStore) SetTimeout(duration time.Duration) {
+ csm.Timeout = duration
+}
+
+func (csm *RedisStore) GetTimeout() time.Duration {
+ return csm.Timeout
+}
+
+// SetLogLevel for RedisStore implements the Loggable interface
+func (csm *RedisStore) SetLogLevel(level slf4go.LogLevel) {
+ csm.logger.Level = level
+}
+
+/////// ConfigStore implementation
+
+func (csm *RedisStore) GetConfig(id string) (*protos.Configuration, StoreErr) {
+ key := NewKeyForConfig(id)
+ var cfg protos.Configuration
+ err := csm.get(key, &cfg)
+ if err != nil {
+ csm.logger.Error("cannot retrieve configuration: %v", err)
+ return nil, err
+ }
+ return &cfg, nil
+}
+
+func (csm *RedisStore) PutConfig(cfg *protos.Configuration) StoreErr {
+ if cfg == nil {
+ return InvalidDataError("nil config")
+ }
+ key := NewKeyForConfig(api.GetVersionId(cfg))
+ if csm.client.Exists(context.Background(), key).Val() == 1 {
+ return AlreadyExistsError(key)
+ }
+ // TODO: Find out whether the client allows to batch requests, instead of sending multiple server requests
+ csm.client.SAdd(context.Background(), ConfigsPrefix, cfg.Name)
+ csm.client.SAdd(context.Background(), NewKeyForConfig(cfg.Name), api.GetVersionId(cfg))
+ return csm.put(key, cfg, NeverExpire)
+}
+
+func (csm *RedisStore) GetAllConfigs() []string {
+ // TODO: enable splitting results with a (cursor, count)
+ csm.logger.Debug("Looking up all configs in DB")
+ configs, err := csm.client.SMembers(context.Background(), ConfigsPrefix).Result()
+ if err != nil {
+ csm.logger.Error(NoConfigurationsFmt, err)
+ return nil
+ }
+ csm.logger.Debug(ReturningItemsFmt, len(configs))
+ return configs
+}
+
+func (csm *RedisStore) GetAllVersions(name string) []string {
+ csm.logger.Debug("Looking up all versions for Configurations `%s` in DB", name)
+ configs, err := csm.client.SMembers(context.Background(), NewKeyForConfig(name)).Result()
+ if err != nil {
+ csm.logger.Error(NoConfigurationsFmt, err)
+ return nil
+ }
+ csm.logger.Debug(ReturningItemsFmt, len(configs))
+ return configs
+}
+
+/////// FSMStore implementation
+
+func (csm *RedisStore) GetStateMachine(id string, cfg string) (*protos.FiniteStateMachine, StoreErr) {
+ key := NewKeyForMachine(id, cfg)
+ var stateMachine protos.FiniteStateMachine
+ err := csm.get(key, &stateMachine)
+ if err != nil {
+ csm.logger.Error("error getting FSM `%s`: %v", key, err)
+ return nil, err
+ }
+ return &stateMachine, nil
+}
+
+func (csm *RedisStore) PutStateMachine(id string, stateMachine *protos.FiniteStateMachine) StoreErr {
+ if stateMachine == nil {
+ return InvalidDataError("nil statemachine")
+ }
+ configName := strings.Split(stateMachine.ConfigId, api.ConfigurationVersionSeparator)[0]
+ key := NewKeyForMachine(id, configName)
+ return csm.put(key, stateMachine, NeverExpire)
+}
+
+func (csm *RedisStore) GetAllInState(cfg string, state string) []string {
+ // TODO: enable splitting results with a (cursor, count)
+ csm.logger.Debug("Looking up all FSMs [%s] in DB with state `%s`", cfg, state)
+ key := NewKeyForMachinesByState(cfg, state)
+ fsms, err := csm.client.SMembers(context.Background(), key).Result()
+ if err != nil {
+ const format = "Could not retrieve FSMs for state `%s`: %s"
+ csm.logger.Error(format, state, err)
+ return nil
+ }
+ csm.logger.Debug(ReturningItemsFmt, len(fsms))
+ return fsms
+}
+
+func (csm *RedisStore) UpdateState(cfgName string, id string, oldState string, newState string) StoreErr {
+ var key string
+ var err error
+ if oldState != "" {
+ key = NewKeyForMachinesByState(cfgName, oldState)
+ err = csm.client.SRem(context.Background(), key, id).Err()
+ if err != nil {
+ return fmt.Errorf(
+ "cannot remove FSM [%s#%s] from state set `%s`: %s",
+ cfgName, id, oldState, err)
+ }
+ }
+ if newState != "" {
+ key = NewKeyForMachinesByState(cfgName, newState)
+ err = csm.client.SAdd(context.Background(), key, id).Err()
+ if err != nil {
+ return fmt.Errorf(
+ "cannot add FSM [%s#%s] to state set `%s`: %s",
+ cfgName, id, newState, err)
+ }
+ }
+ return nil
+}
+
+/////// EventStore implementation
+
+func (csm *RedisStore) GetEvent(id string, cfg string) (*protos.Event, StoreErr) {
+ key := NewKeyForEvent(id, cfg)
+ var event protos.Event
+ err := csm.get(key, &event)
+ if err != nil {
+ csm.logger.Error("cannot retrieve event `%s`: %v", key, err)
+ return nil, err
+ }
+ return &event, nil
+}
+
+func (csm *RedisStore) PutEvent(event *protos.Event, cfg string, ttl time.Duration) StoreErr {
+ if event == nil {
+ return InvalidDataError("nil event")
+ }
+ key := NewKeyForEvent(event.EventId, cfg)
+ return csm.put(key, event, ttl)
+}
+
+func (csm *RedisStore) AddEventOutcome(id string, cfg string, response *protos.EventOutcome, ttl time.Duration) StoreErr {
+ if response == nil {
+ return InvalidDataError("nil response")
+ }
+ key := NewKeyForOutcome(id, cfg)
+ return csm.put(key, response, ttl)
+}
+
+func (csm *RedisStore) GetOutcomeForEvent(id string, cfg string) (*protos.EventOutcome, StoreErr) {
+ key := NewKeyForOutcome(id, cfg)
+ var outcome protos.EventOutcome
+ err := csm.get(key, &outcome)
+ if err != nil {
+ csm.logger.Error("cannot retrieve outcome for event `%s`: %v", key, err)
+ return nil, err
+ }
+ return &outcome, nil
+}
+
+/////// Constructor methods
+
+// NewRedisStoreWithDefaults creates a new StoreManager backed by a Redis server, with
+// all default settings, in a single node configuration.
+func NewRedisStoreWithDefaults(address string) StoreManager {
+ return NewRedisStore(address, false, DefaultRedisDb, DefaultTimeout, DefaultMaxRetries)
+}
+
+// NewRedisStore creates a new StoreManager backed by a Redis server, reachable at address, in
+// cluster configuration if isCluster is set to true.
+// The db value indicates which database to use.
//
-// TODO: should use some form of exponential backoff
-// TODO: wait time should be configurable
-func (csm *RedisStore) wait() {
- waitForMsec := rand.Intn(500)
- time.Sleep(time.Duration(waitForMsec) * time.Millisecond)
+// Some store queries (typically the get and put actions) will be retried up to maxRetries times,
+// if they time out after timeout expires.
+// Use the [Health] function to check whether the store is reachable.
+func NewRedisStore(address string, isCluster bool, db int, timeout time.Duration, maxRetries int) StoreManager {
+ logger := slf4go.NewLog(fmt.Sprintf("redis://%s/%d", address, db))
+
+ var tlsConfig *tls.Config
+ var client redis.Cmdable
+
+ if os.Getenv("REDIS_TLS") != "" {
+ logger.Info("Using TLS for Redis connection")
+ tlsConfig = &tls.Config{MinVersion: tls.VersionTLS12}
+ }
+
+ if isCluster {
+ client = redis.NewClusterClient(&redis.ClusterOptions{
+ TLSConfig: tlsConfig,
+ Addrs: strings.Split(address, ","),
+ })
+ } else {
+ client = redis.NewClient(&redis.Options{
+ TLSConfig: tlsConfig,
+ Addr: address,
+ DB: db, // 0 means default DB
+ })
+ }
+
+ return &RedisStore{
+ logger: slf4go.NewLog(fmt.Sprintf("redis://%s/%d", address, db)),
+ client: client,
+ Timeout: timeout,
+ MaxRetries: maxRetries,
+ }
}
diff --git a/storage/redis_store_test.go b/storage/redis_store_test.go
index be13b5d..dbffdaa 100644
--- a/storage/redis_store_test.go
+++ b/storage/redis_store_test.go
@@ -22,9 +22,48 @@ import (
protos "github.com/massenz/statemachine-proto/golang/api"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
+ "strings"
)
-var _ = Describe("RedisStore", func() {
+const (
+ cfgName = "orders"
+ fsmIdFmt = "fsm-%d"
+)
+
+// configId is a constant in all but name
+var configId = strings.Join([]string{cfgName, "v4"}, api.ConfigurationVersionSeparator)
+
+func setupStoreRedis() (storage.StoreManager, *redis.Client) {
+ store := storage.NewRedisStoreWithDefaults(container.Address)
+ Ω(store).ToNot(BeNil())
+ store.SetLogLevel(slf4go.NONE)
+
+ // This is used to go "behind the back" of our StoreManager and mess with it for testing
+ // purposes. Do NOT do this in your code.
+ rdb := redis.NewClient(&redis.Options{
+ Addr: container.Address,
+ DB: storage.DefaultRedisDb,
+ })
+ return store, rdb
+}
+
+func storeSomeFSMs(store storage.StoreManager, count int) {
+ for id := 1; id < count; id++ {
+ fsm := &protos.FiniteStateMachine{
+ ConfigId: configId,
+ State: "in_transit",
+ History: []*protos.Event{
+ {Transition: &protos.Transition{Event: "confirmed"}, Originator: "bot"},
+ {Transition: &protos.Transition{Event: "shipped"}, Originator: "bot"},
+ },
+ }
+ fsmId := fmt.Sprintf(fsmIdFmt, id)
+ Ω(store.PutStateMachine(fsmId, fsm)).ToNot(HaveOccurred())
+ Ω(store.UpdateState("orders", fsmId, "", fsm.State))
+ }
+}
+
+var _ = Describe("Redis Store", func() {
Context("for simple operations", func() {
var store storage.StoreManager
@@ -37,64 +76,52 @@ var _ = Describe("RedisStore", func() {
Version: "v3",
StartingState: "start",
}
- Expect(container).ToNot(BeNil())
- store = storage.NewRedisStoreWithDefaults(container.Address)
- Expect(store).ToNot(BeNil())
- // Mute unnecessary logging during tests; re-enable (
- // and set to DEBUG) when diagnosing failures.
- store.SetLogLevel(slf4go.NONE)
-
- // This is used to go "behind the back" of our StoreManager and mess with it for testing
- // purposes. Do NOT do this in your code.
- rdb = redis.NewClient(&redis.Options{
- Addr: container.Address,
- DB: storage.DefaultRedisDb,
- })
+ Ω(container).ToNot(BeNil())
+ store, rdb = setupStoreRedis()
}, 0.5)
AfterEach(func() {
// Cleaning up the DB to prevent "dirty" store to impact test results
rdb.FlushDB(context.Background())
}, 0.2)
It("is healthy", func() {
- Expect(store.Health()).To(Succeed())
+ Ω(store.Health()).To(Succeed())
})
It("can get a configuration back", func() {
id := api.GetVersionId(cfg)
val, _ := proto.Marshal(cfg)
res, err := rdb.Set(context.Background(), storage.NewKeyForConfig(id), val,
storage.NeverExpire).Result()
- Expect(err).ToNot(HaveOccurred())
- Expect(res).To(Equal("OK"))
+ Ω(err).ToNot(HaveOccurred())
+ Ω(res).To(Equal("OK"))
- data, ok := store.GetConfig(id)
- Expect(ok).To(BeTrue())
- Expect(data).ToNot(BeNil())
- Expect(api.GetVersionId(data)).To(Equal(api.GetVersionId(cfg)))
+ data, err := store.GetConfig(id)
+ Ω(err).To(BeNil())
+ Ω(data).ToNot(BeNil())
+ Ω(api.GetVersionId(data)).To(Equal(api.GetVersionId(cfg)))
})
It("will return orderly if the id does not exist", func() {
id := "fake"
- data, ok := store.GetConfig(id)
- Expect(ok).To(BeFalse())
- Expect(data).To(BeNil())
+ data, err := store.GetConfig(id)
+ Ω(err).ToNot(BeNil())
+ Ω(data).To(BeNil())
})
It("can save configurations", func() {
var found protos.Configuration
- Expect(store.PutConfig(cfg)).ToNot(HaveOccurred())
+ Ω(store.PutConfig(cfg)).ToNot(HaveOccurred())
val, err := rdb.Get(context.Background(),
storage.NewKeyForConfig(api.GetVersionId(cfg))).Bytes()
- Expect(err).ToNot(HaveOccurred())
+ Ω(err).ToNot(HaveOccurred())
- Expect(proto.Unmarshal(val, &found)).ToNot(HaveOccurred())
- Expect(&found).To(Respect(cfg))
+ Ω(proto.Unmarshal(val, &found)).ToNot(HaveOccurred())
+ Ω(&found).To(Respect(cfg))
})
It("will not save a duplicate configurations", func() {
- Expect(store.PutConfig(cfg)).ToNot(HaveOccurred())
- Expect(store.PutConfig(cfg)).To(HaveOccurred())
+ Ω(store.PutConfig(cfg)).ToNot(HaveOccurred())
+ Ω(store.PutConfig(cfg)).To(HaveOccurred())
})
It("should not fail for a non-existent FSM", func() {
- data, ok := store.GetStateMachine("fake", "bad-config")
- Expect(ok).To(BeFalse())
- Expect(data).To(BeNil())
+ _, err := store.GetStateMachine("fake", "bad-config")
+ Ω(err).ToNot(BeNil())
})
It("can get an FSM back", func() {
id := uuid.New().String()
@@ -108,88 +135,88 @@ var _ = Describe("RedisStore", func() {
key := storage.NewKeyForMachine(id, fsm.ConfigId)
res, err := rdb.Set(context.Background(), key, val, storage.NeverExpire).Result()
- Expect(err).ToNot(HaveOccurred())
- Expect(res).To(Equal("OK"))
+ Ω(err).ToNot(HaveOccurred())
+ Ω(res).To(Equal("OK"))
- data, ok := store.GetStateMachine(id, "cfg_id")
- Expect(ok).To(BeTrue())
- Expect(data).ToNot(BeNil())
- Expect(data).To(Respect(fsm))
+ data, err := store.GetStateMachine(id, "cfg_id")
+ Ω(err).To(BeNil())
+ Ω(data).ToNot(BeNil())
+ Ω(data).To(Respect(fsm))
})
It("can save an FSM", func() {
id := "99" // uuid.New().String()
var found protos.FiniteStateMachine
fsm := &protos.FiniteStateMachine{
- ConfigId: "orders:v4",
+ ConfigId: configId,
State: "in_transit",
History: []*protos.Event{
{Transition: &protos.Transition{Event: "confirmed"}, Originator: "bot"},
{Transition: &protos.Transition{Event: "shipped"}, Originator: "bot"},
},
}
- Expect(store.PutStateMachine(id, fsm)).ToNot(HaveOccurred())
- val, err := rdb.Get(context.Background(), storage.NewKeyForMachine(id, "orders")).Bytes()
- Expect(err).ToNot(HaveOccurred())
+ Ω(store.PutStateMachine(id, fsm)).ToNot(HaveOccurred())
+ val, err := rdb.Get(context.Background(), storage.NewKeyForMachine(id, cfgName)).Bytes()
+ Ω(err).ToNot(HaveOccurred())
- Expect(proto.Unmarshal(val, &found)).ToNot(HaveOccurred())
+ Ω(proto.Unmarshal(val, &found)).ToNot(HaveOccurred())
// NOTE: this fails, even though the Protobufs are actually identical:
- // Expect(found).To(Respect(*fsm))
+ // Ω(found).To(Respect(*fsm))
// it strangely fails on the History field, which is a slice, and actually matches.
- Expect(found.ConfigId).To(Equal(fsm.ConfigId))
- Expect(found.State).To(Equal(fsm.State))
- Expect(found.ConfigId).To(Equal(fsm.ConfigId))
- Expect(found.History).To(HaveLen(len(fsm.History)))
- Expect(found.History[0]).To(Respect(fsm.History[0]))
- Expect(found.History[1]).To(Respect(fsm.History[1]))
+ Ω(found.ConfigId).To(Equal(fsm.ConfigId))
+ Ω(found.State).To(Equal(fsm.State))
+ Ω(found.ConfigId).To(Equal(fsm.ConfigId))
+ Ω(found.History).To(HaveLen(len(fsm.History)))
+ Ω(found.History[0]).To(Respect(fsm.History[0]))
+ Ω(found.History[1]).To(Respect(fsm.History[1]))
})
It("can get events back", func() {
id := uuid.New().String()
ev := api.NewEvent("confirmed")
- key := storage.NewKeyForEvent(id, "orders")
+ key := storage.NewKeyForEvent(id, cfgName)
val, _ := proto.Marshal(ev)
_, err := rdb.Set(context.Background(), key, val, storage.NeverExpire).Result()
- Expect(err).ToNot(HaveOccurred())
+ Ω(err).ToNot(HaveOccurred())
- found, ok := store.GetEvent(id, "orders")
- Expect(ok).To(BeTrue())
- Expect(found).To(Respect(ev))
+ found, err := store.GetEvent(id, cfgName)
+ Ω(err).To(BeNil())
+ Ω(found).To(Respect(ev))
})
It("can save events", func() {
ev := api.NewEvent("confirmed")
id := ev.EventId
- Expect(store.PutEvent(ev, "orders", storage.NeverExpire)).ToNot(HaveOccurred())
- val, err := rdb.Get(context.Background(), storage.NewKeyForEvent(id, "orders")).Bytes()
- Expect(err).ToNot(HaveOccurred())
+ Ω(store.PutEvent(ev, cfgName, storage.NeverExpire)).ToNot(HaveOccurred())
+ val, err := rdb.Get(context.Background(), storage.NewKeyForEvent(id, cfgName)).Bytes()
+ Ω(err).ToNot(HaveOccurred())
var found protos.Event
- Expect(proto.Unmarshal(val, &found)).ToNot(HaveOccurred())
- Expect(&found).To(Respect(ev))
+ Ω(proto.Unmarshal(val, &found)).ToNot(HaveOccurred())
+ Ω(&found).To(Respect(ev))
})
It("will return an error for a non-existent event", func() {
- _, ok := store.GetEvent("fake", "orders")
- Expect(ok).To(BeFalse())
+ _, err := store.GetEvent("fake", cfgName)
+ Ω(err).To(HaveOccurred())
})
It("can save an event Outcome", func() {
id := uuid.New().String()
- cfg := "orders"
+ cfg := cfgName
response := &protos.EventOutcome{
Code: protos.EventOutcome_Ok,
Config: "test",
Id: "1234-feed-beef",
Details: "this was just a test",
}
- Expect(store.AddEventOutcome(id, cfg, response, storage.NeverExpire)).ToNot(HaveOccurred())
+ Ω(store.AddEventOutcome(id, cfg, response, storage.NeverExpire)).ToNot(HaveOccurred())
key := storage.NewKeyForOutcome(id, cfg)
val, err := rdb.Get(context.Background(), key).Bytes()
- Expect(err).ToNot(HaveOccurred())
+ Ω(err).ToNot(HaveOccurred())
var found protos.EventOutcome
- Expect(proto.Unmarshal(val, &found)).ToNot(HaveOccurred())
- Expect(&found).To(Respect(response))
+ Ω(proto.Unmarshal(val, &found)).ToNot(HaveOccurred())
+ Ω(&found).To(Respect(response))
})
It("can get an event Outcome", func() {
id := uuid.New().String()
- cfg := "orders"
+ cfg := cfgName
response := &protos.EventOutcome{
Code: protos.EventOutcome_Ok,
Details: "this was just a test",
@@ -198,22 +225,22 @@ var _ = Describe("RedisStore", func() {
key := storage.NewKeyForOutcome(id, cfg)
val, _ := proto.Marshal(response)
_, err := rdb.Set(context.Background(), key, val, storage.NeverExpire).Result()
- Expect(err).ToNot(HaveOccurred())
- found, ok := store.GetOutcomeForEvent(id, cfg)
- Expect(ok).To(BeTrue())
- Expect(found).To(Respect(response))
+ Ω(err).ToNot(HaveOccurred())
+ found, err := store.GetOutcomeForEvent(id, cfg)
+ Ω(err).ToNot(HaveOccurred())
+ Ω(found).To(Respect(response))
})
It("should gracefully handle a nil Configuration", func() {
- Expect(store.PutConfig(nil)).To(HaveOccurred())
+ Ω(store.PutConfig(nil)).To(HaveOccurred())
})
It("should gracefully handle a nil Statemachine", func() {
- Expect(store.PutStateMachine("fake", nil)).To(HaveOccurred())
+ Ω(store.PutStateMachine("fake", nil)).To(HaveOccurred())
})
It("should gracefully handle a nil Event", func() {
- Expect(store.PutEvent(nil, "orders", storage.NeverExpire)).To(HaveOccurred())
+ Ω(store.PutEvent(nil, cfgName, storage.NeverExpire)).To(HaveOccurred())
})
It("should gracefully handle a nil Outcome", func() {
- Expect(store.AddEventOutcome("fake", "test", nil,
+ Ω(store.AddEventOutcome("fake", "test", nil,
storage.NeverExpire)).To(HaveOccurred())
})
})
@@ -223,17 +250,7 @@ var _ = Describe("RedisStore", func() {
var rdb *redis.Client
BeforeEach(func() {
- Expect(container).ToNot(BeNil())
- store = storage.NewRedisStoreWithDefaults(container.Address)
- Expect(store).ToNot(BeNil())
- store.SetLogLevel(slf4go.NONE)
-
- // This is used to go "behind the back" of our StoreManager and mess with it for testing
- // purposes. Do NOT do this in your code.
- rdb = redis.NewClient(&redis.Options{
- Addr: container.Address,
- DB: storage.DefaultRedisDb,
- })
+ store, rdb = setupStoreRedis()
}, 0.5)
AfterEach(func() {
// Cleaning up the DB to prevent "dirty" store to impact test results
@@ -241,26 +258,26 @@ var _ = Describe("RedisStore", func() {
}, 0.2)
It("can get all configuration names", func() {
- for _, name := range []string{"orders", "devices", "users"} {
- Expect(store.PutConfig(&protos.Configuration{Name: name, Version: "v3", StartingState: "start"})).
+ for _, name := range []string{cfgName, "devices", "users"} {
+ Ω(store.PutConfig(&protos.Configuration{Name: name, Version: "v3", StartingState: "start"})).
ToNot(HaveOccurred())
}
configs := store.GetAllConfigs()
- Expect(len(configs)).To(Equal(3))
- Expect(configs).To(ContainElements("orders", "devices", "users"))
+ Ω(len(configs)).To(Equal(3))
+ Ω(configs).To(ContainElements(cfgName, "devices", "users"))
})
It("can get all versions of a configuration", func() {
for _, version := range []string{"v1alpha1", "v1beta", "v1"} {
- Expect(store.PutConfig(&protos.Configuration{Name: "orders", Version: version, StartingState: "start"})).
+ Ω(store.PutConfig(&protos.Configuration{Name: cfgName, Version: version, StartingState: "start"})).
ToNot(HaveOccurred())
}
- configs := store.GetAllVersions("orders")
- Expect(len(configs)).To(Equal(3))
- Expect(configs).To(ContainElements("orders:v1alpha1", "orders:v1beta", "orders:v1"))
+ configs := store.GetAllVersions(cfgName)
+ Ω(len(configs)).To(Equal(3))
+ Ω(configs).To(ContainElements("orders:v1alpha1", "orders:v1beta", "orders:v1"))
})
It("returns an empty slice for a non-existent config", func() {
configs := store.GetAllVersions("fake")
- Expect(len(configs)).To(Equal(0))
+ Ω(len(configs)).To(Equal(0))
})
})
When("querying for FSMs", func() {
@@ -268,74 +285,40 @@ var _ = Describe("RedisStore", func() {
var rdb *redis.Client
BeforeEach(func() {
- Expect(container).ToNot(BeNil())
- store = storage.NewRedisStoreWithDefaults(container.Address)
- Expect(store).ToNot(BeNil())
- store.SetLogLevel(slf4go.NONE)
-
- // This is used to go "behind the back" of our StoreManager and mess with it for testing
- // purposes. Do NOT do this in your code.
- rdb = redis.NewClient(&redis.Options{
- Addr: container.Address,
- DB: storage.DefaultRedisDb,
- })
+ store, rdb = setupStoreRedis()
}, 0.5)
AfterEach(func() {
// Cleaning up the DB to prevent "dirty" store to impact test results
rdb.FlushDB(context.Background())
}, 0.2)
It("finds them by state", func() {
+ storeSomeFSMs(store, 5)
+ res := store.GetAllInState(cfgName, "in_transit")
+ Ω(len(res)).To(Equal(4))
for id := 1; id < 5; id++ {
- fsm := &protos.FiniteStateMachine{
- ConfigId: "orders:v4",
- State: "in_transit",
- History: []*protos.Event{
- {Transition: &protos.Transition{Event: "confirmed"}, Originator: "bot"},
- {Transition: &protos.Transition{Event: "shipped"}, Originator: "bot"},
- },
- }
- fsmId := fmt.Sprintf("fsm-%d", id)
- Expect(store.PutStateMachine(fsmId, fsm)).ToNot(HaveOccurred())
- Expect(store.UpdateState("orders", fsmId, "", fsm.State))
- }
- res := store.GetAllInState("orders", "in_transit")
- Expect(len(res)).To(Equal(4))
- for id := 1; id < 5; id++ {
- Expect(res).To(ContainElement(fmt.Sprintf("fsm-%d", id)))
+ Ω(res).To(ContainElement(fmt.Sprintf(fsmIdFmt, id)))
}
})
When("transitioning state", func() {
BeforeEach(func() {
- for id := 1; id < 10; id++ {
- fsm := &protos.FiniteStateMachine{
- ConfigId: "orders:v4",
- State: "in_transit",
- History: []*protos.Event{
- {Transition: &protos.Transition{Event: "confirmed"}, Originator: "bot"},
- {Transition: &protos.Transition{Event: "shipped"}, Originator: "bot"},
- },
- }
- fsmId := fmt.Sprintf("fsm-%d", id)
- Expect(store.PutStateMachine(fsmId, fsm)).ToNot(HaveOccurred())
- Expect(store.UpdateState("orders", fsmId, "", fsm.State))
- }
+ storeSomeFSMs(store, 10)
})
It("finds them", func() {
for id := 3; id < 6; id++ {
- fsmId := fmt.Sprintf("fsm-%d", id)
- Expect(store.UpdateState("orders", fsmId, "in_transit", "shipped"))
+ fsmId := fmt.Sprintf(fsmIdFmt, id)
+ Ω(store.UpdateState(cfgName, fsmId, "in_transit", "shipped"))
}
- res := store.GetAllInState("orders", "shipped")
- Expect(len(res)).To(Equal(3))
+ res := store.GetAllInState(cfgName, "shipped")
+ Ω(len(res)).To(Equal(3))
for id := 3; id < 6; id++ {
- Expect(res).To(ContainElement(fmt.Sprintf("fsm-%d", id)))
+ Ω(res).To(ContainElement(fmt.Sprintf(fsmIdFmt, id)))
}
- res = store.GetAllInState("orders", "in_transit")
- Expect(len(res)).To(Equal(6))
+ res = store.GetAllInState(cfgName, "in_transit")
+ Ω(len(res)).To(Equal(6))
})
It("will remove with an empty newState", func() {
- Expect(store.UpdateState("orders", "fsm-1", "in_transit", "")).To(Succeed())
- res := store.GetAllInState("orders", "in_transit")
+ Ω(store.UpdateState(cfgName, "fsm-1", "in_transit", "")).To(Succeed())
+ res := store.GetAllInState(cfgName, "in_transit")
Ω(res).ToNot(ContainElement("fsm-1"))
})
})
diff --git a/storage/storage_suite_test.go b/storage/storage_suite_test.go
index 09a95dc..0735bc9 100644
--- a/storage/storage_suite_test.go
+++ b/storage/storage_suite_test.go
@@ -28,7 +28,8 @@ var container *internals.Container
var _ = BeforeSuite(func() {
var err error
container, err = internals.NewRedisContainer(context.Background())
- Expect(err).ToNot(HaveOccurred())
+ Ω(err).ToNot(HaveOccurred())
+ Ω(container).ToNot(BeNil())
// Note the timeout here is in seconds (and it's not a time.Duration either)
}, 5.0)
diff --git a/storage/types.go b/storage/types.go
index 258fa8a..afd1124 100644
--- a/storage/types.go
+++ b/storage/types.go
@@ -16,22 +16,26 @@ import (
"time"
)
-func Error(msg string) func(string) error {
+type StoreErr = error
+
+func Error(msg string) func(string) StoreErr {
return func(key string) error {
return fmt.Errorf(msg, key)
}
}
var (
- IllegalStoreError = Error("error storing invalid data: %v")
AlreadyExistsError = Error("key %s already exists")
+ GenericStoreError = Error("store error: %v")
+ InvalidDataError = Error("error storing invalid data: %v")
NotFoundError = Error("key %s not found")
NotImplementedError = Error("functionality %s has not been implemented yet")
+ TooManyAttempts = Error("retries exceeded")
)
-type ConfigurationStorageManager interface {
- GetConfig(versionId string) (*protos.Configuration, bool)
- PutConfig(cfg *protos.Configuration) error
+type ConfigStore interface {
+ GetConfig(versionId string) (*protos.Configuration, StoreErr)
+ PutConfig(cfg *protos.Configuration) StoreErr
// GetAllConfigs returns all the `Configurations` that exist in the server, regardless of
// the version, and whether are used or not by an FSM.
@@ -42,16 +46,16 @@ type ConfigurationStorageManager interface {
GetAllVersions(name string) []string
}
-type FiniteStateMachineStorageManager interface {
+type FSMStore interface {
// GetStateMachine will find the FSM with `id and that is configured via a `Configuration` whose
// `name` matches `cfg` (without the `version`).
- GetStateMachine(id string, cfg string) (*protos.FiniteStateMachine, bool)
+ GetStateMachine(id string, cfg string) (*protos.FiniteStateMachine, StoreErr)
// PutStateMachine creates or updates the FSM whose `id` is given.
// No further action is taken: no check that the referenced `Configuration` exists, and the
// `state` SETs are not updated either: it is the caller's responsibility to call the
// `UpdateState` method (possibly with an empty `oldState`, in the case of creation).
- PutStateMachine(id string, fsm *protos.FiniteStateMachine) error
+ PutStateMachine(id string, fsm *protos.FiniteStateMachine) StoreErr
// GetAllInState looks up all the FSMs that are currently in the given `state` and
// are configured with a `Configuration` whose name matches `cfg` (regardless of the
@@ -67,12 +71,12 @@ type FiniteStateMachineStorageManager interface {
// (or not, as the case may be).
//
// `oldState` may be empty in the case of a new FSM being created.
- UpdateState(cfgName string, id string, oldState string, newState string) error
+ UpdateState(cfgName string, id string, oldState string, newState string) StoreErr
}
-type EventStorageManager interface {
- GetEvent(id string, cfg string) (*protos.Event, bool)
- PutEvent(event *protos.Event, cfg string, ttl time.Duration) error
+type EventStore interface {
+ GetEvent(id string, cfg string) (*protos.Event, StoreErr)
+ PutEvent(event *protos.Event, cfg string, ttl time.Duration) StoreErr
// AddEventOutcome adds the outcome of an event to the storage, given the `eventId` and the
// "type" (`Configuration.Name`) of the FSM that received the event.
@@ -80,18 +84,18 @@ type EventStorageManager interface {
// Optionally, it will remove the outcome after a given `ttl` (time-to-live); use
// `NeverExpire` to keep the outcome forever.
AddEventOutcome(eventId string, cfgName string, response *protos.EventOutcome,
- ttl time.Duration) error
+ ttl time.Duration) StoreErr
// GetOutcomeForEvent returns the outcome of an event, given the `eventId` and the "type" of the
// FSM that received the event.
- GetOutcomeForEvent(eventId string, cfgName string) (*protos.EventOutcome, bool)
+ GetOutcomeForEvent(eventId string, cfgName string) (*protos.EventOutcome, StoreErr)
}
type StoreManager interface {
log.Loggable
- ConfigurationStorageManager
- FiniteStateMachineStorageManager
- EventStorageManager
+ ConfigStore
+ FSMStore
+ EventStore
SetTimeout(duration time.Duration)
GetTimeout() time.Duration
Health() error