diff --git a/pkg/proxy/server.go b/pkg/proxy/server.go index bc71c3a1660..4e15df3d54f 100644 --- a/pkg/proxy/server.go +++ b/pkg/proxy/server.go @@ -59,18 +59,6 @@ type Server interface { // Close closes listener and transport. Close() error - // PauseAccept stops accepting new connections. - PauseAccept() - // UnpauseAccept removes pause operation on accepting new connections. - UnpauseAccept() - - // DelayAccept adds latency ± random variable to accepting - // new incoming connections. - DelayAccept(latency, rv time.Duration) - // UndelayAccept removes sending latencies. - UndelayAccept() - // LatencyAccept returns current latency on accepting - // new incoming connections. LatencyAccept() time.Duration // DelayTx adds latency ± random variable for "outgoing" traffic @@ -115,16 +103,6 @@ type Server interface { // UnblackholeRx removes blackhole operation on "receiving". UnblackholeRx() - // PauseTx stops "forwarding" packets; "outgoing" traffic blocks. - PauseTx() - // UnpauseTx removes "forwarding" pause operation. - UnpauseTx() - - // PauseRx stops "receiving" packets; "incoming" traffic blocks. - PauseRx() - // UnpauseRx removes "receiving" pause operation. - UnpauseRx() - // ResetListener closes and restarts listener. ResetListener() error } @@ -164,9 +142,6 @@ type server struct { listenerMu sync.RWMutex listener net.Listener - pauseAcceptMu sync.Mutex - pauseAcceptc chan struct{} - latencyAcceptMu sync.RWMutex latencyAccept time.Duration @@ -208,9 +183,8 @@ func NewServer(cfg ServerConfig) Server { donec: make(chan struct{}), errc: make(chan error, 16), - pauseAcceptc: make(chan struct{}), - pauseTxc: make(chan struct{}), - pauseRxc: make(chan struct{}), + pauseTxc: make(chan struct{}), + pauseRxc: make(chan struct{}), } _, fromPort, err := net.SplitHostPort(cfg.From.Host) @@ -233,7 +207,6 @@ func NewServer(cfg ServerConfig) Server { s.retryInterval = defaultRetryInterval } - close(s.pauseAcceptc) close(s.pauseTxc) close(s.pauseRxc) @@ -290,15 +263,6 @@ func (s *server) listenAndServe() { close(s.readyc) for { - s.pauseAcceptMu.Lock() - pausec := s.pauseAcceptc - s.pauseAcceptMu.Unlock() - select { - case <-pausec: - case <-s.donec: - return - } - s.latencyAcceptMu.RLock() lat := s.latencyAccept s.latencyAcceptMu.RUnlock() @@ -645,70 +609,6 @@ func (s *server) Close() (err error) { return err } -func (s *server) PauseAccept() { - s.pauseAcceptMu.Lock() - s.pauseAcceptc = make(chan struct{}) - s.pauseAcceptMu.Unlock() - - s.lg.Info( - "paused accept", - zap.String("from", s.From()), - zap.String("to", s.To()), - ) -} - -func (s *server) UnpauseAccept() { - s.pauseAcceptMu.Lock() - select { - case <-s.pauseAcceptc: // already unpaused - case <-s.donec: - s.pauseAcceptMu.Unlock() - return - default: - close(s.pauseAcceptc) - } - s.pauseAcceptMu.Unlock() - - s.lg.Info( - "unpaused accept", - zap.String("from", s.From()), - zap.String("to", s.To()), - ) -} - -func (s *server) DelayAccept(latency, rv time.Duration) { - if latency <= 0 { - return - } - d := computeLatency(latency, rv) - s.latencyAcceptMu.Lock() - s.latencyAccept = d - s.latencyAcceptMu.Unlock() - - s.lg.Info( - "set accept latency", - zap.Duration("latency", d), - zap.Duration("given-latency", latency), - zap.Duration("given-latency-random-variable", rv), - zap.String("from", s.From()), - zap.String("to", s.To()), - ) -} - -func (s *server) UndelayAccept() { - s.latencyAcceptMu.Lock() - d := s.latencyAccept - s.latencyAccept = 0 - s.latencyAcceptMu.Unlock() - - s.lg.Info( - "removed accept latency", - zap.Duration("latency", d), - zap.String("from", s.From()), - zap.String("to", s.To()), - ) -} - func (s *server) LatencyAccept() time.Duration { s.latencyAcceptMu.RLock() d := s.latencyAccept diff --git a/pkg/proxy/server_test.go b/pkg/proxy/server_test.go index d19c947c646..baabfebe488 100644 --- a/pkg/proxy/server_test.go +++ b/pkg/proxy/server_test.go @@ -175,114 +175,6 @@ func createTLSInfo(lg *zap.Logger, secure bool) transport.TLSInfo { return transport.TLSInfo{Logger: lg} } -func TestServer_Unix_Insecure_DelayAccept(t *testing.T) { testServerDelayAccept(t, false) } -func TestServer_Unix_Secure_DelayAccept(t *testing.T) { testServerDelayAccept(t, true) } -func testServerDelayAccept(t *testing.T, secure bool) { - lg := zaptest.NewLogger(t) - srcAddr, dstAddr := newUnixAddr(), newUnixAddr() - defer func() { - os.RemoveAll(srcAddr) - os.RemoveAll(dstAddr) - }() - tlsInfo := createTLSInfo(lg, secure) - scheme := "unix" - ln := listen(t, scheme, dstAddr, tlsInfo) - defer ln.Close() - - cfg := ServerConfig{ - Logger: lg, - From: url.URL{Scheme: scheme, Host: srcAddr}, - To: url.URL{Scheme: scheme, Host: dstAddr}, - } - if secure { - cfg.TLSInfo = tlsInfo - } - p := NewServer(cfg) - - waitForServer(t, p) - - defer p.Close() - - data := []byte("Hello World!") - - now := time.Now() - send(t, data, scheme, srcAddr, tlsInfo) - if d := receive(t, ln); !bytes.Equal(data, d) { - t.Fatalf("expected %q, got %q", string(data), string(d)) - } - took1 := time.Since(now) - t.Logf("took %v with no latency", took1) - - lat, rv := 700*time.Millisecond, 10*time.Millisecond - p.DelayAccept(lat, rv) - defer p.UndelayAccept() - if err := p.ResetListener(); err != nil { - t.Fatal(err) - } - time.Sleep(200 * time.Millisecond) - - now = time.Now() - send(t, data, scheme, srcAddr, tlsInfo) - if d := receive(t, ln); !bytes.Equal(data, d) { - t.Fatalf("expected %q, got %q", string(data), string(d)) - } - took2 := time.Since(now) - t.Logf("took %v with latency %v±%v", took2, lat, rv) - - if took1 >= took2 { - t.Fatalf("expected took1 %v < took2 %v", took1, took2) - } -} - -func TestServer_PauseTx(t *testing.T) { - lg := zaptest.NewLogger(t) - scheme := "unix" - srcAddr, dstAddr := newUnixAddr(), newUnixAddr() - defer func() { - os.RemoveAll(srcAddr) - os.RemoveAll(dstAddr) - }() - ln := listen(t, scheme, dstAddr, transport.TLSInfo{}) - defer ln.Close() - - p := NewServer(ServerConfig{ - Logger: lg, - From: url.URL{Scheme: scheme, Host: srcAddr}, - To: url.URL{Scheme: scheme, Host: dstAddr}, - }) - - waitForServer(t, p) - - defer p.Close() - - p.PauseTx() - - data := []byte("Hello World!") - send(t, data, scheme, srcAddr, transport.TLSInfo{}) - - recvc := make(chan []byte, 1) - go func() { - recvc <- receive(t, ln) - }() - - select { - case d := <-recvc: - t.Fatalf("received unexpected data %q during pause", string(d)) - case <-time.After(200 * time.Millisecond): - } - - p.UnpauseTx() - - select { - case d := <-recvc: - if !bytes.Equal(data, d) { - t.Fatalf("expected %q, got %q", string(data), string(d)) - } - case <-time.After(2 * time.Second): - t.Fatal("took too long to receive after unpause") - } -} - func TestServer_ModifyTx_corrupt(t *testing.T) { lg := zaptest.NewLogger(t) scheme := "unix"