Skip to content

Commit

Permalink
Fix unidentified client issue for TCP
Browse files Browse the repository at this point in the history
Signed-off-by: Chaitanya Munukutla <[email protected]>
  • Loading branch information
c16a committed Apr 28, 2024
1 parent dba5b0a commit 6a844f9
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 76 deletions.
1 change: 1 addition & 0 deletions conn/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ func NewTCPConnection(conn *net.TCPConn) *TCPConnection {
}

func (tc *TCPConnection) WriteMessage(data []byte) error {
data = append(data, '\n')
_, err := tc.conn.Write(data)
return err
}
4 changes: 2 additions & 2 deletions handlers/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ func handleConn(message []byte, client *broker.ConnectedClient, b *broker.Broker

client.SetId(event.ClientId)
b.Connect(event.ClientId, client)
subackEvent := &events.ConnAckEvent{
connackEvent := &events.ConnAckEvent{
Kind: events.ConnAck,
Success: true,
ClientId: event.ClientId,
}
return client.WriteInterface(subackEvent)
return client.WriteInterface(connackEvent)
}
38 changes: 38 additions & 0 deletions interfaces/tcp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package interfaces

import (
"bufio"
"github.com/c16a/microq/broker"
"github.com/c16a/microq/conn"
"github.com/c16a/microq/handlers"
"github.com/c16a/microq/storage"
"net"
)

func RunTcp(b *broker.Broker, storageProvider storage.Provider) error {
listener, err := net.ListenTCP("tcp", &net.TCPAddr{Port: 8081})
if err != nil {
return err
}

for {
c, err := listener.AcceptTCP()
if err != nil {
continue
}

scanner := bufio.NewScanner(c)

tcpConn := conn.NewTCPConnection(c)
client := broker.NewUnidentifiedClient(tcpConn)

for scanner.Scan() {
line := scanner.Text()

err = handlers.HandleMessage(client, b, storageProvider, []byte(line))
if err != nil {
continue
}
}
}
}
43 changes: 43 additions & 0 deletions interfaces/ws.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package interfaces

import (
"github.com/c16a/microq/broker"
"github.com/c16a/microq/conn"
"github.com/c16a/microq/handlers"
"github.com/c16a/microq/storage"
"github.com/gorilla/websocket"
"log"
"net/http"
)

func RunWs(b *broker.Broker, storageProvider storage.Provider) {
var upgrader = websocket.Upgrader{}
http.HandleFunc("/echo", func(w http.ResponseWriter, r *http.Request) {
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
defer c.Close()

wsConn := conn.NewWebsocketConnection(c)
client := broker.NewUnidentifiedClient(wsConn)

for {
_, message, err := c.ReadMessage()
if err != nil {
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseAbnormalClosure) {
b.Disconnect(client)
break
} else {
continue
}
}
err = handlers.HandleMessage(client, b, storageProvider, message)
if err != nil {
continue
}
}
})

log.Fatal(http.ListenAndServe(":8080", nil))
}
79 changes: 5 additions & 74 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,87 +1,18 @@
package main

import (
"bufio"
"github.com/c16a/microq/broker"
"github.com/c16a/microq/conn"
"github.com/c16a/microq/handlers"
"github.com/c16a/microq/interfaces"
"github.com/c16a/microq/storage"
"github.com/gorilla/websocket"
"log"
"net"
"net/http"
)

func main() {
var storageProvider = storage.NewBadgerProvider()
defer storageProvider.Close()

b := broker.NewBroker()
go runWebSocketInterface(b, storageProvider)
log.Fatal(runTcpInterface(b, storageProvider))
}

func runTcpInterface(b *broker.Broker, storageProvider storage.Provider) error {
listener, err := net.ListenTCP("tcp", &net.TCPAddr{Port: 8081})
if err != nil {
return err
}

for {
// Handles one TCP client
c, err := listener.AcceptTCP()
if err != nil {
continue
}

scanner := bufio.NewScanner(c)

for scanner.Scan() {
line := scanner.Text()

tcpConn := conn.NewTCPConnection(c)
client := broker.NewUnidentifiedClient(tcpConn)

err = handlers.HandleMessage(client, b, storageProvider, []byte(line))
if err != nil {
continue
}
}
}
}

func runWebSocketInterface(b *broker.Broker, storageProvider storage.Provider) {
var upgrader = websocket.Upgrader{}
http.HandleFunc("/echo", echo(upgrader, b, storageProvider))

log.Fatal(http.ListenAndServe(":8080", nil))
}

func echo(upgrader websocket.Upgrader, b *broker.Broker, sp storage.Provider) func(http.ResponseWriter, *http.Request) {
return func(writer http.ResponseWriter, request *http.Request) {
c, err := upgrader.Upgrade(writer, request, nil)
if err != nil {
return
}
defer c.Close()

wsConn := conn.NewWebsocketConnection(c)
client := broker.NewUnidentifiedClient(wsConn)
var storageProvider = storage.NewBadgerProvider()
defer storageProvider.Close()

for {
_, message, err := c.ReadMessage()
if err != nil {
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseAbnormalClosure) {
b.Disconnect(client)
break
} else {
continue
}
}
err = handlers.HandleMessage(client, b, sp, message)
if err != nil {
continue
}
}
}
go interfaces.RunWs(b, storageProvider)
log.Fatal(interfaces.RunTcp(b, storageProvider))
}

0 comments on commit 6a844f9

Please sign in to comment.