forked from influxdata/influxdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
udp.go
99 lines (80 loc) · 2.02 KB
/
udp.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package influxdb
import (
"net"
"sync"
)
// UDPServer
type UDPServer struct {
server *Server
mu sync.Mutex
wg sync.WaitGroup
done chan struct{} // close notification
// The UDP address to listen on.
Addr *net.UDPAddr
// The name of the database to insert data into.
Database string
// The user authorized to insert the data.
User *User
}
// NewUDPServer returns an instance of UDPServer attached to a Server.
func NewUDPServer(server *Server) *UDPServer {
return &UDPServer{server: server}
}
// ListenAndServe opens a UDP socket to listen for messages.
func (s *UDPServer) ListenAndServe() error {
panic("not yet implemented: UDPServer.ListenAndServe()")
/* TEMPORARILY REMOVED FOR PROTOBUFS.
// Validate that server has a UDP address.
if s.Addr == nil {
return ErrBindAddressRequired
}
// Open UDP connection.
conn, err := net.ListenUDP("udp", s.Addr)
if err != nil {
return err
}
defer conn.Close()
// Read messages off the connection and handle them.
buffer := make([]byte, 2048)
for {
n, _, err := conn.ReadFromUDP(buffer)
if err != nil || n == 0 {
log.Error("UDP ReadFromUDP error: %s", err)
continue
}
// Create a JSON decoder.
dec := json.NewDecoder(bytes.NewBuffer(buffer[0:n]))
dec.UseNumber()
// Deserialize data into series.
var a []*serializedSeries
if err := dec.Decode(&a); err != nil {
log.Error("UDP json error: %s", err)
continue
}
// Write data points to the data store.
for _, ss := range a {
if len(ss.Points) == 0 {
continue
}
// Convert to the internal series format.
series, err := ss.series(SecondPrecision)
if err != nil {
log.Error("udp cannot convert received data: %s", err)
continue
}
// TODO: Authorization.
// Lookup database.
db := s.server.Database(s.Database)
if db == nil {
log.Error("udp: %s", ErrDatabaseNotFound)
continue
}
// Write series data to server.
if err := db.WriteSeries(series); err != nil {
log.Error("udp: write data error: %s", err)
continue
}
}
}
*/
}