forked from zeromq/goczmq
-
Notifications
You must be signed in to change notification settings - Fork 0
/
beacon.go
162 lines (128 loc) · 3.68 KB
/
beacon.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
package goczmq
/*
#include "czmq.h"
zactor_t *Beacon_new () {
zactor_t *beacon = zactor_new(zbeacon, NULL); return beacon;
}
int Beacon_publish(void *actor, void *data, int size, int interval) {
return zsock_send(actor, "sbi", "PUBLISH", (byte*)data, size, interval);
}
*/
import "C"
import (
"strconv"
"unsafe"
)
// Beacon wraps the CZMQ beacon actor. It implements a
// peer-to-peer discovery service for local networks. Beacons
// can broadcast and receive UDPv4 service broadcasts.
type Beacon struct {
zactorT *C.struct__zactor_t
}
// NewBeacon creates a new Beacon instance.
func NewBeacon() *Beacon {
z := &Beacon{}
z.zactorT = C.Beacon_new()
return z
}
// Verbose sets the beacon to log information to stdout.
func (b *Beacon) Verbose() error {
cmd := C.CString("VERBOSE")
defer C.free(unsafe.Pointer(cmd))
rc := C.zstr_send(unsafe.Pointer(b.zactorT), cmd)
if rc == -1 {
return ErrActorCmd
}
return nil
}
// Configure accepts a port number and configures
// the beacon, returning an address
func (b *Beacon) Configure(port int) (string, error) {
cmd := C.CString("CONFIGURE")
defer C.free(unsafe.Pointer(cmd))
cPort := C.CString(strconv.Itoa(port))
defer C.free(unsafe.Pointer(cPort))
rc := C.zstr_sendm(unsafe.Pointer(b.zactorT), cmd)
if rc == -1 {
return "", ErrActorCmd
}
rc = C.zstr_send(unsafe.Pointer(b.zactorT), cPort)
if rc == -1 {
return "", ErrActorCmd
}
cHostname := C.zstr_recv(unsafe.Pointer(b.zactorT))
hostname := C.GoString(cHostname)
return hostname, nil
}
// Publish publishes an announcement string at an interval
func (b *Beacon) Publish(announcement string, interval int) error {
cmd := C.CString("PUBLISH")
defer C.free(unsafe.Pointer(cmd))
cAnnouncement := C.CString(announcement)
defer C.free(unsafe.Pointer(cAnnouncement))
cInterval := C.CString(strconv.Itoa(interval))
defer C.free(unsafe.Pointer(cInterval))
rc := C.zstr_sendm(unsafe.Pointer(b.zactorT), cmd)
if rc == -1 {
return ErrActorCmd
}
rc = C.zstr_sendm(unsafe.Pointer(b.zactorT), cAnnouncement)
if rc == -1 {
return ErrActorCmd
}
rc = C.zstr_send(unsafe.Pointer(b.zactorT), cInterval)
if rc == -1 {
return ErrActorCmd
}
return nil
}
// PublishBytes publishes an announcement byte slice at an interval
func (b *Beacon) PublishBytes(announcement []byte, interval int) error {
rc := C.Beacon_publish(
unsafe.Pointer(b.zactorT),
unsafe.Pointer(&announcement[0]),
C.int(len(announcement)),
C.int(interval),
)
if rc == -1 {
return ErrActorCmd
}
return nil
}
// Subscribe subscribes to beacons matching the filter
func (b *Beacon) Subscribe(filter string) error {
cmd := C.CString("SUBSCRIBE")
defer C.free(unsafe.Pointer(cmd))
cFilter := C.CString(filter)
defer C.free(unsafe.Pointer(cFilter))
rc := C.zstr_sendm(unsafe.Pointer(b.zactorT), cmd)
if rc == -1 {
return ErrActorCmd
}
rc = C.zstr_send(unsafe.Pointer(b.zactorT), cFilter)
if rc == -1 {
return ErrActorCmd
}
return nil
}
// Recv waits for the specific timeout in milliseconds to receive a beacon
func (b *Beacon) Recv(timeout int) [][]byte {
C.zsock_set_rcvtimeo(unsafe.Pointer(b.zactorT), C.int(timeout))
cAddrFrame := C.zframe_recv(unsafe.Pointer(b.zactorT))
defer C.zframe_destroy(&cAddrFrame)
if cAddrFrame == nil {
return nil
}
addr := C.GoBytes(unsafe.Pointer(C.zframe_data(cAddrFrame)), C.int(C.zframe_size(cAddrFrame)))
cBeaconFrame := C.zframe_recv(unsafe.Pointer(b.zactorT))
defer C.zframe_destroy(&cBeaconFrame)
if cBeaconFrame == nil {
return nil
}
beacon := C.GoBytes(unsafe.Pointer(C.zframe_data(cBeaconFrame)), C.int(C.zframe_size(cBeaconFrame)))
return [][]byte{addr, beacon}
}
// Destroy destroys the beacon.
func (b *Beacon) Destroy() {
C.zactor_destroy(&b.zactorT)
}