-
Notifications
You must be signed in to change notification settings - Fork 0
/
transaction.go
164 lines (149 loc) · 3.32 KB
/
transaction.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
package stored
import (
"fmt"
"github.com/apple/foundationdb/bindings/go/src/fdb"
)
type transactionTask struct {
promise *Promise
onDone func(err error) error
check bool
}
// Transaction allow you to join several Promises into one transaction
// also parallel executes promises effectively trying to parallel where its possible
type Transaction struct {
tasks []transactionTask
db *fdb.Database
writable bool
readTr fdb.ReadTransaction
tr fdb.Transaction
started bool
finish bool
err error
}
func (t *Transaction) isReadOnly() bool {
for _, task := range t.tasks {
if !task.promise.readOnly {
return false
}
}
return true
}
func (t *Transaction) clear() {
for _, task := range t.tasks {
task.promise.clear()
}
}
func (t *Transaction) initRead(tr fdb.ReadTransaction) {
t.tasks = []transactionTask{}
t.readTr = tr
t.started = true
}
func (t *Transaction) initWrite(tr fdb.Transaction) {
t.tasks = []transactionTask{}
t.readTr = tr
t.tr = tr
t.writable = true
t.started = true
}
func (t *Transaction) setTr(promise *Promise) {
promise.tr = t.tr
promise.readTr = t.readTr
}
func (t *Transaction) transact() {
if t.finish { // transaction already executed
return
}
if t.started {
_, err := t.execute()
if t.err == nil {
t.err = err
}
return
}
t.started = true
db := t.db
var err error
if t.isReadOnly() {
_, err = db.ReadTransact(func(tr fdb.ReadTransaction) (interface{}, error) {
t.clear()
t.readTr = tr.Snapshot()
return t.execute()
})
t.confirm()
} else {
_, err = db.Transact(func(tr fdb.Transaction) (ret interface{}, err error) {
t.clear()
t.tr = tr
t.readTr = tr
return t.execute()
})
t.confirm()
}
if t.err == nil {
t.err = err
}
}
// will set all promises as confirmed
func (t *Transaction) confirm() {
for _, task := range t.tasks {
task.promise.confirmed = true
}
}
func (t *Transaction) execute() (ret interface{}, err error) {
chains := make([]Chain, len(t.tasks))
for i, task := range t.tasks {
t.setTr(task.promise)
chains[i] = task.promise.chain
}
t.finish = true
next := true
for next {
next = false
// go through all chain events
for i, chain := range chains {
task := t.tasks[i]
promise := task.promise
if chain != nil {
chains[i] = chain()
// once error happened at any promise - transaction is failed
if promise.err != nil && promise.err != ErrSkip {
promise.after = nil // no after in that case
if task.check {
err = promise.err
fmt.Println("PROMISE ERERRR", err)
return
}
}
next = true
} else { // if promise is done we chan check for postponed relative promises
if promise.after != nil {
after := promise.after().self()
promise.after = nil
t.setTr(after)
t.tasks = append(t.tasks, transactionTask{promise: after})
chains = append(chains, after.chain)
next = true
}
if task.onDone != nil {
promise.err = task.onDone(promise.err)
if promise.err != nil {
return // cancel the transaction
}
}
}
}
}
return
}
// Err will perform all promises and return err if any of them failed
func (t *Transaction) Err() error {
t.transact()
return t.err
}
// Fail will set the transaction error, so
func (t *Transaction) Fail(err error) {
t.err = err
if t.writable {
t.tr.Cancel()
}
}