-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Refactored startup flags and Dockerfile accordingly (#56)
- Loading branch information
Showing
4 changed files
with
82 additions
and
60 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,21 +1,12 @@ | ||
<!-- | ||
~ Copyright (c) 2022 AlertAvert.com. All rights reserved. | ||
~ | ||
~ Licensed under the Apache License, Version 2.0 | ||
~ http://www.apache.org/licenses/LICENSE-2.0 | ||
~ | ||
~ Author: Marco Massenzio ([email protected]) | ||
--> | ||
|
||
<component name="ProjectRunConfigurationManager"> | ||
<configuration default="false" name="Run SQS Client" type="GoApplicationRunConfiguration" factoryName="Go Application" editBeforeRun="true"> | ||
<module name="statemachine" /> | ||
<working_directory value="$PROJECT_DIR$" /> | ||
<parameters value="-endpoint http://localhost:4566 -q events -dest test.orders#25 -evt ship" /> | ||
<parameters value="-endpoint http://localhost:4566 -q events -dest test.orders#2 -evt accept" /> | ||
<kind value="FILE" /> | ||
<package value="github.com/massenz/go-statemachine/clients" /> | ||
<directory value="$PROJECT_DIR$" /> | ||
<filePath value="$PROJECT_DIR$/clients/sqs_client.go|$PROJECT_DIR$/clients/orders.go" /> | ||
<method v="2" /> | ||
</configuration> | ||
</component> | ||
</component> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,10 +7,6 @@ | |
* Author: Marco Massenzio ([email protected]) | ||
*/ | ||
|
||
// CLI to process Kubernetes Specs with a JSON configuration. | ||
// | ||
// Created by M. Massenzio, 2021-02-20 | ||
|
||
package main | ||
|
||
import ( | ||
|
@@ -58,56 +54,54 @@ var ( | |
func main() { | ||
defer close(eventsCh) | ||
|
||
var acksTopic = flag.String("acks", "", | ||
"(Requires `-notifications`) The name of the topic in SQS to publish Ok outcomes to; "+ | ||
"unless the -notify-errors-only flag is set") | ||
var awsEndpoint = flag.String("endpoint-url", "", | ||
"HTTP URL for AWS SQS to connect to; usually best left undefined, "+ | ||
"unless required for local testing purposes (LocalStack uses http://localhost:4566)") | ||
var cluster = flag.Bool("cluster", false, | ||
"If set, connects to Redis with cluster-mode enabled") | ||
var debug = flag.Bool("debug", false, | ||
"Verbose logs; better to avoid on Production services") | ||
var trace = flag.Bool("trace", false, | ||
"Enables trace logs for every API request and Pub/Sub event; it may impact performance, "+ | ||
"do not use in production or on heavily loaded systems ("+ | ||
"will override the -debug option)") | ||
var eventsTopic = flag.String("events", "", "Topi name to receive events from") | ||
var grpcPort = flag.Int("grpc-port", 7398, "The port for the gRPC server") | ||
var localOnly = flag.Bool("local", false, | ||
"If set, it only listens to incoming requests from the local host") | ||
var port = flag.Int("http-port", 7399, "HTTP Server port for the REST API") | ||
var redisUrl = flag.String("redis", "", "For single node redis instances: URI "+ | ||
"for the Redis instance (host:port). For redis clusters: a comma-separated list of redis nodes. "+ | ||
"If using an ElastiCache Redis cluster with cluster mode enabled, you can supply the configuration endpoint.") | ||
var cluster = flag.Bool("cluster", false, | ||
"If set, allows connecting to a Redis instance with cluster-mode enabled") | ||
var awsEndpoint = flag.String("endpoint-url", "", | ||
"HTTP URL for AWS SQS to connect to; usually best left undefined, "+ | ||
"unless required for local testing purposes (LocalStack uses http://localhost:4566)") | ||
var eventsTopic = flag.String("events", "", "If defined, it will attempt to connect "+ | ||
"to the given SQS Queue to receive events from the Pub/Sub system") | ||
var maxRetries = flag.Int("max-retries", storage.DefaultMaxRetries, | ||
"Max number of attempts for a recoverable error to be retried against the Redis cluster") | ||
var notificationsTopic = flag.String("notifications", "", | ||
"The name of the notification topic in SQS to publish events' outcomes to; if not "+ | ||
"(optional) The name of the topic to publish events' outcomes to; if not "+ | ||
"specified, no outcomes will be published") | ||
var acksTopic = flag.String("acks", "", | ||
"(Requires `notifications`) The name of the acks topic in SQS to publish events' "+ | ||
"outcomes to; if specified, Ok outcomes will be published to the acks topic and other "+ | ||
"(error) outcomes to the notification topic") | ||
var notifyErrorsOnly = flag.Bool("notify-errors-only", false, | ||
"If set, only errors will be sent to notification topics") | ||
var grpcPort = flag.Int("grpc-port", 7398, "The port for the gRPC server") | ||
var maxRetries = flag.Int("max-retries", storage.DefaultMaxRetries, | ||
"Max number of attempts for a recoverable error to be retried against the Redis cluster") | ||
"If set, only errors will be sent to notification topic (cannot be used with -acks)") | ||
var port = flag.Int("http-port", 7399, "HTTP Server port for the REST API") | ||
var redisUrl = flag.String("redis", "", "For single node redis instances: URI "+ | ||
"for the Redis instance (host:port). For redis clusters: a comma-separated list of redis nodes. "+ | ||
"If using an ElastiCache Redis cluster with cluster mode enabled, "+ | ||
"this can also be the configuration endpoint.") | ||
var timeout = flag.Duration("timeout", storage.DefaultTimeout, | ||
"Timeout for Redis (as a Duration string, e.g. 1s, 20ms, etc.)") | ||
var trace = flag.Bool("trace", false, | ||
"Extremely verbose logs for every API request and Pub/Sub event; it may impact"+ | ||
" performance, do not use in production or on heavily loaded systems (will override the -debug option)") | ||
flag.Parse() | ||
|
||
logger.Info("Starting State Machine Server - Rel. %s", server.Release) | ||
logger.Info("starting State Machine Server - Rel. %s", server.Release) | ||
|
||
if *localOnly { | ||
logger.Info("Listening on local interface only") | ||
logger.Info("listening on local interface only") | ||
host = "localhost" | ||
} else { | ||
logger.Warn("Listening on all interfaces") | ||
logger.Warn("listening on all interfaces") | ||
} | ||
addr := fmt.Sprintf("%s:%d", host, *port) | ||
|
||
if *redisUrl == "" { | ||
logger.Warn("in-memory storage configured, all data will NOT survive a server restart") | ||
store = storage.NewInMemoryStore() | ||
} else { | ||
logger.Info("Connecting to Redis server at %s", *redisUrl) | ||
logger.Info("connecting to Redis server at %s", *redisUrl) | ||
logger.Info("with timeout: %s, max-retries: %d", *timeout, *maxRetries) | ||
store = storage.NewRedisStore(*redisUrl, *cluster, 1, *timeout, *maxRetries) | ||
} | ||
|
@@ -122,16 +116,19 @@ func main() { | |
if *acksTopic != "" && *notifyErrorsOnly { | ||
logger.Fatal(fmt.Errorf("cannot set an acks topic while disabling errors notifications")) | ||
} | ||
logger.Info("Connecting to SQS Topic: %s", *eventsTopic) | ||
logger.Info("connecting to SQS Topic: %s", *eventsTopic) | ||
sub = pubsub.NewSqsSubscriber(eventsCh, awsEndpoint) | ||
if sub == nil { | ||
panic("Cannot create a valid SQS Subscriber") | ||
} | ||
|
||
if *notificationsTopic != "" { | ||
logger.Info("Configuring Topic: %s", *notificationsTopic) | ||
logger.Info("notifications topic: %s", *notificationsTopic) | ||
if *notifyErrorsOnly { | ||
logger.Info("only errors will be published to the notifications topic") | ||
} | ||
if *acksTopic != "" { | ||
logger.Info("Configuring Topic: %s", *acksTopic) | ||
logger.Info("acks topic: %s", *acksTopic) | ||
} | ||
notificationsCh = make(chan protos.EventResponse) | ||
defer close(notificationsCh) | ||
|
@@ -153,15 +150,15 @@ func main() { | |
// This should not be invoked until we have initialized all the services. | ||
setLogLevel(*debug, *trace) | ||
|
||
logger.Info("Starting Events Listener") | ||
logger.Info("starting events listener") | ||
go listener.ListenForMessages() | ||
|
||
logger.Info("gRPC Server running at tcp://:%d", *grpcPort) | ||
logger.Info("gRPC server running at tcp://:%d", *grpcPort) | ||
go startGrpcServer(*grpcPort, eventsCh) | ||
|
||
// TODO: configure & start server using TLS, if configured to do so. | ||
scheme := "http" | ||
logger.Info("HTTP Server (REST API) running at %s://%s", scheme, addr) | ||
logger.Info("HTTP server (REST API) running at %s://%s", scheme, addr) | ||
srv := server.NewHTTPServer(addr, serverLogLevel) | ||
logger.Fatal(srv.ListenAndServe()) | ||
} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters