-
Notifications
You must be signed in to change notification settings - Fork 1
/
txWorker.go
125 lines (105 loc) · 2.56 KB
/
txWorker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package main
import (
"github.com/streadway/amqp"
)
var (
loggableCmds = make(chan command)
logChannel *amqp.Channel
)
func txWorker(unprocessedTxs <-chan string) {
// create a single channel for logging so we don't flood RMQ
// with new channels for each log item.
var err error
logChannel, err = rmqConn.Channel()
failOnError(err, "Failed to open a logging channel")
defer logChannel.Close()
go sendCmdToAudit()
for {
select {
case <-done:
consoleLog.Notice(" [x] Finished processing transactions")
cleanUpTxs(unprocessedTxs)
return
default:
processTxs(unprocessedTxs)
}
}
}
func processTxs(unprocessedTxs <-chan string) {
// If a TX is aborted anwhere in its processing it will bubble up
// to here and catchAbortedTx() will run before processTxs() closes.
// Control returns to txWorker() where the select{} repeats and the
// next transaction is grabbed.
defer catchAbortedTx()
cmd := parseCommand(<-unprocessedTxs)
if !*noAudit {
loggableCmds <- cmd
}
cmd.Execute()
}
func abortTx(msg string) {
panic(msg)
}
func abortTxOnError(err error, msg string) {
if err != nil {
panic(msg)
}
}
func catchAbortedTx() {
if r := recover(); r != nil {
consoleLog.Error(" [x] Aborted transaction:", r)
}
}
func abortTxIfNoAccount(userID string) {
if _, found := accountStore[userID]; !found {
panic("Cannot perform this command on users without an account")
}
}
func cleanUpTxs(unprocessedTxs <-chan string) {
// TODO: Put these back in redis? Just warn for now.
if len(unprocessedTxs) > 0 {
for i := 0; i < len(unprocessedTxs); i++ {
consoleLog.Warning("Unprocessed transaction", <-unprocessedTxs)
}
}
}
func sendCmdToAudit() {
if *noAudit {
consoleLog.Warning("Not sending to audit log")
return
}
for {
select {
case <-done:
consoleLog.Notice(" [x] Finished sending commands to log")
cleanUpCmdLog()
return
case cmd := <-loggableCmds:
header := amqp.Table{
"name": cmd.Name(),
"serviceID": redisBaseKey,
}
ae := cmd.ToAuditEvent()
err := logChannel.Publish(
"", // exchange
auditEventQ, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
Headers: header,
ContentType: "text/plain",
Body: []byte(ae.ToCSV()),
})
failOnError(err, "Failed to publish a message")
consoleLog.Debug("Sent to audit:", cmd.Name())
}
}
}
func cleanUpCmdLog() {
if len(loggableCmds) > 0 {
for i := 0; i < len(loggableCmds); i++ {
cmd := <-loggableCmds
consoleLog.Warning("Unlogged command", cmd.Name())
}
}
}