Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docker_client: don't pull image and create container per tc command #251

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 20 additions & 6 deletions pkg/container/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,21 +570,22 @@ func TestNetemContainerDportFilter_Success(t *testing.T) {
engineClient.AssertExpectations(t)
}

func Test_tcContainerCommand(t *testing.T) {
func Test_tcContainerCommands(t *testing.T) {
c := &Container{
ContainerInfo: DetailsResponse(AsMap("ID", "targetID")),
}

config := container.Config{
Labels: map[string]string{"com.gaiaadm.pumba.skip": "true"},
Entrypoint: []string{"tc"},
Cmd: []string{"test", "me"},
Image: "pumba/tcimage",
// Used as long-running entry-point to keep container alive between commands
Cmd: []string{"monitor"},
Image: "pumba/tcimage",
}
// host config
hconfig := container.HostConfig{
// auto remove container on tc command exit
AutoRemove: true,
// Don't auto-remove, since we may want to run multiple commands
AutoRemove: false,
// NET_ADMIN is required for "tc netem"
CapAdd: []string{"NET_ADMIN"},
// use target container network stack
Expand Down Expand Up @@ -621,8 +622,21 @@ func Test_tcContainerCommand(t *testing.T) {
// start container
engineClient.On("ContainerStart", ctx, "tcID", types.ContainerStartOptions{}).Return(nil)

// create exec for first command
engineClient.On("ContainerExecCreate", ctx, "tcID", types.ExecConfig{Cmd: []string{"tc", "test", "one"}}).Return(types.IDResponse{ID: "execID1"}, nil)
// start exec for first command
engineClient.On("ContainerExecStart", ctx, "execID1", types.ExecStartCheck{}).Return(nil)

// create exec for second command
engineClient.On("ContainerExecCreate", ctx, "tcID", types.ExecConfig{Cmd: []string{"tc", "test", "two"}}).Return(types.IDResponse{ID: "execID2"}, nil)
// start exec for second command
engineClient.On("ContainerExecStart", ctx, "execID2", types.ExecStartCheck{}).Return(nil)

// remove container
engineClient.On("ContainerRemove", ctx, "tcID", types.ContainerRemoveOptions{Force: true}).Return(nil)

client := dockerClient{containerAPI: engineClient, imageAPI: engineClient}
err := client.tcContainerCommand(context.TODO(), c, []string{"test", "me"}, "pumba/tcimage", true)
err := client.tcContainerCommands(context.TODO(), c, [][]string{{"test", "one"}, {"test", "two"}}, "pumba/tcimage", true)

assert.NoError(t, err)
engineClient.AssertExpectations(t)
Expand Down
224 changes: 107 additions & 117 deletions pkg/container/docker_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ func (client dockerClient) startNetemContainer(ctx context.Context, c *Container
// stop disruption command
// netemStopCommand := "tc qdisc del dev eth0 root netem"
log.WithField("netem", strings.Join(netemCommand, " ")).Debug("adding netem qdisc")
return client.tcCommand(ctx, c, netemCommand, tcimage, pull)
return client.tcCommands(ctx, c, [][]string{netemCommand}, tcimage, pull)
}
return nil
}
Expand All @@ -388,54 +388,37 @@ func (client dockerClient) stopNetemContainer(ctx context.Context, c *Container,
"dryrun": dryrun,
}).Debug("stop netem for container")
if !dryrun {
var netemCommands [][]string
if len(ips) != 0 || len(sports) != 0 || len(dports) != 0 {
// delete qdisc 'parent 1:1 handle 10:'
// http://www.linuxfoundation.org/collaborate/workgroups/networking/netem
netemCommand := []string{"qdisc", "del", "dev", netInterface, "parent", "1:1", "handle", "10:"}
log.WithField("netem", strings.Join(netemCommand, " ")).Debug("deleting netem qdisc")
err := client.tcCommand(ctx, c, netemCommand, tcimage, pull)
if err != nil {
return errors.Wrap(err, "failed to delete qdisc 'parent 1:1 handle 10:'")
}
// delete qdisc 'parent 1:2 handle 20:'
// http://www.linuxfoundation.org/collaborate/workgroups/networking/netem
netemCommand = []string{"qdisc", "del", "dev", netInterface, "parent", "1:2", "handle", "20:"}
log.WithField("netem", strings.Join(netemCommand, " ")).Debug("deleting netem qdisc")
err = client.tcCommand(ctx, c, netemCommand, tcimage, pull)
if err != nil {
return errors.Wrap(err, "failed to delete qdisc 'parent 1:2 handle 20:'")
}
// delete qdisc 'parent 1:3 handle 30:'
// http://www.linuxfoundation.org/collaborate/workgroups/networking/netem
netemCommand = []string{"qdisc", "del", "dev", netInterface, "parent", "1:3", "handle", "30:"}
log.WithField("netem", strings.Join(netemCommand, " ")).Debug("deleting netem qdisc")
err = client.tcCommand(ctx, c, netemCommand, tcimage, pull)
if err != nil {
return errors.Wrap(err, "failed to delete qdisc 'parent 1:3 handle 30:'")
}
// delete qdisc 'root handle 1: prio'
// http://www.linuxfoundation.org/collaborate/workgroups/networking/netem
netemCommand = []string{"qdisc", "del", "dev", netInterface, "root", "handle", "1:", "prio"}
log.WithField("netem", strings.Join(netemCommand, " ")).Debug("deleting netem qdisc")
err = client.tcCommand(ctx, c, netemCommand, tcimage, pull)
if err != nil {
return errors.Wrap(err, "failed to delete qdisc 'root handle 1: prio'")
netemCommands = [][]string{
// delete qdisc 'parent 1:1 handle 10:'
// http://www.linuxfoundation.org/collaborate/workgroups/networking/netem
{"qdisc", "del", "dev", netInterface, "parent", "1:1", "handle", "10:"},
// delete qdisc 'parent 1:2 handle 20:'
// http://www.linuxfoundation.org/collaborate/workgroups/networking/netem
{"qdisc", "del", "dev", netInterface, "parent", "1:2", "handle", "20:"},
// delete qdisc 'parent 1:3 handle 30:'
// http://www.linuxfoundation.org/collaborate/workgroups/networking/netem
{"qdisc", "del", "dev", netInterface, "parent", "1:3", "handle", "30:"},
// delete qdisc 'root handle 1: prio'
// http://www.linuxfoundation.org/collaborate/workgroups/networking/netem
{"qdisc", "del", "dev", netInterface, "root", "handle", "1:", "prio"},
}
} else {
// stop netem command
// http://www.linuxfoundation.org/collaborate/workgroups/networking/netem
netemCommand := []string{"qdisc", "del", "dev", netInterface, "root", "netem"}
log.WithField("netem", strings.Join(netemCommand, " ")).Debug("deleting netem qdisc")
err := client.tcCommand(ctx, c, netemCommand, tcimage, pull)
if err != nil {
return errors.Wrap(err, "failed to stop netem")
netemCommands = [][]string{
// stop netem command
// http://www.linuxfoundation.org/collaborate/workgroups/networking/netem
{"qdisc", "del", "dev", netInterface, "root", "netem"},
}
}
err := client.tcCommands(ctx, c, netemCommands, tcimage, pull)
if err != nil {
return errors.Wrap(err, "failed to run netem tc commands")
}
}
return nil
}

//nolint:funlen
func (client dockerClient) startNetemContainerIPFilter(ctx context.Context, c *Container, netInterface string, netemCmd []string,
ips []*net.IPNet, sports []string, dports []string, tcimage string, pull bool, dryrun bool) error {
log.WithFields(log.Fields{
Expand Down Expand Up @@ -465,113 +448,98 @@ func (client dockerClient) startNetemContainerIPFilter(ctx context.Context, c *C
// sfq sfq netem
// band 0 1 2

// Create a priority-based queue. This *instantly* creates classes 1:1, 1:2, 1:3
// 'tc qdisc add dev <netInterface> root handle 1: prio'
// See more: http://man7.org/linux/man-pages/man8/tc-netem.8.html
handleCommand := []string{"qdisc", "add", "dev", netInterface, "root", "handle", "1:", "prio"}
log.WithField("netem", strings.Join(handleCommand, " ")).Debug("adding netem qdisc")
err := client.tcCommand(ctx, c, handleCommand, tcimage, pull)
if err != nil {
return errors.Wrap(err, "failed to create a priority-based queue")
}

// Create Stochastic Fairness Queueing (sfq) queueing discipline for 1:1 class.
// 'tc qdisc add dev <netInterface> parent 1:1 handle 10: sfq'
// See more: https://linux.die.net/man/8/tc-sfq
netemCommand := []string{"qdisc", "add", "dev", netInterface, "parent", "1:1", "handle", "10:", "sfq"}
log.WithField("netem", strings.Join(netemCommand, " ")).Debug("adding netem qdisc")
err = client.tcCommand(ctx, c, netemCommand, tcimage, pull)
if err != nil {
return errors.Wrap(err, "failed to create Stochastic Fairness Queueing (sfq) queueing discipline for 1:1 class")
}

// Create Stochastic Fairness Queueing (sfq) queueing discipline for 1:2 class
// 'tc qdisc add dev <netInterface> parent 1:2 handle 20: sfq'
// See more: https://linux.die.net/man/8/tc-sfq
netemCommand = []string{"qdisc", "add", "dev", netInterface, "parent", "1:2", "handle", "20:", "sfq"}
log.WithField("netem", strings.Join(netemCommand, " ")).Debug("adding netem qdisc")
err = client.tcCommand(ctx, c, netemCommand, tcimage, pull)
if err != nil {
return errors.Wrap(err, "failed to create Stochastic Fairness Queueing (sfq) queueing discipline for 1:2 class")
}

// Add queueing discipline for 1:3 class. No traffic is going through 1:3 yet
// 'tc qdisc add dev <netInterface> parent 1:3 handle 30: netem <netemCmd>'
// See more: http://man7.org/linux/man-pages/man8/tc-netem.8.html
netemCommand = append([]string{"qdisc", "add", "dev", netInterface, "parent", "1:3", "handle", "30:", "netem"}, netemCmd...)
log.WithField("netem", strings.Join(netemCommand, " ")).Debug("adding netem qdisc")
err = client.tcCommand(ctx, c, netemCommand, tcimage, pull)
if err != nil {
return errors.Wrap(err, "failed to add queueing discipline for 1:3 class")
commands := [][]string{
// Create a priority-based queue. This *instantly* creates classes 1:1, 1:2, 1:3
// 'tc qdisc add dev <netInterface> root handle 1: prio'
// See more: http://man7.org/linux/man-pages/man8/tc-netem.8.html
{"qdisc", "add", "dev", netInterface, "root", "handle", "1:", "prio"},
// Create Stochastic Fairness Queueing (sfq) queueing discipline for 1:1 class.
// 'tc qdisc add dev <netInterface> parent 1:1 handle 10: sfq'
// See more: https://linux.die.net/man/8/tc-sfq
{"qdisc", "add", "dev", netInterface, "parent", "1:1", "handle", "10:", "sfq"},
// Create Stochastic Fairness Queueing (sfq) queueing discipline for 1:2 class
// 'tc qdisc add dev <netInterface> parent 1:2 handle 20: sfq'
// See more: https://linux.die.net/man/8/tc-sfq
{"qdisc", "add", "dev", netInterface, "parent", "1:2", "handle", "20:", "sfq"},
// Add queueing discipline for 1:3 class. No traffic is going through 1:3 yet
// 'tc qdisc add dev <netInterface> parent 1:3 handle 30: netem <netemCmd>'
// See more: http://man7.org/linux/man-pages/man8/tc-netem.8.html
append([]string{"qdisc", "add", "dev", netInterface, "parent", "1:3", "handle", "30:", "netem"}, netemCmd...),
}

// # redirect traffic to specific IP through band 3
// 'tc filter add dev <netInterface> protocol ip parent 1:0 prio 1 u32 match ip dst <targetIP> flowid 1:3'
// See more: http://man7.org/linux/man-pages/man8/tc-netem.8.html
for _, ip := range ips {
filterCommand := []string{"filter", "add", "dev", netInterface, "protocol", "ip", "parent", "1:0", "prio", "1",
"u32", "match", "ip", "dst", ip.String(), "flowid", "1:3"}
log.WithField("netem", strings.Join(filterCommand, " ")).Debug("adding netem IP filter")
err = client.tcCommand(ctx, c, filterCommand, tcimage, pull)
if err != nil {
return errors.Wrap(err, "failed to redirect traffic to specific IP through band 3")
}
commands = append(commands, []string{"filter", "add", "dev", netInterface, "protocol", "ip", "parent", "1:0", "prio", "1",
"u32", "match", "ip", "dst", ip.String(), "flowid", "1:3"})
}

// # redirect traffic to specific sport through band 3
// 'tc filter add dev <netInterface> protocol ip parent 1:0 prio 1 u32 match ip <s/d>port <targetPort> 0xffff flowid 1:3'
// See more: http://man7.org/linux/man-pages/man8/tc-netem.8.html
for _, sport := range sports {
filterPortCommand := []string{"filter", "add", "dev", netInterface, "protocol", "ip", "parent", "1:0", "prio", "1",
"u32", "match", "ip", "sport", sport, "0xffff", "flowid", "1:3"}
log.WithField("netem", strings.Join(filterPortCommand, " ")).Debug("adding netem port filter")
err = client.tcCommand(ctx, c, filterPortCommand, tcimage, pull)
if err != nil {
return errors.Wrap(err, "failed to redirect traffic from port "+sport+" through band 3")
}
commands = append(commands, []string{"filter", "add", "dev", netInterface, "protocol", "ip", "parent", "1:0", "prio", "1",
"u32", "match", "ip", "sport", sport, "0xffff", "flowid", "1:3"})
}

// # redirect traffic to specific dport through band 3
// 'tc filter add dev <netInterface> protocol ip parent 1:0 prio 1 u32 match ip <s/d>port <targetPort> 0xffff flowid 1:3'
// See more: http://man7.org/linux/man-pages/man8/tc-netem.8.html
for _, dport := range dports {
filterPortCommand := []string{"filter", "add", "dev", netInterface, "protocol", "ip", "parent", "1:0", "prio", "1",
"u32", "match", "ip", "dport", dport, "0xffff", "flowid", "1:3"}
log.WithField("netem", strings.Join(filterPortCommand, " ")).Debug("adding netem port filter")
err = client.tcCommand(ctx, c, filterPortCommand, tcimage, pull)
if err != nil {
return errors.Wrap(err, "failed to redirect traffic to port "+dport+" through band 3")
}
commands = append(commands, []string{"filter", "add", "dev", netInterface, "protocol", "ip", "parent", "1:0", "prio", "1",
"u32", "match", "ip", "dport", dport, "0xffff", "flowid", "1:3"})
}

err := client.tcCommands(ctx, c, commands, tcimage, pull)
if err != nil {
return errors.Wrap(err, "failed to run tc commands")
}
}
return nil
}

func (client dockerClient) tcCommand(ctx context.Context, c *Container, args []string, tcimage string, pull bool) error {
func (client dockerClient) tcCommands(ctx context.Context, c *Container, argsList [][]string, tcimage string, pull bool) error {
if tcimage == "" {
return client.execOnContainer(ctx, c, "tc", args, true)
for _, args := range argsList {
if err := client.execOnContainer(ctx, c, "tc", args, true); err != nil {
return errors.Wrapf(err, "error running tc command on container: %v", strings.Join(args, " "))
}
}
return nil
}
return client.tcContainerCommand(ctx, c, args, tcimage, pull)
return client.tcContainerCommands(ctx, c, argsList, tcimage, pull)
}

// execute tc command using other container (with iproute2 package installed), using target container network stack
func (client dockerClient) tcExecCommand(ctx context.Context, execID string, args []string) error {
execConfig := types.ExecConfig{
Cmd: append([]string{"tc"}, args...),
}
execCreateResponse, err := client.containerAPI.ContainerExecCreate(ctx, execID, execConfig)
if err != nil {
return errors.Wrap(err, "failed to create tc-container exec")
}
if err = client.containerAPI.ContainerExecStart(ctx, execCreateResponse.ID, types.ExecStartCheck{}); err != nil {
return errors.Wrap(err, "failed to start tc-container exec")
}
log.WithField("args", strings.Join(args, " ")).Debug("run command on tc-container")
return nil
}

// execute tc commands using other container (with iproute2 package installed), using target container network stack
// try to use `gaiadocker\iproute2` image (Alpine + iproute2 package)
func (client dockerClient) tcContainerCommand(ctx context.Context, target *Container, args []string, tcimage string, pull bool) error {
func (client dockerClient) tcContainerCommands(ctx context.Context, target *Container, argsList [][]string, tcimage string, pull bool) error {
log.WithFields(log.Fields{
"container": target.ID(),
"tc-image": tcimage,
"pull": pull,
"args": args,
"args-list": argsList,
}).Debug("executing tc command in a separate container joining target container network namespace")
// container config
config := ctypes.Config{
Labels: map[string]string{"com.gaiaadm.pumba.skip": "true"},
Entrypoint: []string{"tc"},
Cmd: args,
Image: tcimage,
}

// host config
hconfig := ctypes.HostConfig{
// auto remove container on tc command exit
AutoRemove: true,
// Don't auto-remove, since we may want to run multiple commands
AutoRemove: false,
// NET_ADMIN is required for "tc netem"
CapAdd: []string{"NET_ADMIN"},
// use target container network stack
Expand All @@ -585,8 +553,8 @@ func (client dockerClient) tcContainerCommand(ctx context.Context, target *Conta
log.WithField("network", hconfig.NetworkMode).Debug("network mode")
// pull docker image if required: can pull only public images
if pull {
log.WithField("image", config.Image).Debug("pulling tc-image")
events, err := client.imageAPI.ImagePull(ctx, config.Image, types.ImagePullOptions{})
log.WithField("image", tcimage).Debug("pulling tc-image")
events, err := client.imageAPI.ImagePull(ctx, tcimage, types.ImagePullOptions{})
if err != nil {
return errors.Wrap(err, "failed to pull tc-image")
}
Expand All @@ -603,8 +571,19 @@ func (client dockerClient) tcContainerCommand(ctx context.Context, target *Conta
log.Debug(pullResponse)
}
}
log.WithField("image", config.Image).Debug("creating tc-container")

// container config
config := ctypes.Config{
Labels: map[string]string{"com.gaiaadm.pumba.skip": "true"},
Entrypoint: []string{"tc"},
// Used as long-running entry-point to keep container alive between commands
Cmd: []string{"monitor"},
Image: tcimage,
}

createResponse, err := client.containerAPI.ContainerCreate(ctx, &config, &hconfig, nil, nil, "")

log.WithField("image", config.Image).Debug("creating tc-container")
if err != nil {
return errors.Wrap(err, "failed to create tc-container from tc-image")
}
Expand All @@ -613,6 +592,17 @@ func (client dockerClient) tcContainerCommand(ctx context.Context, target *Conta
if err != nil {
return errors.Wrap(err, "failed to start tc-container")
}

for _, args := range argsList {
if err = client.tcExecCommand(ctx, createResponse.ID, args); err != nil {
return errors.Wrapf(err, "error running tc command on container: %v", strings.Join(args, " "))
}
}

if err = client.containerAPI.ContainerRemove(ctx, createResponse.ID, types.ContainerRemoveOptions{Force: true}); err != nil {
return errors.Wrap(err, "failed to remove tc-container")
}

return nil
}

Expand Down
Loading