From d023308dfa9029063158c7f12c46abd3f5693fa9 Mon Sep 17 00:00:00 2001 From: "Bryan T. Richardson" Date: Tue, 6 Feb 2024 16:05:22 -0700 Subject: [PATCH] feat: streaming netflow support for experiments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Netflow is captured using the netflow capabilities within Open vSwitch. Since OVS only allows netflow to be configured and captured bridge-wide, the netflow feature in phÄ“nix will fail if the default bridge for an experiment is set to the default of `phenix`, to avoid data leakage across experiments. Additionally, the creation or updating of an experiment will fail if a default bridge is used that another experiment is already configured with. A new `defaultBridge` setting has been added to the experiment schema, and when set to something other than `phenix` it will allow experiment-wide netflow capture and will also automatically use GRE tunneling between the OVS bridges on multiple mesh nodes. A new `--default-bridge` flag has been added to the `phenix experiment create` subcommand that will set the default bridge name when creating the experiment. The experiment creation modal in the UI has a new option input for setting the default bridge name when creating experiments via the UI. Bridge names must be 15 characters or less, as dictated by OVS (and Linux interface names). When netflow is activated for an experiment, external applications can connect a websocket to `/api/v1/experiments/{name}/netflow/ws` in order to get streaming netflow data for the experiment. --- src/go/api/config/config.go | 26 ++ src/go/api/experiment/experiment.go | 53 ++++ src/go/api/experiment/netflow.go | 219 +++++++++++++++++ src/go/api/experiment/option.go | 33 ++- src/go/api/experiment/util.go | 26 ++ src/go/api/scorch/break.go | 2 +- src/go/api/scorch/tap.go | 6 +- src/go/api/soh/app.go | 2 +- src/go/api/soh/util.go | 6 +- src/go/api/vm/search.go | 80 ++++++ src/go/api/vm/topology.go | 113 +++++++++ src/go/app/app.go | 2 +- src/go/app/tap.go | 6 +- src/go/cmd/experiment.go | 2 + src/go/tmpl/templates/minimega_script.tmpl | 4 + src/go/types/experiment.go | 8 +- src/go/types/interfaces/experiment.go | 2 + src/go/types/interfaces/topology.go | 2 +- src/go/types/version/v1/experiment.go | 15 +- src/go/types/version/v1/network.go | 22 +- src/go/types/version/v1/node.go | 4 +- src/go/types/version/v1/topology.go | 4 +- src/go/types/version/version_test.go | 76 ++++++ src/go/util/cache/cache.go | 11 + src/go/util/cache/go-cache.go | 34 +++ src/go/util/cache/package.go | 17 ++ src/go/util/mm/types.go | 32 ++- src/go/util/tap/tap.go | 4 +- src/go/web/broker/client.go | 104 ++++++++ src/go/web/cache/cache.go | 67 +++-- src/go/web/cache/go-cache.go | 67 ----- src/go/web/cache/package.go | 29 +++ src/go/web/experiment.go | 147 +++++++++++ src/go/web/handlers.go | 1 + src/go/web/netflow.go | 260 ++++++++++++++++++++ src/go/web/proto/experiment.proto | 1 + src/go/web/rbac/known_policy.go | 6 +- src/go/web/server.go | 6 + src/go/web/types.go | 3 +- src/go/web/workflow.go | 21 ++ src/js/package.json | 6 +- src/js/src/components/Experiments.vue | 7 +- src/js/src/components/RunningExperiment.vue | 86 ++++++- src/js/src/components/utils.js | 6 +- src/js/src/main.js | 2 +- src/js/yarn.lock | 30 ++- 46 files changed, 1516 insertions(+), 144 deletions(-) create mode 100644 src/go/api/experiment/netflow.go create mode 100644 src/go/api/experiment/util.go create mode 100644 src/go/api/vm/search.go create mode 100644 src/go/api/vm/topology.go create mode 100644 src/go/types/version/version_test.go create mode 100644 src/go/util/cache/cache.go create mode 100644 src/go/util/cache/go-cache.go create mode 100644 src/go/util/cache/package.go delete mode 100644 src/go/web/cache/go-cache.go create mode 100644 src/go/web/cache/package.go create mode 100644 src/go/web/experiment.go create mode 100644 src/go/web/netflow.go diff --git a/src/go/api/config/config.go b/src/go/api/config/config.go index fc2d328c..eca4326e 100644 --- a/src/go/api/config/config.go +++ b/src/go/api/config/config.go @@ -149,6 +149,30 @@ func Init() error { return fmt.Errorf("parsing config files in %s: %w", base, err) } + // Call any hooks registered for the `startup` stage. + configs, err := store.List(AllKinds...) + if err != nil { + return fmt.Errorf("getting list of configs from store: %w", err) + } + + for _, config := range configs { + var updated bool + + for _, hook := range hooks[config.Kind] { + if err := hook("startup", &config); err != nil { + return fmt.Errorf("calling startup config hook: %w", err) + } + + updated = true + } + + if updated { + if err := store.Update(&config); err != nil { + return fmt.Errorf("updating config in store: %w", err) + } + } + } + return nil } @@ -342,6 +366,8 @@ func Edit(name string, force bool) (*store.Config, error) { } expName = exp.Spec.ExperimentName() + + // Don't allow users to edit the experiment name field. delete(c.Spec, "experimentName") } diff --git a/src/go/api/experiment/experiment.go b/src/go/api/experiment/experiment.go index 5182e4ac..b94b6612 100644 --- a/src/go/api/experiment/experiment.go +++ b/src/go/api/experiment/experiment.go @@ -2,6 +2,7 @@ package experiment import ( "context" + "errors" "fmt" "io/ioutil" "os" @@ -31,6 +32,11 @@ import ( "github.com/mitchellh/mapstructure" ) +var ( + ErrExperimentNotFound = errors.New("experiment not found") + ErrExperimentNotRunning = errors.New("experiment not running") +) + func init() { config.RegisterConfigHook("Experiment", func(stage string, c *store.Config) error { exp, err := types.DecodeExperimentFromConfig(*c) @@ -39,6 +45,18 @@ func init() { } switch stage { + case "startup": + // Experiments created before the default bridge option was added need to + // be updated to have a default `phenix` bridge. + if exp.Spec.DefaultBridge() == "" { + exp.Spec.SetDefaultBridge("phenix") + + if err := exp.Spec.Init(); err != nil { + return fmt.Errorf("re-initializing experiment with default bridge: %w", err) + } + + c.Spec = structs.MapDefaultCase(exp.Spec, structs.CASESNAKE) + } case "create": exp.Spec.SetExperimentName(c.Metadata.Name) @@ -46,6 +64,21 @@ func init() { return fmt.Errorf("initializing experiment: %w", err) } + existing, _ := types.Experiments(false) + for _, other := range existing { + if other.Metadata.Name == exp.Metadata.Name { + continue + } + + if exp.Spec.DefaultBridge() == "phenix" { + continue + } + + if other.Spec.DefaultBridge() == exp.Spec.DefaultBridge() { + return fmt.Errorf("experiment %s already using default bridge %s", other.Metadata.Name, other.Spec.DefaultBridge()) + } + } + if err := exp.Spec.VerifyScenario(context.TODO()); err != nil { return fmt.Errorf("verifying experiment scenario: %w", err) } @@ -65,6 +98,21 @@ func init() { return fmt.Errorf("re-initializing experiment (after update): %w", err) } + existing, _ := types.Experiments(false) + for _, other := range existing { + if other.Metadata.Name == exp.Metadata.Name { + continue + } + + if exp.Spec.DefaultBridge() == "phenix" { + continue + } + + if other.Spec.DefaultBridge() == exp.Spec.DefaultBridge() { + return fmt.Errorf("experiment %s already using default bridge %s", other.Metadata.Name, other.Spec.DefaultBridge()) + } + } + if exp.Spec.ExperimentName() != c.Metadata.Name { if strings.Contains(exp.Spec.BaseDir(), exp.Spec.ExperimentName()) { // If the experiment's base directory contains the current experiment @@ -183,6 +231,10 @@ func Create(ctx context.Context, opts ...CreateOption) error { return fmt.Errorf("no topology name provided") } + if len(o.defaultBridge) > 15 { + return fmt.Errorf("default bridge name must be 15 characters or less") + } + var ( kind = "Experiment" apiVersion = version.StoredVersion[kind] @@ -211,6 +263,7 @@ func Create(ctx context.Context, opts ...CreateOption) error { "experimentName": o.name, "baseDir": o.baseDir, "deployMode": o.deployMode, + "defaultBridge": o.defaultBridge, "topology": topo, } diff --git a/src/go/api/experiment/netflow.go b/src/go/api/experiment/netflow.go new file mode 100644 index 00000000..f11db444 --- /dev/null +++ b/src/go/api/experiment/netflow.go @@ -0,0 +1,219 @@ +package experiment + +import ( + "bufio" + "errors" + "fmt" + "net" + "strconv" + "strings" + "sync" + + "phenix/util/mm" +) + +type Netflow struct { + sync.RWMutex + + Bridge string + Conn *net.UDPConn + + callbacks map[string]chan map[string]any +} + +func NewNetflow(bridge string, conn *net.UDPConn) *Netflow { + return &Netflow{ + Bridge: bridge, + Conn: conn, + + callbacks: make(map[string]chan map[string]any), + } +} + +func (this *Netflow) NewChannel(id string) chan map[string]any { + this.Lock() + defer this.Unlock() + + if _, ok := this.callbacks[id]; ok { + return nil + } + + cb := make(chan map[string]any) + + this.callbacks[id] = cb + + return cb +} + +func (this *Netflow) DeleteChannel(id string) { + this.Lock() + defer this.Unlock() + + if cb, ok := this.callbacks[id]; ok { + close(cb) + + for range cb { + // draining channel so it doesn't block anything + } + } + + delete(this.callbacks, id) +} + +func (this *Netflow) Publish(body map[string]any) { + this.RLock() + defer this.RUnlock() + + for _, cb := range this.callbacks { + cb <- body + } +} + +func (this *Netflow) Close() { + this.Lock() + defer this.Unlock() + + for _, cb := range this.callbacks { + close(cb) + } + + this.callbacks = nil + this.Conn.Close() +} + +var ( + netflows = make(map[string]*Netflow) + netflowMu sync.RWMutex + + ErrNetflowNotStarted = errors.New("netflow not started for experiment") + ErrNetflowAlreadyStarted = errors.New("netflow already started for experiment") + ErrNetflowPhenixBridge = errors.New("cannot capture netflow on default phenix bridge") +) + +func init() { + // Delete netflow captures when experiments are stopped. + RegisterHook("stop", func(stage, name string) { + netflowMu.RLock() + defer netflowMu.RUnlock() + + if flow, ok := netflows[name]; ok { + // We don't need to worry about instructing minimega to delete the netflow + // capture since that will happen as part of the minimega namespace for + // this experiment being cleared. + + flow.Conn.Close() + delete(netflows, name) + } + }) +} + +func GetNetflow(exp string) *Netflow { + netflowMu.RLock() + defer netflowMu.RUnlock() + + if flow, ok := netflows[exp]; ok { + return flow + } + + return nil +} + +func StartNetflow(exp string) error { + netflowMu.Lock() + defer netflowMu.Unlock() + + if _, ok := netflows[exp]; ok { + return ErrNetflowAlreadyStarted + } + + spec, err := Get(exp) + if err != nil { + return ErrExperimentNotFound + } + + if !spec.Running() { + return ErrExperimentNotRunning + } + + if spec.Spec.DefaultBridge() == "phenix" { + return ErrNetflowPhenixBridge + } + + cluster, _ := ClusterNodes(exp) + + conn, err := net.ListenUDP("udp4", nil) + if err != nil { + return fmt.Errorf("creating UDP listener: %w", err) + } + + addr := strings.Split(conn.LocalAddr().String(), ":") + cmds := []string{ + "capture netflow mode ascii", + fmt.Sprintf("capture netflow bridge %s udp %s:%s", spec.Spec.DefaultBridge(), mm.Headnode(), addr[1]), + } + + for _, cmd := range cmds { + for _, node := range cluster { + if err := mm.MeshSend(exp, node, cmd); err != nil { + conn.Close() + return fmt.Errorf("starting netflow capture on node %s: %w", node, err) + } + } + } + + flow := NewNetflow(spec.Spec.DefaultBridge(), conn) + netflows[exp] = flow + + go func() { + scanner := bufio.NewScanner(conn) + + for scanner.Scan() { + fields := strings.Fields(scanner.Text()) + + body := make(map[string]any) + + body["proto"], _ = strconv.Atoi(fields[2]) + + src := strings.Split(fields[3], ":") + dst := strings.Split(fields[5], ":") + + body["src"] = src[0] + body["sport"], _ = strconv.Atoi(src[1]) + + body["dst"] = dst[0] + body["dport"], _ = strconv.Atoi(dst[1]) + + body["packets"], _ = strconv.Atoi(fields[6]) + body["bytes"], _ = strconv.Atoi(fields[7]) + + flow.Publish(body) + } + }() + + return nil +} + +func StopNetflow(exp string) error { + netflowMu.Lock() + defer netflowMu.Unlock() + + flow, ok := netflows[exp] + if !ok { + return ErrNetflowNotStarted + } + + cluster, _ := ClusterNodes(exp) + + cmd := fmt.Sprintf("capture netflow delete bridge %s", flow.Bridge) + + for _, node := range cluster { + if err := mm.MeshSend(exp, node, cmd); err != nil { + return fmt.Errorf("deleting netflow capture on node %s: %w", node, err) + } + } + + flow.Close() + delete(netflows, exp) + + return nil +} diff --git a/src/go/api/experiment/option.go b/src/go/api/experiment/option.go index d3d6389d..88cbfcfb 100644 --- a/src/go/api/experiment/option.go +++ b/src/go/api/experiment/option.go @@ -8,17 +8,18 @@ import ( type CreateOption func(*createOptions) type createOptions struct { - name string - annotations map[string]string - topology string - scenario string - disabledApps []string - vlanMin int - vlanMax int - vlanAliases map[string]int - schedules map[string]string - baseDir string - deployMode common.DeploymentMode + name string + annotations map[string]string + topology string + scenario string + disabledApps []string + vlanMin int + vlanMax int + vlanAliases map[string]int + schedules map[string]string + baseDir string + deployMode common.DeploymentMode + defaultBridge string } func newCreateOptions(opts ...CreateOption) createOptions { @@ -34,6 +35,10 @@ func newCreateOptions(opts ...CreateOption) createOptions { o.baseDir = common.PhenixBase + "/experiments/" + o.name } + if o.defaultBridge == "" { + o.defaultBridge = "phenix" + } + return o } @@ -103,6 +108,12 @@ func CreateWithDeployMode(m common.DeploymentMode) CreateOption { } } +func CreateWithDefaultBridge(b string) CreateOption { + return func(o *createOptions) { + o.defaultBridge = b + } +} + type SaveOption func(*saveOptions) type saveOptions struct { diff --git a/src/go/api/experiment/util.go b/src/go/api/experiment/util.go new file mode 100644 index 00000000..93cfac6a --- /dev/null +++ b/src/go/api/experiment/util.go @@ -0,0 +1,26 @@ +package experiment + +func ClusterNodes(exp string) ([]string, error) { + nodeMap := make(map[string]struct{}) + + spec, err := Get(exp) + if err != nil { + return nil, ErrExperimentNotFound + } + + if !spec.Running() { + return nil, ErrExperimentNotRunning + } + + for _, node := range spec.Status.Schedules() { + nodeMap[node] = struct{}{} + } + + var nodes []string + + for node := range nodeMap { + nodes = append(nodes, node) + } + + return nodes, nil +} diff --git a/src/go/api/scorch/break.go b/src/go/api/scorch/break.go index cdc5d905..715b6656 100644 --- a/src/go/api/scorch/break.go +++ b/src/go/api/scorch/break.go @@ -61,7 +61,7 @@ func (this Break) breakPoint(ctx context.Context, stage Action) error { if md.Tap != nil { pairs := discoverUsedPairs() - md.Tap.Init(tap.Experiment(exp), tap.UsedPairs(pairs)) + md.Tap.Init(this.options.Exp.Spec.DefaultBridge(), tap.Experiment(exp), tap.UsedPairs(pairs)) // backwards compatibility (doesn't support external access firewall rules) if v, ok := md.Tap.Other["internetAccess"]; ok { diff --git a/src/go/api/scorch/tap.go b/src/go/api/scorch/tap.go index 1f9b795c..71bf82f2 100644 --- a/src/go/api/scorch/tap.go +++ b/src/go/api/scorch/tap.go @@ -42,7 +42,7 @@ func (this Tap) Start(ctx context.Context) error { } pairs := discoverUsedPairs() - t.Init(tap.Experiment(exp), tap.UsedPairs(pairs)) + t.Init(this.options.Exp.Spec.DefaultBridge(), tap.Experiment(exp), tap.UsedPairs(pairs)) // backwards compatibility (doesn't support external access firewall rules) if v, ok := t.Other["internetAccess"]; ok { @@ -81,7 +81,7 @@ func (this Tap) Stop(ctx context.Context) error { t, ok := status.Taps[this.options.Name] if ok { - t.Init(tap.Experiment(exp)) + t.Init(this.options.Exp.Spec.DefaultBridge(), tap.Experiment(exp)) if err := t.Delete(mm.Headnode()); err != nil { return fmt.Errorf("deleting host tap for VLAN %s: %w", t.VLAN, err) @@ -98,7 +98,7 @@ func (Tap) Cleanup(context.Context) error { func discoverUsedPairs() []netaddr.IPPrefix { var pairs []netaddr.IPPrefix - running, err := types.RunningExperiments() + running, err := types.Experiments(true) if err != nil { return nil } diff --git a/src/go/api/soh/app.go b/src/go/api/soh/app.go index f837c98c..c8786c08 100644 --- a/src/go/api/soh/app.go +++ b/src/go/api/soh/app.go @@ -97,7 +97,7 @@ func (this *SOH) Configure(ctx context.Context, exp *types.Experiment) error { return fmt.Errorf("building Elastic server node: %w", err) } - exp.Spec.Topology().Init() + exp.Spec.Topology().Init(exp.Spec.DefaultBridge()) } } diff --git a/src/go/api/soh/util.go b/src/go/api/soh/util.go index 4d753255..541bdf8b 100644 --- a/src/go/api/soh/util.go +++ b/src/go/api/soh/util.go @@ -136,7 +136,7 @@ func (this *SOH) buildElasticServerNode(exp *types.Experiment, ip string, cidr i iface.SetAddress(ip) iface.SetMask(cidr) iface.SetProto("static") - iface.SetBridge("phenix") + iface.SetBridge(exp.Spec.DefaultBridge()) data := struct { Hostname string @@ -181,7 +181,7 @@ func (this *SOH) buildPacketBeatNode(exp *types.Experiment, target ifaces.NodeSp "address": ip, "mask": cidr, "proto": "static", - "bridge": "phenix", + "bridge": exp.Spec.DefaultBridge(), }, } @@ -193,7 +193,7 @@ func (this *SOH) buildPacketBeatNode(exp *types.Experiment, target ifaces.NodeSp "type": "ethernet", "vlan": iface.VLAN(), "proto": "static", - "bridge": "phenix", + "bridge": exp.Spec.DefaultBridge(), } nets = append(nets, monitorIface) diff --git a/src/go/api/vm/search.go b/src/go/api/vm/search.go new file mode 100644 index 00000000..8992794d --- /dev/null +++ b/src/go/api/vm/search.go @@ -0,0 +1,80 @@ +package vm + +import "fmt" + +type TopologySearch struct { + Hostname map[string]int `json:"hostname"` + Disk map[string][]int `json:"disk"` + Type map[string][]int `json:"node-type"` + OSType map[string][]int `json:"os-type"` + Label map[string][]int `json:"label"` + Annotation map[string][]int `json:"annotation"` + VLAN map[string][]int `json:"vlan"` + IP map[string][]int `json:"ip"` +} + +func (this *TopologySearch) AddHostname(k string, n int) { + if this.Hostname == nil { + this.Hostname = make(map[string]int) + } + + this.Hostname[k] = n +} + +func (this *TopologySearch) AddDisk(k string, n int) { + if this.Disk == nil { + this.Disk = make(map[string][]int) + } + + this.Disk[k] = append(this.Disk[k], n) +} + +func (this *TopologySearch) AddType(k string, n int) { + if this.Type == nil { + this.Type = make(map[string][]int) + } + + this.Type[k] = append(this.Type[k], n) +} + +func (this *TopologySearch) AddOSType(k string, n int) { + if this.OSType == nil { + this.OSType = make(map[string][]int) + } + + this.OSType[k] = append(this.OSType[k], n) +} + +func (this *TopologySearch) AddLabel(k, v string, n int) { + if this.Label == nil { + this.Label = make(map[string][]int) + } + + k = fmt.Sprintf("%s=%s", k, v) + + this.Label[k] = append(this.Label[k], n) +} + +func (this *TopologySearch) AddAnnotation(k string, n int) { + if this.Annotation == nil { + this.Annotation = make(map[string][]int) + } + + this.Annotation[k] = append(this.Annotation[k], n) +} + +func (this *TopologySearch) AddVLAN(k string, n int) { + if this.VLAN == nil { + this.VLAN = make(map[string][]int) + } + + this.VLAN[k] = append(this.VLAN[k], n) +} + +func (this *TopologySearch) AddIP(k string, n int) { + if this.IP == nil { + this.IP = make(map[string][]int) + } + + this.IP[k] = append(this.IP[k], n) +} diff --git a/src/go/api/vm/topology.go b/src/go/api/vm/topology.go new file mode 100644 index 00000000..70b4f819 --- /dev/null +++ b/src/go/api/vm/topology.go @@ -0,0 +1,113 @@ +package vm + +import ( + "fmt" + + "phenix/api/experiment" + "phenix/util/cache" + "phenix/util/mm" + + "golang.org/x/exp/slices" +) + +type topology struct { + Nodes []mm.VM `json:"nodes"` + Edges []edge `json:"edges"` + Running bool `json:"running"` +} + +type edge struct { + ID int `json:"id"` + Source int `json:"source"` + Target int `json:"target"` + Length int `json:"length"` +} + +func Topology(exp string, ignore []string) (topology, error) { + vms, err := List(exp) + if err != nil { + return topology{}, fmt.Errorf("getting VMs: %w", err) + } + + var ( + networks = make(map[string]mm.VM) + search TopologySearch + + cacheKey = fmt.Sprintf("experiment|%s|search", exp) + cached bool + + nodes []mm.VM + nodeID int + edges []edge + edgeID int + ) + + if val, ok := cache.Get(cacheKey); ok { + search = val.(TopologySearch) + cached = true + } + + for _, vm := range vms { + node := vm.Copy() + node.ID = nodeID + + nodes = append(nodes, node) + nodeID++ + + if !cached { + search.AddHostname(node.Name, node.ID) + search.AddDisk(node.Disk, node.ID) + search.AddType(node.Type, node.ID) + search.AddOSType(node.OSType, node.ID) + + for k, v := range node.Labels { + search.AddLabel(k, v, node.ID) + } + + for k := range node.Annotations { + search.AddAnnotation(k, node.ID) + } + } + + for i, iface := range vm.Networks { + if match := vlanAliasRegex.FindStringSubmatch(iface); match != nil { + iface = match[1] + } + + if slices.Contains(ignore, iface) { + continue + } + + if !cached { + // TODO: what if these change during an experiment (e.g., via user updates)? + search.AddVLAN(iface, node.ID) + search.AddIP(vm.IPv4[i], node.ID) + } + + network, ok := networks[iface] + if !ok { // create new node for VLAN network switch + network = mm.VM{ID: nodeID, Name: iface, Type: "Switch", Networks: []string{iface}} + networks[iface] = network + + nodes = append(nodes, network) + nodeID++ + } + + edges = append(edges, edge{ID: edgeID, Source: node.ID, Target: network.ID, Length: 150}) + edgeID++ + } + } + + if !cached { + // TODO: cache with expire? + cache.Set(cacheKey, search) + } + + topo := topology{Nodes: nodes, Edges: edges} + + if exp, err := experiment.Get(exp); err == nil { + topo.Running = exp.Running() + } + + return topo, nil +} diff --git a/src/go/app/app.go b/src/go/app/app.go index e42b04c8..27d71c92 100644 --- a/src/go/app/app.go +++ b/src/go/app/app.go @@ -338,7 +338,7 @@ func ApplyApps(ctx context.Context, exp *types.Experiment, opts ...Option) error if options.Stage == ACTIONCONFIG || options.Stage == ACTIONPRESTART { // just in case one of the apps added some nodes to the topology... - exp.Spec.Topology().Init() + exp.Spec.Topology().Init(exp.Spec.DefaultBridge()) } return nil diff --git a/src/go/app/tap.go b/src/go/app/tap.go index 41e4409c..4c05296d 100644 --- a/src/go/app/tap.go +++ b/src/go/app/tap.go @@ -85,7 +85,7 @@ func (this *Tap) PostStart(ctx context.Context, exp *types.Experiment) error { opts = append(opts, tap.PairSubnet(subnet)) } - t.Init(opts...) + t.Init(exp.Spec.DefaultBridge(), opts...) // Tap name is random, yet descriptive to the fact that it's a "tapapp" tap. t.Name = fmt.Sprintf("%s-tapapp", util.RandomString(8)) @@ -126,7 +126,7 @@ func (this *Tap) Cleanup(ctx context.Context, exp *types.Experiment) error { ) for _, t := range status.Taps { - t.Init(tap.Experiment(exp.Metadata.Name)) + t.Init(exp.Spec.DefaultBridge(), tap.Experiment(exp.Metadata.Name)) if err := t.Delete(host); err != nil { errs = multierror.Append(errs, fmt.Errorf("deleting host tap for VLAN %s: %w", t.VLAN, err)) @@ -139,7 +139,7 @@ func (this *Tap) Cleanup(ctx context.Context, exp *types.Experiment) error { func (this Tap) discoverUsedPairs() []netaddr.IPPrefix { var pairs []netaddr.IPPrefix - running, err := types.RunningExperiments() + running, err := types.Experiments(true) if err != nil { return nil } diff --git a/src/go/cmd/experiment.go b/src/go/cmd/experiment.go index 3ed4f776..7e1efd6b 100644 --- a/src/go/cmd/experiment.go +++ b/src/go/cmd/experiment.go @@ -175,6 +175,7 @@ func newExperimentCreateCmd() *cobra.Command { experiment.CreateWithVLANMin(MustGetInt(cmd.Flags(), "vlan-min")), experiment.CreateWithVLANMax(MustGetInt(cmd.Flags(), "vlan-max")), experiment.CreatedWithDisabledApplications(disabledApps), + experiment.CreateWithDefaultBridge(MustGetString(cmd.Flags(), "default-bridge")), } ctx := notes.Context(context.Background(), false) @@ -196,6 +197,7 @@ func newExperimentCreateCmd() *cobra.Command { cmd.MarkFlagRequired("topology") cmd.Flags().StringP("scenario", "s", "", "Name of an existing scenario to use (optional)") cmd.Flags().StringP("base-dir", "d", "", "Base directory to use for experiment (optional)") + cmd.Flags().StringP("default-bridge", "b", "phenix", "Default bridge name to use for experiment (optional)") cmd.Flags().Int("vlan-min", 0, "VLAN pool minimum") cmd.Flags().Int("vlan-max", 0, "VLAN pool maximum") cmd.Flags().StringSlice("disabled-apps", []string{}, "Comma separated ist of apps to disable") diff --git a/src/go/tmpl/templates/minimega_script.tmpl b/src/go/tmpl/templates/minimega_script.tmpl index 6044107c..b5dcece2 100644 --- a/src/go/tmpl/templates/minimega_script.tmpl +++ b/src/go/tmpl/templates/minimega_script.tmpl @@ -1,6 +1,10 @@ namespace {{ .ExperimentName }} ns queueing true +{{- if ne .DefaultBridge "phenix" }} +ns bridge {{ .DefaultBridge }} gre +{{- end }} + {{- if and (ne .VLANs.Min 0) (ne .VLANs.Max 0) }} vlans range {{ .VLANs.Min }} {{ .VLANs.Max }} {{- end }} diff --git a/src/go/types/experiment.go b/src/go/types/experiment.go index 7d60c46d..313db29e 100644 --- a/src/go/types/experiment.go +++ b/src/go/types/experiment.go @@ -141,7 +141,7 @@ func (this Experiment) FilesDir() string { return filepath.Join(common.PhenixBase, "images", this.Metadata.Name, "files") } -func RunningExperiments() ([]*Experiment, error) { +func Experiments(running bool) ([]*Experiment, error) { configs, err := store.List("Experiment") if err != nil { return nil, fmt.Errorf("getting list of experiment configs from store: %w", err) @@ -155,9 +155,11 @@ func RunningExperiments() ([]*Experiment, error) { return nil, fmt.Errorf("decoding experiment %s from config: %w", c.Metadata.Name, err) } - if exp.Running() { - experiments = append(experiments, exp) + if running && !exp.Running() { + continue } + + experiments = append(experiments, exp) } return experiments, nil diff --git a/src/go/types/interfaces/experiment.go b/src/go/types/interfaces/experiment.go index 12e55f05..9cc78c1b 100644 --- a/src/go/types/interfaces/experiment.go +++ b/src/go/types/interfaces/experiment.go @@ -19,6 +19,7 @@ type ExperimentSpec interface { ExperimentName() string BaseDir() string + DefaultBridge() string Topology() TopologySpec Scenario() ScenarioSpec VLANs() VLANSpec @@ -27,6 +28,7 @@ type ExperimentSpec interface { SetExperimentName(string) SetBaseDir(string) + SetDefaultBridge(string) SetVLANAlias(string, int, bool) error SetVLANRange(int, int, bool) error SetSchedule(map[string]string) diff --git a/src/go/types/interfaces/topology.go b/src/go/types/interfaces/topology.go index d9437ef7..c5a33faa 100644 --- a/src/go/types/interfaces/topology.go +++ b/src/go/types/interfaces/topology.go @@ -16,7 +16,7 @@ type TopologySpec interface { HasCommands() bool - Init() error + Init(string) error } type NodeSpec interface { diff --git a/src/go/types/version/v1/experiment.go b/src/go/types/version/v1/experiment.go index a898b9ac..4f6eec51 100644 --- a/src/go/types/version/v1/experiment.go +++ b/src/go/types/version/v1/experiment.go @@ -75,6 +75,7 @@ func (this VLANSpec) Validate() error { type ExperimentSpec struct { ExperimentNameF string `json:"experimentName,omitempty" yaml:"experimentName,omitempty" structs:"experimentName" mapstructure:"experimentName"` BaseDirF string `json:"baseDir" yaml:"baseDir" structs:"baseDir" mapstructure:"baseDir"` + DefaultBridgeF string `json:"defaultBridge" yaml:"defaultBridge" structs:"defaultBridge" mapstructure:"defaultBridge"` TopologyF *TopologySpec `json:"topology" yaml:"topology" structs:"topology" mapstructure:"topology"` ScenarioF *v2.ScenarioSpec `json:"scenario" yaml:"scenario" structs:"scenario" mapstructure:"scenario"` VLANsF *VLANSpec `json:"vlans" yaml:"vlans" structs:"vlans" mapstructure:"vlans"` @@ -87,6 +88,10 @@ func (this *ExperimentSpec) Init() error { this.BaseDirF = common.PhenixBase + "/experiments/" + this.ExperimentNameF } + if this.DefaultBridgeF == "" { + this.DefaultBridgeF = "phenix" + } + if !filepath.IsAbs(this.BaseDirF) { if absPath, err := filepath.Abs(this.BaseDirF); err == nil { this.BaseDirF = absPath @@ -107,7 +112,7 @@ func (this *ExperimentSpec) Init() error { } if this.TopologyF != nil { - if err := this.TopologyF.Init(); err != nil { + if err := this.TopologyF.Init(this.DefaultBridgeF); err != nil { return fmt.Errorf("initializing topology: %w", err) } @@ -135,6 +140,10 @@ func (this ExperimentSpec) BaseDir() string { return this.BaseDirF } +func (this ExperimentSpec) DefaultBridge() string { + return this.DefaultBridgeF +} + func (this ExperimentSpec) Topology() ifaces.TopologySpec { if this.TopologyF == nil { return new(TopologySpec) @@ -183,6 +192,10 @@ func (this *ExperimentSpec) SetBaseDir(dir string) { this.BaseDirF = dir } +func (this *ExperimentSpec) SetDefaultBridge(bridge string) { + this.DefaultBridgeF = bridge +} + func (this *ExperimentSpec) SetVLANAlias(a string, i int, f bool) error { if this.VLANsF == nil { this.VLANsF = &VLANSpec{AliasesF: make(map[string]int)} diff --git a/src/go/types/version/v1/network.go b/src/go/types/version/v1/network.go index eb0571cb..48f89188 100644 --- a/src/go/types/version/v1/network.go +++ b/src/go/types/version/v1/network.go @@ -130,6 +130,8 @@ type Interface struct { QinQF bool `json:"qinq" yaml:"qinq" structs:"qinq" mapstructure:"qinq"` RulesetInF string `json:"ruleset_in" yaml:"ruleset_in" structs:"ruleset_in" mapstructure:"ruleset_in"` RulesetOutF string `json:"ruleset_out" yaml:"ruleset_out" structs:"ruleset_out" mapstructure:"ruleset_out"` + + BridgeSetInTopo *bool `json:"-" yaml:"-" structs:"bridge_set_in_topo,omitempty" mapstructure:"bridge_set_in_topo,omitempty"` } func (this Interface) Name() string { @@ -525,10 +527,24 @@ func (this NAT) Out() string { return this.OutF } -func (this *Network) SetDefaults() { +func (this *Network) SetDefaults(bridge string) { for idx, iface := range this.InterfacesF { - if iface.BridgeF == "" { - iface.BridgeF = "phenix" + if iface.BridgeF == bridge { + continue + } + + if iface.BridgeSetInTopo == nil { + var setInTopo bool + + if iface.BridgeF != "" && iface.BridgeF != "phenix" { + setInTopo = true + } + + iface.BridgeSetInTopo = &setInTopo + } + + if setInTopo := *iface.BridgeSetInTopo; !setInTopo { + iface.BridgeF = bridge this.InterfacesF[idx] = iface } } diff --git a/src/go/types/version/v1/node.go b/src/go/types/version/v1/node.go index 80c57bb1..dbbd3c65 100644 --- a/src/go/types/version/v1/node.go +++ b/src/go/types/version/v1/node.go @@ -453,7 +453,7 @@ func (this Node) validate() error { return nil } -func (this *Node) setDefaults() { +func (this *Node) setDefaults(bridge string) { if this.External() { return } @@ -497,7 +497,7 @@ func (this *Node) setDefaults() { } if this.NetworkF != nil { - this.NetworkF.SetDefaults() + this.NetworkF.SetDefaults(bridge) } } diff --git a/src/go/types/version/v1/topology.go b/src/go/types/version/v1/topology.go index bdba3dec..9222d177 100644 --- a/src/go/types/version/v1/topology.go +++ b/src/go/types/version/v1/topology.go @@ -143,7 +143,7 @@ func (this TopologySpec) HasCommands() bool { return false } -func (this *TopologySpec) Init() error { +func (this *TopologySpec) Init(bridge string) error { var errs error for _, n := range this.NodesF { @@ -151,7 +151,7 @@ func (this *TopologySpec) Init() error { errs = multierror.Append(errs, fmt.Errorf("validating node %s: %w", n.GeneralF.HostnameF, err)) } - n.setDefaults() + n.setDefaults(bridge) } return errs diff --git a/src/go/types/version/version_test.go b/src/go/types/version/version_test.go new file mode 100644 index 00000000..c2ca430e --- /dev/null +++ b/src/go/types/version/version_test.go @@ -0,0 +1,76 @@ +package version + +import ( + "context" + "testing" + + v2 "phenix/types/version/v2" + + "github.com/getkin/kin-openapi/openapi3" + "gopkg.in/yaml.v3" +) + +var topo = ` +nodes: +- general: + hostname: host-00 + hardware: + drives: + - image: miniccc.qc2 + memory: 512 + vcpus: 1 + os_type: linux + network: + interfaces: + - address: 10.0.0.1 + mask: 24 + gateway: 10.0.0.254 + name: IF0 + proto: static + type: ethernet + vlan: EXP + type: VirtualMachine +- general: + hostname: hil +# network: +# interfaces: +# - address: 192.168.86.177 +# name: IF0 + type: HIL + external: true +` + +func TestSchema(t *testing.T) { + s, err := openapi3.NewLoader().LoadFromData(v2.OpenAPI) + if err != nil { + t.Log(err) + t.FailNow() + } + + if err := s.Validate(context.Background()); err != nil { + t.Log(err) + t.FailNow() + } + + ref, ok := s.Components.Schemas["Topology"] + if !ok { + t.Log("missing Topology schema") + t.FailNow() + } + + var spec interface{} + if err := yaml.Unmarshal([]byte(topo), &spec); err != nil { + t.Log(err) + t.FailNow() + } + + /* + body, _ := json.Marshal(spec) + json.Unmarshal(body, &spec) + */ + + if err := ref.Value.VisitJSON(spec); err != nil { + t.Log(err) + t.FailNow() + } +} diff --git a/src/go/util/cache/cache.go b/src/go/util/cache/cache.go new file mode 100644 index 00000000..30d39fce --- /dev/null +++ b/src/go/util/cache/cache.go @@ -0,0 +1,11 @@ +package cache + +import ( + "time" +) + +type Cache interface { + Get(string) (any, bool) + Set(string, any) error + SetWithExpire(string, any, time.Duration) error +} diff --git a/src/go/util/cache/go-cache.go b/src/go/util/cache/go-cache.go new file mode 100644 index 00000000..bdbe897f --- /dev/null +++ b/src/go/util/cache/go-cache.go @@ -0,0 +1,34 @@ +package cache + +import ( + "time" + + gocache "github.com/patrickmn/go-cache" +) + +type GoCache struct { + c *gocache.Cache +} + +func NewGoCache() *GoCache { + return &GoCache{c: gocache.New(gocache.NoExpiration, 30*time.Second)} +} + +func (this GoCache) Get(key string) (any, bool) { + v, ok := this.c.Get(key) + if !ok { + return nil, false + } + + return v, true +} + +func (this *GoCache) Set(key string, val any) error { + this.c.Set(key, val, -1) + return nil +} + +func (this *GoCache) SetWithExpire(key string, val any, exp time.Duration) error { + this.c.Set(key, val, exp) + return nil +} diff --git a/src/go/util/cache/package.go b/src/go/util/cache/package.go new file mode 100644 index 00000000..7bbd9ada --- /dev/null +++ b/src/go/util/cache/package.go @@ -0,0 +1,17 @@ +package cache + +import "time" + +var DefaultCache Cache = NewGoCache() + +func Get(key string) (any, bool) { + return DefaultCache.Get(key) +} + +func Set(key string, val any) error { + return DefaultCache.Set(key, val) +} + +func SetWithExpire(key string, val any, exp time.Duration) error { + return DefaultCache.SetWithExpire(key, val, exp) +} diff --git a/src/go/util/mm/types.go b/src/go/util/mm/types.go index 87c6d6d2..ffb98aad 100644 --- a/src/go/util/mm/types.go +++ b/src/go/util/mm/types.go @@ -202,16 +202,19 @@ func (this VMs) Paginate(page, size int) VMs { type VM struct { ID int `json:"id"` Name string `json:"name"` + Type string `json:"type"` Experiment string `json:"experiment"` Host string `json:"host"` IPv4 []string `json:"ipv4"` CPUs int `json:"cpus"` RAM int `json:"ram"` Disk string `json:"disk"` + OSType string `json:"osType"` DoNotBoot bool `json:"dnb"` Networks []string `json:"networks"` Taps []string `json:"taps"` Captures []Capture `json:"captures"` + State string `json:"state"` Running bool `json:"running"` Busy bool `json:"busy"` CCActive bool `json:"ccActive"` @@ -226,19 +229,38 @@ type VM struct { Interfaces map[string]string `json:"-"` // Used internally for showing VM details. - Type string `json:"-"` - OSType string `json:"-"` Metadata map[string]interface{} `json:"-"` Labels map[string]string `json:"-"` Annotations map[string]interface{} `json:"-"` - // Used internally to track state of VM in minimega. - State string `json:"-"` - // Used internally to check for active CC agent. UUID string `json:"-"` } +// Copy returns a deep copy of the VM. It only makes deep copies of fields that +// are exported as JSON. +func (this VM) Copy() VM { + vm := this + + vm.IPv4 = make([]string, len(this.IPv4)) + copy(vm.IPv4, this.IPv4) + + vm.Networks = make([]string, len(this.Networks)) + copy(vm.Networks, this.Networks) + + vm.Taps = make([]string, len(this.Taps)) + copy(vm.Taps, this.Taps) + + // This works because the Capture struct is only made up of primatives. + vm.Captures = make([]Capture, len(this.Captures)) + copy(vm.Captures, this.Captures) + + vm.Tags = make([]string, len(this.Tags)) + copy(vm.Tags, this.Tags) + + return vm +} + type Captures struct { Captures []Capture `json:"captures"` } diff --git a/src/go/util/tap/tap.go b/src/go/util/tap/tap.go index 0ae384d3..9c84c3ca 100644 --- a/src/go/util/tap/tap.go +++ b/src/go/util/tap/tap.go @@ -11,11 +11,11 @@ import ( "inet.af/netaddr" ) -func (this *Tap) Init(opts ...Option) { +func (this *Tap) Init(bridge string, opts ...Option) { this.o = NewOptions(opts...) if this.Bridge == "" { - this.Bridge = "phenix" + this.Bridge = bridge } } diff --git a/src/go/web/broker/client.go b/src/go/web/broker/client.go index 991a0c8b..04a2fc24 100644 --- a/src/go/web/broker/client.go +++ b/src/go/web/broker/client.go @@ -6,11 +6,13 @@ import ( "errors" "fmt" "net/http" + "strings" "sync" "time" "phenix/api/experiment" "phenix/api/vm" + "phenix/util/cache" "phenix/util/mm" "phenix/util/plog" "phenix/web/proto" @@ -21,6 +23,7 @@ import ( "github.com/gorilla/websocket" "google.golang.org/protobuf/encoding/protojson" + "inet.af/netaddr" ) var marshaler = protojson.MarshalOptions{EmitUnpopulated: true} @@ -139,6 +142,107 @@ func (this *Client) read() { switch req.Resource.Type { case "experiment/vms": + case "experiment/topology": + // TODO: check RBAC permissions? + + switch req.Resource.Action { + case "search": + var query map[string]string + + if err := json.Unmarshal(req.Payload, &query); err != nil { + plog.Error("cannot unmarshal request payload", "err", err) + continue + } + + // TODO: handle multiple query terms (how? AND or OR?) + // Do the same as in web/experiment.go@SearchExperimentTopology + term := query["term"] + if term == "" { + term = "hostname" + } + + value := query["value"] + if value == "" { + plog.Error("missing search value for term", "term", term) + continue + } + + cacheKey := fmt.Sprintf("experiment|%s|search", req.Resource.Name) + + val, ok := cache.Get(cacheKey) + if !ok { + // warm the cache (again?) + if _, err := vm.Topology(req.Resource.Name, nil); err != nil { + plog.Error("getting experiment topology", "exp", req.Resource.Name, "err", err) + continue + } + + val, _ = cache.Get(cacheKey) + } + + var ( + search = val.(vm.TopologySearch) + nodes []int + ) + + switch strings.ToLower(term) { + case "hostname": + if node, ok := search.Hostname[value]; ok { + nodes = []int{node} + } + case "disk": + nodes = search.Disk[value] + case "node-type": + nodes = search.Type[value] + case "os-type": + nodes = search.OSType[value] + case "label": + nodes = search.Label[value] + case "annotation": + nodes = search.Annotation[value] + case "vlan": + nodes = search.VLAN[value] + case "ip": + if net, err := netaddr.ParseIPPrefix(value); err == nil { + for k, v := range search.IP { + ip, err := netaddr.ParseIP(k) + if err != nil { + continue + } + + if net.Contains(ip) { + nodes = append(nodes, v...) + } + } + } else { + nodes = search.IP[value] + } + } + + results := map[string]any{ + "term": term, + "value": value, + "results": map[string]any{ + "nodes": nodes, + }, + } + + body, err := json.Marshal(results) + if err != nil { + plog.Error("marshaling search results for WebSocket client", "err", err) + continue + } + + this.publish <- bt.Publish{ + Resource: bt.NewResource("experiment/topology", req.Resource.Name, "search"), + Result: body, + } + + continue + default: + plog.Error("unexpected WebSocket request resource action for experiment/topology resource type", "action", req.Resource.Action) + continue + } default: plog.Error("unexpected WebSocket request resource type", "type", req.Resource.Type) continue diff --git a/src/go/web/cache/cache.go b/src/go/web/cache/cache.go index 93a93649..95282e70 100644 --- a/src/go/web/cache/cache.go +++ b/src/go/web/cache/cache.go @@ -2,6 +2,8 @@ package cache import ( "time" + + gocache "github.com/patrickmn/go-cache" ) type Status string @@ -20,12 +22,9 @@ const ( StatusCommitting Status = "committing" ) -var DefaultCache Cache = NewGoCache() - -type Cache interface { +type WebCache interface { Get(string) ([]byte, bool) Set(string, []byte) error - SetWithExpire(string, []byte, time.Duration) error Lock(string, Status, time.Duration) Status @@ -33,26 +32,62 @@ type Cache interface { Unlock(string) } -func Get(key string) ([]byte, bool) { - return DefaultCache.Get(key) +type GoWebCache struct { + c *gocache.Cache +} + +func NewGoWebCache() *GoWebCache { + return &GoWebCache{c: gocache.New(gocache.NoExpiration, 30*time.Second)} +} + +func (this GoWebCache) Get(key string) ([]byte, bool) { + v, ok := this.c.Get(key) + if !ok { + return nil, false + } + + return v.([]byte), true } -func Set(key string, val []byte) error { - return DefaultCache.Set(key, val) +func (this *GoWebCache) Set(key string, val []byte) error { + this.c.Set(key, val, -1) + return nil } -func SetWithExpire(key string, val []byte, exp time.Duration) error { - return DefaultCache.SetWithExpire(key, val, exp) +func (this *GoWebCache) SetWithExpire(key string, val []byte, exp time.Duration) error { + this.c.Set(key, val, exp) + return nil } -func Lock(key string, status Status, exp time.Duration) Status { - return DefaultCache.Lock(key, status, exp) +func (this *GoWebCache) Lock(key string, status Status, exp time.Duration) Status { + key = "LOCK|" + key + + if err := this.c.Add(key, status, exp); err != nil { + v, ok := this.c.Get(key) + + // This *might* happen if the key expires or is deleted between + // calling `Add` and `Get`. + if !ok { + return "" + } + + return v.(Status) + } + + return "" } -func Locked(key string) Status { - return DefaultCache.Locked(key) +func (this *GoWebCache) Locked(key string) Status { + key = "LOCK|" + key + + v, ok := this.c.Get(key) + if !ok { + return "" + } + + return v.(Status) } -func Unlock(key string) { - DefaultCache.Unlock(key) +func (this *GoWebCache) Unlock(key string) { + this.c.Delete("LOCK|" + key) } diff --git a/src/go/web/cache/go-cache.go b/src/go/web/cache/go-cache.go deleted file mode 100644 index efae9a2e..00000000 --- a/src/go/web/cache/go-cache.go +++ /dev/null @@ -1,67 +0,0 @@ -package cache - -import ( - "time" - - gocache "github.com/patrickmn/go-cache" -) - -type GoCache struct { - c *gocache.Cache -} - -func NewGoCache() *GoCache { - return &GoCache{c: gocache.New(gocache.NoExpiration, 30*time.Second)} -} - -func (this GoCache) Get(key string) ([]byte, bool) { - v, ok := this.c.Get(key) - if !ok { - return nil, false - } - - return v.([]byte), true -} - -func (this *GoCache) Set(key string, val []byte) error { - this.c.Set(key, val, -1) - return nil -} - -func (this *GoCache) SetWithExpire(key string, val []byte, exp time.Duration) error { - this.c.Set(key, val, exp) - return nil -} - -func (this *GoCache) Lock(key string, status Status, exp time.Duration) Status { - key = "LOCK|" + key - - if err := this.c.Add(key, status, exp); err != nil { - v, ok := this.c.Get(key) - - // This *might* happen if the key expires or is deleted between - // calling `Add` and `Get`. - if !ok { - return "" - } - - return v.(Status) - } - - return "" -} - -func (this *GoCache) Locked(key string) Status { - key = "LOCK|" + key - - v, ok := this.c.Get(key) - if !ok { - return "" - } - - return v.(Status) -} - -func (this *GoCache) Unlock(key string) { - this.c.Delete("LOCK|" + key) -} diff --git a/src/go/web/cache/package.go b/src/go/web/cache/package.go new file mode 100644 index 00000000..7907df6a --- /dev/null +++ b/src/go/web/cache/package.go @@ -0,0 +1,29 @@ +package cache + +import "time" + +var DefaultWebCache WebCache = NewGoWebCache() + +func Get(key string) ([]byte, bool) { + return DefaultWebCache.Get(key) +} + +func Set(key string, val []byte) error { + return DefaultWebCache.Set(key, val) +} + +func SetWithExpire(key string, val []byte, exp time.Duration) error { + return DefaultWebCache.SetWithExpire(key, val, exp) +} + +func Lock(key string, status Status, exp time.Duration) Status { + return DefaultWebCache.Lock(key, status, exp) +} + +func Locked(key string) Status { + return DefaultWebCache.Locked(key) +} + +func Unlock(key string) { + DefaultWebCache.Unlock(key) +} diff --git a/src/go/web/experiment.go b/src/go/web/experiment.go new file mode 100644 index 00000000..9c5bf436 --- /dev/null +++ b/src/go/web/experiment.go @@ -0,0 +1,147 @@ +package web + +import ( + "encoding/json" + "fmt" + "net/http" + "strings" + + "phenix/api/vm" + "phenix/util/cache" + "phenix/util/mm" + "phenix/util/plog" + "phenix/web/rbac" + "phenix/web/util" + + "github.com/gorilla/mux" + "inet.af/netaddr" +) + +type topology struct { + Nodes []mm.VM `json:"nodes"` + Edges []edge `json:"edges"` + Running bool `json:"running"` +} + +type edge struct { + ID int `json:"id"` + Source int `json:"source"` + Target int `json:"target"` + Length int `json:"length"` +} + +// GET /experiments/{name}/topology[?ignore=MGMT] +func GetExperimentTopology(w http.ResponseWriter, r *http.Request) { + plog.Debug("HTTP handler called", "handler", "GetExperimentTopology") + + var ( + ctx = r.Context() + role = ctx.Value("role").(rbac.Role) + vars = mux.Vars(r) + name = vars["name"] + + query = r.URL.Query() + ignore = query["ignore"] + ) + + if !role.Allowed("experiments/topology", "get", name) { + http.Error(w, "forbidden", http.StatusForbidden) + return + } + + topo, err := vm.Topology(name, ignore) + if err != nil { + http.Error(w, "unable to get experiment topology", http.StatusBadRequest) + } + + body, err := json.Marshal(topo) + if err != nil { + http.Error(w, "unable to convert topology", http.StatusInternalServerError) + } + + w.Write(body) +} + +// GET /experiments/{name}/topology/search?hostname=xyz&vlan=abc +func SearchExperimentTopology(w http.ResponseWriter, r *http.Request) { + plog.Debug("HTTP handler called", "handler", "SearchExperimentTopology") + + var ( + ctx = r.Context() + role = ctx.Value("role").(rbac.Role) + vars = mux.Vars(r) + name = vars["name"] + + query = r.URL.Query() + ) + + if !role.Allowed("experiments/topology", "get", name) { + http.Error(w, "forbidden", http.StatusForbidden) + return + } + + cacheKey := fmt.Sprintf("experiment|%s|search", name) + + val, ok := cache.Get(cacheKey) + if !ok { + if _, err := vm.Topology(name, nil); err != nil { + http.Error(w, "error getting experiment topology", http.StatusBadRequest) + return + } + + val, _ = cache.Get(cacheKey) + } + + var ( + search = val.(vm.TopologySearch) + nodes []int + ) + + // TODO: how to handle multiple terms? AND or OR? + + for term, values := range query { + value := values[0] + + switch strings.ToLower(term) { + case "hostname": + if node, ok := search.Hostname[value]; ok { + nodes = append(nodes, node) + } + case "disk": + nodes = append(nodes, search.Disk[value]...) + case "node-type": + nodes = append(nodes, search.Type[value]...) + case "os-type": + nodes = append(nodes, search.OSType[value]...) + case "label": + nodes = append(nodes, search.Label[value]...) + case "annotation": + nodes = append(nodes, search.Annotation[value]...) + case "vlan": + nodes = append(nodes, search.VLAN[value]...) + case "ip": + if net, err := netaddr.ParseIPPrefix(value); err == nil { + for k, v := range search.IP { + ip, err := netaddr.ParseIP(k) + if err != nil { + continue + } + + if net.Contains(ip) { + nodes = append(nodes, v...) + } + } + } else { + nodes = append(nodes, search.IP[value]...) + } + } + } + + body, err := json.Marshal(util.WithRoot("nodes", nodes)) + if err != nil { + http.Error(w, "error marshaling search results", http.StatusInternalServerError) + return + } + + w.Write(body) +} diff --git a/src/go/web/handlers.go b/src/go/web/handlers.go index f2f4df46..3d09b08d 100644 --- a/src/go/web/handlers.go +++ b/src/go/web/handlers.go @@ -186,6 +186,7 @@ func CreateExperiment(w http.ResponseWriter, r *http.Request) { experiment.CreateWithVLANMax(int(req.VlanMax)), experiment.CreatedWithDisabledApplications(req.DisabledApps), experiment.CreateWithDeployMode(deployMode), + experiment.CreateWithDefaultBridge(req.DefaultBridge), } if req.WorkflowBranch != "" { diff --git a/src/go/web/netflow.go b/src/go/web/netflow.go new file mode 100644 index 00000000..1d5bca27 --- /dev/null +++ b/src/go/web/netflow.go @@ -0,0 +1,260 @@ +package web + +import ( + "errors" + "net/http" + "time" + + "phenix/api/experiment" + "phenix/util/plog" + "phenix/web/rbac" + + putil "phenix/util" + + "github.com/gorilla/mux" + "github.com/gorilla/websocket" +) + +// GET /experiments/{exp}/netflow +func GetNetflow(w http.ResponseWriter, r *http.Request) { + plog.Debug("HTTP handler called", "handler", "GetNetflow") + + var ( + ctx = r.Context() + role = ctx.Value("role").(rbac.Role) + vars = mux.Vars(r) + exp = vars["exp"] + ) + + if !role.Allowed("experiments/netflow", "get", exp) { + http.Error(w, "forbidden", http.StatusForbidden) + return + } + + if flow := experiment.GetNetflow(exp); flow != nil { + w.WriteHeader(http.StatusNoContent) + return + } + + w.WriteHeader(http.StatusNotFound) +} + +// POST /experiments/{exp}/netflow +func StartNetflow(w http.ResponseWriter, r *http.Request) { + plog.Debug("HTTP handler called", "handler", "StartNetflow") + + var ( + ctx = r.Context() + role = ctx.Value("role").(rbac.Role) + vars = mux.Vars(r) + exp = vars["exp"] + ) + + if !role.Allowed("experiments/netflow", "create", exp) { + http.Error(w, "forbidden", http.StatusForbidden) + return + } + + if err := experiment.StartNetflow(exp); err != nil { + plog.Error("starting netflow capture", "exp", exp, "err", err) + + if errors.Is(err, experiment.ErrNetflowAlreadyStarted) { + http.Error(w, "neflow already started for experiment", http.StatusBadRequest) + return + } + + if errors.Is(err, experiment.ErrExperimentNotFound) { + http.Error(w, "unable to find experiment", http.StatusBadRequest) + return + } + + if errors.Is(err, experiment.ErrExperimentNotRunning) { + http.Error(w, "cannot start netflow on stopped experiment", http.StatusConflict) + return + } + + if errors.Is(err, experiment.ErrNetflowPhenixBridge) { + http.Error(w, "cannot start netflow on experiment with default bridge set to 'phenix'", http.StatusConflict) + return + } + + http.Error(w, "unable to start netflow capture", http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusNoContent) +} + +// DELETE /experiments/{exp}/netflow +func StopNetflow(w http.ResponseWriter, r *http.Request) { + plog.Debug("HTTP handler called", "handler", "StopNetflow") + + var ( + ctx = r.Context() + role = ctx.Value("role").(rbac.Role) + vars = mux.Vars(r) + exp = vars["exp"] + ) + + if !role.Allowed("experiments/netflow", "delete", exp) { + http.Error(w, "forbidden", http.StatusForbidden) + return + } + + if err := experiment.StopNetflow(exp); err != nil { + plog.Error("stopping netflow capture", "exp", exp, "err", err) + + if errors.Is(err, experiment.ErrNetflowNotStarted) { + http.Error(w, "not found", http.StatusNotFound) + return + } + + http.Error(w, "unable to stop netflow capture", http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusNoContent) +} + +// GET /experiments/{exp}/netflow/ws +func GetNetflowWebSocket(w http.ResponseWriter, r *http.Request) { + plog.Debug("HTTP handler called", "handler", "GetNetflowWebSocket") + + var ( + ctx = r.Context() + role = ctx.Value("role").(rbac.Role) + vars = mux.Vars(r) + exp = vars["exp"] + ) + + if !role.Allowed("experiments/netflow", "get", exp) { + http.Error(w, "forbidden", http.StatusForbidden) + return + } + + flow := experiment.GetNetflow(exp) + if flow == nil { + http.Error(w, "not found", http.StatusNotFound) + return + } + + var ( + endpoint = flow.Conn.LocalAddr().String() + + id = putil.RandomString(24) + cb = flow.NewChannel(id) + + upgrader = websocket.Upgrader{ + ReadBufferSize: 4096, + WriteBufferSize: 4096, + } + + done = make(chan struct{}) + ) + + upgrader.CheckOrigin = func(*http.Request) bool { return true } + + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + plog.Error("upgrading connection to WebSocket", "err", err) + return + } + + pongHandler := func(string) error { + plog.Info("received pong message from websocket client", "client", id) + + conn.SetReadDeadline(time.Now().Add(10 * time.Second)) + return nil + } + + closeHandler := func(code int, msg string) error { + plog.Info("received close message from websocket client", "client", id) + + var ( + message = websocket.FormatCloseMessage(code, "") + deadline = time.Now().Add(5 * time.Second) + ) + + // This will be an extra write message if we initiated the close. + conn.WriteControl(websocket.CloseMessage, message, deadline) + return nil + } + + plog.Info("ws client connected to netflow", "endpoint", endpoint, "client", id) + + go func() { // reader (for pong and close messages) + defer close(done) // stop writer + + conn.SetPongHandler(pongHandler) + conn.SetCloseHandler(closeHandler) + conn.SetReadLimit(1024) + + expected := []int{websocket.CloseNormalClosure, websocket.CloseGoingAway, websocket.CloseNoStatusReceived} + + for { + // This will error out if: + // 1. Client does not respond with pong message in time; or + // 2. Client sends a close message (either initiating it or + // responding to ours). + // Either way, we're done here. + // + // NOTE: if this errors because of a pong message not being received in + // time, the underlying socket will be closed without a WebSocket close + // message being sent. This is probably okay since it's likely that a pong + // wasn't received in time because the client no longer exists anyway. + _, _, err := conn.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, expected...) { + plog.Error("reading websocket message", "client", id, "err", err) + } + + return + } + } + }() + + go func() { // writer (for netflow, ping, and close messages) + ticker := time.NewTicker((10 * time.Second * 7) / 10) + defer ticker.Stop() + + for { + select { + case <-done: + return + case msg, open := <-cb: + if !open { + plog.Info("netflow channel closed - closing websocket", "client", id) + + var ( + message = websocket.FormatCloseMessage(websocket.CloseNormalClosure, "netflow stopped") + deadline = time.Now().Add(5 * time.Second) + ) + + // This will (eventually) end up causing the reader to exit when it + // receives the close message response from the client. + conn.WriteControl(websocket.CloseMessage, message, deadline) + return + } + + conn.SetWriteDeadline(time.Now().Add(5 * time.Second)) + + if err := conn.WriteJSON(msg); err != nil { + plog.Error("writing netflow message", "client", id, "err", err) + } + case <-ticker.C: + deadline := time.Now().Add(5 * time.Second) + + if err := conn.WriteControl(websocket.PingMessage, nil, deadline); err != nil { + plog.Error("writing ping message", "client", id, "err", err) + } + } + } + }() + + <-done // wait for reader to be done + + conn.Close() + flow.DeleteChannel(id) + + plog.Info("ws client disconnected from netflow", "endpoint", endpoint, "client", id) +} diff --git a/src/go/web/proto/experiment.proto b/src/go/web/proto/experiment.proto index 8d0525ff..8c85c182 100644 --- a/src/go/web/proto/experiment.proto +++ b/src/go/web/proto/experiment.proto @@ -71,6 +71,7 @@ message CreateExperimentRequest { string workflow_branch = 6 [json_name="workflow_branch"]; repeated string disabled_apps = 7 [json_name="disabled_apps"]; string deploy_mode = 8 [json_name="deploy_mode"]; + string default_bridge = 9 [json_name="default_bridge"]; } message SnapshotRequest { diff --git a/src/go/web/rbac/known_policy.go b/src/go/web/rbac/known_policy.go index 085a7218..0e9b7a25 100644 --- a/src/go/web/rbac/known_policy.go +++ b/src/go/web/rbac/known_policy.go @@ -1,5 +1,5 @@ // Code generated by go generate; DO NOT EDIT. -// This file was generated at build time 2024-01-23 10:14:31.579393538 -0700 MST m=+0.354711363 +// This file was generated at build time 2024-02-05 11:46:23.269706775 -0700 MST m=+0.098769361 // This contains all known role checks used in codebase package rbac @@ -28,10 +28,14 @@ var Permissions = []Permission{ {"experiments/captures", "list"}, {"experiments/files", "get"}, {"experiments/files", "list"}, + {"experiments/netflow", "create"}, + {"experiments/netflow", "delete"}, + {"experiments/netflow", "get"}, {"experiments/schedule", "create"}, {"experiments/schedule", "get"}, {"experiments/start", "update"}, {"experiments/stop", "update"}, + {"experiments/topology", "get"}, {"experiments/trigger", "create"}, {"experiments/trigger", "delete"}, {"history", "get"}, diff --git a/src/go/web/server.go b/src/go/web/server.go index 9da2c61c..42c21afb 100644 --- a/src/go/web/server.go +++ b/src/go/web/server.go @@ -160,6 +160,12 @@ func Start(opts ...ServerOption) error { api.Handle("/experiments/{name}/apps", weberror.ErrorHandler(GetExperimentApps)).Methods("GET", "OPTIONS") api.Handle("/experiments/{name}/start", weberror.ErrorHandler(StartExperiment)).Methods("POST", "OPTIONS") api.Handle("/experiments/{name}/stop", weberror.ErrorHandler(StopExperiment)).Methods("POST", "OPTIONS") + api.HandleFunc("/experiments/{exp}/netflow", GetNetflow).Methods("GET", "OPTIONS") + api.HandleFunc("/experiments/{exp}/netflow", StartNetflow).Methods("POST", "OPTIONS") + api.HandleFunc("/experiments/{exp}/netflow", StopNetflow).Methods("DELETE", "OPTIONS") + api.HandleFunc("/experiments/{exp}/netflow/ws", GetNetflowWebSocket).Methods("GET", "OPTIONS") + api.HandleFunc("/experiments/{name}/topology", GetExperimentTopology).Methods("GET", "OPTIONS") + api.HandleFunc("/experiments/{name}/topology/search", SearchExperimentTopology).Methods("GET", "OPTIONS") api.HandleFunc("/experiments/{name}/trigger", TriggerExperimentApps).Methods("POST", "OPTIONS") api.HandleFunc("/experiments/{name}/trigger", CancelTriggeredExperimentApps).Methods("DELETE", "OPTIONS") api.HandleFunc("/experiments/{name}/schedule", GetExperimentSchedule).Methods("GET", "OPTIONS") diff --git a/src/go/web/types.go b/src/go/web/types.go index 96b5fc03..056d51ef 100644 --- a/src/go/web/types.go +++ b/src/go/web/types.go @@ -1,8 +1,9 @@ package web import ( - "phenix/web/rbac" "sort" + + "phenix/web/rbac" ) type SignupRequest struct { diff --git a/src/go/web/workflow.go b/src/go/web/workflow.go index 6b117860..d2639277 100644 --- a/src/go/web/workflow.go +++ b/src/go/web/workflow.go @@ -139,6 +139,7 @@ func ApplyWorkflow(w http.ResponseWriter, r *http.Request) error { experiment.CreateWithVLANMin(wf.VLANMin()), experiment.CreateWithVLANMax(wf.VLANMax()), experiment.CreateWithDeployMode(wf.ExperimentDeployMode()), + experiment.CreateWithDefaultBridge(wf.DefaultBridgeName()), } if err := experiment.Create(ctx, opts...); err != nil { @@ -303,6 +304,16 @@ func ApplyWorkflow(w http.ResponseWriter, r *http.Request) error { } } + if len(wf.DefaultBridgeName()) > 15 { + err := weberror.NewWebError( + fmt.Errorf("default bridge name must be 15 characters or less"), + "unable to set default bridge for experiment %s", expName, + ) + + return err.SetStatus(http.StatusBadRequest) + } + + exp.Spec.SetDefaultBridge(wf.DefaultBridgeName()) exp.Spec.SetSchedule(schedules) exp.Spec.SetDeployMode(string(wf.ExperimentDeployMode())) exp.Spec.SetVLANRange(wf.VLANMin(), wf.VLANMax(), true) @@ -489,6 +500,8 @@ type workflow struct { Min int `mapstructure:"min"` Max int `mapstructure:"max"` } `mapstructure:"vlanRange"` + + DefaultBridge string `mapstructure:"defaultBridge"` } func (this workflow) AutoUpdate() bool { @@ -571,3 +584,11 @@ func (this workflow) VLANMax() int { return this.VLANRange.Max } + +func (this workflow) DefaultBridgeName() string { + if this.DefaultBridge == "" { + return "phenix" + } + + return this.DefaultBridge +} diff --git a/src/js/package.json b/src/js/package.json index 586a396d..f9671b3f 100644 --- a/src/js/package.json +++ b/src/js/package.json @@ -10,8 +10,8 @@ "test": "jest" }, "dependencies": { - "@fortawesome/fontawesome-free": "^5.15.1", - "@fortawesome/fontawesome-svg-core": "^1.2.32", + "@fortawesome/fontawesome-free": "^6.4.0", + "@fortawesome/fontawesome-svg-core": "^6.4.0", "@fortawesome/vue-fontawesome": "^2.0.6", "buefy": "^0.9.22", "d3": "^6.2.0", @@ -36,8 +36,6 @@ }, "devDependencies": { "@babel/preset-env": "^7.22.5", - "@fortawesome/fontawesome-free": "^5.15.1", - "@fortawesome/free-solid-svg-icons": "^5.15.1", "@vue/cli-plugin-babel": "^3.7.0", "@vue/cli-plugin-eslint": "^3.7.0", "@vue/cli-service": "^3.7.0", diff --git a/src/js/src/components/Experiments.vue b/src/js/src/components/Experiments.vue index 6d9dd374..d313b0dc 100644 --- a/src/js/src/components/Experiments.vue +++ b/src/js/src/components/Experiments.vue @@ -59,6 +59,9 @@ + + + @@ -592,7 +595,8 @@ vlan_max: +this.createModal.vlan_max, workflow_branch: this.createModal.branch, deploy_mode: this.createModal.deploy_mode, - disabled_apps: disabledApps + disabled_apps: disabledApps, + default_bridge: this.createModal.bridge } if ( !this.createModal.name ) { @@ -775,6 +779,7 @@ vlan_max: null, branch: null, deploy_mode: null, + bridge: null, }, experiments: [], topologies: [], diff --git a/src/js/src/components/RunningExperiment.vue b/src/js/src/components/RunningExperiment.vue index 50c0ff6c..1f2b6032 100644 --- a/src/js/src/components/RunningExperiment.vue +++ b/src/js/src/components/RunningExperiment.vue @@ -512,6 +512,12 @@   
+ + + +