-
Notifications
You must be signed in to change notification settings - Fork 2
/
cleaner.go
105 lines (95 loc) · 2.74 KB
/
cleaner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package main
import (
"context"
"fmt"
"runtime/debug"
"sync"
"sync/atomic"
log "maunium.net/go/maulogger/v2"
"maunium.net/go/mautrix"
"maunium.net/go/mautrix/id"
)
type OKResponse struct {
Removed uint64 `json:"removed"`
Skipped uint64 `json:"skipped"`
Failed uint64 `json:"failed"`
}
func cleanRooms(ctx context.Context, client *mautrix.Client) (*OKResponse, error) {
reqLog := ctx.Value(logContextKey).(log.Logger)
reqLog.Infoln(client.UserID, "requested a room cleanup")
rooms, err := GetRoomList(ctx, client)
if err != nil {
return nil, fmt.Errorf("failed to get room list: %w", err)
}
reqLog.Debugln("Found", len(rooms), "rooms")
var resp OKResponse
var wg sync.WaitGroup
wg.Add(len(rooms))
queue := make(chan id.RoomID)
for i := 1; i <= cfg.ThreadCount; i++ {
threadContext := context.WithValue(ctx, logContextKey, reqLog.Sub(fmt.Sprintf("Thread-%d", i)))
go cleanRoomsThread(threadContext, client, queue, &wg, &resp)
}
for _, roomID := range rooms {
select {
case queue <- roomID:
case <-ctx.Done():
reqLog.Warnfln("Room cleanup for %s was canceled before it completed. Status: %+v", client.UserID, resp)
close(queue)
return &resp, ctx.Err()
}
}
wg.Wait()
close(queue)
reqLog.Infofln("Room cleanup for %s completed successfully. Status: %+v", client.UserID, resp)
return &resp, nil
}
func cleanRoomsThread(ctx context.Context, client *mautrix.Client, queue <-chan id.RoomID, wg *sync.WaitGroup, resp *OKResponse) {
reqLog := ctx.Value(logContextKey).(log.Logger)
defer func() {
err := recover()
if err != nil {
reqLog.Errorfln("Panic in room cleaning thread for %s: %v\n%s", client.UserID, err, debug.Stack())
}
}()
for {
select {
case roomID, ok := <-queue:
if !ok {
return
}
allowed, err := cleanRoom(ctx, client, roomID)
if err != nil {
reqLog.Warnfln("Failed to clean up %s: %v", roomID, err)
atomic.AddUint64(&resp.Failed, 1)
} else if allowed {
atomic.AddUint64(&resp.Removed, 1)
} else {
atomic.AddUint64(&resp.Skipped, 1)
}
wg.Done()
case <-ctx.Done():
return
}
}
}
func cleanRoom(ctx context.Context, client *mautrix.Client, roomID id.RoomID) (allowed bool, err error) {
reqLog := ctx.Value(logContextKey).(log.Logger)
defer func() {
panicErr := recover()
if panicErr != nil {
err = fmt.Errorf("panic while cleaning %s for %s: %v\n%s", roomID, client.UserID, panicErr, debug.Stack())
}
}()
usersToKick, permissionErr := IsAllowedToCleanRoom(ctx, client, roomID)
if permissionErr != nil {
reqLog.Debugfln("Skipping room %s as cleaning is not allowed: %v", roomID, permissionErr)
return
}
allowed = true
err = PushLeaveQueue(ctx, roomID, usersToKick)
if err == nil {
reqLog.Debugfln("Room %s queued for leaving", roomID)
}
return
}