Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove DelayAccept of the reverse proxy from the e2e test #18639

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 2 additions & 102 deletions pkg/proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,6 @@ type Server interface {
// Close closes listener and transport.
Close() error

// PauseAccept stops accepting new connections.
PauseAccept()
// UnpauseAccept removes pause operation on accepting new connections.
UnpauseAccept()

// DelayAccept adds latency ± random variable to accepting
// new incoming connections.
DelayAccept(latency, rv time.Duration)
// UndelayAccept removes sending latencies.
UndelayAccept()
// LatencyAccept returns current latency on accepting
// new incoming connections.
LatencyAccept() time.Duration

// DelayTx adds latency ± random variable for "outgoing" traffic
Expand Down Expand Up @@ -115,16 +103,6 @@ type Server interface {
// UnblackholeRx removes blackhole operation on "receiving".
UnblackholeRx()

// PauseTx stops "forwarding" packets; "outgoing" traffic blocks.
PauseTx()
// UnpauseTx removes "forwarding" pause operation.
UnpauseTx()

// PauseRx stops "receiving" packets; "incoming" traffic blocks.
PauseRx()
// UnpauseRx removes "receiving" pause operation.
UnpauseRx()

// ResetListener closes and restarts listener.
ResetListener() error
}
Expand Down Expand Up @@ -164,9 +142,6 @@ type server struct {
listenerMu sync.RWMutex
listener net.Listener

pauseAcceptMu sync.Mutex
pauseAcceptc chan struct{}

latencyAcceptMu sync.RWMutex
latencyAccept time.Duration

Expand Down Expand Up @@ -208,9 +183,8 @@ func NewServer(cfg ServerConfig) Server {
donec: make(chan struct{}),
errc: make(chan error, 16),

pauseAcceptc: make(chan struct{}),
pauseTxc: make(chan struct{}),
pauseRxc: make(chan struct{}),
pauseTxc: make(chan struct{}),
pauseRxc: make(chan struct{}),
}

_, fromPort, err := net.SplitHostPort(cfg.From.Host)
Expand All @@ -233,7 +207,6 @@ func NewServer(cfg ServerConfig) Server {
s.retryInterval = defaultRetryInterval
}

close(s.pauseAcceptc)
close(s.pauseTxc)
close(s.pauseRxc)

Expand Down Expand Up @@ -290,15 +263,6 @@ func (s *server) listenAndServe() {
close(s.readyc)

for {
s.pauseAcceptMu.Lock()
pausec := s.pauseAcceptc
s.pauseAcceptMu.Unlock()
select {
case <-pausec:
case <-s.donec:
return
}

s.latencyAcceptMu.RLock()
lat := s.latencyAccept
s.latencyAcceptMu.RUnlock()
Expand Down Expand Up @@ -645,70 +609,6 @@ func (s *server) Close() (err error) {
return err
}

func (s *server) PauseAccept() {
s.pauseAcceptMu.Lock()
s.pauseAcceptc = make(chan struct{})
s.pauseAcceptMu.Unlock()

s.lg.Info(
"paused accept",
zap.String("from", s.From()),
zap.String("to", s.To()),
)
}

func (s *server) UnpauseAccept() {
s.pauseAcceptMu.Lock()
select {
case <-s.pauseAcceptc: // already unpaused
case <-s.donec:
s.pauseAcceptMu.Unlock()
return
default:
close(s.pauseAcceptc)
}
s.pauseAcceptMu.Unlock()

s.lg.Info(
"unpaused accept",
zap.String("from", s.From()),
zap.String("to", s.To()),
)
}

func (s *server) DelayAccept(latency, rv time.Duration) {
if latency <= 0 {
return
}
d := computeLatency(latency, rv)
s.latencyAcceptMu.Lock()
s.latencyAccept = d
s.latencyAcceptMu.Unlock()

s.lg.Info(
"set accept latency",
zap.Duration("latency", d),
zap.Duration("given-latency", latency),
zap.Duration("given-latency-random-variable", rv),
zap.String("from", s.From()),
zap.String("to", s.To()),
)
}

func (s *server) UndelayAccept() {
s.latencyAcceptMu.Lock()
d := s.latencyAccept
s.latencyAccept = 0
s.latencyAcceptMu.Unlock()

s.lg.Info(
"removed accept latency",
zap.Duration("latency", d),
zap.String("from", s.From()),
zap.String("to", s.To()),
)
}

func (s *server) LatencyAccept() time.Duration {
s.latencyAcceptMu.RLock()
d := s.latencyAccept
Expand Down
108 changes: 0 additions & 108 deletions pkg/proxy/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,114 +175,6 @@ func createTLSInfo(lg *zap.Logger, secure bool) transport.TLSInfo {
return transport.TLSInfo{Logger: lg}
}

func TestServer_Unix_Insecure_DelayAccept(t *testing.T) { testServerDelayAccept(t, false) }
func TestServer_Unix_Secure_DelayAccept(t *testing.T) { testServerDelayAccept(t, true) }
func testServerDelayAccept(t *testing.T, secure bool) {
lg := zaptest.NewLogger(t)
srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
defer func() {
os.RemoveAll(srcAddr)
os.RemoveAll(dstAddr)
}()
tlsInfo := createTLSInfo(lg, secure)
scheme := "unix"
ln := listen(t, scheme, dstAddr, tlsInfo)
defer ln.Close()

cfg := ServerConfig{
Logger: lg,
From: url.URL{Scheme: scheme, Host: srcAddr},
To: url.URL{Scheme: scheme, Host: dstAddr},
}
if secure {
cfg.TLSInfo = tlsInfo
}
p := NewServer(cfg)

waitForServer(t, p)

defer p.Close()

data := []byte("Hello World!")

now := time.Now()
send(t, data, scheme, srcAddr, tlsInfo)
if d := receive(t, ln); !bytes.Equal(data, d) {
t.Fatalf("expected %q, got %q", string(data), string(d))
}
took1 := time.Since(now)
t.Logf("took %v with no latency", took1)

lat, rv := 700*time.Millisecond, 10*time.Millisecond
p.DelayAccept(lat, rv)
defer p.UndelayAccept()
if err := p.ResetListener(); err != nil {
t.Fatal(err)
}
time.Sleep(200 * time.Millisecond)

now = time.Now()
send(t, data, scheme, srcAddr, tlsInfo)
if d := receive(t, ln); !bytes.Equal(data, d) {
t.Fatalf("expected %q, got %q", string(data), string(d))
}
took2 := time.Since(now)
t.Logf("took %v with latency %v±%v", took2, lat, rv)

if took1 >= took2 {
t.Fatalf("expected took1 %v < took2 %v", took1, took2)
}
}

func TestServer_PauseTx(t *testing.T) {
lg := zaptest.NewLogger(t)
scheme := "unix"
srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
defer func() {
os.RemoveAll(srcAddr)
os.RemoveAll(dstAddr)
}()
ln := listen(t, scheme, dstAddr, transport.TLSInfo{})
defer ln.Close()

p := NewServer(ServerConfig{
Logger: lg,
From: url.URL{Scheme: scheme, Host: srcAddr},
To: url.URL{Scheme: scheme, Host: dstAddr},
})

waitForServer(t, p)

defer p.Close()

p.PauseTx()

data := []byte("Hello World!")
send(t, data, scheme, srcAddr, transport.TLSInfo{})

recvc := make(chan []byte, 1)
go func() {
recvc <- receive(t, ln)
}()

select {
case d := <-recvc:
t.Fatalf("received unexpected data %q during pause", string(d))
case <-time.After(200 * time.Millisecond):
}

p.UnpauseTx()

select {
case d := <-recvc:
if !bytes.Equal(data, d) {
t.Fatalf("expected %q, got %q", string(data), string(d))
}
case <-time.After(2 * time.Second):
t.Fatal("took too long to receive after unpause")
}
}

func TestServer_ModifyTx_corrupt(t *testing.T) {
lg := zaptest.NewLogger(t)
scheme := "unix"
Expand Down
Loading