diff --git a/pkg/proxy/server.go b/pkg/proxy/server.go index de41276394d..cd51be3a0d3 100644 --- a/pkg/proxy/server.go +++ b/pkg/proxy/server.go @@ -75,28 +75,12 @@ type Server interface { // LatencyRx returns current receive latency. LatencyRx() time.Duration - // ModifyTx alters/corrupts/drops "outgoing" packets from the listener - // with the given edit function. - ModifyTx(f func(data []byte) []byte) - // UnmodifyTx removes modify operation on "forwarding". - UnmodifyTx() - - // ModifyRx alters/corrupts/drops "incoming" packets to client - // with the given edit function. - ModifyRx(f func(data []byte) []byte) - // UnmodifyRx removes modify operation on "receiving". - UnmodifyRx() - // BlackholeTx drops all "outgoing" packets before "forwarding". - // "BlackholeTx" operation is a wrapper around "ModifyTx" with - // a function that returns empty bytes. BlackholeTx() // UnblackholeTx removes blackhole operation on "sending". UnblackholeTx() // BlackholeRx drops all "incoming" packets to client. - // "BlackholeRx" operation is a wrapper around "ModifyRx" with - // a function that returns empty bytes. BlackholeRx() // UnblackholeRx removes blackhole operation on "receiving". UnblackholeRx() @@ -140,11 +124,11 @@ type server struct { listenerMu sync.RWMutex listener net.Listener - modifyTxMu sync.RWMutex - modifyTx func(data []byte) []byte + shouldDropTxMu sync.RWMutex + shouldDropTx bool - modifyRxMu sync.RWMutex - modifyRx func(data []byte) []byte + shouldDropRxMu sync.RWMutex + shouldDropRx bool pauseTxMu sync.Mutex pauseTxc chan struct{} @@ -404,20 +388,20 @@ func (s *server) ioCopy(dst io.Writer, src io.Reader, ptype proxyType) { } data := buf[:nr1] - // alters/corrupts/drops data + // drops data switch ptype { case proxyTx: - s.modifyTxMu.RLock() - if s.modifyTx != nil { - data = s.modifyTx(data) + s.shouldDropTxMu.RLock() + if s.shouldDropTx { + data = nil } - s.modifyTxMu.RUnlock() + s.shouldDropTxMu.RUnlock() case proxyRx: - s.modifyRxMu.RLock() - if s.modifyRx != nil { - data = s.modifyRx(data) + s.shouldDropRxMu.RLock() + if s.shouldDropRx { + data = nil } - s.modifyRxMu.RUnlock() + s.shouldDropRxMu.RUnlock() default: panic("unknown proxy type") } @@ -691,55 +675,56 @@ func computeLatency(lat, rv time.Duration) time.Duration { return lat + time.Duration(int64(sign)*mrand.Int63n(rv.Nanoseconds())) } -func (s *server) ModifyTx(f func([]byte) []byte) { - s.modifyTxMu.Lock() - s.modifyTx = f - s.modifyTxMu.Unlock() +func (s *server) setShouldDropTx() { + s.shouldDropTxMu.Lock() + s.shouldDropTx = true + s.shouldDropTxMu.Unlock() s.lg.Info( - "modifying tx", + "setShouldDropTx", zap.String("from", s.From()), zap.String("to", s.To()), ) } -func (s *server) UnmodifyTx() { - s.modifyTxMu.Lock() - s.modifyTx = nil - s.modifyTxMu.Unlock() +func (s *server) unsetShouldDropTx() { + s.shouldDropTxMu.Lock() + s.shouldDropTx = false + s.shouldDropTxMu.Unlock() s.lg.Info( - "unmodifyed tx", + "unsetShouldDropTx", zap.String("from", s.From()), zap.String("to", s.To()), ) } -func (s *server) ModifyRx(f func([]byte) []byte) { - s.modifyRxMu.Lock() - s.modifyRx = f - s.modifyRxMu.Unlock() +func (s *server) setShouldDropRx() { + s.shouldDropRxMu.Lock() + s.shouldDropRx = true + s.shouldDropRxMu.Unlock() + s.lg.Info( - "modifying rx", - zap.String("from", s.To()), - zap.String("to", s.From()), + "setShouldDropRx", + zap.String("from", s.From()), + zap.String("to", s.To()), ) } -func (s *server) UnmodifyRx() { - s.modifyRxMu.Lock() - s.modifyRx = nil - s.modifyRxMu.Unlock() +func (s *server) unsetShouldDropRx() { + s.shouldDropRxMu.Lock() + s.shouldDropRx = false + s.shouldDropRxMu.Unlock() s.lg.Info( - "unmodifyed rx", - zap.String("from", s.To()), - zap.String("to", s.From()), + "unsetShouldDropRx", + zap.String("from", s.From()), + zap.String("to", s.To()), ) } func (s *server) BlackholeTx() { - s.ModifyTx(func([]byte) []byte { return nil }) + s.setShouldDropTx() s.lg.Info( "blackholed tx", zap.String("from", s.From()), @@ -748,7 +733,7 @@ func (s *server) BlackholeTx() { } func (s *server) UnblackholeTx() { - s.UnmodifyTx() + s.unsetShouldDropTx() s.lg.Info( "unblackholed tx", zap.String("from", s.From()), @@ -757,7 +742,7 @@ func (s *server) UnblackholeTx() { } func (s *server) BlackholeRx() { - s.ModifyRx(func([]byte) []byte { return nil }) + s.setShouldDropRx() s.lg.Info( "blackholed rx", zap.String("from", s.To()), @@ -766,7 +751,7 @@ func (s *server) BlackholeRx() { } func (s *server) UnblackholeRx() { - s.UnmodifyRx() + s.unsetShouldDropRx() s.lg.Info( "unblackholed rx", zap.String("from", s.To()), diff --git a/pkg/proxy/server_test.go b/pkg/proxy/server_test.go index baabfebe488..124d3095c9d 100644 --- a/pkg/proxy/server_test.go +++ b/pkg/proxy/server_test.go @@ -175,83 +175,6 @@ func createTLSInfo(lg *zap.Logger, secure bool) transport.TLSInfo { return transport.TLSInfo{Logger: lg} } -func TestServer_ModifyTx_corrupt(t *testing.T) { - lg := zaptest.NewLogger(t) - scheme := "unix" - srcAddr, dstAddr := newUnixAddr(), newUnixAddr() - defer func() { - os.RemoveAll(srcAddr) - os.RemoveAll(dstAddr) - }() - ln := listen(t, scheme, dstAddr, transport.TLSInfo{}) - defer ln.Close() - - p := NewServer(ServerConfig{ - Logger: lg, - From: url.URL{Scheme: scheme, Host: srcAddr}, - To: url.URL{Scheme: scheme, Host: dstAddr}, - }) - - waitForServer(t, p) - - defer p.Close() - - p.ModifyTx(func(d []byte) []byte { - d[len(d)/2]++ - return d - }) - data := []byte("Hello World!") - send(t, data, scheme, srcAddr, transport.TLSInfo{}) - if d := receive(t, ln); bytes.Equal(d, data) { - t.Fatalf("expected corrupted data, got %q", string(d)) - } - - p.UnmodifyTx() - send(t, data, scheme, srcAddr, transport.TLSInfo{}) - if d := receive(t, ln); !bytes.Equal(d, data) { - t.Fatalf("expected uncorrupted data, got %q", string(d)) - } -} - -func TestServer_ModifyTx_packet_loss(t *testing.T) { - lg := zaptest.NewLogger(t) - scheme := "unix" - srcAddr, dstAddr := newUnixAddr(), newUnixAddr() - defer func() { - os.RemoveAll(srcAddr) - os.RemoveAll(dstAddr) - }() - ln := listen(t, scheme, dstAddr, transport.TLSInfo{}) - defer ln.Close() - - p := NewServer(ServerConfig{ - Logger: lg, - From: url.URL{Scheme: scheme, Host: srcAddr}, - To: url.URL{Scheme: scheme, Host: dstAddr}, - }) - - waitForServer(t, p) - - defer p.Close() - - // 50% packet loss - p.ModifyTx(func(d []byte) []byte { - half := len(d) / 2 - return d[:half:half] - }) - data := []byte("Hello World!") - send(t, data, scheme, srcAddr, transport.TLSInfo{}) - if d := receive(t, ln); bytes.Equal(d, data) { - t.Fatalf("expected corrupted data, got %q", string(d)) - } - - p.UnmodifyTx() - send(t, data, scheme, srcAddr, transport.TLSInfo{}) - if d := receive(t, ln); !bytes.Equal(d, data) { - t.Fatalf("expected uncorrupted data, got %q", string(d)) - } -} - func TestServer_BlackholeTx(t *testing.T) { lg := zaptest.NewLogger(t) scheme := "unix" diff --git a/tests/robustness/failpoint/network.go b/tests/robustness/failpoint/network.go index 27504c396b9..ab7dbfa8af2 100644 --- a/tests/robustness/failpoint/network.go +++ b/tests/robustness/failpoint/network.go @@ -193,21 +193,22 @@ func (f dropPeerNetworkFailpoint) Inject(ctx context.Context, t *testing.T, lg * member := clus.Procs[rand.Int()%len(clus.Procs)] proxy := member.PeerProxy() - proxy.ModifyRx(f.modifyPacket) - proxy.ModifyTx(f.modifyPacket) + if !f.shouldDropPacket() { + return nil, nil + } + + proxy.BlackholeRx() + proxy.BlackholeTx() lg.Info("Dropping traffic from and to member", zap.String("member", member.Config().Name), zap.Int("probability", f.dropProbabilityPercent)) time.Sleep(f.duration) lg.Info("Traffic drop removed", zap.String("member", member.Config().Name)) - proxy.UnmodifyRx() - proxy.UnmodifyTx() + proxy.UnblackholeRx() + proxy.UnblackholeTx() return nil, nil } -func (f dropPeerNetworkFailpoint) modifyPacket(data []byte) []byte { - if rand.Intn(100) < f.dropProbabilityPercent { - return nil - } - return data +func (f dropPeerNetworkFailpoint) shouldDropPacket() bool { + return rand.Intn(100) < f.dropProbabilityPercent } func (f dropPeerNetworkFailpoint) Name() string {