-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathel.go
173 lines (146 loc) · 3.79 KB
/
el.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
package camellia
import (
"github.com/arianxx/camellia-io/internal"
"time"
)
// EventLoop is the core of this io framework. It schedules the tasks in the loop and dispatchs the coming events.
type EventLoop struct {
*internal.Selector
events []*Event
interval time.Duration
done bool
triggerDataPtr *interface{}
periodTasks []*PeriodTask
}
// NewEventLoop creates a new eventloop.
func NewEventLoop() *EventLoop {
return &EventLoop{
Selector: internal.New(1024),
events: []*Event{},
periodTasks: []*PeriodTask{},
interval: 100 * time.Millisecond,
}
}
// AddEvent adds a event to the eventloop.
func (el *EventLoop) AddEvent(e *Event) {
el.events = append(el.events, e)
}
// AddPeriodTask adds a period task to the eventloop.
func (el *EventLoop) AddPeriodTask(t *PeriodTask) {
el.periodTasks = append(el.periodTasks, t)
}
// SetTriggerDataPtr used to transmit data in library event(e.g. Socket) and user defined event.
// The data ptr will be used as the second parameter of the user event for the next call.
func (el *EventLoop) SetTriggerDataPtr(data interface{}) {
el.triggerDataPtr = &data
}
// Run blocks the thread to serve until the server has been broken.
func (el *EventLoop) Run() {
for _, e := range el.events {
if e.Serving != nil {
e.Serving(el, nil)
}
}
for _, t := range el.periodTasks {
t.setNextTriggerTime()
}
for !el.done {
el.Tick()
}
}
// Tick waits one cycle of the whole eventloop and processes the corresponding events.
func (el *EventLoop) Tick() {
var (
ed EventData
sleepTime = el.interval
)
nearestTask := el.findNearestTask()
if nearestTask != nil {
sleepTime = nearestTask.nextTriggerTime.Sub(time.Now())
}
keys, _, _ := el.Selector.Poll(int(sleepTime / time.Millisecond))
for _, k := range keys {
ed = k.Data.(EventData)
action := ed.e(el, ed.data)
el.processAction(action, k.Fd)
}
if nearestTask != nil {
nearestTask.Event(el, nil)
nearestTask.setNextTriggerTime()
}
}
// Register registers a event in the internal selector.
func (el *EventLoop) Register(fd int, mask uint32, e EventProc, d interface{}) error {
return el.Selector.Register(fd, mask, EventData{e, d})
}
// Done broke the running of the server.
func (el *EventLoop) Done() {
el.done = true
}
func (el *EventLoop) findNearestTask() *PeriodTask {
var ans *PeriodTask
for _, t := range el.periodTasks {
if ans == nil || t.nextTriggerTime.Before(ans.nextTriggerTime) {
ans = t
}
}
return ans
}
func (el *EventLoop) processAction(action Action, fd int) {
switch action {
case SHUTDOWN_RD:
_, _ = el.Unregister(fd, internal.EV_READABLE)
case SHUTDOWN_WR:
_, _ = el.Unregister(fd, internal.EV_WRITABLE)
case SHUTDOWN_RDWR:
_, _ = el.Unregister(fd, internal.EV_READABLE)
_, _ = el.Unregister(fd, internal.EV_WRITABLE)
case TRIGGER_OPEN_EVENT:
for _, t := range el.events {
if t.Open != nil {
t.Open(el, el.triggerDataPtr)
}
}
case TRIGGER_DATA_EVENT:
for _, t := range el.events {
if t.Data != nil {
t.Data(el, el.triggerDataPtr)
}
}
case TRIGGER_CLOSE_EVENT:
for _, t := range el.events {
if t.Closed != nil {
t.Closed(el, el.triggerDataPtr)
}
}
case CONTINUE:
}
el.triggerDataPtr = nil
}
type Action int
const (
CONTINUE Action = iota
SHUTDOWN_RD
SHUTDOWN_WR
SHUTDOWN_RDWR
TRIGGER_OPEN_EVENT
TRIGGER_DATA_EVENT
TRIGGER_CLOSE_EVENT
)
type Event struct {
Serving, Open, Closed, Data TriggerProc
}
type EventProc func(el *EventLoop, data interface{}) Action
type EventData struct {
e EventProc
data interface{}
}
type TriggerProc func(el *EventLoop, dataPtr *interface{})
type PeriodTask struct {
Interval time.Duration
Event TriggerProc
nextTriggerTime time.Time
}
func (t *PeriodTask) setNextTriggerTime() {
t.nextTriggerTime = time.Now().Add(t.Interval)
}