Skip to content

Commit

Permalink
Support for switching between channel and thread modes
Browse files Browse the repository at this point in the history
  • Loading branch information
venkytv committed May 13, 2023
1 parent 0a6f4be commit e99e386
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 27 deletions.
10 changes: 6 additions & 4 deletions backend/slack_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,9 @@ func (s *SlackBackend) Read() {
// Found message in bot-generated message cache
logrus.Debugf("Ignoring my message: %#v", ev)
break
} else {
// Cache message for 1 minute
s.msgCache.Set(ev.Timestamp, nil)
}
}

Expand Down Expand Up @@ -221,10 +224,9 @@ func (s SlackBackend) Post() {
logrus.Error("PostMessage error: ", err)
}

// Cache bot messages for a minute
err = s.msgCache.Set(timestamp, nil)
if err != nil {
logrus.Error("Error caching message timestamp: ", err)
if msg.NeedThreadId {
logrus.Debugf("Returning thread ID %s on channel", timestamp)
msg.ThreadIdChan <- timestamp
}
}
}
Expand Down
10 changes: 9 additions & 1 deletion conversation/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,20 @@ import (
"github.com/sirupsen/logrus"
)

// Conversation types
const (
ConversationTypeThreaded = iota
ConversationTypeChannel
)

type Conversation struct {
conversationType int
threadId string
channelId string
channelName string
manager *Manager
engine engine.Enginer
engineName string
engineQueues engine.EngineQueues
prefixUsername bool
directMessagesOnly bool
Expand All @@ -37,7 +45,7 @@ func (c *Conversation) Start(ctx context.Context) {
ChannelName: c.channelName,
ThreadId: c.threadId,
}
c.manager.Post(m)
c.manager.Post(c, m)
} else {
logrus.Debug("Done with conversation")
return
Expand Down
139 changes: 118 additions & 21 deletions conversation/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package conversation

import (
"context"
"fmt"
"path/filepath"
"regexp"
"strings"
"sync"
"time"

"github.com/duh-uh/teabot/backend"
"github.com/duh-uh/teabot/engine"
Expand All @@ -25,6 +27,8 @@ type Manager struct {
convLock *sync.RWMutex
channelConversations map[string]map[string]*Conversation
channelConvLock *sync.RWMutex

commandRegex *regexp.Regexp
}

func NewManager(ctx context.Context, cfg *cli.Context, backend backend.Backender, backendQueues backend.BackendQueues) *Manager {
Expand All @@ -41,6 +45,8 @@ func NewManager(ctx context.Context, cfg *cli.Context, backend backend.Backender
convLock: &sync.RWMutex{},
channelConversations: make(map[string]map[string]*Conversation),
channelConvLock: &sync.RWMutex{},

commandRegex: regexp.MustCompile(fmt.Sprintf(`\b%s(.+)\b`, globals.BotUrlScheme)),
}

engine.ConfigInit()
Expand Down Expand Up @@ -158,8 +164,26 @@ func (cm *Manager) getEngineEnvironment(m *message.Message, env map[string]strin
return envmap
}

func (cm *Manager) cleanupConversation(c *Conversation) {
// Remove conversation from manager
if c.conversationType == ConversationTypeThreaded {
cm.convLock.Lock()
delete(cm.conversations, c.threadId)
globals.NumThreadedConversations.Dec()
globals.NumConversations.Dec()
cm.convLock.Unlock()
} else {
cm.channelConvLock.Lock()
delete(cm.channelConversations[c.channelId], c.engineName)
globals.NumChannelConversations.Dec()
globals.NumConversations.Dec()
cm.channelConvLock.Unlock()
}
}

func (cm *Manager) addThreadedConversation(ctx context.Context, c *Conversation, threadId string) {
c.threadId = threadId
c.conversationType = ConversationTypeThreaded

cm.convLock.Lock()
if _, exists := cm.conversations[threadId]; !exists {
Expand All @@ -170,40 +194,30 @@ func (cm *Manager) addThreadedConversation(ctx context.Context, c *Conversation,

go func() {
c.Start(ctx)

// Remove conversation from manager
cm.convLock.Lock()
delete(cm.conversations, threadId)
globals.NumThreadedConversations.Dec()
globals.NumConversations.Dec()
cm.convLock.Unlock()
cm.cleanupConversation(c)
}()
} else {
cm.convLock.Unlock()
logrus.Infof("Race detected: conversation: %#v, thread: %s", c, threadId)
}
}

func (cm *Manager) addChannelConversation(ctx context.Context, c *Conversation, channelId string, convId string) bool {
func (cm *Manager) addChannelConversation(ctx context.Context, c *Conversation, channelId string) bool {
c.conversationType = ConversationTypeChannel

cm.channelConvLock.Lock()
if _, exists := cm.channelConversations[channelId][convId]; !exists {
if _, exists := cm.channelConversations[channelId][c.engineName]; !exists {
if _, exists := cm.channelConversations[channelId]; !exists {
cm.channelConversations[channelId] = map[string]*Conversation{}
}
cm.channelConversations[channelId][convId] = c
cm.channelConversations[channelId][c.engineName] = c
globals.NumChannelConversations.Inc()
globals.NumConversations.Inc()
cm.channelConvLock.Unlock()

go func() {
c.Start(ctx)

// Remove conversation from manager
cm.channelConvLock.Lock()
delete(cm.channelConversations[channelId], convId)
globals.NumChannelConversations.Dec()
globals.NumConversations.Dec()
cm.channelConvLock.Unlock()
cm.cleanupConversation(c)
}()

return true
Expand Down Expand Up @@ -289,6 +303,7 @@ func (cm *Manager) GetConversations(ctx context.Context, m *message.Message) []*
channelName: m.ChannelName,
manager: cm,
engine: e,
engineName: config.Name,
engineQueues: engqs,
prefixUsername: config.PrefixUsername,
directMessagesOnly: config.DirectMessagesOnly,
Expand All @@ -299,12 +314,12 @@ func (cm *Manager) GetConversations(ctx context.Context, m *message.Message) []*
conversations = append(conversations, &c)
logrus.Debugf("New threaded conversation with %s: %+v", config.Name, c)
} else {
if cm.addChannelConversation(ctx, &c, m.ChannelId, config.Name) {
if cm.addChannelConversation(ctx, &c, m.ChannelId) {
conversations = append(conversations, &c)
logrus.Debugf("New channel conversation with %s: %+v", config.Name, c)
logrus.Debugf("New channel conversation with %s: %+v", c.engineName, c)
} else {
logrus.Debugf("Ignoring trigger as bot already active: %s: channel='%s' msg='%s' trigger='%s'",
config.Name, c.channelName, m.Text, re.String())
c.engineName, c.channelName, m.Text, re.String())
}
}

Expand All @@ -316,7 +331,89 @@ func (cm *Manager) GetConversations(ctx context.Context, m *message.Message) []*
return conversations
}

func (cm *Manager) Post(m *message.Message) {
// Conversation commands
const (
_ = iota
ConversationCommandSwitchChannel
ConversationCommandSwitchThread
)

func (cm *Manager) Post(c *Conversation, m *message.Message) {
logrus.Debugf("Posting message to backend: %#v", m)

command := 0
if strings.Contains(m.Text, globals.BotUrlScheme) {
matches := cm.commandRegex.FindStringSubmatch(m.Text)
if len(matches) > 0 {
switch matches[1] {
case "switch/channel":
command = ConversationCommandSwitchChannel
case "switch/thread":
command = ConversationCommandSwitchThread
}

if command != 0 {
logrus.Debugf("Matched command: %s", matches[0])

// Remove command from message text
m.Text = strings.TrimSpace(strings.Replace(m.Text, matches[0], "", 1))
if len(m.Text) == 0 {
m.Text = "_..._"
}
} else {
logrus.Debugf("Ignoring unknown command in message: %s", m.Text)
}
}
}

if command == ConversationCommandSwitchThread {
// Need the new thread ID
m.NeedThreadId = true
m.ThreadIdChan = make(chan string, 1)
}

// Send message to backend
cm.backendQueues.RespQ <- m

if m.NeedThreadId {
// Wait for thread ID
select {
case m.ThreadId = <-m.ThreadIdChan:
logrus.Debugf("Got thread ID: %s", m.ThreadId)
case <-time.After(5 * time.Second):
logrus.Warnf("Timeout waiting for thread ID: bot=%s channel=%s",
c.engineName, c.channelName)
return
}

}

switch command {
case ConversationCommandSwitchChannel:
// Switch to channel conversation
logrus.Debugf("Switching to channel conversation for %s", c.engineName)
cm.convLock.Lock()
cm.channelConvLock.Lock()
delete(cm.conversations, m.ThreadId)
cm.channelConversations[m.ChannelId][c.engineName] = c
c.threadId = ""
globals.NumThreadedConversations.Dec()
globals.NumChannelConversations.Inc()
cm.channelConvLock.Unlock()
cm.convLock.Unlock()

case ConversationCommandSwitchThread:
// Switch to threaded conversation
logrus.Debugf("Switching to threaded conversation for %s: channel=%s thread=%s",
c.engineName, m.ChannelId, m.ThreadId)
cm.convLock.Lock()
cm.channelConvLock.Lock()
delete(cm.channelConversations[m.ChannelId], c.engineName)
cm.conversations[m.ThreadId] = c
c.threadId = m.ThreadId
globals.NumChannelConversations.Dec()
globals.NumThreadedConversations.Inc()
cm.channelConvLock.Unlock()
cm.convLock.Unlock()
}
}
3 changes: 2 additions & 1 deletion globals/globals.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import (
)

const (
BotName = "teabot"
BotName = "botters"
BotUrlScheme = BotName + "://"
)

var (
Expand Down
3 changes: 3 additions & 0 deletions message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,7 @@ type Message struct {
InThread bool
DirectMessage bool
Locale string

NeedThreadId bool
ThreadIdChan chan string
}

0 comments on commit e99e386

Please sign in to comment.