From dfa02b2071088b22ab666b9997b5bbb3086ed935 Mon Sep 17 00:00:00 2001 From: zengl Date: Mon, 2 Sep 2024 11:28:55 +0800 Subject: [PATCH] initial commit --- Makefile | 7 ++ go.mod | 14 +++ main.go | 69 ++++++++++++++ server/common.go | 82 +++++++++++++++++ server/tcpserver.go | 219 ++++++++++++++++++++++++++++++++++++++++++++ server/udpserver.go | 203 ++++++++++++++++++++++++++++++++++++++++ 6 files changed, 594 insertions(+) create mode 100644 Makefile create mode 100644 go.mod create mode 100644 main.go create mode 100644 server/common.go create mode 100644 server/tcpserver.go create mode 100644 server/udpserver.go diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..24a6431 --- /dev/null +++ b/Makefile @@ -0,0 +1,7 @@ + +LD_FLAGS = "-s -w" + +echoserver: + go build -trimpath -ldflags ${LD_FLAGS} -v -o echoserver ./main.go + +.PHONY: echoserver diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..e9bff5d --- /dev/null +++ b/go.mod @@ -0,0 +1,14 @@ +module github.com/craftleon/echoserver + +go 1.20 + +require ( + github.com/urfave/cli/v2 v2.27.2 + gvisor.dev/gvisor v0.0.0-20240702234927-b488752cba57 +) + +require ( + github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect + github.com/russross/blackfriday/v2 v2.1.0 // indirect + github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913 // indirect +) diff --git a/main.go b/main.go new file mode 100644 index 0000000..de93318 --- /dev/null +++ b/main.go @@ -0,0 +1,69 @@ +package main + +import ( + "os" + "os/signal" + "path/filepath" + "syscall" + + "github.com/urfave/cli/v2" + + "echoserver/server" +) + +func main() { + app := cli.NewApp() + app.Name = "echoserver" + app.Usage = "return layer4 5-element-tuple of the incoming packet" + + runCmd := &cli.Command{ + Name: "run", + Usage: "create and run server process for NHP protocol", + Flags: []cli.Flag{ + &cli.StringFlag{Name: "a", Value: "", Usage: "listening interface address for the server, leave it empty for all interfaces"}, + &cli.IntFlag{Name: "p", Value: 57575, Usage: "listening port for the server, default 57575"}, + }, + Action: func(c *cli.Context) error { + return runApp(c.String("a"), c.Int("p")) + }, + } + + app.Commands = []*cli.Command{ + runCmd, + } + + if err := app.Run(os.Args); err != nil { + panic(err) + } +} + +func runApp(addr string, port int) error { + exeFilePath, err := os.Executable() + if err != nil { + return err + } + exeDirPath := filepath.Dir(exeFilePath) + server.ExeDirPath = exeDirPath + + ts := server.TcpServer{} + us := server.UdpServer{} + err = ts.Start(addr, port) + if err != nil { + return err + } + err = us.Start(addr, port) + if err != nil { + return err + } + + // react to terminate signals + termCh := make(chan os.Signal, 1) + signal.Notify(termCh, syscall.SIGTERM, os.Interrupt) + + // block until terminated + <-termCh + ts.Stop() + us.Stop() + + return nil +} diff --git a/server/common.go b/server/common.go new file mode 100644 index 0000000..b858f0e --- /dev/null +++ b/server/common.go @@ -0,0 +1,82 @@ +package server + +import ( + "fmt" + "net" + "sync" + "sync/atomic" + "time" +) + +const ( + CONNECTION_IDLE_TIMEOUTMS = 30000 +) + +var ( + ExeDirPath string + ListenIp string + ListenPort int +) + +type ConnType int32 + +const ( + TCP_CONN = iota + UDP_CONN +) + +type ConnData struct { + // atomic data, keep 64bit(8-bytes) alignment for 32-bit system compatibility + InitTime int64 // local connection setup time. immutable after created + LastRemoteSendTime int64 + LastLocalSendTime int64 + LastLocalRecvTime int64 + + sync.Mutex + sync.WaitGroup + + LocalAddr net.Addr + RemoteAddr net.Addr + + connType ConnType + closed atomic.Bool + + idleTimeoutMs int + nativeConn net.Conn + + echoQueue chan *TupleData +} + +func (conn *ConnData) Close() { + if conn.closed.Load() { + return + } + conn.nativeConn.Close() + conn.closed.Store(true) +} + +type TupleData struct { + SrcIp string + DstIp string + SrcPort int + DstPort int + Timestamp time.Time + msg string +} + +func (td *TupleData) String() string { + return fmt.Sprintf("%s %s:%d -> %s:%d \"%s\"", td.Timestamp.Format("2006-01-02 15:04:05"), td.SrcIp, td.SrcPort, td.DstIp, td.DstPort, td.msg) +} + +// need external connection +func GetLocalOutboundAddress() net.IP { + con, err := net.Dial("udp", "8.8.8.8:80") + if err != nil { + return nil + } + defer con.Close() + + addr := con.LocalAddr().(*net.UDPAddr) + + return addr.IP +} diff --git a/server/tcpserver.go b/server/tcpserver.go new file mode 100644 index 0000000..1f1c53d --- /dev/null +++ b/server/tcpserver.go @@ -0,0 +1,219 @@ +package server + +import ( + "fmt" + "net" + "sync" + "sync/atomic" + "time" +) + +type TcpServer struct { + stats struct { + totalRecvBytes uint64 + totalSendBytes uint64 + } + + listenAddr *net.TCPAddr + listenConn *net.TCPListener + + wg sync.WaitGroup + running atomic.Bool + + // connection and remote transaction management + + remoteConnectionMapMutex sync.Mutex + remoteConnectionMap map[string]*ConnData // indexed by remote UDP address + + // signals + signals struct { + stop chan struct{} + } +} + +func (s *TcpServer) Start(addr string, port int) (err error) { + if port < 5000 || port > 65535 { + return fmt.Errorf("listening port must be in the range of 5000-65535") + } + + ListenPort = port + + var netIP net.IP + if len(addr) > 0 { + netIP = net.ParseIP(addr) + if netIP == nil { + return fmt.Errorf("listening ip address is incorrect") + } + ListenIp = addr + } else { + netIP = net.IPv4zero // will both listen on ipv4 0.0.0.0:port and ipv6 [::]:port + // retrieve local ip + ListenIp = GetLocalOutboundAddress().String() + } + + s.listenConn, err = net.ListenTCP("tcp", &net.TCPAddr{ + IP: netIP, + Port: port, + }) + if err != nil { + return fmt.Errorf("listen error %v", err) + } + + // retrieve local port + laddr := s.listenConn.Addr() + s.listenAddr, err = net.ResolveTCPAddr(laddr.Network(), laddr.String()) + if err != nil { + return fmt.Errorf("resolve TCPAddr error %v", err) + } + + s.signals.stop = make(chan struct{}) + + // start server routines + s.wg.Add(1) + go s.recvPacketRoutine() + + s.running.Store(true) + + fmt.Printf("TCP echo service started on %s:%d\n", ListenIp, ListenPort) + return nil +} + +func (s *TcpServer) Stop() { + if !s.running.Load() { + // already stopped, do nothing + return + } + s.running.Store(false) + + close(s.signals.stop) + s.listenConn.Close() + s.wg.Wait() +} + +func (s *TcpServer) recvPacketRoutine() { + defer s.wg.Done() + + for { + select { + case <-s.signals.stop: + return + + default: + } + + s.listenConn.SetDeadline(time.Now().Add(5 * time.Second)) + tcpConn, err := s.listenConn.Accept() + if err != nil { + // accept failure + continue + } + + remoteAddr := tcpConn.RemoteAddr().(*net.TCPAddr) + addrStr := remoteAddr.String() + // allocate a new packet buffer for every read + pkt := make([]byte, 4096) + + // tcp recv, blocking until packet arrives or conn.Close() + tcpConn.SetReadDeadline(time.Now().Add(5 * time.Second)) + n, err := tcpConn.Read(pkt[:]) + if err != nil { + pkt = nil + //log.Error("Read from UDP error: %v\n", err) + if n == 0 { + // listenConn closed + return + } + continue + } + + recvTime := time.Now() + + // add total recv bytes + atomic.AddUint64(&s.stats.totalRecvBytes, uint64(n)) + + //log.Trace("receive udp packet (%s -> %s): %+v", addrStr, s.listenAddr.String(), pkt.Packet) + //log.Info("Receive [%s] packet (%s -> %s), %d bytes", msgType, addrStr, s.listenAddr.String(), n) + + s.remoteConnectionMapMutex.Lock() + conn, found := s.remoteConnectionMap[addrStr] + s.remoteConnectionMapMutex.Unlock() + + tuple := new(TupleData) + tuple.msg = string(pkt) + tuple.SrcIp = remoteAddr.IP.String() + tuple.SrcPort = remoteAddr.Port + tuple.DstIp = ListenIp + tuple.DstPort = ListenPort + tuple.Timestamp = recvTime + + if found { + // existing connection + atomic.StoreInt64(&conn.LastLocalRecvTime, recvTime.UnixMicro()) + conn.echoQueue <- tuple + + } else { + // create new connection + conn = &ConnData{ + InitTime: recvTime.UnixMicro(), + LocalAddr: s.listenAddr, + RemoteAddr: remoteAddr, + nativeConn: tcpConn, + connType: TCP_CONN, + idleTimeoutMs: CONNECTION_IDLE_TIMEOUTMS, + } + conn.echoQueue = make(chan *TupleData) + // setup new routine for connection + s.remoteConnectionMapMutex.Lock() + s.remoteConnectionMap[addrStr] = conn + s.remoteConnectionMapMutex.Unlock() + + conn.echoQueue <- tuple + + //log.Info("Accept new UDP connection from %s to %s", addrStr, s.listenAddr.String()) + + // launch connection routine + s.wg.Add(1) + go s.connectionRoutine(conn) + } + } +} + +func (s *TcpServer) connectionRoutine(conn *ConnData) { + addrStr := conn.RemoteAddr.String() + defer s.wg.Done() + //defer log.Debug("Connection routine: %s stopped", addrStr) + + //log.Debug("Connection routine: %s started", addrStr) + + // stop receiving packets and clean up + defer func() { + // remove conn from remoteConnectionMap + s.remoteConnectionMapMutex.Lock() + delete(s.remoteConnectionMap, addrStr) + s.remoteConnectionMapMutex.Unlock() + conn.Close() + }() + + for { + select { + case <-s.signals.stop: + return + + case <-time.After(time.Duration(conn.idleTimeoutMs) * time.Millisecond): + // timeout, quit routine + //log.Debug("Connection routine idle timeout") + return + + case tuple, ok := <-conn.echoQueue: + if !ok { + return + } + if tuple == nil { + continue + } + //log.Debug("Received udp packet len [%d] from addr: %s\n", len(pkt.Packet), addrStr) + conn.nativeConn.SetWriteDeadline(time.Now().Add(500 * time.Millisecond)) + conn.nativeConn.Write([]byte(tuple.String())) + } + } +} diff --git a/server/udpserver.go b/server/udpserver.go new file mode 100644 index 0000000..3cfde66 --- /dev/null +++ b/server/udpserver.go @@ -0,0 +1,203 @@ +package server + +import ( + "fmt" + "net" + "sync" + "sync/atomic" + "time" +) + +type UdpServer struct { + stats struct { + totalRecvBytes uint64 + totalSendBytes uint64 + } + + listenAddr *net.UDPAddr + listenConn *net.UDPConn + + wg sync.WaitGroup + running atomic.Bool + + // connection and remote transaction management + + remoteConnectionMapMutex sync.Mutex + remoteConnectionMap map[string]*ConnData // indexed by remote UDP address + + // signals + signals struct { + stop chan struct{} + } +} + +func (s *UdpServer) Start(addr string, port int) (err error) { + if port < 5000 || port > 65535 { + return fmt.Errorf("listening port must be in the range of 5000-65535") + } + + var netIP net.IP + if len(addr) > 0 { + netIP = net.ParseIP(addr) + if netIP == nil { + return fmt.Errorf("listening ip address is incorrect") + } + } else { + netIP = net.IPv4zero // will both listen on ipv4 0.0.0.0:port and ipv6 [::]:port + } + + s.listenConn, err = net.ListenUDP("udp", &net.UDPAddr{ + IP: netIP, + Port: port, + }) + if err != nil { + return fmt.Errorf("listen error %v", err) + } + + // retrieve local port + laddr := s.listenConn.LocalAddr() + s.listenAddr, err = net.ResolveUDPAddr(laddr.Network(), laddr.String()) + if err != nil { + return fmt.Errorf("resolve UDPAddr error %v", err) + } + + s.signals.stop = make(chan struct{}) + + // start server routines + s.wg.Add(1) + go s.recvPacketRoutine() + + s.running.Store(true) + fmt.Printf("UDP echo service started on %s:%d\n", ListenIp, ListenPort) + return nil +} + +func (s *UdpServer) Stop() { + if !s.running.Load() { + // already stopped, do nothing + return + } + s.running.Store(false) + + close(s.signals.stop) + s.listenConn.Close() + s.wg.Wait() +} + +func (s *UdpServer) recvPacketRoutine() { + defer s.wg.Done() + + for { + select { + case <-s.signals.stop: + return + + default: + } + + // allocate a new packet buffer for every read + pkt := make([]byte, 4096) + + // udp recv, blocking until packet arrives or conn.Close() + n, remoteAddr, err := s.listenConn.ReadFromUDP(pkt[:]) + if err != nil { + pkt = nil + //log.Error("Read from UDP error: %v\n", err) + if n == 0 { + // listenConn closed + return + } + continue + } + addrStr := remoteAddr.String() + + // add total recv bytes + atomic.AddUint64(&s.stats.totalRecvBytes, uint64(n)) + + recvTime := time.Now() + //log.Trace("receive udp packet (%s -> %s): %+v", addrStr, s.listenAddr.String(), pkt.Packet) + //log.Info("Receive [%s] packet (%s -> %s), %d bytes", msgType, addrStr, s.listenAddr.String(), n) + + s.remoteConnectionMapMutex.Lock() + conn, found := s.remoteConnectionMap[addrStr] + s.remoteConnectionMapMutex.Unlock() + + tuple := new(TupleData) + tuple.msg = string(pkt) + tuple.SrcIp = remoteAddr.IP.String() + tuple.SrcPort = remoteAddr.Port + tuple.DstIp = ListenIp + tuple.DstPort = ListenPort + tuple.Timestamp = recvTime + + if found { + // existing connection + atomic.StoreInt64(&conn.LastLocalRecvTime, recvTime.UnixMicro()) + conn.echoQueue <- tuple + + } else { + // create new connection + conn = &ConnData{ + InitTime: recvTime.UnixMicro(), + LocalAddr: s.listenAddr, + RemoteAddr: remoteAddr, + nativeConn: s.listenConn, + connType: UDP_CONN, + idleTimeoutMs: CONNECTION_IDLE_TIMEOUTMS, + } + conn.echoQueue = make(chan *TupleData) + // setup new routine for connection + s.remoteConnectionMapMutex.Lock() + s.remoteConnectionMap[addrStr] = conn + s.remoteConnectionMapMutex.Unlock() + + conn.echoQueue <- tuple + + //log.Info("Accept new UDP connection from %s to %s", addrStr, s.listenAddr.String()) + + // launch connection routine + s.wg.Add(1) + go s.connectionRoutine(conn) + } + } +} + +func (s *UdpServer) connectionRoutine(conn *ConnData) { + addrStr := conn.RemoteAddr.String() + defer s.wg.Done() + //defer log.Debug("Connection routine: %s stopped", addrStr) + + //log.Debug("Connection routine: %s started", addrStr) + + // stop receiving packets and clean up + defer func() { + // remove conn from remoteConnectionMap + s.remoteConnectionMapMutex.Lock() + delete(s.remoteConnectionMap, addrStr) + s.remoteConnectionMapMutex.Unlock() + conn.Close() + }() + + for { + select { + case <-s.signals.stop: + return + + case <-time.After(time.Duration(conn.idleTimeoutMs) * time.Millisecond): + // timeout, quit routine + //log.Debug("Connection routine idle timeout") + return + + case tuple, ok := <-conn.echoQueue: + if !ok { + return + } + if tuple == nil { + continue + } + //log.Debug("Received udp packet len [%d] from addr: %s\n", len(pkt.Packet), addrStr) + conn.nativeConn.SetWriteDeadline(time.Now().Add(500 * time.Millisecond)) + conn.nativeConn.Write([]byte(tuple.String())) + } + } +}