diff --git a/.vscode/settings.json b/.vscode/settings.json index 8a70abfd..edcd6e63 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -3,14 +3,14 @@ "**/.git": true, "**/.DS_Store": true, ".gitignore": true, - // "vendor/": true, + "vendor/": true, + ".github/": true, ".vscode": true, "build*": true, "LICENSE": true, "appveyor.yml": true, "Dockerfile": true }, - "go.formatTool": "gofmt", "go.formatFlags": [ "-s" diff --git a/internal/broker/conn.go b/internal/broker/conn.go index f8c90281..1df9c8a2 100644 --- a/internal/broker/conn.go +++ b/internal/broker/conn.go @@ -31,8 +31,11 @@ import ( "github.com/emitter-io/emitter/internal/provider/logging" "github.com/emitter-io/emitter/internal/security" "github.com/emitter-io/stats" + "github.com/kelindar/rate" ) +const defaultReadRate = 100000 + // Conn represents an incoming connection. type Conn struct { sync.Mutex @@ -45,10 +48,11 @@ type Conn struct { subs *message.Counters // The subscriptions for this connection. measurer stats.Measurer // The measurer to use for monitoring. links map[string]string // The map of all pre-authorized links. + limit *rate.Limiter // The read rate limiter. } // NewConn creates a new connection. -func (s *Service) newConn(t net.Conn) *Conn { +func (s *Service) newConn(t net.Conn, readRate int) *Conn { c := &Conn{ tracked: 0, luid: security.NewID(), @@ -61,7 +65,11 @@ func (s *Service) newConn(t net.Conn) *Conn { // Generate a globally unique id as well c.guid = c.luid.Unique(uint64(address.GetHardware()), "emitter") - //logging.LogTarget("conn", "created", c.guid) + if readRate == 0 { + readRate = defaultReadRate + } + + c.limit = rate.New(readRate, time.Second) // Increment the connection counter atomic.AddInt64(&s.connections, 1) @@ -103,13 +111,17 @@ func (c *Conn) track(contract contract.Contract) { func (c *Conn) Process() error { defer c.Close() reader := bufio.NewReaderSize(c.socket, 65536) - limit := c.service.Config.MaxMessageBytes() + maxSize := c.service.Config.MaxMessageBytes() for { // Set read/write deadlines so we can close dangling connections c.socket.SetDeadline(time.Now().Add(time.Second * 120)) + if c.limit.Limit() { + time.Sleep(50 * time.Millisecond) + continue + } // Decode an incoming MQTT packet - msg, err := mqtt.DecodePacket(reader, limit) + msg, err := mqtt.DecodePacket(reader, maxSize) if err != nil { return err } diff --git a/internal/broker/conn_test.go b/internal/broker/conn_test.go index 7809bcfd..19892075 100644 --- a/internal/broker/conn_test.go +++ b/internal/broker/conn_test.go @@ -34,7 +34,7 @@ func newTestConn() (pipe *netmock.Conn, conn *Conn) { } pipe = netmock.NewConn() - conn = s.newConn(pipe.Client) + conn = s.newConn(pipe.Client, 0) return } diff --git a/internal/broker/handlers_test.go b/internal/broker/handlers_test.go index 3b4d592f..1e48208b 100644 --- a/internal/broker/handlers_test.go +++ b/internal/broker/handlers_test.go @@ -70,7 +70,7 @@ func TestHandlers_onLink(t *testing.T) { s.Cipher, _ = s.License.Cipher() conn := netmock.NewConn() - nc := s.newConn(conn.Client) + nc := s.newConn(conn.Client, 0) resp, ok := nc.onLink([]byte(tc.packet)) assert.Equal(t, tc.success, ok) @@ -89,7 +89,7 @@ func TestHandlers_onMe(t *testing.T) { } conn := netmock.NewConn() - nc := s.newConn(conn.Client) + nc := s.newConn(conn.Client, 0) nc.links["0"] = "key/a/b/c/" resp, success := nc.onMe() meResp := resp.(*meResponse) @@ -208,7 +208,7 @@ func TestHandlers_onSubscribeUnsubscribe(t *testing.T) { } conn := netmock.NewConn() - nc := s.newConn(conn.Client) + nc := s.newConn(conn.Client, 0) s.Cipher, _ = s.License.Cipher() // Subscribe and check for error. @@ -333,7 +333,7 @@ func TestHandlers_onPublish(t *testing.T) { } conn := netmock.NewConn() - nc := s.newConn(conn.Client) + nc := s.newConn(conn.Client, 0) s.Cipher, _ = s.License.Cipher() err := nc.onPublish(&mqtt.Publish{ @@ -431,7 +431,7 @@ func TestHandlers_onPresence(t *testing.T) { } conn := netmock.NewConn() - nc := s.newConn(conn.Client) + nc := s.newConn(conn.Client, 0) s.Cipher, _ = s.License.Cipher() resp, success := nc.onPresence([]byte(tc.payload)) @@ -518,7 +518,7 @@ func TestHandlers_onKeygen(t *testing.T) { } conn := netmock.NewConn() - nc := s.newConn(conn.Client) + nc := s.newConn(conn.Client, 0) s.Cipher, _ = s.License.Cipher() //resp @@ -590,7 +590,7 @@ func TestHandlers_onEmitterRequest(t *testing.T) { measurer: stats.NewNoop(), } - nc := s.newConn(netmock.NewNoop()) + nc := s.newConn(netmock.NewNoop(), 0) ok := nc.onEmitterRequest(channel, []byte(tc.payload), 0) assert.Equal(t, tc.success, ok, tc.channel) }) @@ -640,7 +640,7 @@ func TestHandlers_lookupPresence(t *testing.T) { measurer: stats.NewNoop(), } - s.subscriptions.Subscribe(message.Ssid{1, 2, 3}, s.newConn(netmock.NewNoop())) + s.subscriptions.Subscribe(message.Ssid{1, 2, 3}, s.newConn(netmock.NewNoop(), 0)) presence := s.lookupPresence(message.Ssid{1, 2, 3}) assert.NotEmpty(t, presence) } diff --git a/internal/broker/service.go b/internal/broker/service.go index 6695a3bc..4652e566 100644 --- a/internal/broker/service.go +++ b/internal/broker/service.go @@ -223,7 +223,7 @@ func (s *Service) listen(addr *net.TCPAddr, conf *tls.Config) { // Create new listener logging.LogTarget("service", "starting the listener", addr) l, err := listener.New(addr.String(), listener.Config{ - WriteRate: s.Config.Limit.WriteRate, + FlushRate: s.Config.Limit.FlushRate, TLS: conf, }) if err != nil { @@ -291,7 +291,7 @@ func (s *Service) notifyUnsubscribe(conn *Conn, ssid message.Ssid, channel []byt // Occurs when a new client connection is accepted. func (s *Service) onAcceptConn(t net.Conn) { - conn := s.newConn(t) + conn := s.newConn(t, s.Config.Limit.ReadRate) go conn.Process() } diff --git a/internal/config/config.go b/internal/config/config.go index 6635c524..c7678921 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -160,12 +160,16 @@ type ClusterConfig struct { // LimitConfig represents various limit configurations - such as message size. type LimitConfig struct { - // Maximum message size allowed from/to the peer. Default if not specified is 64kB. + // Maximum message size allowed from/to the client. Default if not specified is 64kB. MessageSize int `json:"messageSize,omitempty"` + // The maximum messages per second allowed to be processed per client connection. This + // effectively restricts the QpS for an individual connection. + ReadRate int `json:"readRate,omitempty"` + // The maximum socket write rate per connection. This does not limit QpS but instead // can be used to scale throughput. Defaults to 60. - WriteRate int `json:"writeRate,omitempty"` + FlushRate int `json:"flushRate,omitempty"` } // LoadProvider loads a provider from the configuration or panics if the configuration is diff --git a/internal/network/listener/listener.go b/internal/network/listener/listener.go index f2606fcf..4d874ed0 100644 --- a/internal/network/listener/listener.go +++ b/internal/network/listener/listener.go @@ -69,7 +69,7 @@ var noTimeout time.Duration // Config represents the configuration of the listener. type Config struct { TLS *tls.Config // The TLS/SSL configuration. - WriteRate int // The maximum write rate (QPS) per connection. + FlushRate int // The maximum flush rate (QPS) per connection. } // New announces on the local network address laddr. The syntax of laddr is @@ -175,7 +175,7 @@ func (m *Listener) Serve() error { func (m *Listener) serve(c net.Conn, donec <-chan struct{}, wg *sync.WaitGroup) { defer wg.Done() - muc := newConn(c, m.config.WriteRate) + muc := newConn(c, m.config.FlushRate) if m.readTimeout > noTimeout { _ = c.SetReadDeadline(time.Now().Add(m.readTimeout)) }