Skip to content

Commit

Permalink
Remove ModifyTx and ModifyRx of the reverse proxy from the e2e test
Browse files Browse the repository at this point in the history
Part of the patches to fix #17737

During the development of #17938,
we agreed that during the transition to L7 forward proxy, unused
features and features targeting L4 reverse proxy will be dropped.

This feature falls under the features targeting L4 reverse proxy.

Signed-off-by: Chun-Hung Tseng <[email protected]>
  • Loading branch information
henrybear327 committed Sep 25, 2024
1 parent 42b390f commit 2840a56
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 143 deletions.
99 changes: 42 additions & 57 deletions pkg/proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,28 +75,12 @@ type Server interface {
// LatencyRx returns current receive latency.
LatencyRx() time.Duration

// ModifyTx alters/corrupts/drops "outgoing" packets from the listener
// with the given edit function.
ModifyTx(f func(data []byte) []byte)
// UnmodifyTx removes modify operation on "forwarding".
UnmodifyTx()

// ModifyRx alters/corrupts/drops "incoming" packets to client
// with the given edit function.
ModifyRx(f func(data []byte) []byte)
// UnmodifyRx removes modify operation on "receiving".
UnmodifyRx()

// BlackholeTx drops all "outgoing" packets before "forwarding".
// "BlackholeTx" operation is a wrapper around "ModifyTx" with
// a function that returns empty bytes.
BlackholeTx()
// UnblackholeTx removes blackhole operation on "sending".
UnblackholeTx()

// BlackholeRx drops all "incoming" packets to client.
// "BlackholeRx" operation is a wrapper around "ModifyRx" with
// a function that returns empty bytes.
BlackholeRx()
// UnblackholeRx removes blackhole operation on "receiving".
UnblackholeRx()
Expand Down Expand Up @@ -140,11 +124,11 @@ type server struct {
listenerMu sync.RWMutex
listener net.Listener

modifyTxMu sync.RWMutex
modifyTx func(data []byte) []byte
shouldDropTxMu sync.RWMutex
shouldDropTx bool

modifyRxMu sync.RWMutex
modifyRx func(data []byte) []byte
shouldDropRxMu sync.RWMutex
shouldDropRx bool

pauseTxMu sync.Mutex
pauseTxc chan struct{}
Expand Down Expand Up @@ -404,20 +388,20 @@ func (s *server) ioCopy(dst io.Writer, src io.Reader, ptype proxyType) {
}
data := buf[:nr1]

// alters/corrupts/drops data
// drops data
switch ptype {
case proxyTx:
s.modifyTxMu.RLock()
if s.modifyTx != nil {
data = s.modifyTx(data)
s.shouldDropTxMu.RLock()
if s.shouldDropTx {
data = nil
}
s.modifyTxMu.RUnlock()
s.shouldDropTxMu.RUnlock()
case proxyRx:
s.modifyRxMu.RLock()
if s.modifyRx != nil {
data = s.modifyRx(data)
s.shouldDropRxMu.RLock()
if s.shouldDropRx {
data = nil
}
s.modifyRxMu.RUnlock()
s.shouldDropRxMu.RUnlock()
default:
panic("unknown proxy type")
}
Expand Down Expand Up @@ -691,55 +675,56 @@ func computeLatency(lat, rv time.Duration) time.Duration {
return lat + time.Duration(int64(sign)*mrand.Int63n(rv.Nanoseconds()))
}

func (s *server) ModifyTx(f func([]byte) []byte) {
s.modifyTxMu.Lock()
s.modifyTx = f
s.modifyTxMu.Unlock()
func (s *server) setShouldDropTx() {
s.shouldDropTxMu.Lock()
s.shouldDropTx = true
s.shouldDropTxMu.Unlock()

s.lg.Info(
"modifying tx",
"setShouldDropTx",
zap.String("from", s.From()),
zap.String("to", s.To()),
)
}

func (s *server) UnmodifyTx() {
s.modifyTxMu.Lock()
s.modifyTx = nil
s.modifyTxMu.Unlock()
func (s *server) unsetShouldDropTx() {
s.shouldDropTxMu.Lock()
s.shouldDropTx = false
s.shouldDropTxMu.Unlock()

s.lg.Info(
"unmodifyed tx",
"unsetShouldDropTx",
zap.String("from", s.From()),
zap.String("to", s.To()),
)
}

func (s *server) ModifyRx(f func([]byte) []byte) {
s.modifyRxMu.Lock()
s.modifyRx = f
s.modifyRxMu.Unlock()
func (s *server) setShouldDropRx() {
s.shouldDropRxMu.Lock()
s.shouldDropRx = true
s.shouldDropRxMu.Unlock()

s.lg.Info(
"modifying rx",
zap.String("from", s.To()),
zap.String("to", s.From()),
"setShouldDropRx",
zap.String("from", s.From()),
zap.String("to", s.To()),
)
}

func (s *server) UnmodifyRx() {
s.modifyRxMu.Lock()
s.modifyRx = nil
s.modifyRxMu.Unlock()
func (s *server) unsetShouldDropRx() {
s.shouldDropRxMu.Lock()
s.shouldDropRx = false
s.shouldDropRxMu.Unlock()

s.lg.Info(
"unmodifyed rx",
zap.String("from", s.To()),
zap.String("to", s.From()),
"unsetShouldDropRx",
zap.String("from", s.From()),
zap.String("to", s.To()),
)
}

func (s *server) BlackholeTx() {
s.ModifyTx(func([]byte) []byte { return nil })
s.setShouldDropTx()
s.lg.Info(
"blackholed tx",
zap.String("from", s.From()),
Expand All @@ -748,7 +733,7 @@ func (s *server) BlackholeTx() {
}

func (s *server) UnblackholeTx() {
s.UnmodifyTx()
s.unsetShouldDropTx()
s.lg.Info(
"unblackholed tx",
zap.String("from", s.From()),
Expand All @@ -757,7 +742,7 @@ func (s *server) UnblackholeTx() {
}

func (s *server) BlackholeRx() {
s.ModifyRx(func([]byte) []byte { return nil })
s.setShouldDropRx()
s.lg.Info(
"blackholed rx",
zap.String("from", s.To()),
Expand All @@ -766,7 +751,7 @@ func (s *server) BlackholeRx() {
}

func (s *server) UnblackholeRx() {
s.UnmodifyRx()
s.unsetShouldDropRx()
s.lg.Info(
"unblackholed rx",
zap.String("from", s.To()),
Expand Down
77 changes: 0 additions & 77 deletions pkg/proxy/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,83 +175,6 @@ func createTLSInfo(lg *zap.Logger, secure bool) transport.TLSInfo {
return transport.TLSInfo{Logger: lg}
}

func TestServer_ModifyTx_corrupt(t *testing.T) {
lg := zaptest.NewLogger(t)
scheme := "unix"
srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
defer func() {
os.RemoveAll(srcAddr)
os.RemoveAll(dstAddr)
}()
ln := listen(t, scheme, dstAddr, transport.TLSInfo{})
defer ln.Close()

p := NewServer(ServerConfig{
Logger: lg,
From: url.URL{Scheme: scheme, Host: srcAddr},
To: url.URL{Scheme: scheme, Host: dstAddr},
})

waitForServer(t, p)

defer p.Close()

p.ModifyTx(func(d []byte) []byte {
d[len(d)/2]++
return d
})
data := []byte("Hello World!")
send(t, data, scheme, srcAddr, transport.TLSInfo{})
if d := receive(t, ln); bytes.Equal(d, data) {
t.Fatalf("expected corrupted data, got %q", string(d))
}

p.UnmodifyTx()
send(t, data, scheme, srcAddr, transport.TLSInfo{})
if d := receive(t, ln); !bytes.Equal(d, data) {
t.Fatalf("expected uncorrupted data, got %q", string(d))
}
}

func TestServer_ModifyTx_packet_loss(t *testing.T) {
lg := zaptest.NewLogger(t)
scheme := "unix"
srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
defer func() {
os.RemoveAll(srcAddr)
os.RemoveAll(dstAddr)
}()
ln := listen(t, scheme, dstAddr, transport.TLSInfo{})
defer ln.Close()

p := NewServer(ServerConfig{
Logger: lg,
From: url.URL{Scheme: scheme, Host: srcAddr},
To: url.URL{Scheme: scheme, Host: dstAddr},
})

waitForServer(t, p)

defer p.Close()

// 50% packet loss
p.ModifyTx(func(d []byte) []byte {
half := len(d) / 2
return d[:half:half]
})
data := []byte("Hello World!")
send(t, data, scheme, srcAddr, transport.TLSInfo{})
if d := receive(t, ln); bytes.Equal(d, data) {
t.Fatalf("expected corrupted data, got %q", string(d))
}

p.UnmodifyTx()
send(t, data, scheme, srcAddr, transport.TLSInfo{})
if d := receive(t, ln); !bytes.Equal(d, data) {
t.Fatalf("expected uncorrupted data, got %q", string(d))
}
}

func TestServer_BlackholeTx(t *testing.T) {
lg := zaptest.NewLogger(t)
scheme := "unix"
Expand Down
19 changes: 10 additions & 9 deletions tests/robustness/failpoint/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,21 +193,22 @@ func (f dropPeerNetworkFailpoint) Inject(ctx context.Context, t *testing.T, lg *
member := clus.Procs[rand.Int()%len(clus.Procs)]
proxy := member.PeerProxy()

proxy.ModifyRx(f.modifyPacket)
proxy.ModifyTx(f.modifyPacket)
if !f.shouldDropPacket() {
return nil, nil
}

proxy.BlackholeRx()
proxy.BlackholeTx()
lg.Info("Dropping traffic from and to member", zap.String("member", member.Config().Name), zap.Int("probability", f.dropProbabilityPercent))
time.Sleep(f.duration)
lg.Info("Traffic drop removed", zap.String("member", member.Config().Name))
proxy.UnmodifyRx()
proxy.UnmodifyTx()
proxy.UnblackholeRx()
proxy.UnblackholeTx()
return nil, nil
}

func (f dropPeerNetworkFailpoint) modifyPacket(data []byte) []byte {
if rand.Intn(100) < f.dropProbabilityPercent {
return nil
}
return data
func (f dropPeerNetworkFailpoint) shouldDropPacket() bool {
return rand.Intn(100) < f.dropProbabilityPercent
}

func (f dropPeerNetworkFailpoint) Name() string {
Expand Down

0 comments on commit 2840a56

Please sign in to comment.