Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
craftleon committed Sep 2, 2024
0 parents commit dfa02b2
Show file tree
Hide file tree
Showing 6 changed files with 594 additions and 0 deletions.
7 changes: 7 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@

LD_FLAGS = "-s -w"

echoserver:
go build -trimpath -ldflags ${LD_FLAGS} -v -o echoserver ./main.go

.PHONY: echoserver
14 changes: 14 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
module github.com/craftleon/echoserver

go 1.20

require (
github.com/urfave/cli/v2 v2.27.2
gvisor.dev/gvisor v0.0.0-20240702234927-b488752cba57
)

require (
github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913 // indirect
)
69 changes: 69 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package main

import (
"os"
"os/signal"
"path/filepath"
"syscall"

"github.com/urfave/cli/v2"

"echoserver/server"
)

func main() {
app := cli.NewApp()
app.Name = "echoserver"
app.Usage = "return layer4 5-element-tuple of the incoming packet"

runCmd := &cli.Command{
Name: "run",
Usage: "create and run server process for NHP protocol",
Flags: []cli.Flag{
&cli.StringFlag{Name: "a", Value: "", Usage: "listening interface address for the server, leave it empty for all interfaces"},
&cli.IntFlag{Name: "p", Value: 57575, Usage: "listening port for the server, default 57575"},
},
Action: func(c *cli.Context) error {
return runApp(c.String("a"), c.Int("p"))
},
}

app.Commands = []*cli.Command{
runCmd,
}

if err := app.Run(os.Args); err != nil {
panic(err)
}
}

func runApp(addr string, port int) error {
exeFilePath, err := os.Executable()
if err != nil {
return err
}
exeDirPath := filepath.Dir(exeFilePath)
server.ExeDirPath = exeDirPath

ts := server.TcpServer{}
us := server.UdpServer{}
err = ts.Start(addr, port)
if err != nil {
return err
}
err = us.Start(addr, port)
if err != nil {
return err
}

// react to terminate signals
termCh := make(chan os.Signal, 1)
signal.Notify(termCh, syscall.SIGTERM, os.Interrupt)

// block until terminated
<-termCh
ts.Stop()
us.Stop()

return nil
}
82 changes: 82 additions & 0 deletions server/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package server

import (
"fmt"
"net"
"sync"
"sync/atomic"
"time"
)

const (
CONNECTION_IDLE_TIMEOUTMS = 30000
)

var (
ExeDirPath string
ListenIp string
ListenPort int
)

type ConnType int32

const (
TCP_CONN = iota
UDP_CONN
)

type ConnData struct {
// atomic data, keep 64bit(8-bytes) alignment for 32-bit system compatibility
InitTime int64 // local connection setup time. immutable after created
LastRemoteSendTime int64
LastLocalSendTime int64
LastLocalRecvTime int64

sync.Mutex
sync.WaitGroup

LocalAddr net.Addr
RemoteAddr net.Addr

connType ConnType
closed atomic.Bool

idleTimeoutMs int
nativeConn net.Conn

echoQueue chan *TupleData
}

func (conn *ConnData) Close() {
if conn.closed.Load() {
return
}
conn.nativeConn.Close()
conn.closed.Store(true)
}

type TupleData struct {
SrcIp string
DstIp string
SrcPort int
DstPort int
Timestamp time.Time
msg string
}

func (td *TupleData) String() string {
return fmt.Sprintf("%s %s:%d -> %s:%d \"%s\"", td.Timestamp.Format("2006-01-02 15:04:05"), td.SrcIp, td.SrcPort, td.DstIp, td.DstPort, td.msg)
}

// need external connection
func GetLocalOutboundAddress() net.IP {
con, err := net.Dial("udp", "8.8.8.8:80")
if err != nil {
return nil
}
defer con.Close()

addr := con.LocalAddr().(*net.UDPAddr)

return addr.IP
}
219 changes: 219 additions & 0 deletions server/tcpserver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
package server

import (
"fmt"
"net"
"sync"
"sync/atomic"
"time"
)

type TcpServer struct {
stats struct {
totalRecvBytes uint64
totalSendBytes uint64
}

listenAddr *net.TCPAddr
listenConn *net.TCPListener

wg sync.WaitGroup
running atomic.Bool

// connection and remote transaction management

remoteConnectionMapMutex sync.Mutex
remoteConnectionMap map[string]*ConnData // indexed by remote UDP address

// signals
signals struct {
stop chan struct{}
}
}

func (s *TcpServer) Start(addr string, port int) (err error) {
if port < 5000 || port > 65535 {
return fmt.Errorf("listening port must be in the range of 5000-65535")
}

ListenPort = port

var netIP net.IP
if len(addr) > 0 {
netIP = net.ParseIP(addr)
if netIP == nil {
return fmt.Errorf("listening ip address is incorrect")
}
ListenIp = addr
} else {
netIP = net.IPv4zero // will both listen on ipv4 0.0.0.0:port and ipv6 [::]:port
// retrieve local ip
ListenIp = GetLocalOutboundAddress().String()
}

s.listenConn, err = net.ListenTCP("tcp", &net.TCPAddr{
IP: netIP,
Port: port,
})
if err != nil {
return fmt.Errorf("listen error %v", err)
}

// retrieve local port
laddr := s.listenConn.Addr()
s.listenAddr, err = net.ResolveTCPAddr(laddr.Network(), laddr.String())
if err != nil {
return fmt.Errorf("resolve TCPAddr error %v", err)
}

s.signals.stop = make(chan struct{})

// start server routines
s.wg.Add(1)
go s.recvPacketRoutine()

s.running.Store(true)

fmt.Printf("TCP echo service started on %s:%d\n", ListenIp, ListenPort)
return nil
}

func (s *TcpServer) Stop() {
if !s.running.Load() {
// already stopped, do nothing
return
}
s.running.Store(false)

close(s.signals.stop)
s.listenConn.Close()
s.wg.Wait()
}

func (s *TcpServer) recvPacketRoutine() {
defer s.wg.Done()

for {
select {
case <-s.signals.stop:
return

default:
}

s.listenConn.SetDeadline(time.Now().Add(5 * time.Second))
tcpConn, err := s.listenConn.Accept()
if err != nil {
// accept failure
continue
}

remoteAddr := tcpConn.RemoteAddr().(*net.TCPAddr)
addrStr := remoteAddr.String()
// allocate a new packet buffer for every read
pkt := make([]byte, 4096)

// tcp recv, blocking until packet arrives or conn.Close()
tcpConn.SetReadDeadline(time.Now().Add(5 * time.Second))
n, err := tcpConn.Read(pkt[:])
if err != nil {
pkt = nil
//log.Error("Read from UDP error: %v\n", err)
if n == 0 {
// listenConn closed
return
}
continue
}

recvTime := time.Now()

// add total recv bytes
atomic.AddUint64(&s.stats.totalRecvBytes, uint64(n))

//log.Trace("receive udp packet (%s -> %s): %+v", addrStr, s.listenAddr.String(), pkt.Packet)
//log.Info("Receive [%s] packet (%s -> %s), %d bytes", msgType, addrStr, s.listenAddr.String(), n)

s.remoteConnectionMapMutex.Lock()
conn, found := s.remoteConnectionMap[addrStr]
s.remoteConnectionMapMutex.Unlock()

tuple := new(TupleData)
tuple.msg = string(pkt)
tuple.SrcIp = remoteAddr.IP.String()
tuple.SrcPort = remoteAddr.Port
tuple.DstIp = ListenIp
tuple.DstPort = ListenPort
tuple.Timestamp = recvTime

if found {
// existing connection
atomic.StoreInt64(&conn.LastLocalRecvTime, recvTime.UnixMicro())
conn.echoQueue <- tuple

} else {
// create new connection
conn = &ConnData{
InitTime: recvTime.UnixMicro(),
LocalAddr: s.listenAddr,
RemoteAddr: remoteAddr,
nativeConn: tcpConn,
connType: TCP_CONN,
idleTimeoutMs: CONNECTION_IDLE_TIMEOUTMS,
}
conn.echoQueue = make(chan *TupleData)
// setup new routine for connection
s.remoteConnectionMapMutex.Lock()
s.remoteConnectionMap[addrStr] = conn
s.remoteConnectionMapMutex.Unlock()

conn.echoQueue <- tuple

//log.Info("Accept new UDP connection from %s to %s", addrStr, s.listenAddr.String())

// launch connection routine
s.wg.Add(1)
go s.connectionRoutine(conn)
}
}
}

func (s *TcpServer) connectionRoutine(conn *ConnData) {
addrStr := conn.RemoteAddr.String()
defer s.wg.Done()
//defer log.Debug("Connection routine: %s stopped", addrStr)

//log.Debug("Connection routine: %s started", addrStr)

// stop receiving packets and clean up
defer func() {
// remove conn from remoteConnectionMap
s.remoteConnectionMapMutex.Lock()
delete(s.remoteConnectionMap, addrStr)
s.remoteConnectionMapMutex.Unlock()
conn.Close()
}()

for {
select {
case <-s.signals.stop:
return

case <-time.After(time.Duration(conn.idleTimeoutMs) * time.Millisecond):
// timeout, quit routine
//log.Debug("Connection routine idle timeout")
return

case tuple, ok := <-conn.echoQueue:
if !ok {
return
}
if tuple == nil {
continue
}
//log.Debug("Received udp packet len [%d] from addr: %s\n", len(pkt.Packet), addrStr)
conn.nativeConn.SetWriteDeadline(time.Now().Add(500 * time.Millisecond))
conn.nativeConn.Write([]byte(tuple.String()))
}
}
}
Loading

0 comments on commit dfa02b2

Please sign in to comment.