Skip to content

Commit

Permalink
feat: streaming netflow support for experiments
Browse files Browse the repository at this point in the history
Netflow is captured using the netflow capabilities within Open vSwitch.
Since OVS only allows netflow to be configured and captured bridge-wide,
the netflow feature in phēnix will fail if the default bridge for an
experiment is set to the default of `phenix`, to avoid data leakage
across experiments. Additionally, the creation or updating of an
experiment will fail if a default bridge is used that another experiment
is already configured with.

A new `defaultBridge` setting has been added to the experiment schema,
and when set to something other than `phenix` it will allow
experiment-wide netflow capture and will also automatically use GRE
tunneling between the OVS bridges on multiple mesh nodes.

A new `--default-bridge` flag has been added to the `phenix experiment
create` subcommand that will set the default bridge name when creating
the experiment.

The experiment creation modal in the UI has a new option input for
setting the default bridge name when creating experiments via the UI.

Bridge names must be 15 characters or less, as dictated by OVS (and
Linux interface names).

When netflow is activated for an experiment, external applications can
connect a websocket to `/api/v1/experiments/{name}/netflow/ws` in order
to get streaming netflow data for the experiment.
  • Loading branch information
activeshadow committed Feb 6, 2024
1 parent 796e603 commit d023308
Show file tree
Hide file tree
Showing 46 changed files with 1,516 additions and 144 deletions.
26 changes: 26 additions & 0 deletions src/go/api/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,30 @@ func Init() error {
return fmt.Errorf("parsing config files in %s: %w", base, err)
}

// Call any hooks registered for the `startup` stage.
configs, err := store.List(AllKinds...)
if err != nil {
return fmt.Errorf("getting list of configs from store: %w", err)
}

for _, config := range configs {
var updated bool

for _, hook := range hooks[config.Kind] {
if err := hook("startup", &config); err != nil {
return fmt.Errorf("calling startup config hook: %w", err)
}

updated = true
}

if updated {
if err := store.Update(&config); err != nil {
return fmt.Errorf("updating config in store: %w", err)
}
}
}

return nil
}

Expand Down Expand Up @@ -342,6 +366,8 @@ func Edit(name string, force bool) (*store.Config, error) {
}

expName = exp.Spec.ExperimentName()

// Don't allow users to edit the experiment name field.
delete(c.Spec, "experimentName")
}

Expand Down
53 changes: 53 additions & 0 deletions src/go/api/experiment/experiment.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package experiment

import (
"context"
"errors"
"fmt"
"io/ioutil"
"os"
Expand Down Expand Up @@ -31,6 +32,11 @@ import (
"github.com/mitchellh/mapstructure"
)

var (
ErrExperimentNotFound = errors.New("experiment not found")
ErrExperimentNotRunning = errors.New("experiment not running")
)

func init() {
config.RegisterConfigHook("Experiment", func(stage string, c *store.Config) error {
exp, err := types.DecodeExperimentFromConfig(*c)
Expand All @@ -39,13 +45,40 @@ func init() {
}

switch stage {
case "startup":
// Experiments created before the default bridge option was added need to
// be updated to have a default `phenix` bridge.
if exp.Spec.DefaultBridge() == "" {
exp.Spec.SetDefaultBridge("phenix")

if err := exp.Spec.Init(); err != nil {
return fmt.Errorf("re-initializing experiment with default bridge: %w", err)
}

c.Spec = structs.MapDefaultCase(exp.Spec, structs.CASESNAKE)
}
case "create":
exp.Spec.SetExperimentName(c.Metadata.Name)

if err := exp.Spec.Init(); err != nil {
return fmt.Errorf("initializing experiment: %w", err)
}

existing, _ := types.Experiments(false)
for _, other := range existing {
if other.Metadata.Name == exp.Metadata.Name {
continue
}

if exp.Spec.DefaultBridge() == "phenix" {
continue
}

if other.Spec.DefaultBridge() == exp.Spec.DefaultBridge() {
return fmt.Errorf("experiment %s already using default bridge %s", other.Metadata.Name, other.Spec.DefaultBridge())
}
}

if err := exp.Spec.VerifyScenario(context.TODO()); err != nil {
return fmt.Errorf("verifying experiment scenario: %w", err)
}
Expand All @@ -65,6 +98,21 @@ func init() {
return fmt.Errorf("re-initializing experiment (after update): %w", err)
}

existing, _ := types.Experiments(false)
for _, other := range existing {
if other.Metadata.Name == exp.Metadata.Name {
continue
}

if exp.Spec.DefaultBridge() == "phenix" {
continue
}

if other.Spec.DefaultBridge() == exp.Spec.DefaultBridge() {
return fmt.Errorf("experiment %s already using default bridge %s", other.Metadata.Name, other.Spec.DefaultBridge())
}
}

if exp.Spec.ExperimentName() != c.Metadata.Name {
if strings.Contains(exp.Spec.BaseDir(), exp.Spec.ExperimentName()) {
// If the experiment's base directory contains the current experiment
Expand Down Expand Up @@ -183,6 +231,10 @@ func Create(ctx context.Context, opts ...CreateOption) error {
return fmt.Errorf("no topology name provided")
}

if len(o.defaultBridge) > 15 {
return fmt.Errorf("default bridge name must be 15 characters or less")
}

var (
kind = "Experiment"
apiVersion = version.StoredVersion[kind]
Expand Down Expand Up @@ -211,6 +263,7 @@ func Create(ctx context.Context, opts ...CreateOption) error {
"experimentName": o.name,
"baseDir": o.baseDir,
"deployMode": o.deployMode,
"defaultBridge": o.defaultBridge,
"topology": topo,
}

Expand Down
219 changes: 219 additions & 0 deletions src/go/api/experiment/netflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
package experiment

import (
"bufio"
"errors"
"fmt"
"net"
"strconv"
"strings"
"sync"

"phenix/util/mm"
)

type Netflow struct {
sync.RWMutex

Bridge string
Conn *net.UDPConn

callbacks map[string]chan map[string]any
}

func NewNetflow(bridge string, conn *net.UDPConn) *Netflow {
return &Netflow{
Bridge: bridge,
Conn: conn,

callbacks: make(map[string]chan map[string]any),
}
}

func (this *Netflow) NewChannel(id string) chan map[string]any {
this.Lock()
defer this.Unlock()

if _, ok := this.callbacks[id]; ok {
return nil
}

cb := make(chan map[string]any)

this.callbacks[id] = cb

return cb
}

func (this *Netflow) DeleteChannel(id string) {
this.Lock()
defer this.Unlock()

if cb, ok := this.callbacks[id]; ok {
close(cb)

for range cb {
// draining channel so it doesn't block anything
}
}

delete(this.callbacks, id)
}

func (this *Netflow) Publish(body map[string]any) {
this.RLock()
defer this.RUnlock()

for _, cb := range this.callbacks {
cb <- body
}
}

func (this *Netflow) Close() {
this.Lock()
defer this.Unlock()

for _, cb := range this.callbacks {
close(cb)
}

this.callbacks = nil
this.Conn.Close()
}

var (
netflows = make(map[string]*Netflow)
netflowMu sync.RWMutex

ErrNetflowNotStarted = errors.New("netflow not started for experiment")
ErrNetflowAlreadyStarted = errors.New("netflow already started for experiment")
ErrNetflowPhenixBridge = errors.New("cannot capture netflow on default phenix bridge")
)

func init() {
// Delete netflow captures when experiments are stopped.
RegisterHook("stop", func(stage, name string) {
netflowMu.RLock()
defer netflowMu.RUnlock()

if flow, ok := netflows[name]; ok {
// We don't need to worry about instructing minimega to delete the netflow
// capture since that will happen as part of the minimega namespace for
// this experiment being cleared.

flow.Conn.Close()
delete(netflows, name)
}
})
}

func GetNetflow(exp string) *Netflow {
netflowMu.RLock()
defer netflowMu.RUnlock()

if flow, ok := netflows[exp]; ok {
return flow
}

return nil
}

func StartNetflow(exp string) error {
netflowMu.Lock()
defer netflowMu.Unlock()

if _, ok := netflows[exp]; ok {
return ErrNetflowAlreadyStarted
}

spec, err := Get(exp)
if err != nil {
return ErrExperimentNotFound
}

if !spec.Running() {
return ErrExperimentNotRunning
}

if spec.Spec.DefaultBridge() == "phenix" {
return ErrNetflowPhenixBridge
}

cluster, _ := ClusterNodes(exp)

conn, err := net.ListenUDP("udp4", nil)
if err != nil {
return fmt.Errorf("creating UDP listener: %w", err)
}

addr := strings.Split(conn.LocalAddr().String(), ":")
cmds := []string{
"capture netflow mode ascii",
fmt.Sprintf("capture netflow bridge %s udp %s:%s", spec.Spec.DefaultBridge(), mm.Headnode(), addr[1]),
}

for _, cmd := range cmds {
for _, node := range cluster {
if err := mm.MeshSend(exp, node, cmd); err != nil {
conn.Close()
return fmt.Errorf("starting netflow capture on node %s: %w", node, err)
}
}
}

flow := NewNetflow(spec.Spec.DefaultBridge(), conn)
netflows[exp] = flow

go func() {
scanner := bufio.NewScanner(conn)

for scanner.Scan() {
fields := strings.Fields(scanner.Text())

body := make(map[string]any)

body["proto"], _ = strconv.Atoi(fields[2])

src := strings.Split(fields[3], ":")
dst := strings.Split(fields[5], ":")

body["src"] = src[0]
body["sport"], _ = strconv.Atoi(src[1])

body["dst"] = dst[0]
body["dport"], _ = strconv.Atoi(dst[1])

body["packets"], _ = strconv.Atoi(fields[6])
body["bytes"], _ = strconv.Atoi(fields[7])

flow.Publish(body)
}
}()

return nil
}

func StopNetflow(exp string) error {
netflowMu.Lock()
defer netflowMu.Unlock()

flow, ok := netflows[exp]
if !ok {
return ErrNetflowNotStarted
}

cluster, _ := ClusterNodes(exp)

cmd := fmt.Sprintf("capture netflow delete bridge %s", flow.Bridge)

for _, node := range cluster {
if err := mm.MeshSend(exp, node, cmd); err != nil {
return fmt.Errorf("deleting netflow capture on node %s: %w", node, err)
}
}

flow.Close()
delete(netflows, exp)

return nil
}
Loading

0 comments on commit d023308

Please sign in to comment.