From ea74d111384683401edea5d3eb2d2801a31e2d2e Mon Sep 17 00:00:00 2001 From: mzz2017 <2017@duck.com> Date: Fri, 21 Feb 2025 00:04:06 +0800 Subject: [PATCH 1/3] optimize: add cache for direct dialer --- dialer/v2ray/v2ray.go | 3 +- protocol/direct/dialer.go | 117 ++++++++++++++++++++++++++++++-------- 2 files changed, 94 insertions(+), 26 deletions(-) diff --git a/dialer/v2ray/v2ray.go b/dialer/v2ray/v2ray.go index 0e09cb6..e25e11a 100644 --- a/dialer/v2ray/v2ray.go +++ b/dialer/v2ray/v2ray.go @@ -12,7 +12,6 @@ import ( "github.com/daeuniverse/outbound/dialer" "github.com/daeuniverse/outbound/netproxy" "github.com/daeuniverse/outbound/protocol" - "github.com/daeuniverse/outbound/protocol/direct" "github.com/daeuniverse/outbound/protocol/http" "github.com/daeuniverse/outbound/transport/grpc" "github.com/daeuniverse/outbound/transport/httpupgrade" @@ -190,7 +189,7 @@ func (s *V2Ray) Dialer(option *dialer.ExtraOption, nextDialer netproxy.Dialer) ( "transport": []string{"1"}, }.Encode(), } - d, err = http.NewHTTPProxy(&u, direct.SymmetricDirect) + d, err = http.NewHTTPProxy(&u, d) if err != nil { return nil, nil, err } diff --git a/protocol/direct/dialer.go b/protocol/direct/dialer.go index 5b59c2a..dbc50a1 100644 --- a/protocol/direct/dialer.go +++ b/protocol/direct/dialer.go @@ -5,49 +5,89 @@ import ( "fmt" "net" "net/netip" + "strings" + "sync" "syscall" "github.com/daeuniverse/outbound/netproxy" ) -var SymmetricDirect = newDirectDialer(false) -var FullconeDirect = newDirectDialer(true) +var SymmetricDirect = NewDirectDialerLaddr(netip.Addr{}, Option{FullCone: false, WithCache: false}) +var FullconeDirect = NewDirectDialerLaddr(netip.Addr{}, Option{FullCone: true, WithCache: false}) +type Option struct { + FullCone bool + WithCache bool +} + +type addrCache struct { + lastAddr string + lastRemoteIp string +} type directDialer struct { - tcpLocalAddr *net.TCPAddr - udpLocalAddr *net.UDPAddr - fullCone bool + tcpDialer *net.Dialer + tcpDialerMptcp *net.Dialer + udpLocalAddr *net.UDPAddr + Option Option + + muCache sync.Mutex + cache addrCache } -func NewDirectDialerLaddr(fullCone bool, lAddr netip.Addr) netproxy.Dialer { +func NewDirectDialerLaddr(lAddr netip.Addr, option Option) netproxy.Dialer { var tcpLocalAddr *net.TCPAddr var udpLocalAddr *net.UDPAddr if lAddr.IsValid() { tcpLocalAddr = net.TCPAddrFromAddrPort(netip.AddrPortFrom(lAddr, 0)) udpLocalAddr = net.UDPAddrFromAddrPort(netip.AddrPortFrom(lAddr, 0)) } - return &directDialer{ - tcpLocalAddr: tcpLocalAddr, - udpLocalAddr: udpLocalAddr, - fullCone: fullCone, + tcpDialer := &net.Dialer{LocalAddr: tcpLocalAddr} + tcpDialerMptcp := &net.Dialer{LocalAddr: tcpLocalAddr} + tcpDialerMptcp.SetMultipathTCP(true) + d := &directDialer{ + tcpDialer: tcpDialer, + tcpDialerMptcp: tcpDialerMptcp, + udpLocalAddr: udpLocalAddr, + Option: option, } -} -func newDirectDialer(fullCone bool) netproxy.Dialer { - return &directDialer{ - tcpLocalAddr: nil, - udpLocalAddr: nil, - fullCone: fullCone, - } + return d } func (d *directDialer) dialUdp(ctx context.Context, addr string, mark int) (c netproxy.PacketConn, err error) { + var remoteAddr string + if d.Option.WithCache { + defer func() { + host, port, _ := net.SplitHostPort(addr) + // Check if the host is domain + if _, e := netip.ParseAddr(host); e == nil { + // addr is IP + return + } + + // addr is domain + var lastRemoteIp string + d.muCache.Lock() + if err != nil { + lastRemoteIp = d.cache.lastRemoteIp + } else if d.cache.lastAddr != host { + d.cache.lastAddr = host + d.cache.lastRemoteIp = remoteAddr + } + d.muCache.Unlock() + if err != nil && strings.Contains(err.Error(), "i/o timeout") && strings.Contains(err.Error(), "lookup") { + // Retry with last remote ip + c, err = d.dialUdp(ctx, net.JoinHostPort(lastRemoteIp, port), mark) + } + }() + } if mark == 0 { - if d.fullCone { + if d.Option.FullCone { conn, err := net.ListenUDP("udp", d.udpLocalAddr) if err != nil { return nil, err } + remoteAddr = "" return &directPacketConn{UDPConn: conn, FullCone: true, dialTgt: addr}, nil } else { dialer := net.Dialer{ @@ -57,12 +97,13 @@ func (d *directDialer) dialUdp(ctx context.Context, addr string, mark int) (c ne if err != nil { return nil, err } + remoteAddr = conn.RemoteAddr().String() return &directPacketConn{UDPConn: conn.(*net.UDPConn), FullCone: false, dialTgt: addr}, nil } } else { var conn *net.UDPConn - if d.fullCone { + if d.Option.FullCone { c := net.ListenConfig{ Control: func(network string, address string, c syscall.RawConn) error { return netproxy.SoMarkControl(c, mark) @@ -91,7 +132,8 @@ func (d *directDialer) dialUdp(ctx context.Context, addr string, mark int) (c ne } conn = c.(*net.UDPConn) } - return &directPacketConn{UDPConn: conn, FullCone: d.fullCone, dialTgt: addr, resolver: &net.Resolver{ + remoteAddr = conn.RemoteAddr().String() + return &directPacketConn{UDPConn: conn, FullCone: d.Option.FullCone, dialTgt: addr, resolver: &net.Resolver{ PreferGo: true, Dial: func(ctx context.Context, network, address string) (net.Conn, error) { d := net.Dialer{ @@ -105,11 +147,38 @@ func (d *directDialer) dialUdp(ctx context.Context, addr string, mark int) (c ne } } -func (d *directDialer) dialTcp(ctx context.Context, addr string, mark int, mptcp bool) (c netproxy.Conn, err error) { - dialer := &net.Dialer{ - LocalAddr: d.tcpLocalAddr, +func (d *directDialer) dialTcp(ctx context.Context, addr string, mark int, mptcp bool) (c net.Conn, err error) { + if d.Option.WithCache { + defer func() { + host, port, _ := net.SplitHostPort(addr) + // Check if the host is domain + if _, e := netip.ParseAddr(host); e == nil { + // addr is IP + return + } + + // addr is domain + var lastRemoteIp string + d.muCache.Lock() + if err != nil { + lastRemoteIp = d.cache.lastRemoteIp + } else if d.cache.lastAddr != host { + d.cache.lastAddr = host + d.cache.lastRemoteIp = c.RemoteAddr().String() + } + d.muCache.Unlock() + if err != nil && strings.Contains(err.Error(), "i/o timeout") && strings.Contains(err.Error(), "lookup") { + // Retry with last remote ip + c, err = d.dialTcp(ctx, net.JoinHostPort(lastRemoteIp, port), mark, mptcp) + } + }() + } + var dialer *net.Dialer + if mptcp { + dialer = d.tcpDialerMptcp + } else { + dialer = d.tcpDialer } - dialer.SetMultipathTCP(mptcp) if mark == 0 { return dialer.DialContext(ctx, "tcp", addr) } else { From 2cb3ac6672907e7ed9bb069e6893a6e68f80e058 Mon Sep 17 00:00:00 2001 From: mzz2017 <2017@duck.com> Date: Fri, 21 Feb 2025 00:08:50 +0800 Subject: [PATCH 2/3] update --- protocol/direct/dialer.go | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/protocol/direct/dialer.go b/protocol/direct/dialer.go index dbc50a1..3c8fab0 100644 --- a/protocol/direct/dialer.go +++ b/protocol/direct/dialer.go @@ -55,7 +55,7 @@ func NewDirectDialerLaddr(lAddr netip.Addr, option Option) netproxy.Dialer { } func (d *directDialer) dialUdp(ctx context.Context, addr string, mark int) (c netproxy.PacketConn, err error) { - var remoteAddr string + var remoteIp string if d.Option.WithCache { defer func() { host, port, _ := net.SplitHostPort(addr) @@ -70,9 +70,9 @@ func (d *directDialer) dialUdp(ctx context.Context, addr string, mark int) (c ne d.muCache.Lock() if err != nil { lastRemoteIp = d.cache.lastRemoteIp - } else if d.cache.lastAddr != host { + } else { d.cache.lastAddr = host - d.cache.lastRemoteIp = remoteAddr + d.cache.lastRemoteIp = remoteIp } d.muCache.Unlock() if err != nil && strings.Contains(err.Error(), "i/o timeout") && strings.Contains(err.Error(), "lookup") { @@ -87,7 +87,7 @@ func (d *directDialer) dialUdp(ctx context.Context, addr string, mark int) (c ne if err != nil { return nil, err } - remoteAddr = "" + remoteIp = "" return &directPacketConn{UDPConn: conn, FullCone: true, dialTgt: addr}, nil } else { dialer := net.Dialer{ @@ -97,7 +97,7 @@ func (d *directDialer) dialUdp(ctx context.Context, addr string, mark int) (c ne if err != nil { return nil, err } - remoteAddr = conn.RemoteAddr().String() + remoteIp = conn.RemoteAddr().(*net.UDPAddr).IP.String() return &directPacketConn{UDPConn: conn.(*net.UDPConn), FullCone: false, dialTgt: addr}, nil } @@ -119,6 +119,7 @@ func (d *directDialer) dialUdp(ctx context.Context, addr string, mark int) (c ne return nil, err } conn = _conn.(*net.UDPConn) + remoteIp = "" } else { dialer := net.Dialer{ Control: func(network, address string, c syscall.RawConn) error { @@ -131,8 +132,8 @@ func (d *directDialer) dialUdp(ctx context.Context, addr string, mark int) (c ne return nil, err } conn = c.(*net.UDPConn) + remoteIp = conn.RemoteAddr().(*net.UDPAddr).IP.String() } - remoteAddr = conn.RemoteAddr().String() return &directPacketConn{UDPConn: conn, FullCone: d.Option.FullCone, dialTgt: addr, resolver: &net.Resolver{ PreferGo: true, Dial: func(ctx context.Context, network, address string) (net.Conn, error) { @@ -162,9 +163,9 @@ func (d *directDialer) dialTcp(ctx context.Context, addr string, mark int, mptcp d.muCache.Lock() if err != nil { lastRemoteIp = d.cache.lastRemoteIp - } else if d.cache.lastAddr != host { + } else { d.cache.lastAddr = host - d.cache.lastRemoteIp = c.RemoteAddr().String() + d.cache.lastRemoteIp = c.RemoteAddr().(*net.TCPAddr).IP.String() } d.muCache.Unlock() if err != nil && strings.Contains(err.Error(), "i/o timeout") && strings.Contains(err.Error(), "lookup") { From 486f77c132eed365514671dffd5b218dd2004a50 Mon Sep 17 00:00:00 2001 From: mzz2017 <2017@duck.com> Date: Fri, 21 Feb 2025 00:17:49 +0800 Subject: [PATCH 3/3] extract func tryRetry to reuse --- protocol/direct/dialer.go | 78 +++++++++++++++++---------------------- 1 file changed, 34 insertions(+), 44 deletions(-) diff --git a/protocol/direct/dialer.go b/protocol/direct/dialer.go index 3c8fab0..9519b61 100644 --- a/protocol/direct/dialer.go +++ b/protocol/direct/dialer.go @@ -54,31 +54,39 @@ func NewDirectDialerLaddr(lAddr netip.Addr, option Option) netproxy.Dialer { return d } +func (d *directDialer) tryRetry(err error, addr string, remoteIp string, cb func(addr string)) { + host, port, _ := net.SplitHostPort(addr) + // Check if the host is domain + if _, e := netip.ParseAddr(host); e == nil { + // addr is IP + return + } + + // addr is domain + d.muCache.Lock() + if err == nil { + d.cache.lastAddr = host + d.cache.lastRemoteIp = remoteIp + d.muCache.Unlock() + } else { + if d.cache.lastAddr == host && strings.Contains(err.Error(), "i/o timeout") && strings.Contains(err.Error(), "lookup") { + lastRemoteIp := d.cache.lastRemoteIp + d.muCache.Unlock() + // Retry with last remote ip + cb(net.JoinHostPort(lastRemoteIp, port)) + } else { + d.muCache.Unlock() + } + } +} + func (d *directDialer) dialUdp(ctx context.Context, addr string, mark int) (c netproxy.PacketConn, err error) { var remoteIp string if d.Option.WithCache { - defer func() { - host, port, _ := net.SplitHostPort(addr) - // Check if the host is domain - if _, e := netip.ParseAddr(host); e == nil { - // addr is IP - return - } - - // addr is domain - var lastRemoteIp string - d.muCache.Lock() - if err != nil { - lastRemoteIp = d.cache.lastRemoteIp - } else { - d.cache.lastAddr = host - d.cache.lastRemoteIp = remoteIp - } - d.muCache.Unlock() - if err != nil && strings.Contains(err.Error(), "i/o timeout") && strings.Contains(err.Error(), "lookup") { - // Retry with last remote ip - c, err = d.dialUdp(ctx, net.JoinHostPort(lastRemoteIp, port), mark) - } + defer func() { // don't remove func wrapper for d.tryRetry + d.tryRetry(err, addr, remoteIp, func(addr string) { + c, err = d.dialUdp(ctx, addr, mark) + }) }() } if mark == 0 { @@ -150,28 +158,10 @@ func (d *directDialer) dialUdp(ctx context.Context, addr string, mark int) (c ne func (d *directDialer) dialTcp(ctx context.Context, addr string, mark int, mptcp bool) (c net.Conn, err error) { if d.Option.WithCache { - defer func() { - host, port, _ := net.SplitHostPort(addr) - // Check if the host is domain - if _, e := netip.ParseAddr(host); e == nil { - // addr is IP - return - } - - // addr is domain - var lastRemoteIp string - d.muCache.Lock() - if err != nil { - lastRemoteIp = d.cache.lastRemoteIp - } else { - d.cache.lastAddr = host - d.cache.lastRemoteIp = c.RemoteAddr().(*net.TCPAddr).IP.String() - } - d.muCache.Unlock() - if err != nil && strings.Contains(err.Error(), "i/o timeout") && strings.Contains(err.Error(), "lookup") { - // Retry with last remote ip - c, err = d.dialTcp(ctx, net.JoinHostPort(lastRemoteIp, port), mark, mptcp) - } + defer func() { // don't remove func wrapper for d.tryRetry + d.tryRetry(err, addr, c.RemoteAddr().(*net.TCPAddr).IP.String(), func(addr string) { + c, err = d.dialTcp(ctx, addr, mark, mptcp) + }) }() } var dialer *net.Dialer