Skip to content

Commit

Permalink
Remove packet modification
Browse files Browse the repository at this point in the history
Signed-off-by: Chun-Hung Tseng <[email protected]>
  • Loading branch information
henrybear327 committed Sep 25, 2024
1 parent 5eb187b commit 0ba5c3e
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 72 deletions.
73 changes: 13 additions & 60 deletions pkg/proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,28 +93,12 @@ type Server interface {
// LatencyRx returns current receive latency.
LatencyRx() time.Duration

// ModifyTx alters/corrupts/drops "outgoing" packets from the listener
// with the given edit function.
ModifyTx(f func(data []byte) []byte)
// UnmodifyTx removes modify operation on "forwarding".
UnmodifyTx()

// ModifyRx alters/corrupts/drops "incoming" packets to client
// with the given edit function.
ModifyRx(f func(data []byte) []byte)
// UnmodifyRx removes modify operation on "receiving".
UnmodifyRx()

// BlackholeTx drops all "outgoing" packets before "forwarding".
// "BlackholeTx" operation is a wrapper around "ModifyTx" with
// a function that returns empty bytes.
BlackholeTx()
// UnblackholeTx removes blackhole operation on "sending".
UnblackholeTx()

// BlackholeRx drops all "incoming" packets to client.
// "BlackholeRx" operation is a wrapper around "ModifyRx" with
// a function that returns empty bytes.
BlackholeRx()
// UnblackholeRx removes blackhole operation on "receiving".
UnblackholeRx()
Expand Down Expand Up @@ -855,75 +839,44 @@ func computeLatency(lat, rv time.Duration) time.Duration {
return lat + time.Duration(int64(sign)*mrand.Int63n(rv.Nanoseconds()))
}

func (s *server) ModifyTx(f func([]byte) []byte) {
func (s *server) BlackholeTx() {
s.modifyTxMu.Lock()
s.modifyTx = f
s.modifyTx = func([]byte) []byte { return nil }
s.modifyTxMu.Unlock()

s.lg.Info(
"modifying tx",
zap.String("proxy listen on", s.Listen()),
"blackholed tx",
zap.String("proxy listening on", s.Listen()),
)
}

func (s *server) UnmodifyTx() {
func (s *server) UnblackholeTx() {
s.modifyTxMu.Lock()
s.modifyTx = nil
s.modifyTxMu.Unlock()

s.lg.Info(
"unmodifyed tx",
zap.String("proxy listen on", s.Listen()),
)
}

func (s *server) ModifyRx(f func([]byte) []byte) {
s.modifyRxMu.Lock()
s.modifyRx = f
s.modifyRxMu.Unlock()
s.lg.Info(
"modifying rx",
zap.String("proxy listen on", s.Listen()),
)
}

func (s *server) UnmodifyRx() {
s.modifyRxMu.Lock()
s.modifyRx = nil
s.modifyRxMu.Unlock()

s.lg.Info(
"unmodifyed rx",
zap.String("proxy listen on", s.Listen()),
)
}

func (s *server) BlackholeTx() {
s.ModifyTx(func([]byte) []byte { return nil })
s.lg.Info(
"blackholed tx",
zap.String("proxy listening on", s.Listen()),
)
}

func (s *server) UnblackholeTx() {
s.UnmodifyTx()
s.lg.Info(
"unblackholed tx",
zap.String("proxy listening on", s.Listen()),
)
}

func (s *server) BlackholeRx() {
s.ModifyRx(func([]byte) []byte { return nil })
s.modifyRxMu.Lock()
s.modifyRx = func([]byte) []byte { return nil }
s.modifyRxMu.Unlock()

s.lg.Info(
"blackholed rx",
zap.String("proxy listening on", s.Listen()),
)
}

func (s *server) UnblackholeRx() {
s.UnmodifyRx()
s.modifyRxMu.Lock()
s.modifyRx = nil
s.modifyRxMu.Unlock()

s.lg.Info(
"unblackholed rx",
zap.String("proxy listening on", s.Listen()),
Expand Down
23 changes: 11 additions & 12 deletions tests/robustness/failpoint/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,21 +187,20 @@ func (f dropPeerNetworkFailpoint) Inject(ctx context.Context, t *testing.T, lg *
member := clus.Procs[rand.Int()%len(clus.Procs)]
forwardProxy := member.PeerForwardProxy()

forwardProxy.ModifyRx(f.modifyPacket)
forwardProxy.ModifyTx(f.modifyPacket)
lg.Info("Dropping traffic from and to member", zap.String("member", member.Config().Name), zap.Int("probability", f.dropProbabilityPercent))
time.Sleep(f.duration)
lg.Info("Traffic drop removed", zap.String("member", member.Config().Name))
forwardProxy.UnmodifyRx()
forwardProxy.UnmodifyTx()
if f.shouldDropPacket() {
forwardProxy.BlackholeRx()
forwardProxy.BlackholeTx()
lg.Info("Dropping traffic from and to member", zap.String("member", member.Config().Name), zap.Int("probability", f.dropProbabilityPercent))
time.Sleep(f.duration)
lg.Info("Traffic drop removed", zap.String("member", member.Config().Name))
forwardProxy.UnblackholeRx()
forwardProxy.UnblackholeTx()
}
return nil, nil
}

func (f dropPeerNetworkFailpoint) modifyPacket(data []byte) []byte {
if rand.Intn(100) < f.dropProbabilityPercent {
return nil
}
return data
func (f dropPeerNetworkFailpoint) shouldDropPacket() bool {
return rand.Intn(100) < f.dropProbabilityPercent
}

func (f dropPeerNetworkFailpoint) Name() string {
Expand Down

0 comments on commit 0ba5c3e

Please sign in to comment.