Skip to content

Commit

Permalink
Release 0.8.0
Browse files Browse the repository at this point in the history
  • Loading branch information
massenz committed Dec 28, 2022
2 parents 60ee5c0 + 0d167f5 commit cbdfaf2
Show file tree
Hide file tree
Showing 25 changed files with 1,008 additions and 342 deletions.
12 changes: 2 additions & 10 deletions .run/Run All Tests.run.xml → .run/Run all tests.run.xml
Original file line number Diff line number Diff line change
@@ -1,21 +1,13 @@
<!--
~ Copyright (c) 2022 AlertAvert.com. All rights reserved.
~
~ Licensed under the Apache License, Version 2.0
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Author: Marco Massenzio ([email protected])
-->

<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Run all tests" type="GoTestRunConfiguration" factoryName="Go Test" singleton="false">
<module name="statemachine" />
<working_directory value="$PROJECT_DIR$" />
<parameters value="./api ./grpc ./pubsub ./storage ./server" />
<kind value="DIRECTORY" />
<package value="github.com/massenz/go-statemachine" />
<directory value="$PROJECT_DIR$" />
<filePath value="$PROJECT_DIR$" />
<framework value="gotest" />
<method v="2" />
</configuration>
</component>
</component>
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ help: ## Display this help.
@awk 'BEGIN {FS = ":.*##"; printf "\nUsage:\n make \033[36m<target>\033[0m\n"} /^[a-zA-Z_0-9-]+:.*?##/ { printf " \033[36m%-15s\033[0m %s\n", $$1, $$2 } /^##@/ { printf "\n\033[1m%s\033[0m\n", substr($$0, 5) } ' $(MAKEFILE_LIST)

.PHONY: clean
img=$(shell docker images -q --filter=reference=$(image))
clean: ## Cleans up the binary, container image and other data
@rm -f $(out)
@docker-compose -f $(compose) down
@docker rmi $(shell docker images -q --filter=reference=$(image))
@[ ! -z $(img) ] && docker rmi $(img) || true

.PHONY: build test container cov clean fmt
fmt: ## Formats the Go source code using 'go fmt'
Expand Down
2 changes: 1 addition & 1 deletion build.settings
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
# Build configuration

version = 0.7.1
version = 0.8.0
Binary file added docs/images/datamodel.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
7 changes: 3 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ require (
github.com/google/uuid v1.3.0
github.com/gorilla/mux v1.8.0
github.com/massenz/slf4go v0.3.2-g4eb5504
github.com/massenz/statemachine-proto/golang v0.6.0-ga901a76
github.com/massenz/statemachine-proto/golang v1.1.0-beta-g1fc5dd8
github.com/onsi/ginkgo v1.16.5
github.com/onsi/gomega v1.18.1
github.com/testcontainers/testcontainers-go v0.16.0
google.golang.org/grpc v1.49.0
google.golang.org/grpc v1.51.0
google.golang.org/protobuf v1.28.1
)

Expand Down Expand Up @@ -132,7 +132,7 @@ require (
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.1.0 // indirect
golang.org/x/term v0.0.0-20220526004731-065cf7ba2467 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/text v0.4.0 // indirect
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20220617124728-180714bec0ad // indirect
Expand All @@ -153,7 +153,6 @@ replace (
github.com/docker/cli => github.com/docker/cli v20.10.3-0.20221013132413-1d6c6e2367e2+incompatible // 22.06 master branch
github.com/docker/docker => github.com/docker/docker v20.10.3-0.20221013203545-33ab36d6b304+incompatible // 22.06 branch
github.com/moby/buildkit => github.com/moby/buildkit v0.10.1-0.20220816171719-55ba9d14360a // same as buildx

github.com/opencontainers/runc => github.com/opencontainers/runc v1.1.2 // Can be removed on next bump of containerd to > 1.6.4

// For k8s dependencies, we use a replace directive, to prevent them being
Expand Down
11 changes: 6 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -380,8 +380,8 @@ github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN
github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/massenz/slf4go v0.3.2-g4eb5504 h1:tRrxPOKcqNKQn25eS8Dy9bW3NMNPpuK4Sla9jKfWmSs=
github.com/massenz/slf4go v0.3.2-g4eb5504/go.mod h1:ZJjthXAnZMJGwXUz3Z3v5uyban00uAFFoDYODOoLFpw=
github.com/massenz/statemachine-proto/golang v0.6.0-ga901a76 h1:tik7Xn5GL+w9U5RTJZ3mieoP2sun6RDM+cUBi7WGrUU=
github.com/massenz/statemachine-proto/golang v0.6.0-ga901a76/go.mod h1:EkwQg7wD6c/cmXVxfqNaUOVSrBLlti+xYljIxaQNJqA=
github.com/massenz/statemachine-proto/golang v1.1.0-beta-g1fc5dd8 h1:Dp2yv070ogiHLwQU5LppXskUDnCoO8tDkqgszyZMNmk=
github.com/massenz/statemachine-proto/golang v1.1.0-beta-g1fc5dd8/go.mod h1:g6CkyXxfs7XF8wv6OLdMZZDUu0fn4PY6HQQ2WDbW3GU=
github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
github.com/mattn/go-colorable v0.1.12 h1:jF+Du6AlPIjs2BiUiQlKOX0rt3SujHxPnksPKZbaA40=
github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4=
Expand Down Expand Up @@ -820,8 +820,9 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.4.0 h1:BrVqGRd7+k1DiOgtnFvAkoQEWQvBc25ouMJM6429SFg=
golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
Expand Down Expand Up @@ -954,8 +955,8 @@ google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAG
google.golang.org/grpc v1.43.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU=
google.golang.org/grpc v1.44.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU=
google.golang.org/grpc v1.47.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
google.golang.org/grpc v1.49.0 h1:WTLtQzmQori5FUH25Pq4WT22oCsv8USpQ+F6rqtsmxw=
google.golang.org/grpc v1.49.0/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCDK+GI=
google.golang.org/grpc v1.51.0 h1:E1eGv1FTqoLIdnBCZufiSHgKjlqG6fKFf6pPWtMTh8U=
google.golang.org/grpc v1.51.0/go.mod h1:wgNDFcnuBGmxLKI/qn4T+m5BtEBYXJPvibbUPsAIPww=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
Expand Down
149 changes: 112 additions & 37 deletions grpc/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/wrapperspb"
"strings"
"time"

Expand All @@ -32,6 +33,9 @@ type Config struct {
Timeout time.Duration
}

type StatemachineStream = protos.StatemachineService_StreamAllInstateServer
type ConfigurationStream = protos.StatemachineService_StreamAllConfigurationsServer

var _ protos.StatemachineServiceServer = (*grpcSubscriber)(nil)

const (
Expand All @@ -43,9 +47,9 @@ type grpcSubscriber struct {
*Config
}

func (s *grpcSubscriber) ProcessEvent(ctx context.Context, request *protos.EventRequest) (*protos.
func (s *grpcSubscriber) SendEvent(ctx context.Context, request *protos.EventRequest) (*protos.
EventResponse, error) {
if request.Dest == "" {
if request.GetId() == "" {
return nil, status.Error(codes.FailedPrecondition, api.MissingDestinationError.Error())
}
if request.GetEvent() == nil || request.Event.GetTransition() == nil ||
Expand Down Expand Up @@ -87,31 +91,46 @@ func (s *grpcSubscriber) PutConfiguration(ctx context.Context, cfg *protos.Confi
}
s.Logger.Trace("configuration stored: %s", api.GetVersionId(cfg))
return &protos.PutResponse{
Id: api.GetVersionId(cfg),
Config: cfg,
Id: api.GetVersionId(cfg),
// Note: this is the magic incantation to use a `one_of` field in Protobuf.
EntityResponse: &protos.PutResponse_Config{Config: cfg},
}, nil
}
func (s *grpcSubscriber) GetAllConfigurations(ctx context.Context, req *wrapperspb.StringValue) (
*protos.ListResponse, error) {
cfgName := req.Value
if cfgName == "" {
s.Logger.Trace("looking up all available configurations on server")
return &protos.ListResponse{Ids: s.Store.GetAllConfigs()}, nil
}
s.Logger.Trace("looking up all version for configuration %s", cfgName)
return &protos.ListResponse{Ids: s.Store.GetAllVersions(cfgName)}, nil
}

func (s *grpcSubscriber) GetConfiguration(ctx context.Context, request *protos.GetRequest) (
func (s *grpcSubscriber) GetConfiguration(ctx context.Context, configId *wrapperspb.StringValue) (
*protos.Configuration, error) {
s.Logger.Trace("retrieving Configuration %s", request.GetId())
cfg, found := s.Store.GetConfig(request.GetId())
cfgId := configId.Value
s.Logger.Trace("retrieving Configuration %s", cfgId)
cfg, found := s.Store.GetConfig(cfgId)
if !found {
return nil, status.Errorf(codes.NotFound, "configuration %s not found", request.GetId())
return nil, status.Errorf(codes.NotFound, "configuration %s not found", cfgId)
}
return cfg, nil
}

func (s *grpcSubscriber) PutFiniteStateMachine(ctx context.Context,
fsm *protos.FiniteStateMachine) (*protos.PutResponse, error) {
request *protos.PutFsmRequest) (*protos.PutResponse, error) {
fsm := request.Fsm
// First check that the configuration for the FSM is valid
cfg, ok := s.Store.GetConfig(fsm.ConfigId)
if !ok {
return nil, status.Error(codes.FailedPrecondition, storage.NotFoundError(
fsm.ConfigId).Error())
}
// FIXME: we need to allow clients to specify the ID of the FSM to create
id := uuid.NewString()
var id = request.Id
if id == "" {
id = uuid.NewString()
}
// If the State of the FSM is not specified,
// we set it to the initial state of the configuration.
if fsm.State == "" {
Expand All @@ -122,38 +141,52 @@ func (s *grpcSubscriber) PutFiniteStateMachine(ctx context.Context,
s.Logger.Error("could not store FSM [%v]: %v", fsm, err)
return nil, status.Error(codes.Internal, err.Error())
}
return &protos.PutResponse{Id: id, Fsm: fsm}, nil
if err := s.Store.UpdateState(cfg.Name, id, "", fsm.State); err != nil {
s.Logger.Error("could not store FSM in state set [%s]: %v", fsm.State, err)
return nil, status.Error(codes.Internal, err.Error())
}
return &protos.PutResponse{Id: id, EntityResponse: &protos.PutResponse_Fsm{Fsm: fsm}}, nil
}

func (s *grpcSubscriber) GetFiniteStateMachine(ctx context.Context, request *protos.GetRequest) (
func (s *grpcSubscriber) GetFiniteStateMachine(ctx context.Context, in *protos.GetFsmRequest) (
*protos.FiniteStateMachine, error) {
// TODO: use Context to set a timeout, and then pass it on to the Store.
// This may require a pretty large refactoring of the store interface.
s.Logger.Debug("looking up FSM %s", request.GetId())
// The ID in the request contains the FSM ID,
// prefixed by the Config Name (which defines the "type" of FSM)
splitId := strings.Split(request.GetId(), storage.KeyPrefixIDSeparator)
if len(splitId) != 2 {
return nil, status.Errorf(codes.InvalidArgument, "invalid FSM ID: %s", request.GetId())
}
fsm, ok := s.Store.GetStateMachine(splitId[1], splitId[0])
cfg := in.GetConfig()
if cfg == "" {
return nil, status.Error(codes.InvalidArgument, "configuration name must always be provided when looking up statemachine")
}
fsmId := in.GetId()
if fsmId == "" {
return nil, status.Error(codes.InvalidArgument, "ID must always be provided when looking up statemachine")
}
s.Logger.Debug("looking up FSM [%s] (Configuration: %s)", fsmId, cfg)
fsm, ok := s.Store.GetStateMachine(fsmId, cfg)
if !ok {
return nil, status.Error(codes.NotFound, storage.NotFoundError(request.GetId()).Error())
return nil, status.Error(codes.NotFound, storage.NotFoundError(fsmId).Error())
}
return fsm, nil
}

func (s *grpcSubscriber) GetEventOutcome(ctx context.Context, request *protos.GetRequest) (
*protos.EventResponse, error) {

s.Logger.Debug("looking up EventOutcome %s", request.GetId())
dest := strings.Split(request.GetId(), storage.KeyPrefixIDSeparator)
if len(dest) != 2 {
return nil, status.Error(codes.InvalidArgument,
fmt.Sprintf("invalid destination [%s] expected: <type>#<id>", request.GetId()))
func (s *grpcSubscriber) GetAllInState(ctx context.Context, in *protos.GetFsmRequest) (
*protos.ListResponse, error) {
cfgName := in.GetConfig()
if cfgName == "" {
return nil, status.Errorf(codes.InvalidArgument, "configuration must always be specified")
}
state := in.GetState()
if state == "" {
// TODO: implement table scanning
return nil, status.Errorf(codes.Unimplemented, "missing state, table scan not implemented")
}
smType, evtId := dest[0], dest[1]
outcome, ok := s.Store.GetOutcomeForEvent(evtId, smType)
ids := s.Store.GetAllInState(cfgName, state)
return &protos.ListResponse{Ids: ids}, nil
}

func (s *grpcSubscriber) GetEventOutcome(ctx context.Context, in *protos.EventRequest) (
*protos.EventResponse, error) {
evtId := in.GetId()
config := in.GetConfig()
s.Logger.Debug("looking up EventOutcome %s (%s)", evtId, config)
outcome, ok := s.Store.GetOutcomeForEvent(evtId, config)
if !ok {
return nil, status.Error(codes.NotFound, fmt.Sprintf("outcome for event %s not found", evtId))
}
Expand All @@ -163,14 +196,56 @@ func (s *grpcSubscriber) GetEventOutcome(ctx context.Context, request *protos.Ge
}, nil
}

func (s *grpcSubscriber) StreamAllInstate(in *protos.GetFsmRequest, stream StatemachineStream) error {
response, err := s.GetAllInState(context.Background(), in)
if err != nil {
return err
}
cfgName := in.GetConfig()
for _, id := range response.GetIds() {
fsm, found := s.Store.GetStateMachine(id, cfgName)
if !found {
return storage.NotFoundError(id)
}
if err = stream.SendMsg(&protos.PutResponse{
Id: id,
EntityResponse: &protos.PutResponse_Fsm{Fsm: fsm},
}); err != nil {
s.Logger.Error("could not stream response back: %s", err)
return err
}
}
return nil
}

func (s *grpcSubscriber) StreamAllConfigurations(in *wrapperspb.StringValue, stream ConfigurationStream) error {
if in.GetValue() == "" {
return status.Errorf(codes.InvalidArgument, "must specify the Configuration name")
}
response, err := s.GetAllConfigurations(context.Background(), in)
if err != nil {
return nil
}
for _, cfgId := range response.GetIds() {
cfg, found := s.Store.GetConfig(cfgId)
if !found {
return storage.NotFoundError(cfgId)
}
if err = stream.SendMsg(cfg); err != nil {
return err
}
}
return nil
}

// NewGrpcServer creates a new gRPC server to handle incoming events and other API calls.
// The `Config` can be used to configure the backing store, a timeout and the logger.
func NewGrpcServer(config *Config) (*grpc.Server, error) {
// Unless explicitly configured, we use for the server the same timeout as for the Redis store
if config.Timeout == 0 {
config.Timeout = DefaultTimeout
}
gsrv := grpc.NewServer()
protos.RegisterStatemachineServiceServer(gsrv, &grpcSubscriber{Config: config})
return gsrv, nil
server := grpc.NewServer()
protos.RegisterStatemachineServiceServer(server, &grpcSubscriber{Config: config})
return server, nil
}
Loading

0 comments on commit cbdfaf2

Please sign in to comment.