Skip to content

Commit

Permalink
[#64, #67] Removes REST server & in-memory store (#85)
Browse files Browse the repository at this point in the history
  • Loading branch information
massenz authored Feb 25, 2023
1 parent 4e6a2b7 commit 8bc346b
Show file tree
Hide file tree
Showing 18 changed files with 109 additions and 1,776 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build.yml → .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# Copyright (c) 2022 AlertAvert.com. All rights reserved.
# Author: Marco Massenzio ([email protected])
#
name: Build & Test
name: Test

on:
push:
Expand All @@ -28,4 +28,4 @@ jobs:
run: |
mkdir -p ${HOME}/.aws && cp data/credentials ${HOME}/.aws/
export AWS_REGION=us-west-2
go test ./api ./grpc ./pubsub ./server ./storage
go test ./api ./grpc ./pubsub ./storage
2 changes: 1 addition & 1 deletion .run/Run all tests.run.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<configuration default="false" name="Run all tests" type="GoTestRunConfiguration" factoryName="Go Test" singleton="false">
<module name="statemachine" />
<working_directory value="$PROJECT_DIR$" />
<parameters value="./api ./grpc ./pubsub ./storage ./server" />
<parameters value="./api ./grpc ./pubsub ./storage" />
<kind value="DIRECTORY" />
<package value="github.com/massenz/go-statemachine" />
<directory value="$PROJECT_DIR$" />
Expand Down
12 changes: 6 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ dockerfile := docker/Dockerfile
# Edit only the packages list, when adding new functionality,
# the rest is deduced automatically.
#
pkgs := ./api ./grpc ./pubsub ./server ./storage
pkgs := ./api ./grpc ./pubsub ./storage
all_go := $(shell for d in $(pkgs); do find $$d -name "*.go"; done)
test_srcs := $(shell for d in $(pkgs); do find $$d -name "*_test.go"; done)
srcs := $(filter-out $(test_srcs),$(all_go))
Expand Down Expand Up @@ -61,16 +61,16 @@ fmt: ## Formats the Go source code using 'go fmt'
$(bin): cmd/main.go $(srcs)
@mkdir -p $(shell dirname $(bin))
GOOS=$(GOOS); GOARCH=$(GOARCH); go build \
-ldflags "-X $(GOMOD)/server.Release=$(release)" \
-ldflags "-X $(GOMOD)/api.Release=$(release)" \
-o $(bin) cmd/main.go

$(dockerbin):
GOOS=linux; GOARCH=amd64; go build \
-ldflags "-X $(GOMOD)/server.Release=$(release)" \
-ldflags "-X $(GOMOD)/api.Release=$(release)" \
-o $(dockerbin) cmd/main.go

.PHONY: build
build: $(bin) ## Builds the server
build: $(bin) ## Builds the Statemachine server binary

test: $(srcs) $(test_srcs) ## Runs all tests
ginkgo $(pkgs)
Expand All @@ -89,13 +89,13 @@ container: $(dockerbin) ## Builds the container image
.PHONY: start
start: ## Starts the Redis and LocalStack containers, and Creates the SQS Queues in LocalStack
@RELEASE=$(release) BASEDIR=$(shell pwd) docker compose -f $(compose) --project-name sm up redis localstack -d
@sleep 1
@sleep 3
@for queue in events notifications; do \
aws --no-cli-pager --endpoint-url=http://localhost:4566 \
--region us-west-2 \
sqs create-queue --queue-name $$queue; done >/dev/null
# We need to wait for the SQS Queues to be up before starting the server.
@RELEASE=$(release) BASEDIR=$(shell pwd) docker compose -f $(compose) --project-name sm up server -d
#@RELEASE=$(release) BASEDIR=$(shell pwd) docker compose -f $(compose) --project-name sm up server -d

.PHONY: stop
stop: ## Stops the Redis and LocalStack containers
Expand Down
4 changes: 3 additions & 1 deletion api/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ const (
)

var (
Release string

MalformedConfigurationError = fmt.Errorf("this configuration cannot be parsed")
MissingNameConfigurationError = fmt.Errorf(
"configuration must always specify a name (and optionally a version)")
Expand All @@ -43,7 +45,7 @@ var (

// Logger is made accessible so that its `Level` can be changed
// or can be sent to a `NullLog` during testing.
Logger = log.NewLog("fsm")
Logger = log.NewLog("api")
)

// ConfiguredStateMachine is the internal representation of an FSM, which
Expand Down
172 changes: 97 additions & 75 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,20 @@ import (
"errors"
"flag"
"fmt"
g "google.golang.org/grpc"
"net"
"os"
"os/signal"
"sync"
"syscall"

log "github.com/massenz/slf4go/logging"
protos "github.com/massenz/statemachine-proto/golang/api"

"github.com/massenz/go-statemachine/api"
"github.com/massenz/go-statemachine/grpc"
"github.com/massenz/go-statemachine/pubsub"
"github.com/massenz/go-statemachine/server"
"github.com/massenz/go-statemachine/storage"
log "github.com/massenz/slf4go/logging"
protos "github.com/massenz/statemachine-proto/golang/api"
"net"
"sync"
)

func SetLogLevel(services []log.Loggable, level log.LogLevel) {
Expand All @@ -32,27 +38,34 @@ func SetLogLevel(services []log.Loggable, level log.LogLevel) {
}

var (
logger = log.NewLog("sm-server")
serverLogLevel log.LogLevel = log.INFO
logger = log.NewLog("sm-server")

host = "0.0.0.0"

store storage.StoreManager

sub *pubsub.SqsSubscriber
pub *pubsub.SqsPublisher = nil
listener *pubsub.EventsListener

// TODO: for now blocking channels; we will need to confirm
// whether we can support a fully concurrent system with a
// buffered channel
pub *pubsub.SqsPublisher = nil
sub *pubsub.SqsSubscriber
store storage.StoreManager
wg sync.WaitGroup

// notificationsCh is the channel over which we send error notifications
// to publish on the appropriate queue.
// The Listener will produce error notifications, which will be consumed
// by the PubSub Publisher (if configured) which in turn will produce to
// the -notifications topic.
//
// Not configured by default, it is only used if a -notifications queue
// is defined.
notificationsCh chan protos.EventResponse = nil
eventsCh = make(chan protos.EventRequest)
wg sync.WaitGroup

// eventsCh is the channel over which the Listener receive Events to process.
// Both the gRPC server and the PubSub Subscriber (if configured) will produce
// events for this channel.
//
// Currently, this is a blocking channel (capacity for one item), but once we
// parallelize events processing we can make it deeper.
eventsCh = make(chan protos.EventRequest)
)

func main() {
defer close(eventsCh)

var awsEndpoint = flag.String("endpoint-url", "",
"HTTP URL for AWS SQS to connect to; usually best left undefined, "+
Expand All @@ -61,17 +74,14 @@ func main() {
"If set, connects to Redis with cluster-mode enabled")
var debug = flag.Bool("debug", false,
"Verbose logs; better to avoid on Production services")
var eventsTopic = flag.String("events", "", "Topi name to receive events from")
var eventsTopic = flag.String("events", "", "Topic name to receive events from")
var grpcPort = flag.Int("grpc-port", 7398, "The port for the gRPC server")
var noTls = flag.Bool("insecure", false, "If set, TLS will be disabled (NOT recommended)")
var localOnly = flag.Bool("local", false,
"If set, it only listens to incoming requests from the local host")
var maxRetries = flag.Int("max-retries", storage.DefaultMaxRetries,
"Max number of attempts for a recoverable error to be retried against the Redis cluster")
var notificationsTopic = flag.String("notifications", "",
"(optional) The name of the topic to publish events' outcomes to; if not "+
"specified, no outcomes will be published")
var port = flag.Int("http-port", 7399, "HTTP Server port for the REST API")
var redisUrl = flag.String("redis", "", "For single node Redis instances: host:port "+
"for the Redis instance. For redis clusters: a comma-separated list of redis nodes. "+
"If using an ElastiCache Redis cluster with cluster mode enabled, this can also be the configuration endpoint.")
Expand All @@ -82,15 +92,7 @@ func main() {
" performance, do not use in production or on heavily loaded systems (will override the -debug option)")
flag.Parse()

logger.Info("starting State Machine Server - Rel. %s", server.Release)

if *localOnly {
logger.Info("listening on local interface only")
host = "localhost"
} else {
logger.Warn("listening on all interfaces")
}
addr := fmt.Sprintf("%s:%d", host, *port)
logger.Info("starting State Machine Server - Rel. %s", api.Release)

if *redisUrl == "" {
logger.Fatal(errors.New("in-memory store deprecated, a Redis server must be configured"))
Expand All @@ -99,78 +101,92 @@ func main() {
logger.Info("with timeout: %s, max-retries: %d", *timeout, *maxRetries)
store = storage.NewRedisStore(*redisUrl, *cluster, 1, *timeout, *maxRetries)
}
server.SetStore(store)

if *eventsTopic == "" {
logger.Warn("no PubSub event topic configured, events can only be sent via gRPC calls")
}
logger.Info("connecting to SQS Topic: %s", *eventsTopic)
sub = pubsub.NewSqsSubscriber(eventsCh, awsEndpoint)
if sub == nil {
panic("Cannot create a valid SQS Subscriber")
done := make(chan interface{})
if *eventsTopic != "" {
logger.Info("connecting to SQS Topic: %s", *eventsTopic)
sub = pubsub.NewSqsSubscriber(eventsCh, awsEndpoint)
if sub == nil {
logger.Fatal(errors.New("cannot create a valid SQS Subscriber"))
}
wg.Add(1)
go func() {
defer wg.Done()
logger.Info("subscribing to events on topic [%s]", *eventsTopic)
sub.Subscribe(*eventsTopic, done)
}()
}

if *notificationsTopic != "" {
logger.Info("notifications topic: %s", *notificationsTopic)
logger.Info("sending errors to DLQ topic [%s]", *notificationsTopic)
notificationsCh = make(chan protos.EventResponse)
defer close(notificationsCh)
pub = pubsub.NewSqsPublisher(notificationsCh, awsEndpoint)
if pub == nil {
panic("Cannot create a valid SQS Publisher")
logger.Fatal(errors.New("cannot create a valid SQS Publisher"))
}
go pub.Publish(*notificationsTopic)
}

listener = pubsub.NewEventsListener(&pubsub.ListenerOptions{
EventsChannel: eventsCh,
NotificationsChannel: notificationsCh,
StatemachinesStore: store,
// TODO: workers pool not implemented yet.
ListenersPoolSize: 0,
})
go sub.Subscribe(*eventsTopic, nil)

// This should not be invoked until we have initialized all the services.
setLogLevel(*debug, *trace)

logger.Info("starting events listener")
go listener.ListenForMessages()
wg.Add(1)
go func() {
defer wg.Done()
listener.ListenForMessages()
}()

logger.Info("gRPC server running at tcp://:%d", *grpcPort)
go startGrpcServer(*grpcPort, *noTls, eventsCh)
svr := startGrpcServer(*grpcPort, *noTls, eventsCh)

// TODO: REST Server is deprecated and will be removed soon.
scheme := "http"
logger.Info("HTTP server (REST API) running at %s://%s", scheme, addr)
srv := server.NewHTTPServer(addr, serverLogLevel)
logger.Fatal(srv.ListenAndServe())
// This should not be invoked until we have initialized all the services.
setLogLevel(*debug, *trace)
logger.Info("statemachine server ready for processing events...")
RunUntilStopped(done, svr)
logger.Info("...done. Goodbye.")
}

func RunUntilStopped(done chan interface{}, svr *g.Server) {
// Trap Ctrl-C and SIGTERM (Docker/Kubernetes) to shutdown gracefully
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)

// Block until a signal is received.
_ = <-c
logger.Info("shutting down services...")
close(done)
close(eventsCh)
svr.GracefulStop()
logger.Info("waiting for services to exit...")
wg.Wait()
}

// setLogLevel sets the logging level for all the services' loggers, depending on
// whether the -debug or -trace flag is enabled (if neither, we log at INFO level).
// If both are set, then -trace takes priority.
func setLogLevel(debug bool, trace bool) {
if debug {
var logLevel log.LogLevel = log.INFO
if debug && !trace {
logger.Info("verbose logging enabled")
logger.Level = log.DEBUG
SetLogLevel([]log.Loggable{store, pub, sub, listener}, log.DEBUG)
serverLogLevel = log.DEBUG
}

if trace {
logger.Warn("trace logging Enabled")
logger.Level = log.TRACE
server.EnableTracing()
SetLogLevel([]log.Loggable{store, pub, sub, listener}, log.TRACE)
serverLogLevel = log.TRACE
logLevel = log.DEBUG
} else if trace {
logger.Info("trace logging enabled")
logLevel = log.TRACE
}
logger.Level = logLevel
SetLogLevel([]log.Loggable{store, pub, sub, listener}, logLevel)
}

// startGrpcServer will start a new gRPC server, bound to
// the local `port` and will send any incoming
// `EventRequest` to the receiving channel.
// This MUST be run as a go-routine, which never returns
func startGrpcServer(port int, disableTls bool, events chan<- protos.EventRequest) {
defer wg.Done()
func startGrpcServer(port int, disableTls bool, events chan<- protos.EventRequest) *g.Server {
l, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
panic(err)
Expand All @@ -183,10 +199,16 @@ func startGrpcServer(port int, disableTls bool, events chan<- protos.EventReques
TlsEnabled: !disableTls,
})
if err != nil {
log.RootLog.Fatal(err)
}
err = grpcServer.Serve(l)
if err != nil {
log.RootLog.Fatal(err)
logger.Fatal(err)
}
wg.Add(1)
go func() {
defer wg.Done()
err = grpcServer.Serve(l)
if err != nil {
logger.Fatal(err)
}
logger.Info("gRPC server exited")
}()
return grpcServer
}
Loading

0 comments on commit 8bc346b

Please sign in to comment.