Skip to content

Commit

Permalink
Add support for topic patterns
Browse files Browse the repository at this point in the history
Signed-off-by: Chaitanya Munukutla <[email protected]>
  • Loading branch information
c16a committed Apr 6, 2024
1 parent c503016 commit 12423f2
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 19 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ A tiny Websocket-based event broker.
- [ ] Offline messages
- [ ] Clustering
- [x] QoS (0, 1)
- [x] Topc Patterns

### Topic patterns
Topic patterns are heavily inspired by [NATS's subject-based messaging](https://docs.nats.io/nats-concepts/subjects).

## Why does this exist?
Because.
35 changes: 16 additions & 19 deletions broker/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,17 @@ import (

type ConnectedClient struct {
id string
subscriptions map[string]*Subscription
subscriptions []*Subscription
conn WebSocketConnection
mutex sync.RWMutex
}

type Subscription struct {
active bool
group string
}

func (s *Subscription) IsActive() bool {
return s.active
}

func (s *Subscription) GetGroup() string {
return s.group
}

func NewConnectedClient(conn WebSocketConnection, id string) *ConnectedClient {
return &ConnectedClient{
id: id,
conn: conn,
mutex: sync.RWMutex{},
subscriptions: make(map[string]*Subscription),
subscriptions: make([]*Subscription, 0),
}
}

Expand All @@ -51,24 +38,34 @@ func (client *ConnectedClient) GetSubscription(topic string) *Subscription {
client.mutex.RLock()
defer client.mutex.RUnlock()

return client.subscriptions[topic]
for _, sub := range client.subscriptions {
if sub.Matches(topic) {
return sub
}
}

return nil
}

func (client *ConnectedClient) SubscribeToTopic(topic string, group string) {
client.mutex.Lock()
defer client.mutex.Unlock()

subscription := &Subscription{active: true}
subscription := &Subscription{active: true, pattern: topic}
if len(group) > 0 {
subscription.group = group
}

client.subscriptions[topic] = subscription
client.subscriptions = append(client.subscriptions, subscription)
}

func (client *ConnectedClient) UnsubscribeFromTopic(topic string) {
client.mutex.Lock()
defer client.mutex.Unlock()

delete(client.subscriptions, topic)
for i, subscription := range client.subscriptions {
if subscription.Matches(topic) {
client.subscriptions = append(client.subscriptions[:i], client.subscriptions[i+1:]...)
}
}
}
53 changes: 53 additions & 0 deletions broker/subscription.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package broker

import "strings"

const (
PatternSeparator = "."
SingleTokenMatcher = "*"
MultiTokenMatcher = ">"
)

type Subscription struct {
active bool
group string
pattern string
}

func (s *Subscription) IsActive() bool {
return s.active
}

func (s *Subscription) GetGroup() string {
return s.group
}

func (s *Subscription) GetPattern() string {
return s.pattern
}

func (s *Subscription) Matches(topic string) bool {

patternTokens := strings.Split(s.pattern, PatternSeparator)
topicTokens := strings.Split(topic, PatternSeparator)

if len(patternTokens) > len(topicTokens) {
return false
}

for tIdx, topicToken := range topicTokens {
if tIdx > len(patternTokens)-1 {
return false
}
if patternTokens[tIdx] == MultiTokenMatcher {
break
}
if patternTokens[tIdx] == topicToken || patternTokens[tIdx] == SingleTokenMatcher {
continue
} else {
return false
}
}

return true
}
56 changes: 56 additions & 0 deletions broker/subscription_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package broker

import "testing"

var matchTests = []struct {
pattern string
topic string
expected bool
}{
{
pattern: "time.us.*",
topic: "time.us.east",
expected: true,
},
{
pattern: "time.us.*",
topic: "time.us.east.atlanta",
expected: false,
},
{
pattern: "time.us.>",
topic: "time.us.east.atlanta",
expected: true,
},
{
pattern: "time.*.east",
topic: "time.us.east",
expected: true,
},
{
pattern: "time.*.east",
topic: "time.eu.east",
expected: true,
},
{
pattern: ">",
topic: "time.eu.east",
expected: true,
},
}

func TestSubscription_Matches(t *testing.T) {
for _, tt := range matchTests {
t.Run(tt.pattern, func(_t *testing.T) {
subscription := &Subscription{
active: true,
group: "",
pattern: tt.pattern,
}
ok := subscription.Matches(tt.topic)
if ok != tt.expected {
_t.Fatal("doesn't match")
}
})
}
}

0 comments on commit 12423f2

Please sign in to comment.