Skip to content

Commit

Permalink
Fix message storage
Browse files Browse the repository at this point in the history
- add "deleted" field to avoid retrying the processing
- add DeleteMessage function to mark processed and deleted messages
- change Messages function to return all messages by msg
- fix and add unit tests
  • Loading branch information
coperius committed Aug 6, 2024
1 parent cac742e commit ddab9ac
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 56 deletions.
102 changes: 61 additions & 41 deletions app/events/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,62 +76,82 @@ func (a *admin) MsgHandler(update tbapi.Update) error {

// it would be nice to ban this user right away, but we don't have forwarded user ID here due to tg privacy limitation.
// it is empty in update.Message. to ban this user, we need to get the match on the message from the locator and ban from there.
info, ok := a.locator.Message(msgTxt)
infos, ok := a.locator.Messages(msgTxt)
if !ok {
return fmt.Errorf("not found %q in locator", shrink(msgTxt, 50))
}

log.Printf("[DEBUG] locator found message %s", info)
log.Printf("[DEBUG] locator found messages %s", infos)
errs := new(multierror.Error)

// check if the forwarded message will ban a super-user and ignore it
if info.UserName != "" && a.superUsers.IsSuper(info.UserName) {
return fmt.Errorf("forwarded message is about super-user %s (%d), ignored", info.UserName, info.UserID)
}
users := make(map[int64]string)

// remove user from the approved list and from storage
if err := a.bot.RemoveApprovedUser(info.UserID); err != nil {
errs = multierror.Append(errs, fmt.Errorf("failed to remove user %d from approved list: %w", info.UserID, err))
}
for _, info := range infos {
// check if the forwarded message will ban a super-user and ignore it
if info.UserName != "" && a.superUsers.IsSuper(info.UserName) {
errs = multierror.Append(errs, fmt.Errorf("forwarded message is about super-user %s (%d), ignored", info.UserName, info.UserID))
continue
}

// make a message with spam info and send to admin chat
spamInfo := []string{}
resp := a.bot.OnMessage(bot.Message{Text: update.Message.Text, From: bot.User{ID: info.UserID}})
spamInfoText := "**can't get spam info**"
for _, check := range resp.CheckResults {
spamInfo = append(spamInfo, "- "+escapeMarkDownV1Text(check.String()))
}
if len(spamInfo) > 0 {
spamInfoText = strings.Join(spamInfo, "\n")
}
newMsgText := fmt.Sprintf("**original detection results for %q (%d)**\n\n%s\n\n\n*the user banned and message deleted*",
escapeMarkDownV1Text(info.UserName), info.UserID, spamInfoText)
if err := send(tbapi.NewMessage(a.adminChatID, newMsgText), a.tbAPI); err != nil {
errs = multierror.Append(errs, fmt.Errorf("failed to send spap detection results to admin chat: %w", err))
}
users[info.UserID] = info.UserName

if a.dry {
return errs.ErrorOrNil()
}
if a.dry {
continue
}

// update spam samples
if err := a.bot.UpdateSpam(msgTxt); err != nil {
return fmt.Errorf("failed to update spam for %q: %w", msgTxt, err)
// delete message
if _, err := a.tbAPI.Request(tbapi.DeleteMessageConfig{ChatID: a.primChatID, MessageID: info.MsgID}); err != nil {
errs = multierror.Append(errs, fmt.Errorf("failed to delete message %d: %w", info.MsgID, err))
} else {
log.Printf("[INFO] message %d deleted", info.MsgID)
// delete from locator
if err := a.locator.DeleteMessage(a.primChatID, info.MsgID); err != nil {
errs = multierror.Append(errs, fmt.Errorf("failed to delete message %d from locator: %w", info.MsgID, err))
}
}
}

// delete message
if _, err := a.tbAPI.Request(tbapi.DeleteMessageConfig{ChatID: a.primChatID, MessageID: info.MsgID}); err != nil {
errs = multierror.Append(errs, fmt.Errorf("failed to delete message %d: %w", info.MsgID, err))
} else {
log.Printf("[INFO] message %d deleted", info.MsgID)
if !a.dry {
// update spam samples
if err := a.bot.UpdateSpam(msgTxt); err != nil {
errs = multierror.Append(errs, fmt.Errorf("failed to update spam for %q: %w", msgTxt, err))
}
}

// ban user
banReq := banRequest{duration: bot.PermanentBanDuration, userID: info.UserID, chatID: a.primChatID,
tbAPI: a.tbAPI, dry: a.dry, training: a.trainingMode, userName: update.Message.ForwardSenderName}
for userID, userName := range users {
// remove user from the approved list and from storage
if err := a.bot.RemoveApprovedUser(userID); err != nil {
errs = multierror.Append(errs, fmt.Errorf("failed to remove user %d from approved list: %w", userID, err))
}

// make a message with spam info and send to admin chat
spamInfo := []string{}
resp := a.bot.OnMessage(bot.Message{Text: update.Message.Text, From: bot.User{ID: userID}})
spamInfoText := "**can't get spam info**"
for _, check := range resp.CheckResults {
spamInfo = append(spamInfo, "- "+escapeMarkDownV1Text(check.String()))
}
if len(spamInfo) > 0 {
spamInfoText = strings.Join(spamInfo, "\n")
}
newMsgText := fmt.Sprintf("**original detection results for %q (%d)**\n\n%s\n\n\n*the user banned and message deleted*",
escapeMarkDownV1Text(userName), userID, spamInfoText)
if err := send(tbapi.NewMessage(a.adminChatID, newMsgText), a.tbAPI); err != nil {
errs = multierror.Append(errs, fmt.Errorf("failed to send spap detection results to admin chat: %w", err))
}

if a.dry {
continue
}

// ban user
banReq := banRequest{duration: bot.PermanentBanDuration, userID: userID, chatID: a.primChatID,
tbAPI: a.tbAPI, dry: a.dry, training: a.trainingMode, userName: update.Message.ForwardSenderName}

if err := banUserOrChannel(banReq); err != nil {
errs = multierror.Append(errs, fmt.Errorf("failed to ban user %d: %w", userID, err))
}

if err := banUserOrChannel(banReq); err != nil {
errs = multierror.Append(errs, fmt.Errorf("failed to ban user %d: %w", info.UserID, err))
}

return errs.ErrorOrNil()
Expand Down
3 changes: 2 additions & 1 deletion app/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,11 @@ func (f SpamLoggerFunc) Save(msg *bot.Message, response *bot.Response) {
type Locator interface {
AddMessage(msg string, chatID, userID int64, userName string, msgID int) error
AddSpam(userID int64, checks []spamcheck.Response) error
Message(msg string) (storage.MsgMeta, bool)
Messages(msg string) ([]storage.MsgMeta, bool)
Spam(userID int64) (storage.SpamData, bool)
MsgHash(msg string) string
UserNameByID(userID int64) string
DeleteMessage(chatID int64, msgID int) error
}

// Bot is an interface for bot events.
Expand Down
33 changes: 25 additions & 8 deletions app/storage/locator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"fmt"
"log"
"strings"
"time"

"github.com/jmoiron/sqlx"
Expand Down Expand Up @@ -53,6 +54,7 @@ func NewLocator(ttl time.Duration, minSize int, db *sqlx.DB) (*Locator, error) {
user_id INTEGER,
user_name TEXT,
msg_id INTEGER,
deleted INT default 0,
PRIMARY KEY (chat_id, msg_id)
)`)
if err != nil {
Expand Down Expand Up @@ -135,17 +137,26 @@ func (l *Locator) AddSpam(userID int64, checks []spamcheck.Response) error {
return l.cleanupSpam()
}

// Message returns message MsgMeta for given msg
// this allows to match messages from admin chat (only text available) to the original message
func (l *Locator) Message(msg string) (MsgMeta, bool) {
var meta MsgMeta
// Messages returns messages MsgMeta for given msg
// this allows to match messages from admin chat (only text available) to the original messages
func (l *Locator) Messages(msg string) ([]MsgMeta, bool) {
var meta []MsgMeta
hash := l.MsgHash(msg)
err := l.db.Get(&meta, `SELECT time, chat_id, user_id, user_name, msg_id FROM messages WHERE hash = ?`, hash)
err := l.db.Select(&meta, `SELECT time, chat_id, user_id, user_name, msg_id FROM messages WHERE hash = ? and deleted == 0`, hash)
if err != nil {
log.Printf("[DEBUG] failed to find message by hash %q: %v", hash, err)
return MsgMeta{}, false
log.Printf("[DEBUG] failed to find messages by hash %q: %v", hash, err)
return []MsgMeta{}, false
}
return meta, true
return meta, len(meta) > 0
}

// DeleteMessage removes message by chatID and msgID
func (l *Locator) DeleteMessage(chatID int64, msgID int) error {
_, err := l.db.Exec(`UPDATE messages SET deleted = 1 WHERE chat_id = ? and msg_id = ?`, chatID, msgID)
if err != nil {
return fmt.Errorf("failed to set message as deleted: %w", err)
}
return nil
}

// UserNameByID returns username by user id. Returns empty string if not found
Expand All @@ -161,6 +172,11 @@ func (l *Locator) UserNameByID(userID int64) string {

// UserIDByName returns user id by username. Returns 0 if not found
func (l *Locator) UserIDByName(userName string) int64 {
// many users have empty usernames, so we need to ignore them
if strings.TrimSpace(userName) == "" {
log.Printf("[DEBUG] failed to find user id by empty name")
return 0
}
var userID int64
err := l.db.Get(&userID, `SELECT user_id FROM messages WHERE user_name = ? LIMIT 1`, userName)
if err != nil {
Expand Down Expand Up @@ -254,6 +270,7 @@ func migrateMessageTable(db *sqlx.DB) error {
user_id INTEGER,
user_name TEXT,
msg_id INTEGER,
deleted INT default 0,
PRIMARY KEY (chat_id, msg_id)
)`)
if err != nil {
Expand Down
93 changes: 87 additions & 6 deletions app/storage/locator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ func TestLocator_AddAndRetrieveMessage(t *testing.T) {

require.NoError(t, locator.AddMessage(msg, chatID, userID, userName, msgID))

retrievedMsg, found := locator.Message(msg)
retrievedMsgs, found := locator.Messages(msg)
require.True(t, found)
assert.Equal(t, MsgMeta{Time: retrievedMsg.Time, ChatID: chatID, UserID: userID, UserName: userName, MsgID: msgID}, retrievedMsg)
require.Len(t, retrievedMsgs, 1)
assert.Equal(t, MsgMeta{Time: retrievedMsgs[0].Time, ChatID: chatID, UserID: userID, UserName: userName, MsgID: msgID}, retrievedMsgs[0])

res := locator.UserNameByID(userID)
assert.Equal(t, userName, res)
Expand All @@ -52,6 +53,39 @@ func TestLocator_AddAndRetrieveMessage(t *testing.T) {

id = locator.UserIDByName("user2")
assert.Equal(t, int64(0), id)

id = locator.UserIDByName("")
assert.Equal(t, int64(0), id)
}

func TestLocator_AddAndRetrieveMessageWithEmptyUsername(t *testing.T) {
locator := newTestLocator(t)

msg := "test message"
chatID := int64(123)
userID := int64(456)
userName := ""
msgID := 789

require.NoError(t, locator.AddMessage(msg, chatID, userID, userName, msgID))

retrievedMsgs, found := locator.Messages(msg)
require.True(t, found)
require.Len(t, retrievedMsgs, 1)
assert.Equal(t, MsgMeta{Time: retrievedMsgs[0].Time, ChatID: chatID, UserID: userID, UserName: userName, MsgID: msgID}, retrievedMsgs[0])

res := locator.UserNameByID(userID)
assert.Equal(t, userName, res)

res = locator.UserNameByID(123456)
assert.Equal(t, "", res)

id := locator.UserIDByName(userName)
assert.Equal(t, int64(0), id)

id = locator.UserIDByName("user2")
assert.Equal(t, int64(0), id)

}

func TestLocator_AddAndRetrieveManyMessage(t *testing.T) {
Expand All @@ -64,9 +98,56 @@ func TestLocator_AddAndRetrieveManyMessage(t *testing.T) {
}

for i := 0; i < 100; i++ {
retrievedMsg, found := locator.Message(fmt.Sprintf("test message %d", i))
retrievedMsgs, found := locator.Messages(fmt.Sprintf("test message %d", i))
require.True(t, found)
require.Len(t, retrievedMsgs, 1)
assert.Equal(t, MsgMeta{Time: retrievedMsgs[0].Time, ChatID: int64(1234), UserID: int64(i%10 + 1), UserName: "name" + strconv.Itoa(i%10+1), MsgID: i}, retrievedMsgs[0])
}
}

func TestLocator_AddAndRetrieveSameMessage(t *testing.T) {
locator := newTestLocator(t)

// add 100 messages for 10 users
for i := 0; i < 100; i++ {
userID := int64(i%10 + 1)
locator.AddMessage("test message", 1234, userID, "name"+strconv.Itoa(int(userID)), i)
}

retrievedMsgs, found := locator.Messages("test message")
require.True(t, found)
require.Len(t, retrievedMsgs, 100)
for i := 0; i < 100; i++ {
assert.Equal(t, MsgMeta{Time: retrievedMsgs[i].Time, ChatID: int64(1234), UserID: int64(i%10 + 1), UserName: "name" + strconv.Itoa(i%10+1), MsgID: i}, retrievedMsgs[i])
}
}

func TestLocator_AddAndDeleteAndRetrieveManyMessage(t *testing.T) {
locator := newTestLocator(t)

// add 100 messages for 10 users
for i := 0; i < 100; i++ {
userID := int64(i%10 + 1)
locator.AddMessage(fmt.Sprintf("test message %d", i), 1234, userID, "name"+strconv.Itoa(int(userID)), i)
}

//delete every 3rd message
for i := 0; i < 100; i++ {
if i%3 == 0 {
locator.DeleteMessage(1234, i)
}
}

for i := 0; i < 100; i++ {
retrievedMsgs, found := locator.Messages(fmt.Sprintf("test message %d", i))
if i%3 == 0 {
require.False(t, found)
require.Len(t, retrievedMsgs, 0)
continue
}
require.True(t, found)
assert.Equal(t, MsgMeta{Time: retrievedMsg.Time, ChatID: int64(1234), UserID: int64(i%10 + 1), UserName: "name" + strconv.Itoa(i%10+1), MsgID: i}, retrievedMsg)
require.Len(t, retrievedMsgs, 1)
assert.Equal(t, MsgMeta{Time: retrievedMsgs[0].Time, ChatID: int64(1234), UserID: int64(i%10 + 1), UserName: "name" + strconv.Itoa(i%10+1), MsgID: i}, retrievedMsgs[0])
}
}

Expand Down Expand Up @@ -94,7 +175,7 @@ func TestLocator_CleanupLogic(t *testing.T) {
"old_hash1", oldTime, int64(111), int64(222), "old_user", 333)
require.NoError(t, err)
_, err = locator.db.Exec(`INSERT INTO messages (hash, time, chat_id, user_id, user_name, msg_id) VALUES (?, ?, ?, ?, ?, ?)`,
"old_hash2", oldTime, int64(111), int64(222), "old_user", 333)
"old_hash2", oldTime, int64(111), int64(222), "old_user", 334)
require.NoError(t, err)

_, err = locator.db.Exec(`INSERT INTO spam (user_id, time, checks) VALUES (?, ?, ?)`,
Expand All @@ -119,7 +200,7 @@ func TestLocator_RetrieveNonExistentMessage(t *testing.T) {
locator := newTestLocator(t)

msg := "non_existent_message"
_, found := locator.Message(msg)
_, found := locator.Messages(msg)
assert.False(t, found, "expected to not find a non-existent message")

_, found = locator.Spam(1234)
Expand Down

0 comments on commit ddab9ac

Please sign in to comment.