From 5a0145a08aa895398420819e597a9701830095fe Mon Sep 17 00:00:00 2001 From: Daniel Date: Tue, 10 Oct 2023 16:32:36 +0200 Subject: [PATCH 1/6] Improve rate limiting over longer periods --- terminal/session.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/terminal/session.go b/terminal/session.go index 61dee28..1296ab3 100644 --- a/terminal/session.go +++ b/terminal/session.go @@ -15,6 +15,7 @@ const ( rateLimitMaxOpsPerSecond = 20 // TODO: Reduce to 10 after test phase. rateLimitMinSuspicion = 25 + rateLimitMinPermaSuspicion = rateLimitMinSuspicion * 10 rateLimitMaxSuspicionPerSecond = 2 // TODO: Reduce to 1 after test phase. // Make this big enough to trigger suspicion limit in first blast. @@ -31,7 +32,7 @@ type Session struct { // It is set when the Session is created and may be treated as a constant. started int64 - // opCount is the amount of operations started. + // opCount is the amount of operations started (and not rate limited by suspicion). opCount atomic.Int64 // suspicionScore holds a score of suspicious activity. @@ -102,6 +103,13 @@ func (s *Session) RateLimit() *Error { return ErrRateLimited } + + // Permanently rate limit if suspicion goes over the perma min limit and + // the suspicion score is 50% or greater of the operation count. + if score > rateLimitMinPermaSuspicion && + score*2 > s.opCount.Load() { + return ErrRateLimited + } } // Check the rate limit. From d208af14b0c351650b3957abb1df10c34935333d Mon Sep 17 00:00:00 2001 From: Daniel Date: Tue, 10 Oct 2023 16:33:03 +0200 Subject: [PATCH 2/6] Add support for binding Piers to specific addresses --- captain/public.go | 2 +- conf/networks.go | 83 ++++++++++++++++++++++++++------------- crew/op_connect.go | 13 +++++- patrol/http.go | 2 +- ships/http.go | 5 +-- ships/http_shared.go | 61 +++++++++++++++++----------- ships/http_shared_test.go | 8 ++-- ships/pier.go | 16 +++----- ships/tcp.go | 37 +++++++++++------ 9 files changed, 144 insertions(+), 83 deletions(-) diff --git a/captain/public.go b/captain/public.go index 137abec..a9593a8 100644 --- a/captain/public.go +++ b/captain/public.go @@ -69,7 +69,7 @@ func loadPublicIdentity() (err error) { publicIdentity.Hub.Info.IPv6 != nil, ) if cfgOptionBindToAdvertised() { - conf.SetConnectAddr(publicIdentity.Hub.Info.IPv4, publicIdentity.Hub.Info.IPv6) + conf.SetBindAddr(publicIdentity.Hub.Info.IPv4, publicIdentity.Hub.Info.IPv6) } // Set Home Hub before updating the hub on the map, as this would trigger a diff --git a/conf/networks.go b/conf/networks.go index b08551f..379395c 100644 --- a/conf/networks.go +++ b/conf/networks.go @@ -29,53 +29,82 @@ func HubHasIPv6() bool { } var ( - connectIPv4 net.IP - connectIPv6 net.IP - connectIPLock sync.Mutex + bindIPv4 net.IP + bindIPv6 net.IP + bindIPLock sync.Mutex ) -// SetConnectAddr sets the preferred connect (bind) addresses. -func SetConnectAddr(ip4, ip6 net.IP) { - connectIPLock.Lock() - defer connectIPLock.Unlock() +// SetBindAddr sets the preferred connect (bind) addresses. +func SetBindAddr(ip4, ip6 net.IP) { + bindIPLock.Lock() + defer bindIPLock.Unlock() - connectIPv4 = ip4 - connectIPv6 = ip6 + bindIPv4 = ip4 + bindIPv6 = ip6 } -// GetConnectAddr returns an address with the preferred connect (bind) -// addresses for the given dial network. -// The dial network must have a suffix specify the IP version. -func GetConnectAddr(dialNetwork string) net.Addr { - connectIPLock.Lock() - defer connectIPLock.Unlock() +// BindAddrIsSet returns whether any bind address is set. +func BindAddrIsSet() bool { + bindIPLock.Lock() + defer bindIPLock.Unlock() + + return bindIPv4 != nil || bindIPv6 != nil +} + +// GetBindAddr returns an address with the preferred binding address for the +// given dial network. +// The dial network must have a suffix specifying the IP version. +func GetBindAddr(dialNetwork string) net.Addr { + bindIPLock.Lock() + defer bindIPLock.Unlock() switch dialNetwork { case "ip4": - if connectIPv4 != nil { - return &net.IPAddr{IP: connectIPv4} + if bindIPv4 != nil { + return &net.IPAddr{IP: bindIPv4} } case "ip6": - if connectIPv6 != nil { - return &net.IPAddr{IP: connectIPv6} + if bindIPv6 != nil { + return &net.IPAddr{IP: bindIPv6} } case "tcp4": - if connectIPv4 != nil { - return &net.TCPAddr{IP: connectIPv4} + if bindIPv4 != nil { + return &net.TCPAddr{IP: bindIPv4} } case "tcp6": - if connectIPv6 != nil { - return &net.TCPAddr{IP: connectIPv6} + if bindIPv6 != nil { + return &net.TCPAddr{IP: bindIPv6} } case "udp4": - if connectIPv4 != nil { - return &net.UDPAddr{IP: connectIPv4} + if bindIPv4 != nil { + return &net.UDPAddr{IP: bindIPv4} } case "udp6": - if connectIPv6 != nil { - return &net.UDPAddr{IP: connectIPv6} + if bindIPv6 != nil { + return &net.UDPAddr{IP: bindIPv6} } } return nil } + +// GetBindIPs returns the preferred binding IPs. +// Returns a slice with a single nil IP if no preferred binding IPs are set. +func GetBindIPs() []net.IP { + bindIPLock.Lock() + defer bindIPLock.Unlock() + + switch { + case bindIPv4 == nil && bindIPv6 == nil: + // Match most common case first. + return []net.IP{nil} + case bindIPv4 != nil && bindIPv6 != nil: + return []net.IP{bindIPv4, bindIPv6} + case bindIPv4 != nil: + return []net.IP{bindIPv4} + case bindIPv6 != nil: + return []net.IP{bindIPv6} + } + + return []net.IP{nil} +} diff --git a/crew/op_connect.go b/crew/op_connect.go index 158c1b0..8d4baec 100644 --- a/crew/op_connect.go +++ b/crew/op_connect.go @@ -283,7 +283,7 @@ func (op *ConnectOp) setup(session *terminal.Session) { } dialer := &net.Dialer{ Timeout: 10 * time.Second, - LocalAddr: conf.GetConnectAddr(dialNet), + LocalAddr: conf.GetBindAddr(dialNet), FallbackDelay: -1, // Disables Fast Fallback from IPv6 to IPv4. KeepAlive: -1, // Disable keep-alive. } @@ -410,6 +410,8 @@ func (op *ConnectOp) connWriter(_ context.Context) error { }() defer func() { + // Signal that we are done with writing. + close(op.doneWriting) // Close connection. _ = op.conn.Close() }() @@ -522,7 +524,14 @@ func (op *ConnectOp) HandleStop(err *terminal.Error) (errorToSend *terminal.Erro // If the op was ended remotely, write all remaining received data. // If the op was ended locally, don't bother writing remaining data. if err.IsExternal() { - <-op.doneWriting + select { + case <-op.doneWriting: + default: + select { + case <-op.doneWriting: + case <-time.After(5 * time.Second): + } + } } } diff --git a/patrol/http.go b/patrol/http.go index d361c60..07420b9 100644 --- a/patrol/http.go +++ b/patrol/http.go @@ -148,7 +148,7 @@ func CheckHTTPSConnection(ctx context.Context, network, domain string) (statusCo } dialer := &net.Dialer{ Timeout: 15 * time.Second, - LocalAddr: conf.GetConnectAddr(network), + LocalAddr: conf.GetBindAddr(network), FallbackDelay: -1, // Disables Fast Fallback from IPv6 to IPv4. KeepAlive: -1, // Disable keep-alive. } diff --git a/ships/http.go b/ships/http.go index ced7c65..daad181 100644 --- a/ships/http.go +++ b/ships/http.go @@ -77,7 +77,7 @@ func launchHTTPShip(ctx context.Context, transport *hub.Transport, ip net.IP) (S } dialer := &net.Dialer{ Timeout: 30 * time.Second, - LocalAddr: conf.GetConnectAddr(dialNet), + LocalAddr: conf.GetBindAddr(dialNet), FallbackDelay: -1, // Disables Fast Fallback from IPv6 to IPv4. KeepAlive: -1, // Disable keep-alive. } @@ -209,11 +209,10 @@ func establishHTTPPier(transport *hub.Transport, dockingRequests chan Ship) (Pie pier.initBase() // Register handler. - listener, err := addHTTPHandler(transport.Port, path, pier.ServeHTTP) + err := addHTTPHandler(transport.Port, path, pier.ServeHTTP) if err != nil { return nil, fmt.Errorf("failed to add HTTP handler: %w", err) } - pier.listener = listener return pier, nil } diff --git a/ships/http_shared.go b/ships/http_shared.go index 7f64435..1ca2f5b 100644 --- a/ships/http_shared.go +++ b/ships/http_shared.go @@ -8,11 +8,13 @@ import ( "net/http" "sync" "time" + + "github.com/safing/portbase/log" + "github.com/safing/spn/conf" ) type sharedServer struct { - listener net.Listener - server *http.Server + server *http.Server handlers map[string]http.HandlerFunc handlersLock sync.RWMutex @@ -45,10 +47,10 @@ var ( sharedHTTPServersLock sync.Mutex ) -func addHTTPHandler(port uint16, path string, handler http.HandlerFunc) (ln net.Listener, err error) { +func addHTTPHandler(port uint16, path string, handler http.HandlerFunc) error { // Check params. if port == 0 { - return nil, errors.New("cannot listen on port 0") + return errors.New("cannot listen on port 0") } // Default to root path. @@ -69,12 +71,12 @@ func addHTTPHandler(port uint16, path string, handler http.HandlerFunc) (ln net. // Check if path is already registered. _, ok := shared.handlers[path] if ok { - return nil, errors.New("path already registered") + return errors.New("path already registered") } // Else, register handler at path. shared.handlers[path] = handler - return shared.listener, nil + return nil } // Shared server does not exist - create one. @@ -99,28 +101,41 @@ func addHTTPHandler(port uint16, path string, handler http.HandlerFunc) (ln net. } shared.server = server - // Start listener. - shared.listener, err = net.Listen("tcp", server.Addr) - if err != nil { - return nil, fmt.Errorf("failed to listen: %w", err) + // Start listeners. + bindIPs := conf.GetBindIPs() + listeners := make([]net.Listener, 0, len(bindIPs)) + for _, bindIP := range bindIPs { + listener, err := net.ListenTCP("tcp", &net.TCPAddr{ + IP: bindIP, + Port: int(port), + }) + if err != nil { + return fmt.Errorf("failed to listen: %w", err) + } + + listeners = append(listeners, listener) + log.Infof("spn/ships: http transport pier established on %s", listener.Addr()) } // Add shared http server to list. sharedHTTPServers[port] = shared - // Start server in service worker. - module.StartServiceWorker( - fmt.Sprintf("shared http server listener on port %d", port), 0, - func(ctx context.Context) error { - err := shared.server.Serve(shared.listener) - if !errors.Is(http.ErrServerClosed, err) { - return err - } - return nil - }, - ) - - return shared.listener, nil + // Start servers in service workers. + for _, listener := range listeners { + serviceListener := listener + module.StartServiceWorker( + fmt.Sprintf("shared http server listener on %s", listener.Addr()), 0, + func(ctx context.Context) error { + err := shared.server.Serve(serviceListener) + if !errors.Is(http.ErrServerClosed, err) { + return err + } + return nil + }, + ) + } + + return nil } func removeHTTPHandler(port uint16, path string) error { diff --git a/ships/http_shared_test.go b/ships/http_shared_test.go index d310179..e16ff53 100644 --- a/ships/http_shared_test.go +++ b/ships/http_shared_test.go @@ -10,13 +10,13 @@ func TestSharedHTTP(t *testing.T) { //nolint:paralleltest // Test checks global const testPort = 65100 // Register multiple handlers. - _, err := addHTTPHandler(testPort, "", ServeInfoPage) + err := addHTTPHandler(testPort, "", ServeInfoPage) assert.NoError(t, err, "should be able to share http listener") - _, err = addHTTPHandler(testPort, "/test", ServeInfoPage) + err = addHTTPHandler(testPort, "/test", ServeInfoPage) assert.NoError(t, err, "should be able to share http listener") - _, err = addHTTPHandler(testPort, "/test2", ServeInfoPage) + err = addHTTPHandler(testPort, "/test2", ServeInfoPage) assert.NoError(t, err, "should be able to share http listener") - _, err = addHTTPHandler(testPort, "/", ServeInfoPage) + err = addHTTPHandler(testPort, "/", ServeInfoPage) assert.Error(t, err, "should fail to register path twice") // Unregister diff --git a/ships/pier.go b/ships/pier.go index 818426b..a332bf6 100644 --- a/ships/pier.go +++ b/ships/pier.go @@ -17,9 +17,6 @@ type Pier interface { // Transport returns the transport used for this ship. Transport() *hub.Transport - // Addr returns the underlying network address used by the listener. - Addr() net.Addr - // Abolish closes the underlying listener and cleans up any related resources. Abolish() } @@ -50,8 +47,8 @@ func EstablishPier(transport *hub.Transport, dockingRequests chan Ship) (Pier, e type PierBase struct { // transport holds the transport definition of the pier. transport *hub.Transport - // listener is the actual underlying listener. - listener net.Listener + // listeners holds the actual underlying listeners. + listeners []net.Listener // dockingRequests is used to report new connections to the higher layer. dockingRequests chan Ship @@ -75,14 +72,11 @@ func (pier *PierBase) Transport() *hub.Transport { return pier.transport } -// Addr returns the underlying network address used by the listener. -func (pier *PierBase) Addr() net.Addr { - return pier.listener.Addr() -} - // Abolish closes the underlying listener and cleans up any related resources. func (pier *PierBase) Abolish() { if pier.abolishing.SetToIf(false, true) { - _ = pier.listener.Close() + for _, listener := range pier.listeners { + _ = listener.Close() + } } } diff --git a/ships/tcp.go b/ships/tcp.go index 546802d..9bf91c1 100644 --- a/ships/tcp.go +++ b/ships/tcp.go @@ -6,6 +6,7 @@ import ( "net" "time" + "github.com/safing/portbase/log" "github.com/safing/spn/conf" "github.com/safing/spn/hub" ) @@ -36,7 +37,7 @@ func launchTCPShip(ctx context.Context, transport *hub.Transport, ip net.IP) (Sh } dialer := &net.Dialer{ Timeout: 30 * time.Second, - LocalAddr: conf.GetConnectAddr(dialNet), + LocalAddr: conf.GetBindAddr(dialNet), FallbackDelay: -1, // Disables Fast Fallback from IPv6 to IPv4. KeepAlive: -1, // Disable keep-alive. } @@ -60,33 +61,47 @@ func launchTCPShip(ctx context.Context, transport *hub.Transport, ip net.IP) (Sh } func establishTCPPier(transport *hub.Transport, dockingRequests chan Ship) (Pier, error) { - listener, err := net.ListenTCP("tcp", &net.TCPAddr{ - Port: int(transport.Port), - }) - if err != nil { - return nil, err + // Start listeners. + bindIPs := conf.GetBindIPs() + listeners := make([]net.Listener, 0, len(bindIPs)) + for _, bindIP := range bindIPs { + listener, err := net.ListenTCP("tcp", &net.TCPAddr{ + IP: bindIP, + Port: int(transport.Port), + }) + if err != nil { + return nil, fmt.Errorf("failed to listen: %w", err) + } + + listeners = append(listeners, listener) + log.Infof("spn/ships: tcp transport pier established on %s", listener.Addr()) } // Create new pier. pier := &TCPPier{ PierBase: PierBase{ transport: transport, - listener: listener, + listeners: listeners, dockingRequests: dockingRequests, }, } pier.initBase() - // Start worker. - module.StartServiceWorker("accept TCP docking requests", 0, pier.dockingWorker) + // Start workers. + for _, listener := range pier.listeners { + serviceListener := listener + module.StartServiceWorker("accept TCP docking requests", 0, func(ctx context.Context) error { + return pier.dockingWorker(ctx, serviceListener) + }) + } return pier, nil } -func (pier *TCPPier) dockingWorker(ctx context.Context) error { +func (pier *TCPPier) dockingWorker(ctx context.Context, listener net.Listener) error { for { // Block until something happens. - conn, err := pier.listener.Accept() + conn, err := listener.Accept() // Check if we are done. select { From 0af6c91db4653960cbd906eb15f869e83d2a380e Mon Sep 17 00:00:00 2001 From: Daniel Date: Thu, 12 Oct 2023 16:45:46 +0200 Subject: [PATCH 3/6] Fix stopping TCP piers --- ships/http.go | 9 +-------- ships/tcp.go | 32 ++++++++++++++++++++------------ 2 files changed, 21 insertions(+), 20 deletions(-) diff --git a/ships/http.go b/ships/http.go index daad181..1603284 100644 --- a/ships/http.go +++ b/ships/http.go @@ -226,12 +226,5 @@ func (pier *HTTPPier) Abolish() { // Do not close the listener, as it is shared. // Instead, remove the HTTP handler and the shared server will shutdown itself when needed. - - // Default to root path. - path := pier.transport.Path - if path == "" { - path = "/" - } - - _ = removeHTTPHandler(pier.transport.Port, path) + _ = removeHTTPHandler(pier.transport.Port, pier.transport.Path) } diff --git a/ships/tcp.go b/ships/tcp.go index 9bf91c1..9d47bd9 100644 --- a/ships/tcp.go +++ b/ships/tcp.go @@ -19,6 +19,9 @@ type TCPShip struct { // TCPPier is a pier that uses TCP. type TCPPier struct { PierBase + + ctx context.Context + cancelCtx context.CancelFunc } func init() { @@ -78,12 +81,15 @@ func establishTCPPier(transport *hub.Transport, dockingRequests chan Ship) (Pier } // Create new pier. + pierCtx, cancelCtx := context.WithCancel(module.Ctx) pier := &TCPPier{ PierBase: PierBase{ transport: transport, listeners: listeners, dockingRequests: dockingRequests, }, + ctx: pierCtx, + cancelCtx: cancelCtx, } pier.initBase() @@ -98,20 +104,16 @@ func establishTCPPier(transport *hub.Transport, dockingRequests chan Ship) (Pier return pier, nil } -func (pier *TCPPier) dockingWorker(ctx context.Context, listener net.Listener) error { +func (pier *TCPPier) dockingWorker(_ context.Context, listener net.Listener) error { for { // Block until something happens. conn, err := listener.Accept() - // Check if we are done. - select { - case <-ctx.Done(): - return nil - default: - } - - // Check for error. - if err != nil { + // Check for errors. + switch { + case pier.ctx.Err() != nil: + return pier.ctx.Err() + case err != nil: return err } @@ -130,8 +132,14 @@ func (pier *TCPPier) dockingWorker(ctx context.Context, listener net.Listener) e // Submit new docking request. select { case pier.dockingRequests <- ship: - case <-ctx.Done(): - return nil + case <-pier.ctx.Done(): + return pier.ctx.Err() } } } + +// Abolish closes the underlying listener and cleans up any related resources. +func (pier *TCPPier) Abolish() { + pier.cancelCtx() + pier.PierBase.Abolish() +} From 5ef2b288629e1bd60135977c88a477c7dec0ae5c Mon Sep 17 00:00:00 2001 From: Daniel Date: Thu, 12 Oct 2023 16:46:32 +0200 Subject: [PATCH 4/6] Leave loading of geoip to navigator --- captain/intel.go | 15 --------------- captain/module.go | 5 +---- 2 files changed, 1 insertion(+), 19 deletions(-) diff --git a/captain/intel.go b/captain/intel.go index f16a207..621e619 100644 --- a/captain/intel.go +++ b/captain/intel.go @@ -85,21 +85,6 @@ func resetSPNIntel() { intelResource = nil } -var requiredResources = []string{ - "intel/geoip/geoipv4.mmdb.gz", - "intel/geoip/geoipv6.mmdb.gz", -} - -func loadRequiredResources() error { - for _, res := range requiredResources { - _, err := updates.GetFile(res) - if err != nil { - return fmt.Errorf("failed to get required resource %s: %w", res, err) - } - } - return nil -} - func setVirtualNetworkConfig(configs []*hub.VirtualNetworkConfig) { // Do nothing if not public Hub. if !conf.PublicHub() { diff --git a/captain/module.go b/captain/module.go index aad02d6..a74c53f 100644 --- a/captain/module.go +++ b/captain/module.go @@ -98,10 +98,7 @@ func start() error { } ships.EnableMasking(maskingBytes) - // Initialize intel and other required resources. - if err := loadRequiredResources(); err != nil { - return err - } + // Initialize intel. if err := registerIntelUpdateHook(); err != nil { return err } From 7eae35e4f4c9dc17a39177737e3d90957a45b477 Mon Sep 17 00:00:00 2001 From: Daniel Date: Thu, 12 Oct 2023 16:47:20 +0200 Subject: [PATCH 5/6] Close waiting connections when stopping docking request handler --- captain/module.go | 1 - captain/piers.go | 24 +++++++++++++++++------- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/captain/module.go b/captain/module.go index a74c53f..e04db08 100644 --- a/captain/module.go +++ b/captain/module.go @@ -191,7 +191,6 @@ func stop() error { if conf.PublicHub() { publishShutdownStatus() stopPiers() - closePendingDockingRequests() } return nil diff --git a/captain/piers.go b/captain/piers.go index f97dc20..45e5e1e 100644 --- a/captain/piers.go +++ b/captain/piers.go @@ -57,13 +57,29 @@ func stopPiers() { } func dockingRequestHandler(ctx context.Context) error { + // Sink all waiting ships when this worker ends. + // But don't be destructive so the service worker could recover. + defer func() { + for { + select { + case ship := <-dockingRequests: + if ship != nil { + ship.Sink() + } + default: + return + } + } + }() + for { select { case <-ctx.Done(): return nil case ship := <-dockingRequests: + // Ignore nil ships. if ship == nil { - return errors.New("received nil ship") + continue } if err := checkDockingPermission(ctx, ship); err != nil { @@ -75,12 +91,6 @@ func dockingRequestHandler(ctx context.Context) error { } } -func closePendingDockingRequests() { - for ship := range dockingRequests { - ship.Sink() - } -} - func checkDockingPermission(ctx context.Context, ship ships.Ship) error { remoteIP, remotePort, err := netutils.IPPortFromAddr(ship.RemoteAddr()) if err != nil { From b1bc0cbd519eca85cdf74283c2648f16151a4bd3 Mon Sep 17 00:00:00 2001 From: Daniel Date: Thu, 12 Oct 2023 16:47:44 +0200 Subject: [PATCH 6/6] Return error when loading intel with test map to prevent looping --- captain/intel.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/captain/intel.go b/captain/intel.go index 621e619..ce7749a 100644 --- a/captain/intel.go +++ b/captain/intel.go @@ -50,7 +50,7 @@ func updateSPNIntel(ctx context.Context, _ interface{}) (err error) { // Only update SPN intel when using the matching map. if conf.MainMapName != intelResourceMapName { - return nil + return fmt.Errorf("intel resource not for map %q", conf.MainMapName) } // Check if there is something to do.