-
Notifications
You must be signed in to change notification settings - Fork 0
/
with_rollback.go
56 lines (53 loc) · 1.65 KB
/
with_rollback.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package opinionatedsagas
import (
"context"
"errors"
events "github.com/markusylisiurunen/go-opinionatedevents"
)
func withRollback(
publisher *events.Publisher, limit int,
) events.OnMessageMiddleware {
doRollback := func(ctx context.Context, delivery events.Delivery) error {
// parse the message payload
msg := newTaskMessage(&noopTask{})
if err := delivery.GetMessage().GetPayload(msg); err != nil {
return err
}
// publish the rollback message
rollback, ok := msg.rollback()
if !ok {
// the rollback stack is empty, processing is done
err := errors.New("processing the task failed, nothing to roll back")
return events.Fatal(err)
}
opinionatedRollback, err := rollback.toOpinionatedMessage()
if err != nil {
return err
}
if err := publisher.Publish(ctx, opinionatedRollback); err != nil {
return err
}
err = errors.New("processing the task failed, rolling back")
return events.Fatal(err)
}
return func(next events.OnMessageHandler) events.OnMessageHandler {
return func(ctx context.Context, delivery events.Delivery) error {
if limit != 0 && delivery.GetAttempt() > limit {
// processing the message has already been attempted `limit` many times
return doRollback(ctx, delivery)
}
// try to process the message
result := next(ctx, delivery)
if result != nil && events.IsFatal(result) {
// fatal error, should roll back immediately
return doRollback(ctx, delivery)
}
if result != nil && limit > 0 && delivery.GetAttempt() >= limit {
// retriable error, but has failed too many times
return doRollback(ctx, delivery)
}
// otherwise, don't roll back
return result
}
}
}