Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding Support for Redis Cluster Client #46

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .run/Run SQS Client.run.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<configuration default="false" name="Run SQS Client" type="GoApplicationRunConfiguration" factoryName="Go Application" editBeforeRun="true">
<module name="statemachine" />
<working_directory value="$PROJECT_DIR$" />
<parameters value="-endpoint http://localhost:4566 -q events -dest 8033049e-caae-4cb1-9fe4-bc02ff26bc26 -evt accept" />
<parameters value="-endpoint http://localhost:4566 -q events -dest test.orders#25 -evt ship" />
<kind value="FILE" />
<package value="github.com/massenz/go-statemachine/clients" />
<directory value="$PROJECT_DIR$" />
Expand Down
219 changes: 145 additions & 74 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,72 +56,12 @@ See [Sending Events](#sending-events) below for details on how to encode an SQS

The HTTP server exposes a REST API that allows to create (`POST`) and retrieve (`GET`) both `configurations` and `statemachines`, encoding their contents using JSON.

### State Machines

To create a `statemachine` simply requires indicating its configuration and an (optional) ID:

```
POST /api/v1/statemachines

{
"configuration_version": "devices:v3"
}
```

if the optional `id` is omitted, one will be generated and returned in the `Location` header (as well as in the body of the response):

```
Location: /api/v1/statemachines/6b5af0e8-9033-47e2-97db-337476f1402a

{
"id": "6b5af0e8-9033-47e2-97db-337476f1402a",
"statemachine": {
"config_id": "devices:v3",
"state": "started"
}
}
```

To obtain the current state of the FSM, simply use a `GET`:

```
GET /api/v1/statemachines/6b5af0e8-9033-47e2-97db-337476f1402a

200 OK

{
"id": "6b5af0e8-9033-47e2-97db-337476f1402a",
"statemachine": {
"config_id": "devices:v3",
"state": "backorderd",
"history": [
{
"event_id": "258",
"timestamp": {
"seconds": 1661733324,
"nanos": 461000000
},
"transition": {
"from": "started",
"to": "backorderd",
"event": "backorder"
},
"originator": "SimpleSender"
}
]
}
}
```

which shows that an event `backorder` was sent at `Sun Aug 28 2022 17:35:24 PDT` (the `timestamp` in seconds from epoch) transitioning our device order to a `backordered` state.

See [`sqs_client`](clients/sqs_client.go) for a fully worked out example as to how to send an SQS event.


### Configurations

Before creating an FSM, you need to define the associated configuration (trying to create an FSM with a `configuration_version` that does not match an existing `configuration` will result in a `404 NOT FOUND` error).

> `TODO`: we may eventually choose to return a more descriptive error code (e.g., either a `403 FORBIDDEN` or a `406 METHOD NOT ALLOWED` in future)

To create a new configuration use:

```
Expand Down Expand Up @@ -184,7 +124,7 @@ Location: /api/v1/configurations/test.orders:v3

Configurations are deemed to be immutable, so no `PUT` is offered, and also trying to re-create a configuration with the same `{name, version}` tuple will result in a `409 CONFLICT` error.

Similarly to FSMs, configurations can be retrieved using the `GET` and endpoint returned:
Configurations can be retrieved using the `GET` and endpoint returned:

```
GET /api/v1/configurations/test.orders:v3
Expand All @@ -204,6 +144,102 @@ GET /api/v1/configurations/test.orders:v3
}
```

### State Machines

To create a `statemachine` simply requires indicating its [configuration](#configurations) and an (optional) ID:

```
POST /api/v1/statemachines

{
"configuration_version": "devices:v3",
"id": "6b5af0e8-9033-47e2-97db-337476f1402a"
}
```

if the optional `id` is omitted, one will be generated and returned in the `Location` header (as well as in the body of the response):

```
Location: /api/v1/statemachines/devices/6b5af0e8-9033-47e2-97db-337476f1402a

{
"id": "6b5af0e8-9033-47e2-97db-337476f1402a",
"statemachine": {
"config_id": "devices:v3",
"state": "started"
}
}
```

> **Note**
> The "type" of FSM (in other words, its configuration - but **not** the version)
> is included in the FSM's URI.

To obtain the current state of the FSM, simply use a `GET`:

```
GET /api/v1/statemachines/devices/6b5af0e8-9033-47e2-97db-337476f1402a

200 OK

{
"id": "6b5af0e8-9033-47e2-97db-337476f1402a",
"statemachine": {
"config_id": "devices:v3",
"state": "backorderd",
"history": [
{
"event_id": "258",
"timestamp": {
"seconds": 1661733324,
"nanos": 461000000
},
"transition": {
"from": "started",
"to": "backorderd",
"event": "backorder"
},
"originator": "SimpleSender"
}
]
}
}
```

which shows that an event `backorder` was sent at `Sun Aug 28 2022 17:35:24 PDT` (the `timestamp` in seconds from epoch) transitioning our device order to a `backordered` state.

Again, the "type" of FSM **must** be specified in the URL (`/devices`).

See [`grpc_client`](clients/grpc_client.go) for a fully worked out example as to how to send events to an FSM.

### Event Outcomes

After [sending an event](#sending-events), the outcome of the event can be retrieved using the `event_id` (either specified, or auto-generated by the server):

```
GET /api/v1/events/outcome/orders/f8b6a19b-12c9-40b1-aa35-240cd829b014

{
"status_code": "Ok",
"message": "event [accept] transitioned FSM [25] to state [pending]",
"destination": "orders#6b5af0e8-9033-47e2-97db-337476f1402a"
}
```

if there was an error, it would be reported too, with the relevant message, if available:

```
GET /api/v1/events/outcome/test.orders/4018047a-50c1-45ea-b87e-e79b195568db

{
"status_code": "TransitionNotAllowed",
"message": "event [self-destroy] could not be processed: unexpected event transition",
"destination": "test.orders#6b5af0e8-9033-47e2-97db-337476f1402a"
}
```

Note that, as for FSMs, we need to qualify the `event_id` with the `configuration.name` in the URL (in this example `test.orders`).

## Sending Events

> Note that **it is not possible to send events via the REST API**: this is **by design** and not just a "missing feature"; please do not submit requests to add a `POST /api/v1/events` API: it's not going to happen.
Expand Down Expand Up @@ -238,7 +274,8 @@ msg := &protos.EventRequest{

// This is the unique ID for the entity you are sending the event to; MUST
// match the `id` of an existing `statemachine` (see the REST API).
Dest: "6b5af0e8-9033-47e2-97db-337476f1402a",
// NOTE -- the ID is prefixed by the configuration name.
Dest: "devices#6b5af0e8-9033-47e2-97db-337476f1402a",
}

_, err = queue.SendMessage(&sqs.SendMessageInput{
Expand All @@ -248,11 +285,43 @@ _, err = queue.SendMessage(&sqs.SendMessageInput{
})
```

This will cause a `backorder` event to be sent to our FSM whose `id` matches the UUID in `Dest`; if there are errors (eg, the FSM does not exist, or the event is not allowed for the machine's configuration and current state) errors may be optionally sent to the SQS queue configured via the `-errors` option (see [Running the Server](#running-the-server)): see the [`pubsub` code](pubsub/sqs_pub.go) code for details as to how we encode the error message as an SQS message.
This will cause a `backorder` event to be sent to our FSM whose `id` matches the UUID in `Dest`; if there are errors (eg, the FSM does not exist, or the event is not allowed for the machine's configuration and current state) errors may be optionally sent to the SQS queue configured via the `-notifications` option (see [Running the Server](#running-the-server)): see the [`pubsub` code](pubsub/sqs_pub.go) code for details as to how we encode the error message as an SQS message.

See [`EventRequest` in `statemachine-proto`](https://github.com/massenz/statemachine-proto/blob/main/api/statemachine.proto#L86) for details on the event being sent.

#### SQS Error notifications
To try this out, you can use the [`SQS Client`](clients/sqs_client.go) example:

```
└─( http POST :7399/api/v1/statemachines configuration_version=test.orders:v3 id=26
HTTP/1.1 201 Created
Location: /api/v1/statemachines/test.orders/26

{
"id": "26",
"statemachine": {
"config_id": "test.orders:v3",
"state": "start"
}
}

└─( SQS_Client -endpoint http://localhost:4566 -q events \
-dest test.orders#26 -evt ship

Publishing Event `ship` for FSM `test.orders#26` to SQS Topic: [events]
Sent event [724ea354-4739-4782-8785-6ce55b86a25d] to queue events

└─( http :7399/api/v1/events/outcome/test.orders/724ea354-4739-4782-8785-6ce55b86a25d
HTTP/1.1 200 OK

{
"destination": "test.orders#26",
"message": "event [ship] transitioned FSM [25] to state [shipped]",
"status_code": "Ok"
}
```


#### SQS Notifications

Event processing outcomes are returned in [`EventResponse` protocol buffers](https://github.com/massenz/statemachine-proto/blob/main/api/statemachine.proto#L112), which are then serialized inside the `body` of the SQS message; to retrieve the actual Go struct, you can use code such as this (see [test code](pubsub/sqs_pub_test.go#L148) for actual working code):

Expand All @@ -269,14 +338,14 @@ receivedEvt.EventId --> is the ID of the Event that failed
if receivedEvt.Outcome.Code == protos.EventOutcome_InternalError {
// whatever...
}
return fmt.Errorf("cannot process event to statemachine [%s]: %s,
return fmt.Errorf("cannot process event to statemachine [%s]: %s,
receivedEvt.Outcome.Dest, receivedEvt.Outcome.Details)

```

The possible error codes are (see the `.proto` definition for the up-to-date values):

```protobuf
```proto
enum StatusCode {
Ok = 0;
GenericError = 1;
Expand All @@ -297,7 +366,7 @@ Please refer to [gRPC documentation](https://grpc.io/docs/), the [example gRPC c

The TL;DR version of all the above is that code like this:

```golang
```go
response, err := client.ProcessEvent(context.Background(),
&api.EventRequest{
Event: &api.Event{
Expand Down Expand Up @@ -331,7 +400,7 @@ To install the CLI run this:

> **Beware** Gingko now is at `v2` and will install that one by default if you follow the instruction on the site: use instead the command above and run `go mod tidy` before running the tests/builds to download packages<br/>
> (see [this issue](https://github.com/onsi/ginkgo/issues/945) for more details)


**Protocol Buffers definitions**<br/>
They are kept in the [statemachine-proto](https://github.com/massenz/statemachine-proto) repository; nothing specific is needed to use them; however, if you want to review the messages and services definitions, you can see them there.
Expand Down Expand Up @@ -363,17 +432,15 @@ To create the necessary SQS Queues in AWS, please see the `aws` CLI command in `
The `sm-server` accepts a number of configuration options (some of them are **required**):

```
└─( build/bin/sm-server -help
└─( build/bin/sm-server -help

Usage of build/bin/sm-server:
-debug
Verbose logs; better to avoid on Production services
-endpoint-url string
HTTP URL for AWS SQS to connect to; usually best left undefined, unless required for local testing purposes (LocalStack uses http://localhost:4566)
-errors string
The name of the Dead-Letter Queue (DLQ) in SQS to post errors to; if not specified, the DLQ will not be used
-events string
If defined, it will attempt to connect to the given SQS Queue (ignores any value that is passed via the -kafka flag)
If defined, it will attempt to connect to the given SQS Queue to receive events from the Pub/Sub system
-grpc-port int
The port for the gRPC server (default 7398)
-http-port int
Expand All @@ -382,6 +449,8 @@ Usage of build/bin/sm-server:
If set, it only listens to incoming requests from the local host
-max-retries int
Max number of attempts for a recoverable error to be retried against the Redis cluster (default 3)
-notifications string
The name of the notification topic in SQS to publish events' outcomes to; if not specified, no outcomes will be published
-redis string
URI for the Redis cluster (host:port)
-timeout duration
Expand Down Expand Up @@ -454,3 +523,5 @@ where the `[profile]` matches the value in `AWS_PROFILE`.
# Contributing

Please follow the Go Style enshrined in `go fmt` before submitting PRs, refer to actual [Issues](#), and provide sufficient testing (ideally, ensuring your code coverage is better than 80%).

We prefer you submit a PR directly from cloning this repository and creating a feature branch, rather than from a fork.
2 changes: 1 addition & 1 deletion build.settings
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Build configuration

version = 0.6.0
version = 0.6.1

2 changes: 1 addition & 1 deletion clients/sqs_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,5 +106,5 @@ func main() {
if err != nil {
panic(err)
}
fmt.Println("Sent event to queue", *q)
fmt.Printf("Sent event [%s] to queue %s\n", msg.Event.EventId, *q)
}
8 changes: 6 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,11 @@ func main() {
var localOnly = flag.Bool("local", false,
"If set, it only listens to incoming requests from the local host")
var port = flag.Int("http-port", 7399, "HTTP Server port for the REST API")
var redisUrl = flag.String("redis", "", "URI for the Redis cluster (host:port)")
var redisUrl = flag.String("redis", "", "For single node redis instances: URI "+
"for the Redis instance (host:port). For redis clusters: a comma-separated list of redis nodes. "+
"If using an ElastiCache Redis cluster with cluster mode enabled, you can supply the configuration endpoint.")
var cluster = flag.Bool("cluster", false,
z-cran marked this conversation as resolved.
Show resolved Hide resolved
"Needs to be set if connecting to a Redis instance with cluster mode enabled")
var awsEndpoint = flag.String("endpoint-url", "",
"HTTP URL for AWS SQS to connect to; usually best left undefined, "+
"unless required for local testing purposes (LocalStack uses http://localhost:4566)")
Expand Down Expand Up @@ -108,7 +112,7 @@ func main() {
} else {
logger.Info("Connecting to Redis server at %s", *redisUrl)
logger.Info("with timeout: %s, max-retries: %d", *timeout, *maxRetries)
store = storage.NewRedisStore(*redisUrl, 1, *timeout, *maxRetries)
store = storage.NewRedisStore(*redisUrl, *cluster, 1, *timeout, *maxRetries)
}
server.SetStore(store)

Expand Down
2 changes: 1 addition & 1 deletion docker/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ then
endpoint="--endpoint-url ${AWS_ENDPOINT}"
fi

cmd="./sm-server -http-port ${SERVER_PORT} ${endpoint:-} ${DEBUG} \
cmd="./sm-server -http-port ${SERVER_PORT} ${endpoint:-} ${CLUSTER} ${DEBUG} \
-redis ${REDIS}:${REDIS_PORT} -timeout ${TIMEOUT:-25ms} -max-retries ${RETRIES:-3} \
-events ${EVENTS_Q} -notifications ${ERRORS_Q} \
$@"
Expand Down
7 changes: 5 additions & 2 deletions pubsub/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,19 +90,22 @@ func (listener *EventsListener) ListenForMessages() {
fmt.Sprintf("configuration [%s] could not be found", fsm.ConfigId)))
continue
}
previousState := fsm.State
cfgFsm := ConfiguredStateMachine{
Config: cfg,
FSM: fsm,
}
listener.logger.Debug("Preparing to send event `%s` for FSM [%s] (current state: %s)",
request.Event.Transition.Event, smId, previousState)
if err := cfgFsm.SendEvent(request.Event); err != nil {
listener.PostNotificationAndReportOutcome(makeResponse(&request,
protos.EventOutcome_TransitionNotAllowed,
fmt.Sprintf("event [%s] could not be processed: %v",
request.GetEvent().GetTransition().GetEvent(), err)))
continue
}
listener.logger.Info("Event `%s` transitioned FSM [%s] to state `%s` - updating store",
request.Event.Transition.Event, smId, fsm.State)
listener.logger.Info("Event `%s` transitioned FSM [%s] to state `%s` from state `%s` - updating store",
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please use Debug not Info - this gets really noisy quickly (I know I used INFO before, my bad)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

request.Event.Transition.Event, smId, fsm.State, previousState)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add a log line before we try to "SendEvent" ? It seems like it can fail there and we wouldn't have the IDs logged (SM ID, event ID, previous state)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, adding as a debug entry

        listener.logger.Debug("Preparing to send event `%s` for FSM [%s] (current state: %s)",
            request.Event.Transition.Event, smId, previousState)

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not only we capture the error if SendEvent fails, but we also send a notification (L98)

err := listener.store.PutStateMachine(smId, fsm)
if err != nil {
listener.PostNotificationAndReportOutcome(makeResponse(&request,
Expand Down
Loading