Skip to content

Commit

Permalink
thread safe
Browse files Browse the repository at this point in the history
  • Loading branch information
fanweixiao committed Feb 1, 2024
1 parent 084c7d9 commit 93cbe1a
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 4 deletions.
1 change: 1 addition & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ YOMO_ZIPPER=127.0.0.1:9000
YOMO_SNDR_NAME=prscd-sender
YOMO_RCVR_NAME=prscd-receiver
#OTEL_EXPORTER_OTLP_ENDPOINT=http://127.0.0.1:4318
YOMO_LOG_LEVEL=warn

# Server TLS
CERT_FILE=./lo.yomo.dev.cert
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ dist: clean

.PHONY: dev
dev:
$(GO) run -race ./cmd/prscd
YOMO_LOG_LEVEL=warn $(GO) run -race ./cmd/prscd

.PHONY: test
test:
Expand Down
3 changes: 2 additions & 1 deletion chirp/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ func (c *Channel) Dispatch(sig *psig.Signalling) {
// sig.Sid is sender's sid when sending message
log.Debug("[SND>]", "sid", sig.Sid, "sig", sig)
var sender = sig.Sid
// do not broadcast APP_ID and Sid to end user
// do not broadcast APP_ID, Sid and Mesh to end user
sig.AppID = ""
sig.Sid = ""
sig.MeshID = ""
resp, err := msgpack.Marshal(sig)
if err != nil {
log.Error("msgpack marshal: %+v", err)
Expand Down
29 changes: 29 additions & 0 deletions chirp/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package chirp

import (
"net"
"sync"

"github.com/gobwas/ws/wsutil"
"github.com/quic-go/quic-go"
Expand All @@ -13,6 +14,8 @@ type Connection interface {
RemoteAddr() string
// Write the data to the connection
Write(msg []byte) error
// RawWrite write the raw bytes to the connection, this is a low-level implementation
RawWrite(buf []byte) (int, error)
}

/*** WebSocket ***/
Expand All @@ -26,6 +29,7 @@ func NewWebSocketConnection(conn net.Conn) Connection {

// WebSocketConnection is a WebSocket connection
type WebSocketConnection struct {
mu sync.Mutex
underlyingConn net.Conn
}

Expand All @@ -36,9 +40,18 @@ func (c *WebSocketConnection) RemoteAddr() string {

// Write the data to the connection
func (c *WebSocketConnection) Write(msg []byte) error {
c.mu.Lock()
defer c.mu.Unlock()
return wsutil.WriteServerBinary(c.underlyingConn, msg)
}

// RawWrite write the raw bytes to the connection, this is a low-level implementation
func (c *WebSocketConnection) RawWrite(buf []byte) (int, error) {
c.mu.Lock()
defer c.mu.Unlock()
return c.underlyingConn.Write(buf)
}

/*** WebTransport ***/

// NewWebTransportConnection creates a new WebTransportConnection
Expand All @@ -50,6 +63,7 @@ func NewWebTransportConnection(conn quic.Connection) Connection {

// WebTransportConnection is a WebTransport connection
type WebTransportConnection struct {
mu sync.Mutex
underlyingConn quic.Connection
}

Expand All @@ -60,6 +74,9 @@ func (c *WebTransportConnection) RemoteAddr() string {

// Write the data to the connection
func (c *WebTransportConnection) Write(msg []byte) error {
c.mu.Lock()
defer c.mu.Unlock()

// add 0x00 to msg
buf := []byte{0x00}
buf = append(buf, msg...)
Expand All @@ -69,3 +86,15 @@ func (c *WebTransportConnection) Write(msg []byte) error {
}
return nil
}

// RawWrite write the raw bytes to the connection, this is a low-level implementation
func (c *WebTransportConnection) RawWrite(buf []byte) (int, error) {
c.mu.Lock()
defer c.mu.Unlock()

if err := c.underlyingConn.SendDatagram(buf); err != nil {
log.Error("SendMessage error", "remote", c.RemoteAddr(), "err", err)
return 0, err
}
return len(buf), nil
}
5 changes: 5 additions & 0 deletions chirp/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ func (c *MockConnection) Write(msg []byte) error {
return nil
}

// RawWrite write the raw bytes to the connection, this is a low-level implementation
func (c *MockConnection) RawWrite(byf []byte) (int, error) {
return 0, nil
}

// SenderMock implement yomo.Source interface
type SenderMock struct{}

Expand Down
8 changes: 6 additions & 2 deletions websocket/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"crypto/tls"
"encoding/binary"
"errors"
"fmt"
"io"
"net"
"net/http"
Expand Down Expand Up @@ -171,7 +172,8 @@ func ListenAndServe(addr string, config *tls.Config) {
log.Debug("ticker done", "sid", peer.Sid)
return
case <-ticker.C:
c.Write(generatePingFrame())
// c.Write(generatePingFrame())
pconn.RawWrite(generatePingFrame())
}
}
}(conn)
Expand Down Expand Up @@ -253,6 +255,7 @@ func generatePingFrame() []byte {
tsbuf := make([]byte, 8)
binary.BigEndian.PutUint64(tsbuf, uint64(ts))
pf := ws.MustCompileFrame(ws.NewPingFrame(tsbuf))
log.Debug("PING Payload", "len", len(pf), "bytes", fmt.Sprintf("% X", pf))
return pf
}

Expand All @@ -268,7 +271,8 @@ func handlePongFrame(sid string, r io.Reader, header ws.Header) error {
// calculate the RTT and prints to stdout
appData := int64(binary.BigEndian.Uint64(buf))
now := time.Now().UnixMilli()
log.Inspect("\tPONG Payload", "sid", sid, "len", len(buf), "val", appData, "𝚫", now-appData)
// log.Inspect("\tPONG Payload", "sid", sid, "len", len(buf), "val", appData, "𝚫", now-appData)
log.Debug("[PONG]", "sid", sid, "len", len(buf), "buf", fmt.Sprintf("% X", buf), "val", appData, "𝚫", now-appData)
return nil
}

Expand Down

0 comments on commit 93cbe1a

Please sign in to comment.