This repository has been archived by the owner on Apr 3, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 5
/
socketmap.go
110 lines (90 loc) · 2.06 KB
/
socketmap.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package main
import (
"errors"
"fmt"
"sync"
)
type SocketMap interface {
Add(binName, socketUUID string, s Socket)
Get(binName, socketUUID string) (Socket, bool)
Delete(binName, socketUUID string) error
Send(binName string, payload []byte) error
}
type UnSub interface {
Unsubscribe(...string) error
}
func NewSocketMap(unsubber UnSub) SocketMap {
return &sm{
unsubber: unsubber,
}
}
type sm struct {
lk sync.Mutex
unsubber UnSub
smap map[string]map[string]Socket
}
func (sm *sm) Add(binName, socketUUID string, s Socket) {
sm.lk.Lock()
defer sm.lk.Unlock()
if sm.smap == nil {
sm.smap = make(map[string]map[string]Socket)
}
if _, ok := sm.smap[binName]; !ok {
sm.smap[binName] = make(map[string]Socket)
}
sm.smap[binName][socketUUID] = s
}
func (sm *sm) Get(binName, socketUUID string) (Socket, bool) {
sm.lk.Lock()
defer sm.lk.Unlock()
if sm.smap == nil {
return nil, false
}
if _, ok := sm.smap[binName]; !ok {
return nil, false
}
return sm.smap[binName][socketUUID], true
}
func (sm *sm) Delete(binName, socketUUID string) error {
sm.lk.Lock()
defer sm.lk.Unlock()
if sm.smap == nil {
return errors.New("There are no known sockets.")
}
sockets, ok := sm.smap[binName]
if !ok {
return errors.New("No sockets match that bin name.")
}
_, ok = sockets[socketUUID]
if !ok {
return errors.New("No matching socket for that bin name and uuid.")
}
delete(sockets, socketUUID)
if len(sockets) == 0 {
delete(sm.smap, binName)
if sm.unsubber == nil {
return nil
}
if err := sm.unsubber.Unsubscribe(binName); err != nil {
return errors.New(fmt.Sprint("Failure to UNSUBSCRIBE from", binName, err))
}
}
return nil
}
func (sm *sm) Send(binName string, payload []byte) error {
sm.lk.Lock()
defer sm.lk.Unlock()
if sm.smap == nil {
return errors.New("There are no known sockets.")
}
sockets, ok := sm.smap[binName]
if !ok {
return errors.New(fmt.Sprint("Got message for unknown channel:", binName))
}
for _, s := range sockets {
go func(s Socket, p []byte) {
s.Write(p)
}(s, payload)
}
return nil
}