-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathview.go
122 lines (106 loc) · 2.31 KB
/
view.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package gocqrs
import (
"log"
"time"
)
const (
RebuiltOpt = "rebuild"
PurgeOpt = "purge"
)
type View struct {
Name string `json:"view"`
All bool `json:"all"`
CatchUp bool `json:"catchup"`
V Viewer
Store EventStore
wakeUP chan bool
running bool
every time.Duration
}
type Viewer interface {
Init(params map[string]string, dev bool)
Purge() error
Status() (uint64, error)
Stream() string
Rebuild() error
Apply(event Event) error
}
func NewView(name string, i Viewer) *View {
var v View
v.Name = name
v.wakeUP = make(chan bool, 2)
v.V = i
// set default wakeup time every 400ms
v.every = time.Duration(time.Millisecond * 400)
return &v
}
func (v *View) awake() {
for {
time.Sleep(v.every)
v.wakeUP <- true
}
}
func (v *View) Run(action string, params map[string]string, dev bool) error {
var err error
switch action {
case RebuiltOpt:
err := v.V.Purge()
if err != nil {
panic("Failed to purge view:" + err.Error())
}
// Rebuild view
// init view
v.V.Init(params, dev)
case PurgeOpt:
v.V.Init(params, dev)
log.Println("Purging view...")
err := v.V.Purge()
if err != nil {
panic("Failed to purge view:" + err.Error())
}
return nil
default:
// init view
v.V.Init(params, dev)
}
go v.awake()
mainStream := v.V.Stream()
for {
v.running = true
curVersion, err := v.V.Status()
if err != nil {
log.Println("View not started, no status available")
}
esVersion, err := v.Store.Version(mainStream)
if err != nil {
log.Println("Failed to get current stream version, sleeping for few seconds")
time.Sleep(time.Second * 5)
continue
}
if esVersion > curVersion || curVersion == 0 {
log.Println("current Version:", curVersion, " evento version:", esVersion)
log.Println(mainStream)
log.Println("Catching up from version ", curVersion, " to version ", esVersion)
var events chan Event
if curVersion == 0 {
events = v.Store.Scan(mainStream, curVersion, esVersion)
} else {
events = v.Store.Scan(mainStream, curVersion+1, esVersion)
}
for e := range events {
err := v.V.Apply(e)
log.Println("Applying..", e.EventVersion)
log.Println(e)
if err != nil {
log.Println(err)
log.Fatal("Failed to apply event :", e)
}
}
}
v.running = false
select {
case <-v.wakeUP:
}
}
return err
}