diff --git a/pkg/proxy/server.go b/pkg/proxy/server.go index cd51be3a0d3e..d91b8b3be0a8 100644 --- a/pkg/proxy/server.go +++ b/pkg/proxy/server.go @@ -130,12 +130,6 @@ type server struct { shouldDropRxMu sync.RWMutex shouldDropRx bool - pauseTxMu sync.Mutex - pauseTxc chan struct{} - - pauseRxMu sync.Mutex - pauseRxc chan struct{} - latencyTxMu sync.RWMutex latencyTx time.Duration @@ -161,9 +155,6 @@ func NewServer(cfg ServerConfig) Server { readyc: make(chan struct{}), donec: make(chan struct{}), errc: make(chan error, 16), - - pauseTxc: make(chan struct{}), - pauseRxc: make(chan struct{}), } _, fromPort, err := net.SplitHostPort(cfg.From.Host) @@ -186,9 +177,6 @@ func NewServer(cfg ServerConfig) Server { s.retryInterval = defaultRetryInterval } - close(s.pauseTxc) - close(s.pauseRxc) - if strings.HasPrefix(s.from.Scheme, "http") { s.from.Scheme = "tcp" } @@ -427,27 +415,6 @@ func (s *server) ioCopy(dst io.Writer, src io.Reader, ptype proxyType) { panic("unknown proxy type") } - // pause before packet dropping, blocking, and forwarding - var pausec chan struct{} - switch ptype { - case proxyTx: - s.pauseTxMu.Lock() - pausec = s.pauseTxc - s.pauseTxMu.Unlock() - case proxyRx: - s.pauseRxMu.Lock() - pausec = s.pauseRxc - s.pauseRxMu.Unlock() - default: - panic("unknown proxy type") - } - select { - case <-pausec: - case <-s.donec: - return - } - - // pause first, and then drop packets if nr2 == 0 { continue } @@ -759,68 +726,6 @@ func (s *server) UnblackholeRx() { ) } -func (s *server) PauseTx() { - s.pauseTxMu.Lock() - s.pauseTxc = make(chan struct{}) - s.pauseTxMu.Unlock() - - s.lg.Info( - "paused tx", - zap.String("from", s.From()), - zap.String("to", s.To()), - ) -} - -func (s *server) UnpauseTx() { - s.pauseTxMu.Lock() - select { - case <-s.pauseTxc: // already unpaused - case <-s.donec: - s.pauseTxMu.Unlock() - return - default: - close(s.pauseTxc) - } - s.pauseTxMu.Unlock() - - s.lg.Info( - "unpaused tx", - zap.String("from", s.From()), - zap.String("to", s.To()), - ) -} - -func (s *server) PauseRx() { - s.pauseRxMu.Lock() - s.pauseRxc = make(chan struct{}) - s.pauseRxMu.Unlock() - - s.lg.Info( - "paused rx", - zap.String("from", s.To()), - zap.String("to", s.From()), - ) -} - -func (s *server) UnpauseRx() { - s.pauseRxMu.Lock() - select { - case <-s.pauseRxc: // already unpaused - case <-s.donec: - s.pauseRxMu.Unlock() - return - default: - close(s.pauseRxc) - } - s.pauseRxMu.Unlock() - - s.lg.Info( - "unpaused rx", - zap.String("from", s.To()), - zap.String("to", s.From()), - ) -} - func (s *server) ResetListener() error { s.listenerMu.Lock() defer s.listenerMu.Unlock()