diff --git a/pkg/proxy/server.go b/pkg/proxy/server.go index bc71c3a1660..cd84e4e64b5 100644 --- a/pkg/proxy/server.go +++ b/pkg/proxy/server.go @@ -59,20 +59,6 @@ type Server interface { // Close closes listener and transport. Close() error - // PauseAccept stops accepting new connections. - PauseAccept() - // UnpauseAccept removes pause operation on accepting new connections. - UnpauseAccept() - - // DelayAccept adds latency ± random variable to accepting - // new incoming connections. - DelayAccept(latency, rv time.Duration) - // UndelayAccept removes sending latencies. - UndelayAccept() - // LatencyAccept returns current latency on accepting - // new incoming connections. - LatencyAccept() time.Duration - // DelayTx adds latency ± random variable for "outgoing" traffic // in "sending" layer. DelayTx(latency, rv time.Duration) @@ -115,16 +101,6 @@ type Server interface { // UnblackholeRx removes blackhole operation on "receiving". UnblackholeRx() - // PauseTx stops "forwarding" packets; "outgoing" traffic blocks. - PauseTx() - // UnpauseTx removes "forwarding" pause operation. - UnpauseTx() - - // PauseRx stops "receiving" packets; "incoming" traffic blocks. - PauseRx() - // UnpauseRx removes "receiving" pause operation. - UnpauseRx() - // ResetListener closes and restarts listener. ResetListener() error } @@ -164,24 +140,12 @@ type server struct { listenerMu sync.RWMutex listener net.Listener - pauseAcceptMu sync.Mutex - pauseAcceptc chan struct{} - - latencyAcceptMu sync.RWMutex - latencyAccept time.Duration - modifyTxMu sync.RWMutex modifyTx func(data []byte) []byte modifyRxMu sync.RWMutex modifyRx func(data []byte) []byte - pauseTxMu sync.Mutex - pauseTxc chan struct{} - - pauseRxMu sync.Mutex - pauseRxc chan struct{} - latencyTxMu sync.RWMutex latencyTx time.Duration @@ -207,10 +171,6 @@ func NewServer(cfg ServerConfig) Server { readyc: make(chan struct{}), donec: make(chan struct{}), errc: make(chan error, 16), - - pauseAcceptc: make(chan struct{}), - pauseTxc: make(chan struct{}), - pauseRxc: make(chan struct{}), } _, fromPort, err := net.SplitHostPort(cfg.From.Host) @@ -233,10 +193,6 @@ func NewServer(cfg ServerConfig) Server { s.retryInterval = defaultRetryInterval } - close(s.pauseAcceptc) - close(s.pauseTxc) - close(s.pauseRxc) - if strings.HasPrefix(s.from.Scheme, "http") { s.from.Scheme = "tcp" } @@ -290,26 +246,6 @@ func (s *server) listenAndServe() { close(s.readyc) for { - s.pauseAcceptMu.Lock() - pausec := s.pauseAcceptc - s.pauseAcceptMu.Unlock() - select { - case <-pausec: - case <-s.donec: - return - } - - s.latencyAcceptMu.RLock() - lat := s.latencyAccept - s.latencyAcceptMu.RUnlock() - if lat > 0 { - select { - case <-time.After(lat): - case <-s.donec: - return - } - } - s.listenerMu.RLock() ln := s.listener s.listenerMu.RUnlock() @@ -495,27 +431,6 @@ func (s *server) ioCopy(dst io.Writer, src io.Reader, ptype proxyType) { panic("unknown proxy type") } - // pause before packet dropping, blocking, and forwarding - var pausec chan struct{} - switch ptype { - case proxyTx: - s.pauseTxMu.Lock() - pausec = s.pauseTxc - s.pauseTxMu.Unlock() - case proxyRx: - s.pauseRxMu.Lock() - pausec = s.pauseRxc - s.pauseRxMu.Unlock() - default: - panic("unknown proxy type") - } - select { - case <-pausec: - case <-s.donec: - return - } - - // pause first, and then drop packets if nr2 == 0 { continue } @@ -645,77 +560,6 @@ func (s *server) Close() (err error) { return err } -func (s *server) PauseAccept() { - s.pauseAcceptMu.Lock() - s.pauseAcceptc = make(chan struct{}) - s.pauseAcceptMu.Unlock() - - s.lg.Info( - "paused accept", - zap.String("from", s.From()), - zap.String("to", s.To()), - ) -} - -func (s *server) UnpauseAccept() { - s.pauseAcceptMu.Lock() - select { - case <-s.pauseAcceptc: // already unpaused - case <-s.donec: - s.pauseAcceptMu.Unlock() - return - default: - close(s.pauseAcceptc) - } - s.pauseAcceptMu.Unlock() - - s.lg.Info( - "unpaused accept", - zap.String("from", s.From()), - zap.String("to", s.To()), - ) -} - -func (s *server) DelayAccept(latency, rv time.Duration) { - if latency <= 0 { - return - } - d := computeLatency(latency, rv) - s.latencyAcceptMu.Lock() - s.latencyAccept = d - s.latencyAcceptMu.Unlock() - - s.lg.Info( - "set accept latency", - zap.Duration("latency", d), - zap.Duration("given-latency", latency), - zap.Duration("given-latency-random-variable", rv), - zap.String("from", s.From()), - zap.String("to", s.To()), - ) -} - -func (s *server) UndelayAccept() { - s.latencyAcceptMu.Lock() - d := s.latencyAccept - s.latencyAccept = 0 - s.latencyAcceptMu.Unlock() - - s.lg.Info( - "removed accept latency", - zap.Duration("latency", d), - zap.String("from", s.From()), - zap.String("to", s.To()), - ) -} - -func (s *server) LatencyAccept() time.Duration { - s.latencyAcceptMu.RLock() - d := s.latencyAccept - s.latencyAcceptMu.RUnlock() - return d -} - func (s *server) DelayTx(latency, rv time.Duration) { if latency <= 0 { return @@ -897,68 +741,6 @@ func (s *server) UnblackholeRx() { ) } -func (s *server) PauseTx() { - s.pauseTxMu.Lock() - s.pauseTxc = make(chan struct{}) - s.pauseTxMu.Unlock() - - s.lg.Info( - "paused tx", - zap.String("from", s.From()), - zap.String("to", s.To()), - ) -} - -func (s *server) UnpauseTx() { - s.pauseTxMu.Lock() - select { - case <-s.pauseTxc: // already unpaused - case <-s.donec: - s.pauseTxMu.Unlock() - return - default: - close(s.pauseTxc) - } - s.pauseTxMu.Unlock() - - s.lg.Info( - "unpaused tx", - zap.String("from", s.From()), - zap.String("to", s.To()), - ) -} - -func (s *server) PauseRx() { - s.pauseRxMu.Lock() - s.pauseRxc = make(chan struct{}) - s.pauseRxMu.Unlock() - - s.lg.Info( - "paused rx", - zap.String("from", s.To()), - zap.String("to", s.From()), - ) -} - -func (s *server) UnpauseRx() { - s.pauseRxMu.Lock() - select { - case <-s.pauseRxc: // already unpaused - case <-s.donec: - s.pauseRxMu.Unlock() - return - default: - close(s.pauseRxc) - } - s.pauseRxMu.Unlock() - - s.lg.Info( - "unpaused rx", - zap.String("from", s.To()), - zap.String("to", s.From()), - ) -} - func (s *server) ResetListener() error { s.listenerMu.Lock() defer s.listenerMu.Unlock() diff --git a/pkg/proxy/server_test.go b/pkg/proxy/server_test.go index d19c947c646..baabfebe488 100644 --- a/pkg/proxy/server_test.go +++ b/pkg/proxy/server_test.go @@ -175,114 +175,6 @@ func createTLSInfo(lg *zap.Logger, secure bool) transport.TLSInfo { return transport.TLSInfo{Logger: lg} } -func TestServer_Unix_Insecure_DelayAccept(t *testing.T) { testServerDelayAccept(t, false) } -func TestServer_Unix_Secure_DelayAccept(t *testing.T) { testServerDelayAccept(t, true) } -func testServerDelayAccept(t *testing.T, secure bool) { - lg := zaptest.NewLogger(t) - srcAddr, dstAddr := newUnixAddr(), newUnixAddr() - defer func() { - os.RemoveAll(srcAddr) - os.RemoveAll(dstAddr) - }() - tlsInfo := createTLSInfo(lg, secure) - scheme := "unix" - ln := listen(t, scheme, dstAddr, tlsInfo) - defer ln.Close() - - cfg := ServerConfig{ - Logger: lg, - From: url.URL{Scheme: scheme, Host: srcAddr}, - To: url.URL{Scheme: scheme, Host: dstAddr}, - } - if secure { - cfg.TLSInfo = tlsInfo - } - p := NewServer(cfg) - - waitForServer(t, p) - - defer p.Close() - - data := []byte("Hello World!") - - now := time.Now() - send(t, data, scheme, srcAddr, tlsInfo) - if d := receive(t, ln); !bytes.Equal(data, d) { - t.Fatalf("expected %q, got %q", string(data), string(d)) - } - took1 := time.Since(now) - t.Logf("took %v with no latency", took1) - - lat, rv := 700*time.Millisecond, 10*time.Millisecond - p.DelayAccept(lat, rv) - defer p.UndelayAccept() - if err := p.ResetListener(); err != nil { - t.Fatal(err) - } - time.Sleep(200 * time.Millisecond) - - now = time.Now() - send(t, data, scheme, srcAddr, tlsInfo) - if d := receive(t, ln); !bytes.Equal(data, d) { - t.Fatalf("expected %q, got %q", string(data), string(d)) - } - took2 := time.Since(now) - t.Logf("took %v with latency %v±%v", took2, lat, rv) - - if took1 >= took2 { - t.Fatalf("expected took1 %v < took2 %v", took1, took2) - } -} - -func TestServer_PauseTx(t *testing.T) { - lg := zaptest.NewLogger(t) - scheme := "unix" - srcAddr, dstAddr := newUnixAddr(), newUnixAddr() - defer func() { - os.RemoveAll(srcAddr) - os.RemoveAll(dstAddr) - }() - ln := listen(t, scheme, dstAddr, transport.TLSInfo{}) - defer ln.Close() - - p := NewServer(ServerConfig{ - Logger: lg, - From: url.URL{Scheme: scheme, Host: srcAddr}, - To: url.URL{Scheme: scheme, Host: dstAddr}, - }) - - waitForServer(t, p) - - defer p.Close() - - p.PauseTx() - - data := []byte("Hello World!") - send(t, data, scheme, srcAddr, transport.TLSInfo{}) - - recvc := make(chan []byte, 1) - go func() { - recvc <- receive(t, ln) - }() - - select { - case d := <-recvc: - t.Fatalf("received unexpected data %q during pause", string(d)) - case <-time.After(200 * time.Millisecond): - } - - p.UnpauseTx() - - select { - case d := <-recvc: - if !bytes.Equal(data, d) { - t.Fatalf("expected %q, got %q", string(data), string(d)) - } - case <-time.After(2 * time.Second): - t.Fatal("took too long to receive after unpause") - } -} - func TestServer_ModifyTx_corrupt(t *testing.T) { lg := zaptest.NewLogger(t) scheme := "unix"