Skip to content

Commit

Permalink
Remove PauseTx and PauseRx of the reverse proxy from the e2e test
Browse files Browse the repository at this point in the history
Part of the patches to fix #17737

During the development of #17938,
we agreed that during the transition to L7 forward proxy, unused
features and features targeting L4 reverse proxy will be dropped.

This feature falls under the unused feature.

Signed-off-by: Chun-Hung Tseng <[email protected]>
  • Loading branch information
henrybear327 committed Sep 25, 2024
1 parent 2840a56 commit ea9179a
Showing 1 changed file with 0 additions and 95 deletions.
95 changes: 0 additions & 95 deletions pkg/proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,6 @@ type server struct {
shouldDropRxMu sync.RWMutex
shouldDropRx bool

pauseTxMu sync.Mutex
pauseTxc chan struct{}

pauseRxMu sync.Mutex
pauseRxc chan struct{}

latencyTxMu sync.RWMutex
latencyTx time.Duration

Expand All @@ -161,9 +155,6 @@ func NewServer(cfg ServerConfig) Server {
readyc: make(chan struct{}),
donec: make(chan struct{}),
errc: make(chan error, 16),

pauseTxc: make(chan struct{}),
pauseRxc: make(chan struct{}),
}

_, fromPort, err := net.SplitHostPort(cfg.From.Host)
Expand All @@ -186,9 +177,6 @@ func NewServer(cfg ServerConfig) Server {
s.retryInterval = defaultRetryInterval
}

close(s.pauseTxc)
close(s.pauseRxc)

if strings.HasPrefix(s.from.Scheme, "http") {
s.from.Scheme = "tcp"
}
Expand Down Expand Up @@ -427,27 +415,6 @@ func (s *server) ioCopy(dst io.Writer, src io.Reader, ptype proxyType) {
panic("unknown proxy type")
}

// pause before packet dropping, blocking, and forwarding
var pausec chan struct{}
switch ptype {
case proxyTx:
s.pauseTxMu.Lock()
pausec = s.pauseTxc
s.pauseTxMu.Unlock()
case proxyRx:
s.pauseRxMu.Lock()
pausec = s.pauseRxc
s.pauseRxMu.Unlock()
default:
panic("unknown proxy type")
}
select {
case <-pausec:
case <-s.donec:
return
}

// pause first, and then drop packets
if nr2 == 0 {
continue
}
Expand Down Expand Up @@ -759,68 +726,6 @@ func (s *server) UnblackholeRx() {
)
}

func (s *server) PauseTx() {
s.pauseTxMu.Lock()
s.pauseTxc = make(chan struct{})
s.pauseTxMu.Unlock()

s.lg.Info(
"paused tx",
zap.String("from", s.From()),
zap.String("to", s.To()),
)
}

func (s *server) UnpauseTx() {
s.pauseTxMu.Lock()
select {
case <-s.pauseTxc: // already unpaused
case <-s.donec:
s.pauseTxMu.Unlock()
return
default:
close(s.pauseTxc)
}
s.pauseTxMu.Unlock()

s.lg.Info(
"unpaused tx",
zap.String("from", s.From()),
zap.String("to", s.To()),
)
}

func (s *server) PauseRx() {
s.pauseRxMu.Lock()
s.pauseRxc = make(chan struct{})
s.pauseRxMu.Unlock()

s.lg.Info(
"paused rx",
zap.String("from", s.To()),
zap.String("to", s.From()),
)
}

func (s *server) UnpauseRx() {
s.pauseRxMu.Lock()
select {
case <-s.pauseRxc: // already unpaused
case <-s.donec:
s.pauseRxMu.Unlock()
return
default:
close(s.pauseRxc)
}
s.pauseRxMu.Unlock()

s.lg.Info(
"unpaused rx",
zap.String("from", s.To()),
zap.String("to", s.From()),
)
}

func (s *server) ResetListener() error {
s.listenerMu.Lock()
defer s.listenerMu.Unlock()
Expand Down

0 comments on commit ea9179a

Please sign in to comment.