diff --git a/pkg/proxy/server.go b/pkg/proxy/server.go index de41276394d..cd84e4e64b5 100644 --- a/pkg/proxy/server.go +++ b/pkg/proxy/server.go @@ -146,12 +146,6 @@ type server struct { modifyRxMu sync.RWMutex modifyRx func(data []byte) []byte - pauseTxMu sync.Mutex - pauseTxc chan struct{} - - pauseRxMu sync.Mutex - pauseRxc chan struct{} - latencyTxMu sync.RWMutex latencyTx time.Duration @@ -177,9 +171,6 @@ func NewServer(cfg ServerConfig) Server { readyc: make(chan struct{}), donec: make(chan struct{}), errc: make(chan error, 16), - - pauseTxc: make(chan struct{}), - pauseRxc: make(chan struct{}), } _, fromPort, err := net.SplitHostPort(cfg.From.Host) @@ -202,9 +193,6 @@ func NewServer(cfg ServerConfig) Server { s.retryInterval = defaultRetryInterval } - close(s.pauseTxc) - close(s.pauseRxc) - if strings.HasPrefix(s.from.Scheme, "http") { s.from.Scheme = "tcp" } @@ -443,27 +431,6 @@ func (s *server) ioCopy(dst io.Writer, src io.Reader, ptype proxyType) { panic("unknown proxy type") } - // pause before packet dropping, blocking, and forwarding - var pausec chan struct{} - switch ptype { - case proxyTx: - s.pauseTxMu.Lock() - pausec = s.pauseTxc - s.pauseTxMu.Unlock() - case proxyRx: - s.pauseRxMu.Lock() - pausec = s.pauseRxc - s.pauseRxMu.Unlock() - default: - panic("unknown proxy type") - } - select { - case <-pausec: - case <-s.donec: - return - } - - // pause first, and then drop packets if nr2 == 0 { continue } @@ -774,68 +741,6 @@ func (s *server) UnblackholeRx() { ) } -func (s *server) PauseTx() { - s.pauseTxMu.Lock() - s.pauseTxc = make(chan struct{}) - s.pauseTxMu.Unlock() - - s.lg.Info( - "paused tx", - zap.String("from", s.From()), - zap.String("to", s.To()), - ) -} - -func (s *server) UnpauseTx() { - s.pauseTxMu.Lock() - select { - case <-s.pauseTxc: // already unpaused - case <-s.donec: - s.pauseTxMu.Unlock() - return - default: - close(s.pauseTxc) - } - s.pauseTxMu.Unlock() - - s.lg.Info( - "unpaused tx", - zap.String("from", s.From()), - zap.String("to", s.To()), - ) -} - -func (s *server) PauseRx() { - s.pauseRxMu.Lock() - s.pauseRxc = make(chan struct{}) - s.pauseRxMu.Unlock() - - s.lg.Info( - "paused rx", - zap.String("from", s.To()), - zap.String("to", s.From()), - ) -} - -func (s *server) UnpauseRx() { - s.pauseRxMu.Lock() - select { - case <-s.pauseRxc: // already unpaused - case <-s.donec: - s.pauseRxMu.Unlock() - return - default: - close(s.pauseRxc) - } - s.pauseRxMu.Unlock() - - s.lg.Info( - "unpaused rx", - zap.String("from", s.To()), - zap.String("to", s.From()), - ) -} - func (s *server) ResetListener() error { s.listenerMu.Lock() defer s.listenerMu.Unlock()