Skip to content

Commit

Permalink
Added TCP support
Browse files Browse the repository at this point in the history
Signed-off-by: Chaitanya Munukutla <[email protected]>
  • Loading branch information
c16a committed Apr 23, 2024
1 parent 1485ce2 commit f8fce80
Show file tree
Hide file tree
Showing 10 changed files with 124 additions and 9 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
# microq
A tiny Websocket-based event broker.
A tiny event broker.

## Features
- [x] Websocket support
- [x] TCP support
- [x] Grouped subscriptions
- [ ] Offline messages
- [ ] Clustering
Expand Down
5 changes: 2 additions & 3 deletions broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ import (
"sync"
)

type WebSocketConnection interface {
WriteMessage(messageType int, data []byte) error
ReadMessage() (messageType int, p []byte, err error)
type GenericConnection interface {
WriteMessage(data []byte) error
}

type Broker struct {
Expand Down
11 changes: 7 additions & 4 deletions broker/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,18 @@ package broker

import (
"encoding/json"
"github.com/gorilla/websocket"
"sync"
)

type ConnectedClient struct {
id string
subscriptions []*Subscription
unsubscriptions []*Subscription
conn WebSocketConnection
conn GenericConnection
mutex sync.RWMutex
}

func NewConnectedClient(conn WebSocketConnection, id string) *ConnectedClient {
func NewConnectedClient(conn GenericConnection, id string) *ConnectedClient {
return &ConnectedClient{
id: id,
conn: conn,
Expand All @@ -24,11 +23,15 @@ func NewConnectedClient(conn WebSocketConnection, id string) *ConnectedClient {
}
}

func (client *ConnectedClient) SetId(id string) {
client.id = id
}

func (client *ConnectedClient) WriteDataMessage(data []byte) error {
client.mutex.Lock()
defer client.mutex.Unlock()

return client.conn.WriteMessage(websocket.TextMessage, data)
return client.conn.WriteMessage(data)
}

func (client *ConnectedClient) WriteInterface(v any) error {
Expand Down
16 changes: 16 additions & 0 deletions conn/tcp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package conn

import "net"

type TCPConnection struct {
conn *net.TCPConn
}

func NewTCPConnection(conn *net.TCPConn) *TCPConnection {
return &TCPConnection{conn: conn}
}

func (tc *TCPConnection) WriteMessage(data []byte) error {
_, err := tc.conn.Write(data)
return err
}
15 changes: 15 additions & 0 deletions conn/ws.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package conn

import "github.com/gorilla/websocket"

type WebsocketConnection struct {
conn *websocket.Conn
}

func NewWebsocketConnection(conn *websocket.Conn) *WebsocketConnection {
return &WebsocketConnection{conn: conn}
}

func (wc *WebsocketConnection) WriteMessage(data []byte) error {
return wc.conn.WriteMessage(websocket.TextMessage, data)
}
8 changes: 8 additions & 0 deletions events/conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package events

const Conn = "conn"

type ConnEvent struct {
Kind string `json:"kind"`
ClientId string `json:"client_id"`
}
9 changes: 9 additions & 0 deletions events/connack.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package events

const ConnAck = "connack"

type ConnAckEvent struct {
Kind string `json:"kind"`
ClientId string `json:"client_id"`
Status bool `json:"status"`
}
2 changes: 2 additions & 0 deletions handlers/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ func HandleMessage(client *broker.ConnectedClient, broker *broker.Broker, sp sto
return handleSubscribe(message, client)
case events.Unsub:
return handleUnsubscribe(message, client)
case events.Conn:
return handleConn(message, client)
}
return nil
}
23 changes: 23 additions & 0 deletions handlers/conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package handlers

import (
"encoding/json"
"github.com/c16a/microq/broker"
"github.com/c16a/microq/events"
)

func handleConn(message []byte, client *broker.ConnectedClient) error {
var event events.ConnEvent
err := json.Unmarshal(message, &event)
if err != nil {
return err
}

client.SetId(event.ClientId)
subackEvent := &events.ConnAckEvent{
Kind: events.ConnAck,
Status: true,
ClientId: event.ClientId,
}
return client.WriteInterface(subackEvent)
}
40 changes: 39 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package main

import (
"bufio"
"github.com/c16a/microq/broker"
"github.com/c16a/microq/conn"
"github.com/c16a/microq/handlers"
"github.com/c16a/microq/storage"
"github.com/gorilla/websocket"
"log"
"net"
"net/http"
)

Expand All @@ -14,6 +17,40 @@ func main() {
defer storageProvider.Close()

b := broker.NewBroker()
go runWebSocketInterface(b, storageProvider)
log.Fatal(runTcpInterface(b, storageProvider))
}

func runTcpInterface(b *broker.Broker, storageProvider storage.Provider) error {
listener, err := net.ListenTCP("tcp", &net.TCPAddr{Port: 8081})
if err != nil {
return err
}

for {
// Handles one TCP client
c, err := listener.AcceptTCP()
if err != nil {
continue
}

scanner := bufio.NewScanner(c)

for scanner.Scan() {
line := scanner.Text()

tcpConn := conn.NewTCPConnection(c)
client := broker.NewConnectedClient(tcpConn, "unlabeled")

err = handlers.HandleMessage(client, b, storageProvider, []byte(line))
if err != nil {
continue
}
}
}
}

func runWebSocketInterface(b *broker.Broker, storageProvider storage.Provider) {
var upgrader = websocket.Upgrader{}
http.HandleFunc("/echo", echo(upgrader, b, storageProvider))

Expand All @@ -30,7 +67,8 @@ func echo(upgrader websocket.Upgrader, b *broker.Broker, sp storage.Provider) fu

clientId := request.Header.Get("Client-Id")

client := broker.NewConnectedClient(c, clientId)
wsConn := conn.NewWebsocketConnection(c)
client := broker.NewConnectedClient(wsConn, clientId)
b.Connect(clientId, client)

for {
Expand Down

0 comments on commit f8fce80

Please sign in to comment.