Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added TCP support #4

Merged
merged 2 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
# microq
A tiny Websocket-based event broker.
A tiny event broker.

## Features
- [x] Websocket support
- [x] TCP support
- [x] Grouped subscriptions
- [ ] Offline messages
- [ ] Clustering
Expand Down
13 changes: 8 additions & 5 deletions broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ import (
"sync"
)

type WebSocketConnection interface {
WriteMessage(messageType int, data []byte) error
ReadMessage() (messageType int, p []byte, err error)
type GenericConnection interface {
WriteMessage(data []byte) error
}

type Broker struct {
Expand All @@ -30,11 +29,15 @@ func (broker *Broker) Connect(clientId string, client *ConnectedClient) {
broker.clients[clientId] = client
}

func (broker *Broker) Disconnect(clientId string) {
func (broker *Broker) Disconnect(client *ConnectedClient) {
broker.mutex.Lock()
defer broker.mutex.Unlock()

delete(broker.clients, clientId)
for _, c := range broker.clients {
if client.GetId() == c.GetId() {
delete(broker.clients, client.GetId())
}
}
}

func (broker *Broker) Broadcast(event events.PubEvent) error {
Expand Down
23 changes: 19 additions & 4 deletions broker/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,18 @@ package broker

import (
"encoding/json"
"github.com/gorilla/websocket"
"sync"
)

type ConnectedClient struct {
id string
subscriptions []*Subscription
unsubscriptions []*Subscription
conn WebSocketConnection
conn GenericConnection
mutex sync.RWMutex
}

func NewConnectedClient(conn WebSocketConnection, id string) *ConnectedClient {
func NewConnectedClient(conn GenericConnection, id string) *ConnectedClient {
return &ConnectedClient{
id: id,
conn: conn,
Expand All @@ -24,11 +23,27 @@ func NewConnectedClient(conn WebSocketConnection, id string) *ConnectedClient {
}
}

func NewUnidentifiedClient(conn GenericConnection) *ConnectedClient {
return NewConnectedClient(conn, "")
}

func (client *ConnectedClient) IsIdentified() bool {
return client.id != ""
}

func (client *ConnectedClient) SetId(id string) {
client.id = id
}

func (client *ConnectedClient) GetId() string {
return client.id
}

func (client *ConnectedClient) WriteDataMessage(data []byte) error {
client.mutex.Lock()
defer client.mutex.Unlock()

return client.conn.WriteMessage(websocket.TextMessage, data)
return client.conn.WriteMessage(data)
}

func (client *ConnectedClient) WriteInterface(v any) error {
Expand Down
6 changes: 1 addition & 5 deletions broker/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,10 @@ import (
type TestingWebSocketConnection struct {
}

func (t *TestingWebSocketConnection) WriteMessage(messageType int, data []byte) error {
func (t *TestingWebSocketConnection) WriteMessage(data []byte) error {
return nil
}

func (t *TestingWebSocketConnection) ReadMessage() (messageType int, p []byte, err error) {
return 0, nil, nil
}

func TestConnectedClient_Subscribe_Unsubscribe(t *testing.T) {
client := NewConnectedClient(&TestingWebSocketConnection{}, "client-1")

Expand Down
16 changes: 16 additions & 0 deletions conn/tcp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package conn

import "net"

type TCPConnection struct {
conn *net.TCPConn
}

func NewTCPConnection(conn *net.TCPConn) *TCPConnection {
return &TCPConnection{conn: conn}
}

func (tc *TCPConnection) WriteMessage(data []byte) error {
_, err := tc.conn.Write(data)
return err
}
15 changes: 15 additions & 0 deletions conn/ws.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package conn

import "github.com/gorilla/websocket"

type WebsocketConnection struct {
conn *websocket.Conn
}

func NewWebsocketConnection(conn *websocket.Conn) *WebsocketConnection {
return &WebsocketConnection{conn: conn}
}

func (wc *WebsocketConnection) WriteMessage(data []byte) error {
return wc.conn.WriteMessage(websocket.TextMessage, data)
}
8 changes: 8 additions & 0 deletions events/conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package events

const Conn = "conn"

type ConnEvent struct {
Kind string `json:"kind"`
ClientId string `json:"client_id"`
}
9 changes: 9 additions & 0 deletions events/connack.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package events

const ConnAck = "connack"

type ConnAckEvent struct {
Kind string `json:"kind"`
ClientId string `json:"client_id"`
Success bool `json:"success"`
}
1 change: 1 addition & 0 deletions events/puback.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ const PubAck = "puback"
type PubAckEvent struct {
Kind string `json:"kind"`
PacketId string `json:"packet_id"`
Success bool `json:"success"`
}
1 change: 1 addition & 0 deletions events/pubcomp.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ const PubComp = "pubcomp"
type PubCompEvent struct {
Kind string `json:"kind"`
PacketId string `json:"packet_id"`
Success bool `json:"success"`
}
2 changes: 2 additions & 0 deletions handlers/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ func HandleMessage(client *broker.ConnectedClient, broker *broker.Broker, sp sto
return handleSubscribe(message, client)
case events.Unsub:
return handleUnsubscribe(message, client)
case events.Conn:
return handleConn(message, client, broker)
}
return nil
}
24 changes: 24 additions & 0 deletions handlers/conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package handlers

import (
"encoding/json"
"github.com/c16a/microq/broker"
"github.com/c16a/microq/events"
)

func handleConn(message []byte, client *broker.ConnectedClient, b *broker.Broker) error {
var event events.ConnEvent
err := json.Unmarshal(message, &event)
if err != nil {
return err
}

client.SetId(event.ClientId)
b.Connect(event.ClientId, client)
subackEvent := &events.ConnAckEvent{
Kind: events.ConnAck,
Success: true,
ClientId: event.ClientId,
}
return client.WriteInterface(subackEvent)
}
10 changes: 10 additions & 0 deletions handlers/pub.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,23 @@ package handlers

import (
"encoding/json"
"errors"
"github.com/c16a/microq/broker"
"github.com/c16a/microq/events"
"github.com/c16a/microq/storage"
"github.com/google/uuid"
)

func handlePublish(message []byte, client *broker.ConnectedClient, broker *broker.Broker, sp storage.Provider) error {

if !client.IsIdentified() {
client.WriteInterface(&events.PubAckEvent{
Kind: events.PubAck,
Success: false,
})
return errors.New("unidentified client")
}

var event events.PubEvent
err := json.Unmarshal(message, &event)
if err != nil {
Expand Down
9 changes: 9 additions & 0 deletions handlers/pubrel.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,20 @@ package handlers

import (
"encoding/json"
"errors"
"github.com/c16a/microq/broker"
"github.com/c16a/microq/events"
)

func handlePubrel(message []byte, client *broker.ConnectedClient) error {
if !client.IsIdentified() {
client.WriteInterface(&events.PubCompEvent{
Kind: events.PubComp,
Success: false,
})
return errors.New("unidentified client")
}

var event events.PubRelEvent
err := json.Unmarshal(message, &event)
if err != nil {
Expand Down
9 changes: 9 additions & 0 deletions handlers/sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,20 @@ package handlers

import (
"encoding/json"
"errors"
"github.com/c16a/microq/broker"
"github.com/c16a/microq/events"
)

func handleSubscribe(message []byte, client *broker.ConnectedClient) error {
if !client.IsIdentified() {
client.WriteInterface(&events.SubAckEvent{
Kind: events.SubAck,
Success: false,
})
return errors.New("unidentified client")
}

var event events.SubEvent
err := json.Unmarshal(message, &event)
if err != nil {
Expand Down
9 changes: 9 additions & 0 deletions handlers/unsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,20 @@ package handlers

import (
"encoding/json"
"errors"
"github.com/c16a/microq/broker"
"github.com/c16a/microq/events"
)

func handleUnsubscribe(message []byte, client *broker.ConnectedClient) error {
if !client.IsIdentified() {
client.WriteInterface(&events.UnsubAckEvent{
Kind: events.PubAck,
Success: false,
})
return errors.New("unidentified client")
}

var event events.UnsubEvent
err := json.Unmarshal(message, &event)
if err != nil {
Expand Down
45 changes: 40 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package main

import (
"bufio"
"github.com/c16a/microq/broker"
"github.com/c16a/microq/conn"
"github.com/c16a/microq/handlers"
"github.com/c16a/microq/storage"
"github.com/gorilla/websocket"
"log"
"net"
"net/http"
)

Expand All @@ -14,6 +17,40 @@ func main() {
defer storageProvider.Close()

b := broker.NewBroker()
go runWebSocketInterface(b, storageProvider)
log.Fatal(runTcpInterface(b, storageProvider))
}

func runTcpInterface(b *broker.Broker, storageProvider storage.Provider) error {
listener, err := net.ListenTCP("tcp", &net.TCPAddr{Port: 8081})
if err != nil {
return err
}

for {
// Handles one TCP client
c, err := listener.AcceptTCP()
if err != nil {
continue
}

scanner := bufio.NewScanner(c)

for scanner.Scan() {
line := scanner.Text()

tcpConn := conn.NewTCPConnection(c)
client := broker.NewUnidentifiedClient(tcpConn)

err = handlers.HandleMessage(client, b, storageProvider, []byte(line))
if err != nil {
continue
}
}
}
}

func runWebSocketInterface(b *broker.Broker, storageProvider storage.Provider) {
var upgrader = websocket.Upgrader{}
http.HandleFunc("/echo", echo(upgrader, b, storageProvider))

Expand All @@ -28,16 +65,14 @@ func echo(upgrader websocket.Upgrader, b *broker.Broker, sp storage.Provider) fu
}
defer c.Close()

clientId := request.Header.Get("Client-Id")

client := broker.NewConnectedClient(c, clientId)
b.Connect(clientId, client)
wsConn := conn.NewWebsocketConnection(c)
client := broker.NewUnidentifiedClient(wsConn)

for {
_, message, err := c.ReadMessage()
if err != nil {
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseAbnormalClosure) {
b.Disconnect(clientId)
b.Disconnect(client)
break
} else {
continue
Expand Down
Loading