From 93cbe1ae094a4792bc081b03d46bba91be493b55 Mon Sep 17 00:00:00 2001 From: "C.C" Date: Thu, 1 Feb 2024 09:52:40 +0800 Subject: [PATCH] thread safe --- .env | 1 + Makefile | 2 +- chirp/channel.go | 3 ++- chirp/connection.go | 29 +++++++++++++++++++++++++++++ chirp/node_test.go | 5 +++++ websocket/main.go | 8 ++++++-- 6 files changed, 44 insertions(+), 4 deletions(-) diff --git a/.env b/.env index 333c1f3..b0e94b2 100644 --- a/.env +++ b/.env @@ -16,6 +16,7 @@ YOMO_ZIPPER=127.0.0.1:9000 YOMO_SNDR_NAME=prscd-sender YOMO_RCVR_NAME=prscd-receiver #OTEL_EXPORTER_OTLP_ENDPOINT=http://127.0.0.1:4318 +YOMO_LOG_LEVEL=warn # Server TLS CERT_FILE=./lo.yomo.dev.cert diff --git a/Makefile b/Makefile index ebdc232..6419ba4 100644 --- a/Makefile +++ b/Makefile @@ -30,7 +30,7 @@ dist: clean .PHONY: dev dev: - $(GO) run -race ./cmd/prscd + YOMO_LOG_LEVEL=warn $(GO) run -race ./cmd/prscd .PHONY: test test: diff --git a/chirp/channel.go b/chirp/channel.go index 68d6a1d..ea98e51 100644 --- a/chirp/channel.go +++ b/chirp/channel.go @@ -43,9 +43,10 @@ func (c *Channel) Dispatch(sig *psig.Signalling) { // sig.Sid is sender's sid when sending message log.Debug("[SND>]", "sid", sig.Sid, "sig", sig) var sender = sig.Sid - // do not broadcast APP_ID and Sid to end user + // do not broadcast APP_ID, Sid and Mesh to end user sig.AppID = "" sig.Sid = "" + sig.MeshID = "" resp, err := msgpack.Marshal(sig) if err != nil { log.Error("msgpack marshal: %+v", err) diff --git a/chirp/connection.go b/chirp/connection.go index 2c3e1e0..1d1a7cf 100644 --- a/chirp/connection.go +++ b/chirp/connection.go @@ -2,6 +2,7 @@ package chirp import ( "net" + "sync" "github.com/gobwas/ws/wsutil" "github.com/quic-go/quic-go" @@ -13,6 +14,8 @@ type Connection interface { RemoteAddr() string // Write the data to the connection Write(msg []byte) error + // RawWrite write the raw bytes to the connection, this is a low-level implementation + RawWrite(buf []byte) (int, error) } /*** WebSocket ***/ @@ -26,6 +29,7 @@ func NewWebSocketConnection(conn net.Conn) Connection { // WebSocketConnection is a WebSocket connection type WebSocketConnection struct { + mu sync.Mutex underlyingConn net.Conn } @@ -36,9 +40,18 @@ func (c *WebSocketConnection) RemoteAddr() string { // Write the data to the connection func (c *WebSocketConnection) Write(msg []byte) error { + c.mu.Lock() + defer c.mu.Unlock() return wsutil.WriteServerBinary(c.underlyingConn, msg) } +// RawWrite write the raw bytes to the connection, this is a low-level implementation +func (c *WebSocketConnection) RawWrite(buf []byte) (int, error) { + c.mu.Lock() + defer c.mu.Unlock() + return c.underlyingConn.Write(buf) +} + /*** WebTransport ***/ // NewWebTransportConnection creates a new WebTransportConnection @@ -50,6 +63,7 @@ func NewWebTransportConnection(conn quic.Connection) Connection { // WebTransportConnection is a WebTransport connection type WebTransportConnection struct { + mu sync.Mutex underlyingConn quic.Connection } @@ -60,6 +74,9 @@ func (c *WebTransportConnection) RemoteAddr() string { // Write the data to the connection func (c *WebTransportConnection) Write(msg []byte) error { + c.mu.Lock() + defer c.mu.Unlock() + // add 0x00 to msg buf := []byte{0x00} buf = append(buf, msg...) @@ -69,3 +86,15 @@ func (c *WebTransportConnection) Write(msg []byte) error { } return nil } + +// RawWrite write the raw bytes to the connection, this is a low-level implementation +func (c *WebTransportConnection) RawWrite(buf []byte) (int, error) { + c.mu.Lock() + defer c.mu.Unlock() + + if err := c.underlyingConn.SendDatagram(buf); err != nil { + log.Error("SendMessage error", "remote", c.RemoteAddr(), "err", err) + return 0, err + } + return len(buf), nil +} diff --git a/chirp/node_test.go b/chirp/node_test.go index fd09167..354a7f2 100644 --- a/chirp/node_test.go +++ b/chirp/node_test.go @@ -30,6 +30,11 @@ func (c *MockConnection) Write(msg []byte) error { return nil } +// RawWrite write the raw bytes to the connection, this is a low-level implementation +func (c *MockConnection) RawWrite(byf []byte) (int, error) { + return 0, nil +} + // SenderMock implement yomo.Source interface type SenderMock struct{} diff --git a/websocket/main.go b/websocket/main.go index d72c642..f5a0a58 100644 --- a/websocket/main.go +++ b/websocket/main.go @@ -6,6 +6,7 @@ import ( "crypto/tls" "encoding/binary" "errors" + "fmt" "io" "net" "net/http" @@ -171,7 +172,8 @@ func ListenAndServe(addr string, config *tls.Config) { log.Debug("ticker done", "sid", peer.Sid) return case <-ticker.C: - c.Write(generatePingFrame()) + // c.Write(generatePingFrame()) + pconn.RawWrite(generatePingFrame()) } } }(conn) @@ -253,6 +255,7 @@ func generatePingFrame() []byte { tsbuf := make([]byte, 8) binary.BigEndian.PutUint64(tsbuf, uint64(ts)) pf := ws.MustCompileFrame(ws.NewPingFrame(tsbuf)) + log.Debug("PING Payload", "len", len(pf), "bytes", fmt.Sprintf("% X", pf)) return pf } @@ -268,7 +271,8 @@ func handlePongFrame(sid string, r io.Reader, header ws.Header) error { // calculate the RTT and prints to stdout appData := int64(binary.BigEndian.Uint64(buf)) now := time.Now().UnixMilli() - log.Inspect("\tPONG Payload", "sid", sid, "len", len(buf), "val", appData, "𝚫", now-appData) + // log.Inspect("\tPONG Payload", "sid", sid, "len", len(buf), "val", appData, "𝚫", now-appData) + log.Debug("[PONG]", "sid", sid, "len", len(buf), "buf", fmt.Sprintf("% X", buf), "val", appData, "𝚫", now-appData) return nil }