-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathsyncLoop.go
206 lines (176 loc) · 4.83 KB
/
syncLoop.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
package gotrix
import (
"context"
"errors"
"fmt"
"time"
"github.com/chanbakjsd/gotrix/api"
"github.com/chanbakjsd/gotrix/debug"
"github.com/chanbakjsd/gotrix/event"
"github.com/chanbakjsd/gotrix/matrix"
)
// SyncOptions contains options for the /sync endpoint that is used once the
// Client is opened.
type SyncOptions struct {
Filter event.GlobalFilter
Timeout time.Duration
MinBackoffTime time.Duration
MaxBackoffTime time.Duration
}
// DefaultSyncOptions is the default sync options instance used on every Client
// creation.
var DefaultSyncOptions = SyncOptions{
Filter: event.GlobalFilter{
Room: event.RoomFilter{
IncludeLeave: false,
State: event.StateFilter{
LazyLoadMembers: true,
},
Timeline: event.RoomEventFilter{
Limit: 50,
LazyLoadMembers: true,
},
},
},
Timeout: 5 * time.Second,
MinBackoffTime: 1 * time.Second,
MaxBackoffTime: 300 * time.Second,
}
// Next returns the current Next synchronization argument. Next can ONLY be
// called once the Client is closed, otherwise a panic will occur.
func (c *Client) Next() string {
select {
case <-c.closeDone:
return c.next
default:
panic("Next called on unclosed Client")
}
}
// Open starts the event loop of the client with a background context.
func (c *Client) Open() error {
return c.OpenWithNext("")
}
// syncOpts is the internal copy of the sync states.
type syncOpts struct {
SyncOptions
next string
filterID string
}
// OpenWithNext starts the event loop with the given next string that resumes the sync loop.
// If next is empty, then an initial sync will be done.
func (c *Client) OpenWithNext(next string) error {
ctx, cancel := context.WithCancel(context.Background())
c.closeDone = make(chan struct{})
c.cancelFunc = cancel
filterID, err := c.FilterAdd(c.SyncOpts.Filter)
if err != nil {
return err
}
go c.readLoop(ctx, syncOpts{
SyncOptions: c.SyncOpts,
next: next,
filterID: filterID,
})
return nil
}
// Close signals to the event loop to stop and wait for it to finish.
func (c *Client) Close() error {
c.cancelFunc()
<-c.closeDone
return nil
}
func (c *Client) handleWithRoomID(e []event.RawEvent, roomID matrix.RoomID, isHistorical bool) {
for _, v := range e {
v := v
concrete, err := event.Parse(v)
if w, ok := concrete.(event.RoomEvent); ok {
w.RoomInfo().RoomID = roomID
}
var unknownErr event.UnknownEventTypeError
// Print out warnings.
switch {
case errors.As(err, &unknownErr):
debug.Warn(fmt.Sprintf("unknown event type: %s", unknownErr.Found))
case err != nil:
debug.Warn(fmt.Errorf("error unmarshalling content: %w", err))
}
// Don't call handlers on historical events.
if isHistorical {
continue
}
c.Handler.HandleRaw(c, v)
if err != nil {
continue
}
c.Handler.Handle(c, concrete)
}
}
func (c *Client) readLoop(ctx context.Context, opts syncOpts) {
client := c.WithContext(ctx)
timeout := int(opts.Timeout / time.Millisecond)
next := opts.next
handle := func(e []event.RawEvent) {
c.handleWithRoomID(e, "", next == "")
}
var nextRetryTime time.Duration
timer := time.NewTimer(0)
defer timer.Stop()
defer close(c.closeDone)
<-timer.C
for {
// Fetch next set of events.
debug.Debug("Fetching new events. Next: " + next)
resp, err := client.Sync(api.SyncArg{
Filter: opts.filterID,
Since: next,
Timeout: timeout,
})
if err != nil {
if ctx.Err() != nil {
// The context has finished.
return
}
// Exponentially backoff with a cap of 5 minutes.
nextRetryTime *= 2
if nextRetryTime < opts.MinBackoffTime {
nextRetryTime = opts.MinBackoffTime
}
if nextRetryTime > opts.MaxBackoffTime {
nextRetryTime = opts.MaxBackoffTime
}
debug.Error(fmt.Errorf("error in event loop (retrying in %s): %w", nextRetryTime, err))
timer.Reset(nextRetryTime)
select {
case <-timer.C:
continue
case <-ctx.Done():
return
}
}
if err := c.State.AddEvents(resp); err != nil {
debug.Debug(fmt.Errorf("error adding sync events to state: %w", err))
}
handle(resp.Presence.Events)
handle(resp.AccountData.Events)
handle(resp.ToDevice.Events)
for k, v := range resp.Rooms.Joined {
c.handleWithRoomID(v.State.Events, k, next == "")
c.handleWithRoomID(v.Timeline.Events, k, next == "")
c.handleWithRoomID(v.Ephemeral.Events, k, next == "")
c.handleWithRoomID(v.AccountData.Events, k, next == "")
}
for k, v := range resp.Rooms.Invited {
events := make([]event.RawEvent, len(v.State.Events))
for k, v := range v.State.Events {
events[k] = event.RawEvent(v)
}
c.handleWithRoomID(events, k, next == "")
}
for k, v := range resp.Rooms.Left {
c.handleWithRoomID(v.State.Events, k, next == "")
c.handleWithRoomID(v.Timeline.Events, k, next == "")
c.handleWithRoomID(v.AccountData.Events, k, next == "")
}
next = resp.NextBatch
}
}