forked from mymmrac/telego
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhelpers.go
195 lines (159 loc) · 5.21 KB
/
helpers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
package telego
import (
"errors"
"fmt"
"time"
"github.com/goccy/go-json"
"github.com/valyala/fasthttp"
"github.com/mymmrac/telego/telegoapi"
)
const (
updateChanBuffer = 100
defaultUpdateInterval = time.Second / 2 // 0.5s
retryTimeout = time.Second * 3 // 3s
)
const listeningForWebhookErrMsg = "Listening for webhook: %v"
// SetUpdateInterval sets interval of calling GetUpdates in UpdatesViaLongPulling method. Ensures that between two
// calls of GetUpdates will be at least specified time, but it could be longer.
func (b *Bot) SetUpdateInterval(interval time.Duration) { // TODO: Add bot option for this
b.updateInterval = interval
}
// UpdatesViaLongPulling receive updates in chan using GetUpdates method
// Note: After you done with getting updates you should call StopLongPulling method
func (b *Bot) UpdatesViaLongPulling(params *GetUpdatesParams) (chan Update, error) {
b.stop = make(chan struct{})
b.startedLongPulling = true // TODO: Add mutex
updatesChan := make(chan Update, updateChanBuffer)
if params == nil {
params = &GetUpdatesParams{}
}
go func() {
for {
select {
case <-b.stop:
close(updatesChan)
return
default:
// Continue getting updates
}
updates, err := b.GetUpdates(params)
if err != nil {
b.log.Errorf("Getting updates: %v", err)
b.log.Errorf("Retrying to get updates in %s", retryTimeout.String())
time.Sleep(retryTimeout)
continue
}
for _, update := range updates {
if update.UpdateID >= params.Offset {
params.Offset = update.UpdateID + 1
updatesChan <- update
}
}
time.Sleep(b.updateInterval)
}
}()
return updatesChan, nil
}
// IsRunningLongPulling tells if UpdatesViaLongPulling is running
func (b *Bot) IsRunningLongPulling() bool {
return b.startedLongPulling
}
// StopLongPulling stop reviving updates from UpdatesViaLongPulling method
func (b *Bot) StopLongPulling() { // TODO: [?] Graceful shutdown
if b.startedLongPulling {
b.startedLongPulling = false
close(b.stop)
}
}
// StartListeningForWebhook start server for listening for webhook
// Note: After you done with getting updates you should call StopWebhook method
func (b *Bot) StartListeningForWebhook(address string) {
b.startedWebhook = true // TODO: Add mutex
go func() {
err := b.server.ListenAndServe(address)
if err != nil {
b.log.Errorf(listeningForWebhookErrMsg, err)
}
}()
}
// StartListeningForWebhookTLS start server with TLS for listening for webhook
// Note: After you done with getting updates you should call StopWebhook method
func (b *Bot) StartListeningForWebhookTLS(address, certificateFile, keyFile string) {
b.startedWebhook = true
go func() {
err := b.server.ListenAndServeTLS(address, certificateFile, keyFile)
if err != nil {
b.log.Errorf(listeningForWebhookErrMsg, err)
}
}()
}
// StartListeningForWebhookTLSEmbed start server with TLS (embed) for listening for webhook
// Note: After you done with getting updates you should call StopWebhook method
func (b *Bot) StartListeningForWebhookTLSEmbed(address string, certificateData []byte, keyData []byte) {
b.startedWebhook = true
go func() {
err := b.server.ListenAndServeTLSEmbed(address, certificateData, keyData)
if err != nil {
b.log.Errorf(listeningForWebhookErrMsg, err)
}
}()
}
// IsRunningWebhook tells if StartListeningForWebhook... is running
func (b *Bot) IsRunningWebhook() bool {
return b.startedWebhook
}
// StopWebhook shutdown webhook server used in UpdatesViaWebhook method
// Note: Should be called only after both UpdatesViaWebhook and StartListeningForWebhook...
func (b *Bot) StopWebhook() error { // TODO: [?] Graceful shutdown
if b.startedWebhook {
b.startedWebhook = false
close(b.stop)
return b.server.Shutdown()
}
return nil
}
// UpdatesViaWebhook receive updates in chan from webhook
func (b *Bot) UpdatesViaWebhook(path string) (chan Update, error) {
if b.startedWebhook {
return nil, errors.New("calling UpdatesViaWebhook after starting webhook is not allowed")
}
updatesChan := make(chan Update, updateChanBuffer)
b.stop = make(chan struct{})
b.server.Handler = func(ctx *fasthttp.RequestCtx) {
if string(ctx.Path()) != path {
ctx.SetStatusCode(fasthttp.StatusNotFound)
b.log.Errorf("Unknown path was used in webhook: %q", ctx.Path())
return
}
if method := string(ctx.Method()); method != fasthttp.MethodPost {
err := fmt.Errorf("used invalid HTTP method: %q, required method: %q", method, fasthttp.MethodPost)
b.respondWithError(ctx, err)
b.log.Errorf("Webhook invalid HTTP method: %q", method)
return
}
var update Update
err := json.Unmarshal(ctx.PostBody(), &update)
if err != nil {
b.respondWithError(ctx, fmt.Errorf("decoding update: %w", err))
b.log.Errorf("Webhook decoding error: %v", err)
return
}
updatesChan <- update
ctx.SetStatusCode(fasthttp.StatusOK)
}
go func() {
<-b.stop
close(updatesChan)
}()
return updatesChan, nil
}
func (b *Bot) respondWithError(ctx *fasthttp.RequestCtx, err error) {
//nolint:errcheck
errMsg, _ := json.Marshal(map[string]string{"error": err.Error()})
ctx.SetStatusCode(fasthttp.StatusBadRequest)
ctx.SetContentType(telegoapi.ContentTypeJSON)
_, err = ctx.Write(errMsg)
if err != nil {
b.log.Error("Writing HTTP:", err)
}
}