-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsender_thread.go
67 lines (55 loc) · 1.14 KB
/
sender_thread.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
package notify
import (
"sync"
"time"
"github.com/raohwork/notify/model"
"github.com/raohwork/notify/types"
)
type data struct {
i *model.Item
drv types.Driver
ch chan *thread
}
type thread struct {
ch chan *data
SenderOptions
wg *sync.WaitGroup
id uint16
job *jobCtrl
}
func (t *thread) mainloop() {
for d := range t.ch {
t.run(d.i, d.drv)
d.ch <- t
}
t.wg.Done()
}
func (t *thread) run(i *model.Item, drv types.Driver) {
defer func() {
t.job.lset(t.id, "")
}()
now := time.Now()
next, stop := t.Scheduler(drv.Type(), i.ID, now, i.Tried)
state := types.PENDING
i.Tried++
i.NextAt = next.Unix()
if stop {
i.Tried = t.MaxTries
}
if i.Tried >= t.MaxTries {
state = types.FAILED
}
resp, err := drv.Send(i.Endpoint, i.Content)
if err == nil {
state = types.SUCCESS
} else {
// TODO: log error
if len(resp) == 0 {
resp = []byte(err.Error())
}
}
t.Update(i.ID, i.Tried, i.NextAt, state, resp)
}