From 2840a56423ab404f4e59bb6f3e491d162e34885a Mon Sep 17 00:00:00 2001 From: Chun-Hung Tseng Date: Wed, 25 Sep 2024 23:38:23 +0200 Subject: [PATCH] Remove ModifyTx and ModifyRx of the reverse proxy from the e2e test Part of the patches to fix https://github.com/etcd-io/etcd/issues/17737 During the development of https://github.com/etcd-io/etcd/pull/17938, we agreed that during the transition to L7 forward proxy, unused features and features targeting L4 reverse proxy will be dropped. This feature falls under the features targeting L4 reverse proxy. Signed-off-by: Chun-Hung Tseng --- pkg/proxy/server.go | 99 ++++++++++++--------------- pkg/proxy/server_test.go | 77 --------------------- tests/robustness/failpoint/network.go | 19 ++--- 3 files changed, 52 insertions(+), 143 deletions(-) diff --git a/pkg/proxy/server.go b/pkg/proxy/server.go index de41276394d..cd51be3a0d3 100644 --- a/pkg/proxy/server.go +++ b/pkg/proxy/server.go @@ -75,28 +75,12 @@ type Server interface { // LatencyRx returns current receive latency. LatencyRx() time.Duration - // ModifyTx alters/corrupts/drops "outgoing" packets from the listener - // with the given edit function. - ModifyTx(f func(data []byte) []byte) - // UnmodifyTx removes modify operation on "forwarding". - UnmodifyTx() - - // ModifyRx alters/corrupts/drops "incoming" packets to client - // with the given edit function. - ModifyRx(f func(data []byte) []byte) - // UnmodifyRx removes modify operation on "receiving". - UnmodifyRx() - // BlackholeTx drops all "outgoing" packets before "forwarding". - // "BlackholeTx" operation is a wrapper around "ModifyTx" with - // a function that returns empty bytes. BlackholeTx() // UnblackholeTx removes blackhole operation on "sending". UnblackholeTx() // BlackholeRx drops all "incoming" packets to client. - // "BlackholeRx" operation is a wrapper around "ModifyRx" with - // a function that returns empty bytes. BlackholeRx() // UnblackholeRx removes blackhole operation on "receiving". UnblackholeRx() @@ -140,11 +124,11 @@ type server struct { listenerMu sync.RWMutex listener net.Listener - modifyTxMu sync.RWMutex - modifyTx func(data []byte) []byte + shouldDropTxMu sync.RWMutex + shouldDropTx bool - modifyRxMu sync.RWMutex - modifyRx func(data []byte) []byte + shouldDropRxMu sync.RWMutex + shouldDropRx bool pauseTxMu sync.Mutex pauseTxc chan struct{} @@ -404,20 +388,20 @@ func (s *server) ioCopy(dst io.Writer, src io.Reader, ptype proxyType) { } data := buf[:nr1] - // alters/corrupts/drops data + // drops data switch ptype { case proxyTx: - s.modifyTxMu.RLock() - if s.modifyTx != nil { - data = s.modifyTx(data) + s.shouldDropTxMu.RLock() + if s.shouldDropTx { + data = nil } - s.modifyTxMu.RUnlock() + s.shouldDropTxMu.RUnlock() case proxyRx: - s.modifyRxMu.RLock() - if s.modifyRx != nil { - data = s.modifyRx(data) + s.shouldDropRxMu.RLock() + if s.shouldDropRx { + data = nil } - s.modifyRxMu.RUnlock() + s.shouldDropRxMu.RUnlock() default: panic("unknown proxy type") } @@ -691,55 +675,56 @@ func computeLatency(lat, rv time.Duration) time.Duration { return lat + time.Duration(int64(sign)*mrand.Int63n(rv.Nanoseconds())) } -func (s *server) ModifyTx(f func([]byte) []byte) { - s.modifyTxMu.Lock() - s.modifyTx = f - s.modifyTxMu.Unlock() +func (s *server) setShouldDropTx() { + s.shouldDropTxMu.Lock() + s.shouldDropTx = true + s.shouldDropTxMu.Unlock() s.lg.Info( - "modifying tx", + "setShouldDropTx", zap.String("from", s.From()), zap.String("to", s.To()), ) } -func (s *server) UnmodifyTx() { - s.modifyTxMu.Lock() - s.modifyTx = nil - s.modifyTxMu.Unlock() +func (s *server) unsetShouldDropTx() { + s.shouldDropTxMu.Lock() + s.shouldDropTx = false + s.shouldDropTxMu.Unlock() s.lg.Info( - "unmodifyed tx", + "unsetShouldDropTx", zap.String("from", s.From()), zap.String("to", s.To()), ) } -func (s *server) ModifyRx(f func([]byte) []byte) { - s.modifyRxMu.Lock() - s.modifyRx = f - s.modifyRxMu.Unlock() +func (s *server) setShouldDropRx() { + s.shouldDropRxMu.Lock() + s.shouldDropRx = true + s.shouldDropRxMu.Unlock() + s.lg.Info( - "modifying rx", - zap.String("from", s.To()), - zap.String("to", s.From()), + "setShouldDropRx", + zap.String("from", s.From()), + zap.String("to", s.To()), ) } -func (s *server) UnmodifyRx() { - s.modifyRxMu.Lock() - s.modifyRx = nil - s.modifyRxMu.Unlock() +func (s *server) unsetShouldDropRx() { + s.shouldDropRxMu.Lock() + s.shouldDropRx = false + s.shouldDropRxMu.Unlock() s.lg.Info( - "unmodifyed rx", - zap.String("from", s.To()), - zap.String("to", s.From()), + "unsetShouldDropRx", + zap.String("from", s.From()), + zap.String("to", s.To()), ) } func (s *server) BlackholeTx() { - s.ModifyTx(func([]byte) []byte { return nil }) + s.setShouldDropTx() s.lg.Info( "blackholed tx", zap.String("from", s.From()), @@ -748,7 +733,7 @@ func (s *server) BlackholeTx() { } func (s *server) UnblackholeTx() { - s.UnmodifyTx() + s.unsetShouldDropTx() s.lg.Info( "unblackholed tx", zap.String("from", s.From()), @@ -757,7 +742,7 @@ func (s *server) UnblackholeTx() { } func (s *server) BlackholeRx() { - s.ModifyRx(func([]byte) []byte { return nil }) + s.setShouldDropRx() s.lg.Info( "blackholed rx", zap.String("from", s.To()), @@ -766,7 +751,7 @@ func (s *server) BlackholeRx() { } func (s *server) UnblackholeRx() { - s.UnmodifyRx() + s.unsetShouldDropRx() s.lg.Info( "unblackholed rx", zap.String("from", s.To()), diff --git a/pkg/proxy/server_test.go b/pkg/proxy/server_test.go index baabfebe488..124d3095c9d 100644 --- a/pkg/proxy/server_test.go +++ b/pkg/proxy/server_test.go @@ -175,83 +175,6 @@ func createTLSInfo(lg *zap.Logger, secure bool) transport.TLSInfo { return transport.TLSInfo{Logger: lg} } -func TestServer_ModifyTx_corrupt(t *testing.T) { - lg := zaptest.NewLogger(t) - scheme := "unix" - srcAddr, dstAddr := newUnixAddr(), newUnixAddr() - defer func() { - os.RemoveAll(srcAddr) - os.RemoveAll(dstAddr) - }() - ln := listen(t, scheme, dstAddr, transport.TLSInfo{}) - defer ln.Close() - - p := NewServer(ServerConfig{ - Logger: lg, - From: url.URL{Scheme: scheme, Host: srcAddr}, - To: url.URL{Scheme: scheme, Host: dstAddr}, - }) - - waitForServer(t, p) - - defer p.Close() - - p.ModifyTx(func(d []byte) []byte { - d[len(d)/2]++ - return d - }) - data := []byte("Hello World!") - send(t, data, scheme, srcAddr, transport.TLSInfo{}) - if d := receive(t, ln); bytes.Equal(d, data) { - t.Fatalf("expected corrupted data, got %q", string(d)) - } - - p.UnmodifyTx() - send(t, data, scheme, srcAddr, transport.TLSInfo{}) - if d := receive(t, ln); !bytes.Equal(d, data) { - t.Fatalf("expected uncorrupted data, got %q", string(d)) - } -} - -func TestServer_ModifyTx_packet_loss(t *testing.T) { - lg := zaptest.NewLogger(t) - scheme := "unix" - srcAddr, dstAddr := newUnixAddr(), newUnixAddr() - defer func() { - os.RemoveAll(srcAddr) - os.RemoveAll(dstAddr) - }() - ln := listen(t, scheme, dstAddr, transport.TLSInfo{}) - defer ln.Close() - - p := NewServer(ServerConfig{ - Logger: lg, - From: url.URL{Scheme: scheme, Host: srcAddr}, - To: url.URL{Scheme: scheme, Host: dstAddr}, - }) - - waitForServer(t, p) - - defer p.Close() - - // 50% packet loss - p.ModifyTx(func(d []byte) []byte { - half := len(d) / 2 - return d[:half:half] - }) - data := []byte("Hello World!") - send(t, data, scheme, srcAddr, transport.TLSInfo{}) - if d := receive(t, ln); bytes.Equal(d, data) { - t.Fatalf("expected corrupted data, got %q", string(d)) - } - - p.UnmodifyTx() - send(t, data, scheme, srcAddr, transport.TLSInfo{}) - if d := receive(t, ln); !bytes.Equal(d, data) { - t.Fatalf("expected uncorrupted data, got %q", string(d)) - } -} - func TestServer_BlackholeTx(t *testing.T) { lg := zaptest.NewLogger(t) scheme := "unix" diff --git a/tests/robustness/failpoint/network.go b/tests/robustness/failpoint/network.go index 27504c396b9..ab7dbfa8af2 100644 --- a/tests/robustness/failpoint/network.go +++ b/tests/robustness/failpoint/network.go @@ -193,21 +193,22 @@ func (f dropPeerNetworkFailpoint) Inject(ctx context.Context, t *testing.T, lg * member := clus.Procs[rand.Int()%len(clus.Procs)] proxy := member.PeerProxy() - proxy.ModifyRx(f.modifyPacket) - proxy.ModifyTx(f.modifyPacket) + if !f.shouldDropPacket() { + return nil, nil + } + + proxy.BlackholeRx() + proxy.BlackholeTx() lg.Info("Dropping traffic from and to member", zap.String("member", member.Config().Name), zap.Int("probability", f.dropProbabilityPercent)) time.Sleep(f.duration) lg.Info("Traffic drop removed", zap.String("member", member.Config().Name)) - proxy.UnmodifyRx() - proxy.UnmodifyTx() + proxy.UnblackholeRx() + proxy.UnblackholeTx() return nil, nil } -func (f dropPeerNetworkFailpoint) modifyPacket(data []byte) []byte { - if rand.Intn(100) < f.dropProbabilityPercent { - return nil - } - return data +func (f dropPeerNetworkFailpoint) shouldDropPacket() bool { + return rand.Intn(100) < f.dropProbabilityPercent } func (f dropPeerNetworkFailpoint) Name() string {