diff --git a/src/go/api/config/config.go b/src/go/api/config/config.go index fc2d328c..eca4326e 100644 --- a/src/go/api/config/config.go +++ b/src/go/api/config/config.go @@ -149,6 +149,30 @@ func Init() error { return fmt.Errorf("parsing config files in %s: %w", base, err) } + // Call any hooks registered for the `startup` stage. + configs, err := store.List(AllKinds...) + if err != nil { + return fmt.Errorf("getting list of configs from store: %w", err) + } + + for _, config := range configs { + var updated bool + + for _, hook := range hooks[config.Kind] { + if err := hook("startup", &config); err != nil { + return fmt.Errorf("calling startup config hook: %w", err) + } + + updated = true + } + + if updated { + if err := store.Update(&config); err != nil { + return fmt.Errorf("updating config in store: %w", err) + } + } + } + return nil } @@ -342,6 +366,8 @@ func Edit(name string, force bool) (*store.Config, error) { } expName = exp.Spec.ExperimentName() + + // Don't allow users to edit the experiment name field. delete(c.Spec, "experimentName") } diff --git a/src/go/api/experiment/experiment.go b/src/go/api/experiment/experiment.go index 5182e4ac..b94b6612 100644 --- a/src/go/api/experiment/experiment.go +++ b/src/go/api/experiment/experiment.go @@ -2,6 +2,7 @@ package experiment import ( "context" + "errors" "fmt" "io/ioutil" "os" @@ -31,6 +32,11 @@ import ( "github.com/mitchellh/mapstructure" ) +var ( + ErrExperimentNotFound = errors.New("experiment not found") + ErrExperimentNotRunning = errors.New("experiment not running") +) + func init() { config.RegisterConfigHook("Experiment", func(stage string, c *store.Config) error { exp, err := types.DecodeExperimentFromConfig(*c) @@ -39,6 +45,18 @@ func init() { } switch stage { + case "startup": + // Experiments created before the default bridge option was added need to + // be updated to have a default `phenix` bridge. + if exp.Spec.DefaultBridge() == "" { + exp.Spec.SetDefaultBridge("phenix") + + if err := exp.Spec.Init(); err != nil { + return fmt.Errorf("re-initializing experiment with default bridge: %w", err) + } + + c.Spec = structs.MapDefaultCase(exp.Spec, structs.CASESNAKE) + } case "create": exp.Spec.SetExperimentName(c.Metadata.Name) @@ -46,6 +64,21 @@ func init() { return fmt.Errorf("initializing experiment: %w", err) } + existing, _ := types.Experiments(false) + for _, other := range existing { + if other.Metadata.Name == exp.Metadata.Name { + continue + } + + if exp.Spec.DefaultBridge() == "phenix" { + continue + } + + if other.Spec.DefaultBridge() == exp.Spec.DefaultBridge() { + return fmt.Errorf("experiment %s already using default bridge %s", other.Metadata.Name, other.Spec.DefaultBridge()) + } + } + if err := exp.Spec.VerifyScenario(context.TODO()); err != nil { return fmt.Errorf("verifying experiment scenario: %w", err) } @@ -65,6 +98,21 @@ func init() { return fmt.Errorf("re-initializing experiment (after update): %w", err) } + existing, _ := types.Experiments(false) + for _, other := range existing { + if other.Metadata.Name == exp.Metadata.Name { + continue + } + + if exp.Spec.DefaultBridge() == "phenix" { + continue + } + + if other.Spec.DefaultBridge() == exp.Spec.DefaultBridge() { + return fmt.Errorf("experiment %s already using default bridge %s", other.Metadata.Name, other.Spec.DefaultBridge()) + } + } + if exp.Spec.ExperimentName() != c.Metadata.Name { if strings.Contains(exp.Spec.BaseDir(), exp.Spec.ExperimentName()) { // If the experiment's base directory contains the current experiment @@ -183,6 +231,10 @@ func Create(ctx context.Context, opts ...CreateOption) error { return fmt.Errorf("no topology name provided") } + if len(o.defaultBridge) > 15 { + return fmt.Errorf("default bridge name must be 15 characters or less") + } + var ( kind = "Experiment" apiVersion = version.StoredVersion[kind] @@ -211,6 +263,7 @@ func Create(ctx context.Context, opts ...CreateOption) error { "experimentName": o.name, "baseDir": o.baseDir, "deployMode": o.deployMode, + "defaultBridge": o.defaultBridge, "topology": topo, } diff --git a/src/go/api/experiment/netflow.go b/src/go/api/experiment/netflow.go new file mode 100644 index 00000000..f11db444 --- /dev/null +++ b/src/go/api/experiment/netflow.go @@ -0,0 +1,219 @@ +package experiment + +import ( + "bufio" + "errors" + "fmt" + "net" + "strconv" + "strings" + "sync" + + "phenix/util/mm" +) + +type Netflow struct { + sync.RWMutex + + Bridge string + Conn *net.UDPConn + + callbacks map[string]chan map[string]any +} + +func NewNetflow(bridge string, conn *net.UDPConn) *Netflow { + return &Netflow{ + Bridge: bridge, + Conn: conn, + + callbacks: make(map[string]chan map[string]any), + } +} + +func (this *Netflow) NewChannel(id string) chan map[string]any { + this.Lock() + defer this.Unlock() + + if _, ok := this.callbacks[id]; ok { + return nil + } + + cb := make(chan map[string]any) + + this.callbacks[id] = cb + + return cb +} + +func (this *Netflow) DeleteChannel(id string) { + this.Lock() + defer this.Unlock() + + if cb, ok := this.callbacks[id]; ok { + close(cb) + + for range cb { + // draining channel so it doesn't block anything + } + } + + delete(this.callbacks, id) +} + +func (this *Netflow) Publish(body map[string]any) { + this.RLock() + defer this.RUnlock() + + for _, cb := range this.callbacks { + cb <- body + } +} + +func (this *Netflow) Close() { + this.Lock() + defer this.Unlock() + + for _, cb := range this.callbacks { + close(cb) + } + + this.callbacks = nil + this.Conn.Close() +} + +var ( + netflows = make(map[string]*Netflow) + netflowMu sync.RWMutex + + ErrNetflowNotStarted = errors.New("netflow not started for experiment") + ErrNetflowAlreadyStarted = errors.New("netflow already started for experiment") + ErrNetflowPhenixBridge = errors.New("cannot capture netflow on default phenix bridge") +) + +func init() { + // Delete netflow captures when experiments are stopped. + RegisterHook("stop", func(stage, name string) { + netflowMu.RLock() + defer netflowMu.RUnlock() + + if flow, ok := netflows[name]; ok { + // We don't need to worry about instructing minimega to delete the netflow + // capture since that will happen as part of the minimega namespace for + // this experiment being cleared. + + flow.Conn.Close() + delete(netflows, name) + } + }) +} + +func GetNetflow(exp string) *Netflow { + netflowMu.RLock() + defer netflowMu.RUnlock() + + if flow, ok := netflows[exp]; ok { + return flow + } + + return nil +} + +func StartNetflow(exp string) error { + netflowMu.Lock() + defer netflowMu.Unlock() + + if _, ok := netflows[exp]; ok { + return ErrNetflowAlreadyStarted + } + + spec, err := Get(exp) + if err != nil { + return ErrExperimentNotFound + } + + if !spec.Running() { + return ErrExperimentNotRunning + } + + if spec.Spec.DefaultBridge() == "phenix" { + return ErrNetflowPhenixBridge + } + + cluster, _ := ClusterNodes(exp) + + conn, err := net.ListenUDP("udp4", nil) + if err != nil { + return fmt.Errorf("creating UDP listener: %w", err) + } + + addr := strings.Split(conn.LocalAddr().String(), ":") + cmds := []string{ + "capture netflow mode ascii", + fmt.Sprintf("capture netflow bridge %s udp %s:%s", spec.Spec.DefaultBridge(), mm.Headnode(), addr[1]), + } + + for _, cmd := range cmds { + for _, node := range cluster { + if err := mm.MeshSend(exp, node, cmd); err != nil { + conn.Close() + return fmt.Errorf("starting netflow capture on node %s: %w", node, err) + } + } + } + + flow := NewNetflow(spec.Spec.DefaultBridge(), conn) + netflows[exp] = flow + + go func() { + scanner := bufio.NewScanner(conn) + + for scanner.Scan() { + fields := strings.Fields(scanner.Text()) + + body := make(map[string]any) + + body["proto"], _ = strconv.Atoi(fields[2]) + + src := strings.Split(fields[3], ":") + dst := strings.Split(fields[5], ":") + + body["src"] = src[0] + body["sport"], _ = strconv.Atoi(src[1]) + + body["dst"] = dst[0] + body["dport"], _ = strconv.Atoi(dst[1]) + + body["packets"], _ = strconv.Atoi(fields[6]) + body["bytes"], _ = strconv.Atoi(fields[7]) + + flow.Publish(body) + } + }() + + return nil +} + +func StopNetflow(exp string) error { + netflowMu.Lock() + defer netflowMu.Unlock() + + flow, ok := netflows[exp] + if !ok { + return ErrNetflowNotStarted + } + + cluster, _ := ClusterNodes(exp) + + cmd := fmt.Sprintf("capture netflow delete bridge %s", flow.Bridge) + + for _, node := range cluster { + if err := mm.MeshSend(exp, node, cmd); err != nil { + return fmt.Errorf("deleting netflow capture on node %s: %w", node, err) + } + } + + flow.Close() + delete(netflows, exp) + + return nil +} diff --git a/src/go/api/experiment/option.go b/src/go/api/experiment/option.go index d3d6389d..88cbfcfb 100644 --- a/src/go/api/experiment/option.go +++ b/src/go/api/experiment/option.go @@ -8,17 +8,18 @@ import ( type CreateOption func(*createOptions) type createOptions struct { - name string - annotations map[string]string - topology string - scenario string - disabledApps []string - vlanMin int - vlanMax int - vlanAliases map[string]int - schedules map[string]string - baseDir string - deployMode common.DeploymentMode + name string + annotations map[string]string + topology string + scenario string + disabledApps []string + vlanMin int + vlanMax int + vlanAliases map[string]int + schedules map[string]string + baseDir string + deployMode common.DeploymentMode + defaultBridge string } func newCreateOptions(opts ...CreateOption) createOptions { @@ -34,6 +35,10 @@ func newCreateOptions(opts ...CreateOption) createOptions { o.baseDir = common.PhenixBase + "/experiments/" + o.name } + if o.defaultBridge == "" { + o.defaultBridge = "phenix" + } + return o } @@ -103,6 +108,12 @@ func CreateWithDeployMode(m common.DeploymentMode) CreateOption { } } +func CreateWithDefaultBridge(b string) CreateOption { + return func(o *createOptions) { + o.defaultBridge = b + } +} + type SaveOption func(*saveOptions) type saveOptions struct { diff --git a/src/go/api/experiment/util.go b/src/go/api/experiment/util.go new file mode 100644 index 00000000..93cfac6a --- /dev/null +++ b/src/go/api/experiment/util.go @@ -0,0 +1,26 @@ +package experiment + +func ClusterNodes(exp string) ([]string, error) { + nodeMap := make(map[string]struct{}) + + spec, err := Get(exp) + if err != nil { + return nil, ErrExperimentNotFound + } + + if !spec.Running() { + return nil, ErrExperimentNotRunning + } + + for _, node := range spec.Status.Schedules() { + nodeMap[node] = struct{}{} + } + + var nodes []string + + for node := range nodeMap { + nodes = append(nodes, node) + } + + return nodes, nil +} diff --git a/src/go/api/scorch/break.go b/src/go/api/scorch/break.go index cdc5d905..715b6656 100644 --- a/src/go/api/scorch/break.go +++ b/src/go/api/scorch/break.go @@ -61,7 +61,7 @@ func (this Break) breakPoint(ctx context.Context, stage Action) error { if md.Tap != nil { pairs := discoverUsedPairs() - md.Tap.Init(tap.Experiment(exp), tap.UsedPairs(pairs)) + md.Tap.Init(this.options.Exp.Spec.DefaultBridge(), tap.Experiment(exp), tap.UsedPairs(pairs)) // backwards compatibility (doesn't support external access firewall rules) if v, ok := md.Tap.Other["internetAccess"]; ok { diff --git a/src/go/api/scorch/tap.go b/src/go/api/scorch/tap.go index 1f9b795c..71bf82f2 100644 --- a/src/go/api/scorch/tap.go +++ b/src/go/api/scorch/tap.go @@ -42,7 +42,7 @@ func (this Tap) Start(ctx context.Context) error { } pairs := discoverUsedPairs() - t.Init(tap.Experiment(exp), tap.UsedPairs(pairs)) + t.Init(this.options.Exp.Spec.DefaultBridge(), tap.Experiment(exp), tap.UsedPairs(pairs)) // backwards compatibility (doesn't support external access firewall rules) if v, ok := t.Other["internetAccess"]; ok { @@ -81,7 +81,7 @@ func (this Tap) Stop(ctx context.Context) error { t, ok := status.Taps[this.options.Name] if ok { - t.Init(tap.Experiment(exp)) + t.Init(this.options.Exp.Spec.DefaultBridge(), tap.Experiment(exp)) if err := t.Delete(mm.Headnode()); err != nil { return fmt.Errorf("deleting host tap for VLAN %s: %w", t.VLAN, err) @@ -98,7 +98,7 @@ func (Tap) Cleanup(context.Context) error { func discoverUsedPairs() []netaddr.IPPrefix { var pairs []netaddr.IPPrefix - running, err := types.RunningExperiments() + running, err := types.Experiments(true) if err != nil { return nil } diff --git a/src/go/api/soh/app.go b/src/go/api/soh/app.go index f837c98c..c8786c08 100644 --- a/src/go/api/soh/app.go +++ b/src/go/api/soh/app.go @@ -97,7 +97,7 @@ func (this *SOH) Configure(ctx context.Context, exp *types.Experiment) error { return fmt.Errorf("building Elastic server node: %w", err) } - exp.Spec.Topology().Init() + exp.Spec.Topology().Init(exp.Spec.DefaultBridge()) } } diff --git a/src/go/api/soh/util.go b/src/go/api/soh/util.go index 4d753255..541bdf8b 100644 --- a/src/go/api/soh/util.go +++ b/src/go/api/soh/util.go @@ -136,7 +136,7 @@ func (this *SOH) buildElasticServerNode(exp *types.Experiment, ip string, cidr i iface.SetAddress(ip) iface.SetMask(cidr) iface.SetProto("static") - iface.SetBridge("phenix") + iface.SetBridge(exp.Spec.DefaultBridge()) data := struct { Hostname string @@ -181,7 +181,7 @@ func (this *SOH) buildPacketBeatNode(exp *types.Experiment, target ifaces.NodeSp "address": ip, "mask": cidr, "proto": "static", - "bridge": "phenix", + "bridge": exp.Spec.DefaultBridge(), }, } @@ -193,7 +193,7 @@ func (this *SOH) buildPacketBeatNode(exp *types.Experiment, target ifaces.NodeSp "type": "ethernet", "vlan": iface.VLAN(), "proto": "static", - "bridge": "phenix", + "bridge": exp.Spec.DefaultBridge(), } nets = append(nets, monitorIface) diff --git a/src/go/api/vm/search.go b/src/go/api/vm/search.go new file mode 100644 index 00000000..8992794d --- /dev/null +++ b/src/go/api/vm/search.go @@ -0,0 +1,80 @@ +package vm + +import "fmt" + +type TopologySearch struct { + Hostname map[string]int `json:"hostname"` + Disk map[string][]int `json:"disk"` + Type map[string][]int `json:"node-type"` + OSType map[string][]int `json:"os-type"` + Label map[string][]int `json:"label"` + Annotation map[string][]int `json:"annotation"` + VLAN map[string][]int `json:"vlan"` + IP map[string][]int `json:"ip"` +} + +func (this *TopologySearch) AddHostname(k string, n int) { + if this.Hostname == nil { + this.Hostname = make(map[string]int) + } + + this.Hostname[k] = n +} + +func (this *TopologySearch) AddDisk(k string, n int) { + if this.Disk == nil { + this.Disk = make(map[string][]int) + } + + this.Disk[k] = append(this.Disk[k], n) +} + +func (this *TopologySearch) AddType(k string, n int) { + if this.Type == nil { + this.Type = make(map[string][]int) + } + + this.Type[k] = append(this.Type[k], n) +} + +func (this *TopologySearch) AddOSType(k string, n int) { + if this.OSType == nil { + this.OSType = make(map[string][]int) + } + + this.OSType[k] = append(this.OSType[k], n) +} + +func (this *TopologySearch) AddLabel(k, v string, n int) { + if this.Label == nil { + this.Label = make(map[string][]int) + } + + k = fmt.Sprintf("%s=%s", k, v) + + this.Label[k] = append(this.Label[k], n) +} + +func (this *TopologySearch) AddAnnotation(k string, n int) { + if this.Annotation == nil { + this.Annotation = make(map[string][]int) + } + + this.Annotation[k] = append(this.Annotation[k], n) +} + +func (this *TopologySearch) AddVLAN(k string, n int) { + if this.VLAN == nil { + this.VLAN = make(map[string][]int) + } + + this.VLAN[k] = append(this.VLAN[k], n) +} + +func (this *TopologySearch) AddIP(k string, n int) { + if this.IP == nil { + this.IP = make(map[string][]int) + } + + this.IP[k] = append(this.IP[k], n) +} diff --git a/src/go/api/vm/topology.go b/src/go/api/vm/topology.go new file mode 100644 index 00000000..70b4f819 --- /dev/null +++ b/src/go/api/vm/topology.go @@ -0,0 +1,113 @@ +package vm + +import ( + "fmt" + + "phenix/api/experiment" + "phenix/util/cache" + "phenix/util/mm" + + "golang.org/x/exp/slices" +) + +type topology struct { + Nodes []mm.VM `json:"nodes"` + Edges []edge `json:"edges"` + Running bool `json:"running"` +} + +type edge struct { + ID int `json:"id"` + Source int `json:"source"` + Target int `json:"target"` + Length int `json:"length"` +} + +func Topology(exp string, ignore []string) (topology, error) { + vms, err := List(exp) + if err != nil { + return topology{}, fmt.Errorf("getting VMs: %w", err) + } + + var ( + networks = make(map[string]mm.VM) + search TopologySearch + + cacheKey = fmt.Sprintf("experiment|%s|search", exp) + cached bool + + nodes []mm.VM + nodeID int + edges []edge + edgeID int + ) + + if val, ok := cache.Get(cacheKey); ok { + search = val.(TopologySearch) + cached = true + } + + for _, vm := range vms { + node := vm.Copy() + node.ID = nodeID + + nodes = append(nodes, node) + nodeID++ + + if !cached { + search.AddHostname(node.Name, node.ID) + search.AddDisk(node.Disk, node.ID) + search.AddType(node.Type, node.ID) + search.AddOSType(node.OSType, node.ID) + + for k, v := range node.Labels { + search.AddLabel(k, v, node.ID) + } + + for k := range node.Annotations { + search.AddAnnotation(k, node.ID) + } + } + + for i, iface := range vm.Networks { + if match := vlanAliasRegex.FindStringSubmatch(iface); match != nil { + iface = match[1] + } + + if slices.Contains(ignore, iface) { + continue + } + + if !cached { + // TODO: what if these change during an experiment (e.g., via user updates)? + search.AddVLAN(iface, node.ID) + search.AddIP(vm.IPv4[i], node.ID) + } + + network, ok := networks[iface] + if !ok { // create new node for VLAN network switch + network = mm.VM{ID: nodeID, Name: iface, Type: "Switch", Networks: []string{iface}} + networks[iface] = network + + nodes = append(nodes, network) + nodeID++ + } + + edges = append(edges, edge{ID: edgeID, Source: node.ID, Target: network.ID, Length: 150}) + edgeID++ + } + } + + if !cached { + // TODO: cache with expire? + cache.Set(cacheKey, search) + } + + topo := topology{Nodes: nodes, Edges: edges} + + if exp, err := experiment.Get(exp); err == nil { + topo.Running = exp.Running() + } + + return topo, nil +} diff --git a/src/go/app/app.go b/src/go/app/app.go index e42b04c8..27d71c92 100644 --- a/src/go/app/app.go +++ b/src/go/app/app.go @@ -338,7 +338,7 @@ func ApplyApps(ctx context.Context, exp *types.Experiment, opts ...Option) error if options.Stage == ACTIONCONFIG || options.Stage == ACTIONPRESTART { // just in case one of the apps added some nodes to the topology... - exp.Spec.Topology().Init() + exp.Spec.Topology().Init(exp.Spec.DefaultBridge()) } return nil diff --git a/src/go/app/tap.go b/src/go/app/tap.go index 41e4409c..4c05296d 100644 --- a/src/go/app/tap.go +++ b/src/go/app/tap.go @@ -85,7 +85,7 @@ func (this *Tap) PostStart(ctx context.Context, exp *types.Experiment) error { opts = append(opts, tap.PairSubnet(subnet)) } - t.Init(opts...) + t.Init(exp.Spec.DefaultBridge(), opts...) // Tap name is random, yet descriptive to the fact that it's a "tapapp" tap. t.Name = fmt.Sprintf("%s-tapapp", util.RandomString(8)) @@ -126,7 +126,7 @@ func (this *Tap) Cleanup(ctx context.Context, exp *types.Experiment) error { ) for _, t := range status.Taps { - t.Init(tap.Experiment(exp.Metadata.Name)) + t.Init(exp.Spec.DefaultBridge(), tap.Experiment(exp.Metadata.Name)) if err := t.Delete(host); err != nil { errs = multierror.Append(errs, fmt.Errorf("deleting host tap for VLAN %s: %w", t.VLAN, err)) @@ -139,7 +139,7 @@ func (this *Tap) Cleanup(ctx context.Context, exp *types.Experiment) error { func (this Tap) discoverUsedPairs() []netaddr.IPPrefix { var pairs []netaddr.IPPrefix - running, err := types.RunningExperiments() + running, err := types.Experiments(true) if err != nil { return nil } diff --git a/src/go/cmd/experiment.go b/src/go/cmd/experiment.go index 3ed4f776..7e1efd6b 100644 --- a/src/go/cmd/experiment.go +++ b/src/go/cmd/experiment.go @@ -175,6 +175,7 @@ func newExperimentCreateCmd() *cobra.Command { experiment.CreateWithVLANMin(MustGetInt(cmd.Flags(), "vlan-min")), experiment.CreateWithVLANMax(MustGetInt(cmd.Flags(), "vlan-max")), experiment.CreatedWithDisabledApplications(disabledApps), + experiment.CreateWithDefaultBridge(MustGetString(cmd.Flags(), "default-bridge")), } ctx := notes.Context(context.Background(), false) @@ -196,6 +197,7 @@ func newExperimentCreateCmd() *cobra.Command { cmd.MarkFlagRequired("topology") cmd.Flags().StringP("scenario", "s", "", "Name of an existing scenario to use (optional)") cmd.Flags().StringP("base-dir", "d", "", "Base directory to use for experiment (optional)") + cmd.Flags().StringP("default-bridge", "b", "phenix", "Default bridge name to use for experiment (optional)") cmd.Flags().Int("vlan-min", 0, "VLAN pool minimum") cmd.Flags().Int("vlan-max", 0, "VLAN pool maximum") cmd.Flags().StringSlice("disabled-apps", []string{}, "Comma separated ist of apps to disable") diff --git a/src/go/tmpl/templates/minimega_script.tmpl b/src/go/tmpl/templates/minimega_script.tmpl index 6044107c..b5dcece2 100644 --- a/src/go/tmpl/templates/minimega_script.tmpl +++ b/src/go/tmpl/templates/minimega_script.tmpl @@ -1,6 +1,10 @@ namespace {{ .ExperimentName }} ns queueing true +{{- if ne .DefaultBridge "phenix" }} +ns bridge {{ .DefaultBridge }} gre +{{- end }} + {{- if and (ne .VLANs.Min 0) (ne .VLANs.Max 0) }} vlans range {{ .VLANs.Min }} {{ .VLANs.Max }} {{- end }} diff --git a/src/go/types/experiment.go b/src/go/types/experiment.go index 7d60c46d..313db29e 100644 --- a/src/go/types/experiment.go +++ b/src/go/types/experiment.go @@ -141,7 +141,7 @@ func (this Experiment) FilesDir() string { return filepath.Join(common.PhenixBase, "images", this.Metadata.Name, "files") } -func RunningExperiments() ([]*Experiment, error) { +func Experiments(running bool) ([]*Experiment, error) { configs, err := store.List("Experiment") if err != nil { return nil, fmt.Errorf("getting list of experiment configs from store: %w", err) @@ -155,9 +155,11 @@ func RunningExperiments() ([]*Experiment, error) { return nil, fmt.Errorf("decoding experiment %s from config: %w", c.Metadata.Name, err) } - if exp.Running() { - experiments = append(experiments, exp) + if running && !exp.Running() { + continue } + + experiments = append(experiments, exp) } return experiments, nil diff --git a/src/go/types/interfaces/experiment.go b/src/go/types/interfaces/experiment.go index 12e55f05..9cc78c1b 100644 --- a/src/go/types/interfaces/experiment.go +++ b/src/go/types/interfaces/experiment.go @@ -19,6 +19,7 @@ type ExperimentSpec interface { ExperimentName() string BaseDir() string + DefaultBridge() string Topology() TopologySpec Scenario() ScenarioSpec VLANs() VLANSpec @@ -27,6 +28,7 @@ type ExperimentSpec interface { SetExperimentName(string) SetBaseDir(string) + SetDefaultBridge(string) SetVLANAlias(string, int, bool) error SetVLANRange(int, int, bool) error SetSchedule(map[string]string) diff --git a/src/go/types/interfaces/topology.go b/src/go/types/interfaces/topology.go index d9437ef7..c5a33faa 100644 --- a/src/go/types/interfaces/topology.go +++ b/src/go/types/interfaces/topology.go @@ -16,7 +16,7 @@ type TopologySpec interface { HasCommands() bool - Init() error + Init(string) error } type NodeSpec interface { diff --git a/src/go/types/version/v1/experiment.go b/src/go/types/version/v1/experiment.go index a898b9ac..4f6eec51 100644 --- a/src/go/types/version/v1/experiment.go +++ b/src/go/types/version/v1/experiment.go @@ -75,6 +75,7 @@ func (this VLANSpec) Validate() error { type ExperimentSpec struct { ExperimentNameF string `json:"experimentName,omitempty" yaml:"experimentName,omitempty" structs:"experimentName" mapstructure:"experimentName"` BaseDirF string `json:"baseDir" yaml:"baseDir" structs:"baseDir" mapstructure:"baseDir"` + DefaultBridgeF string `json:"defaultBridge" yaml:"defaultBridge" structs:"defaultBridge" mapstructure:"defaultBridge"` TopologyF *TopologySpec `json:"topology" yaml:"topology" structs:"topology" mapstructure:"topology"` ScenarioF *v2.ScenarioSpec `json:"scenario" yaml:"scenario" structs:"scenario" mapstructure:"scenario"` VLANsF *VLANSpec `json:"vlans" yaml:"vlans" structs:"vlans" mapstructure:"vlans"` @@ -87,6 +88,10 @@ func (this *ExperimentSpec) Init() error { this.BaseDirF = common.PhenixBase + "/experiments/" + this.ExperimentNameF } + if this.DefaultBridgeF == "" { + this.DefaultBridgeF = "phenix" + } + if !filepath.IsAbs(this.BaseDirF) { if absPath, err := filepath.Abs(this.BaseDirF); err == nil { this.BaseDirF = absPath @@ -107,7 +112,7 @@ func (this *ExperimentSpec) Init() error { } if this.TopologyF != nil { - if err := this.TopologyF.Init(); err != nil { + if err := this.TopologyF.Init(this.DefaultBridgeF); err != nil { return fmt.Errorf("initializing topology: %w", err) } @@ -135,6 +140,10 @@ func (this ExperimentSpec) BaseDir() string { return this.BaseDirF } +func (this ExperimentSpec) DefaultBridge() string { + return this.DefaultBridgeF +} + func (this ExperimentSpec) Topology() ifaces.TopologySpec { if this.TopologyF == nil { return new(TopologySpec) @@ -183,6 +192,10 @@ func (this *ExperimentSpec) SetBaseDir(dir string) { this.BaseDirF = dir } +func (this *ExperimentSpec) SetDefaultBridge(bridge string) { + this.DefaultBridgeF = bridge +} + func (this *ExperimentSpec) SetVLANAlias(a string, i int, f bool) error { if this.VLANsF == nil { this.VLANsF = &VLANSpec{AliasesF: make(map[string]int)} diff --git a/src/go/types/version/v1/network.go b/src/go/types/version/v1/network.go index eb0571cb..48f89188 100644 --- a/src/go/types/version/v1/network.go +++ b/src/go/types/version/v1/network.go @@ -130,6 +130,8 @@ type Interface struct { QinQF bool `json:"qinq" yaml:"qinq" structs:"qinq" mapstructure:"qinq"` RulesetInF string `json:"ruleset_in" yaml:"ruleset_in" structs:"ruleset_in" mapstructure:"ruleset_in"` RulesetOutF string `json:"ruleset_out" yaml:"ruleset_out" structs:"ruleset_out" mapstructure:"ruleset_out"` + + BridgeSetInTopo *bool `json:"-" yaml:"-" structs:"bridge_set_in_topo,omitempty" mapstructure:"bridge_set_in_topo,omitempty"` } func (this Interface) Name() string { @@ -525,10 +527,24 @@ func (this NAT) Out() string { return this.OutF } -func (this *Network) SetDefaults() { +func (this *Network) SetDefaults(bridge string) { for idx, iface := range this.InterfacesF { - if iface.BridgeF == "" { - iface.BridgeF = "phenix" + if iface.BridgeF == bridge { + continue + } + + if iface.BridgeSetInTopo == nil { + var setInTopo bool + + if iface.BridgeF != "" && iface.BridgeF != "phenix" { + setInTopo = true + } + + iface.BridgeSetInTopo = &setInTopo + } + + if setInTopo := *iface.BridgeSetInTopo; !setInTopo { + iface.BridgeF = bridge this.InterfacesF[idx] = iface } } diff --git a/src/go/types/version/v1/node.go b/src/go/types/version/v1/node.go index 80c57bb1..dbbd3c65 100644 --- a/src/go/types/version/v1/node.go +++ b/src/go/types/version/v1/node.go @@ -453,7 +453,7 @@ func (this Node) validate() error { return nil } -func (this *Node) setDefaults() { +func (this *Node) setDefaults(bridge string) { if this.External() { return } @@ -497,7 +497,7 @@ func (this *Node) setDefaults() { } if this.NetworkF != nil { - this.NetworkF.SetDefaults() + this.NetworkF.SetDefaults(bridge) } } diff --git a/src/go/types/version/v1/topology.go b/src/go/types/version/v1/topology.go index bdba3dec..9222d177 100644 --- a/src/go/types/version/v1/topology.go +++ b/src/go/types/version/v1/topology.go @@ -143,7 +143,7 @@ func (this TopologySpec) HasCommands() bool { return false } -func (this *TopologySpec) Init() error { +func (this *TopologySpec) Init(bridge string) error { var errs error for _, n := range this.NodesF { @@ -151,7 +151,7 @@ func (this *TopologySpec) Init() error { errs = multierror.Append(errs, fmt.Errorf("validating node %s: %w", n.GeneralF.HostnameF, err)) } - n.setDefaults() + n.setDefaults(bridge) } return errs diff --git a/src/go/types/version/version_test.go b/src/go/types/version/version_test.go new file mode 100644 index 00000000..c2ca430e --- /dev/null +++ b/src/go/types/version/version_test.go @@ -0,0 +1,76 @@ +package version + +import ( + "context" + "testing" + + v2 "phenix/types/version/v2" + + "github.com/getkin/kin-openapi/openapi3" + "gopkg.in/yaml.v3" +) + +var topo = ` +nodes: +- general: + hostname: host-00 + hardware: + drives: + - image: miniccc.qc2 + memory: 512 + vcpus: 1 + os_type: linux + network: + interfaces: + - address: 10.0.0.1 + mask: 24 + gateway: 10.0.0.254 + name: IF0 + proto: static + type: ethernet + vlan: EXP + type: VirtualMachine +- general: + hostname: hil +# network: +# interfaces: +# - address: 192.168.86.177 +# name: IF0 + type: HIL + external: true +` + +func TestSchema(t *testing.T) { + s, err := openapi3.NewLoader().LoadFromData(v2.OpenAPI) + if err != nil { + t.Log(err) + t.FailNow() + } + + if err := s.Validate(context.Background()); err != nil { + t.Log(err) + t.FailNow() + } + + ref, ok := s.Components.Schemas["Topology"] + if !ok { + t.Log("missing Topology schema") + t.FailNow() + } + + var spec interface{} + if err := yaml.Unmarshal([]byte(topo), &spec); err != nil { + t.Log(err) + t.FailNow() + } + + /* + body, _ := json.Marshal(spec) + json.Unmarshal(body, &spec) + */ + + if err := ref.Value.VisitJSON(spec); err != nil { + t.Log(err) + t.FailNow() + } +} diff --git a/src/go/util/cache/cache.go b/src/go/util/cache/cache.go new file mode 100644 index 00000000..30d39fce --- /dev/null +++ b/src/go/util/cache/cache.go @@ -0,0 +1,11 @@ +package cache + +import ( + "time" +) + +type Cache interface { + Get(string) (any, bool) + Set(string, any) error + SetWithExpire(string, any, time.Duration) error +} diff --git a/src/go/util/cache/go-cache.go b/src/go/util/cache/go-cache.go new file mode 100644 index 00000000..bdbe897f --- /dev/null +++ b/src/go/util/cache/go-cache.go @@ -0,0 +1,34 @@ +package cache + +import ( + "time" + + gocache "github.com/patrickmn/go-cache" +) + +type GoCache struct { + c *gocache.Cache +} + +func NewGoCache() *GoCache { + return &GoCache{c: gocache.New(gocache.NoExpiration, 30*time.Second)} +} + +func (this GoCache) Get(key string) (any, bool) { + v, ok := this.c.Get(key) + if !ok { + return nil, false + } + + return v, true +} + +func (this *GoCache) Set(key string, val any) error { + this.c.Set(key, val, -1) + return nil +} + +func (this *GoCache) SetWithExpire(key string, val any, exp time.Duration) error { + this.c.Set(key, val, exp) + return nil +} diff --git a/src/go/util/cache/package.go b/src/go/util/cache/package.go new file mode 100644 index 00000000..7bbd9ada --- /dev/null +++ b/src/go/util/cache/package.go @@ -0,0 +1,17 @@ +package cache + +import "time" + +var DefaultCache Cache = NewGoCache() + +func Get(key string) (any, bool) { + return DefaultCache.Get(key) +} + +func Set(key string, val any) error { + return DefaultCache.Set(key, val) +} + +func SetWithExpire(key string, val any, exp time.Duration) error { + return DefaultCache.SetWithExpire(key, val, exp) +} diff --git a/src/go/util/mm/types.go b/src/go/util/mm/types.go index 87c6d6d2..ffb98aad 100644 --- a/src/go/util/mm/types.go +++ b/src/go/util/mm/types.go @@ -202,16 +202,19 @@ func (this VMs) Paginate(page, size int) VMs { type VM struct { ID int `json:"id"` Name string `json:"name"` + Type string `json:"type"` Experiment string `json:"experiment"` Host string `json:"host"` IPv4 []string `json:"ipv4"` CPUs int `json:"cpus"` RAM int `json:"ram"` Disk string `json:"disk"` + OSType string `json:"osType"` DoNotBoot bool `json:"dnb"` Networks []string `json:"networks"` Taps []string `json:"taps"` Captures []Capture `json:"captures"` + State string `json:"state"` Running bool `json:"running"` Busy bool `json:"busy"` CCActive bool `json:"ccActive"` @@ -226,19 +229,38 @@ type VM struct { Interfaces map[string]string `json:"-"` // Used internally for showing VM details. - Type string `json:"-"` - OSType string `json:"-"` Metadata map[string]interface{} `json:"-"` Labels map[string]string `json:"-"` Annotations map[string]interface{} `json:"-"` - // Used internally to track state of VM in minimega. - State string `json:"-"` - // Used internally to check for active CC agent. UUID string `json:"-"` } +// Copy returns a deep copy of the VM. It only makes deep copies of fields that +// are exported as JSON. +func (this VM) Copy() VM { + vm := this + + vm.IPv4 = make([]string, len(this.IPv4)) + copy(vm.IPv4, this.IPv4) + + vm.Networks = make([]string, len(this.Networks)) + copy(vm.Networks, this.Networks) + + vm.Taps = make([]string, len(this.Taps)) + copy(vm.Taps, this.Taps) + + // This works because the Capture struct is only made up of primatives. + vm.Captures = make([]Capture, len(this.Captures)) + copy(vm.Captures, this.Captures) + + vm.Tags = make([]string, len(this.Tags)) + copy(vm.Tags, this.Tags) + + return vm +} + type Captures struct { Captures []Capture `json:"captures"` } diff --git a/src/go/util/tap/tap.go b/src/go/util/tap/tap.go index 0ae384d3..9c84c3ca 100644 --- a/src/go/util/tap/tap.go +++ b/src/go/util/tap/tap.go @@ -11,11 +11,11 @@ import ( "inet.af/netaddr" ) -func (this *Tap) Init(opts ...Option) { +func (this *Tap) Init(bridge string, opts ...Option) { this.o = NewOptions(opts...) if this.Bridge == "" { - this.Bridge = "phenix" + this.Bridge = bridge } } diff --git a/src/go/web/broker/client.go b/src/go/web/broker/client.go index 991a0c8b..04a2fc24 100644 --- a/src/go/web/broker/client.go +++ b/src/go/web/broker/client.go @@ -6,11 +6,13 @@ import ( "errors" "fmt" "net/http" + "strings" "sync" "time" "phenix/api/experiment" "phenix/api/vm" + "phenix/util/cache" "phenix/util/mm" "phenix/util/plog" "phenix/web/proto" @@ -21,6 +23,7 @@ import ( "github.com/gorilla/websocket" "google.golang.org/protobuf/encoding/protojson" + "inet.af/netaddr" ) var marshaler = protojson.MarshalOptions{EmitUnpopulated: true} @@ -139,6 +142,107 @@ func (this *Client) read() { switch req.Resource.Type { case "experiment/vms": + case "experiment/topology": + // TODO: check RBAC permissions? + + switch req.Resource.Action { + case "search": + var query map[string]string + + if err := json.Unmarshal(req.Payload, &query); err != nil { + plog.Error("cannot unmarshal request payload", "err", err) + continue + } + + // TODO: handle multiple query terms (how? AND or OR?) + // Do the same as in web/experiment.go@SearchExperimentTopology + term := query["term"] + if term == "" { + term = "hostname" + } + + value := query["value"] + if value == "" { + plog.Error("missing search value for term", "term", term) + continue + } + + cacheKey := fmt.Sprintf("experiment|%s|search", req.Resource.Name) + + val, ok := cache.Get(cacheKey) + if !ok { + // warm the cache (again?) + if _, err := vm.Topology(req.Resource.Name, nil); err != nil { + plog.Error("getting experiment topology", "exp", req.Resource.Name, "err", err) + continue + } + + val, _ = cache.Get(cacheKey) + } + + var ( + search = val.(vm.TopologySearch) + nodes []int + ) + + switch strings.ToLower(term) { + case "hostname": + if node, ok := search.Hostname[value]; ok { + nodes = []int{node} + } + case "disk": + nodes = search.Disk[value] + case "node-type": + nodes = search.Type[value] + case "os-type": + nodes = search.OSType[value] + case "label": + nodes = search.Label[value] + case "annotation": + nodes = search.Annotation[value] + case "vlan": + nodes = search.VLAN[value] + case "ip": + if net, err := netaddr.ParseIPPrefix(value); err == nil { + for k, v := range search.IP { + ip, err := netaddr.ParseIP(k) + if err != nil { + continue + } + + if net.Contains(ip) { + nodes = append(nodes, v...) + } + } + } else { + nodes = search.IP[value] + } + } + + results := map[string]any{ + "term": term, + "value": value, + "results": map[string]any{ + "nodes": nodes, + }, + } + + body, err := json.Marshal(results) + if err != nil { + plog.Error("marshaling search results for WebSocket client", "err", err) + continue + } + + this.publish <- bt.Publish{ + Resource: bt.NewResource("experiment/topology", req.Resource.Name, "search"), + Result: body, + } + + continue + default: + plog.Error("unexpected WebSocket request resource action for experiment/topology resource type", "action", req.Resource.Action) + continue + } default: plog.Error("unexpected WebSocket request resource type", "type", req.Resource.Type) continue diff --git a/src/go/web/cache/cache.go b/src/go/web/cache/cache.go index 93a93649..95282e70 100644 --- a/src/go/web/cache/cache.go +++ b/src/go/web/cache/cache.go @@ -2,6 +2,8 @@ package cache import ( "time" + + gocache "github.com/patrickmn/go-cache" ) type Status string @@ -20,12 +22,9 @@ const ( StatusCommitting Status = "committing" ) -var DefaultCache Cache = NewGoCache() - -type Cache interface { +type WebCache interface { Get(string) ([]byte, bool) Set(string, []byte) error - SetWithExpire(string, []byte, time.Duration) error Lock(string, Status, time.Duration) Status @@ -33,26 +32,62 @@ type Cache interface { Unlock(string) } -func Get(key string) ([]byte, bool) { - return DefaultCache.Get(key) +type GoWebCache struct { + c *gocache.Cache +} + +func NewGoWebCache() *GoWebCache { + return &GoWebCache{c: gocache.New(gocache.NoExpiration, 30*time.Second)} +} + +func (this GoWebCache) Get(key string) ([]byte, bool) { + v, ok := this.c.Get(key) + if !ok { + return nil, false + } + + return v.([]byte), true } -func Set(key string, val []byte) error { - return DefaultCache.Set(key, val) +func (this *GoWebCache) Set(key string, val []byte) error { + this.c.Set(key, val, -1) + return nil } -func SetWithExpire(key string, val []byte, exp time.Duration) error { - return DefaultCache.SetWithExpire(key, val, exp) +func (this *GoWebCache) SetWithExpire(key string, val []byte, exp time.Duration) error { + this.c.Set(key, val, exp) + return nil } -func Lock(key string, status Status, exp time.Duration) Status { - return DefaultCache.Lock(key, status, exp) +func (this *GoWebCache) Lock(key string, status Status, exp time.Duration) Status { + key = "LOCK|" + key + + if err := this.c.Add(key, status, exp); err != nil { + v, ok := this.c.Get(key) + + // This *might* happen if the key expires or is deleted between + // calling `Add` and `Get`. + if !ok { + return "" + } + + return v.(Status) + } + + return "" } -func Locked(key string) Status { - return DefaultCache.Locked(key) +func (this *GoWebCache) Locked(key string) Status { + key = "LOCK|" + key + + v, ok := this.c.Get(key) + if !ok { + return "" + } + + return v.(Status) } -func Unlock(key string) { - DefaultCache.Unlock(key) +func (this *GoWebCache) Unlock(key string) { + this.c.Delete("LOCK|" + key) } diff --git a/src/go/web/cache/go-cache.go b/src/go/web/cache/go-cache.go deleted file mode 100644 index efae9a2e..00000000 --- a/src/go/web/cache/go-cache.go +++ /dev/null @@ -1,67 +0,0 @@ -package cache - -import ( - "time" - - gocache "github.com/patrickmn/go-cache" -) - -type GoCache struct { - c *gocache.Cache -} - -func NewGoCache() *GoCache { - return &GoCache{c: gocache.New(gocache.NoExpiration, 30*time.Second)} -} - -func (this GoCache) Get(key string) ([]byte, bool) { - v, ok := this.c.Get(key) - if !ok { - return nil, false - } - - return v.([]byte), true -} - -func (this *GoCache) Set(key string, val []byte) error { - this.c.Set(key, val, -1) - return nil -} - -func (this *GoCache) SetWithExpire(key string, val []byte, exp time.Duration) error { - this.c.Set(key, val, exp) - return nil -} - -func (this *GoCache) Lock(key string, status Status, exp time.Duration) Status { - key = "LOCK|" + key - - if err := this.c.Add(key, status, exp); err != nil { - v, ok := this.c.Get(key) - - // This *might* happen if the key expires or is deleted between - // calling `Add` and `Get`. - if !ok { - return "" - } - - return v.(Status) - } - - return "" -} - -func (this *GoCache) Locked(key string) Status { - key = "LOCK|" + key - - v, ok := this.c.Get(key) - if !ok { - return "" - } - - return v.(Status) -} - -func (this *GoCache) Unlock(key string) { - this.c.Delete("LOCK|" + key) -} diff --git a/src/go/web/cache/package.go b/src/go/web/cache/package.go new file mode 100644 index 00000000..7907df6a --- /dev/null +++ b/src/go/web/cache/package.go @@ -0,0 +1,29 @@ +package cache + +import "time" + +var DefaultWebCache WebCache = NewGoWebCache() + +func Get(key string) ([]byte, bool) { + return DefaultWebCache.Get(key) +} + +func Set(key string, val []byte) error { + return DefaultWebCache.Set(key, val) +} + +func SetWithExpire(key string, val []byte, exp time.Duration) error { + return DefaultWebCache.SetWithExpire(key, val, exp) +} + +func Lock(key string, status Status, exp time.Duration) Status { + return DefaultWebCache.Lock(key, status, exp) +} + +func Locked(key string) Status { + return DefaultWebCache.Locked(key) +} + +func Unlock(key string) { + DefaultWebCache.Unlock(key) +} diff --git a/src/go/web/experiment.go b/src/go/web/experiment.go new file mode 100644 index 00000000..9c5bf436 --- /dev/null +++ b/src/go/web/experiment.go @@ -0,0 +1,147 @@ +package web + +import ( + "encoding/json" + "fmt" + "net/http" + "strings" + + "phenix/api/vm" + "phenix/util/cache" + "phenix/util/mm" + "phenix/util/plog" + "phenix/web/rbac" + "phenix/web/util" + + "github.com/gorilla/mux" + "inet.af/netaddr" +) + +type topology struct { + Nodes []mm.VM `json:"nodes"` + Edges []edge `json:"edges"` + Running bool `json:"running"` +} + +type edge struct { + ID int `json:"id"` + Source int `json:"source"` + Target int `json:"target"` + Length int `json:"length"` +} + +// GET /experiments/{name}/topology[?ignore=MGMT] +func GetExperimentTopology(w http.ResponseWriter, r *http.Request) { + plog.Debug("HTTP handler called", "handler", "GetExperimentTopology") + + var ( + ctx = r.Context() + role = ctx.Value("role").(rbac.Role) + vars = mux.Vars(r) + name = vars["name"] + + query = r.URL.Query() + ignore = query["ignore"] + ) + + if !role.Allowed("experiments/topology", "get", name) { + http.Error(w, "forbidden", http.StatusForbidden) + return + } + + topo, err := vm.Topology(name, ignore) + if err != nil { + http.Error(w, "unable to get experiment topology", http.StatusBadRequest) + } + + body, err := json.Marshal(topo) + if err != nil { + http.Error(w, "unable to convert topology", http.StatusInternalServerError) + } + + w.Write(body) +} + +// GET /experiments/{name}/topology/search?hostname=xyz&vlan=abc +func SearchExperimentTopology(w http.ResponseWriter, r *http.Request) { + plog.Debug("HTTP handler called", "handler", "SearchExperimentTopology") + + var ( + ctx = r.Context() + role = ctx.Value("role").(rbac.Role) + vars = mux.Vars(r) + name = vars["name"] + + query = r.URL.Query() + ) + + if !role.Allowed("experiments/topology", "get", name) { + http.Error(w, "forbidden", http.StatusForbidden) + return + } + + cacheKey := fmt.Sprintf("experiment|%s|search", name) + + val, ok := cache.Get(cacheKey) + if !ok { + if _, err := vm.Topology(name, nil); err != nil { + http.Error(w, "error getting experiment topology", http.StatusBadRequest) + return + } + + val, _ = cache.Get(cacheKey) + } + + var ( + search = val.(vm.TopologySearch) + nodes []int + ) + + // TODO: how to handle multiple terms? AND or OR? + + for term, values := range query { + value := values[0] + + switch strings.ToLower(term) { + case "hostname": + if node, ok := search.Hostname[value]; ok { + nodes = append(nodes, node) + } + case "disk": + nodes = append(nodes, search.Disk[value]...) + case "node-type": + nodes = append(nodes, search.Type[value]...) + case "os-type": + nodes = append(nodes, search.OSType[value]...) + case "label": + nodes = append(nodes, search.Label[value]...) + case "annotation": + nodes = append(nodes, search.Annotation[value]...) + case "vlan": + nodes = append(nodes, search.VLAN[value]...) + case "ip": + if net, err := netaddr.ParseIPPrefix(value); err == nil { + for k, v := range search.IP { + ip, err := netaddr.ParseIP(k) + if err != nil { + continue + } + + if net.Contains(ip) { + nodes = append(nodes, v...) + } + } + } else { + nodes = append(nodes, search.IP[value]...) + } + } + } + + body, err := json.Marshal(util.WithRoot("nodes", nodes)) + if err != nil { + http.Error(w, "error marshaling search results", http.StatusInternalServerError) + return + } + + w.Write(body) +} diff --git a/src/go/web/handlers.go b/src/go/web/handlers.go index f2f4df46..3d09b08d 100644 --- a/src/go/web/handlers.go +++ b/src/go/web/handlers.go @@ -186,6 +186,7 @@ func CreateExperiment(w http.ResponseWriter, r *http.Request) { experiment.CreateWithVLANMax(int(req.VlanMax)), experiment.CreatedWithDisabledApplications(req.DisabledApps), experiment.CreateWithDeployMode(deployMode), + experiment.CreateWithDefaultBridge(req.DefaultBridge), } if req.WorkflowBranch != "" { diff --git a/src/go/web/netflow.go b/src/go/web/netflow.go new file mode 100644 index 00000000..1d5bca27 --- /dev/null +++ b/src/go/web/netflow.go @@ -0,0 +1,260 @@ +package web + +import ( + "errors" + "net/http" + "time" + + "phenix/api/experiment" + "phenix/util/plog" + "phenix/web/rbac" + + putil "phenix/util" + + "github.com/gorilla/mux" + "github.com/gorilla/websocket" +) + +// GET /experiments/{exp}/netflow +func GetNetflow(w http.ResponseWriter, r *http.Request) { + plog.Debug("HTTP handler called", "handler", "GetNetflow") + + var ( + ctx = r.Context() + role = ctx.Value("role").(rbac.Role) + vars = mux.Vars(r) + exp = vars["exp"] + ) + + if !role.Allowed("experiments/netflow", "get", exp) { + http.Error(w, "forbidden", http.StatusForbidden) + return + } + + if flow := experiment.GetNetflow(exp); flow != nil { + w.WriteHeader(http.StatusNoContent) + return + } + + w.WriteHeader(http.StatusNotFound) +} + +// POST /experiments/{exp}/netflow +func StartNetflow(w http.ResponseWriter, r *http.Request) { + plog.Debug("HTTP handler called", "handler", "StartNetflow") + + var ( + ctx = r.Context() + role = ctx.Value("role").(rbac.Role) + vars = mux.Vars(r) + exp = vars["exp"] + ) + + if !role.Allowed("experiments/netflow", "create", exp) { + http.Error(w, "forbidden", http.StatusForbidden) + return + } + + if err := experiment.StartNetflow(exp); err != nil { + plog.Error("starting netflow capture", "exp", exp, "err", err) + + if errors.Is(err, experiment.ErrNetflowAlreadyStarted) { + http.Error(w, "neflow already started for experiment", http.StatusBadRequest) + return + } + + if errors.Is(err, experiment.ErrExperimentNotFound) { + http.Error(w, "unable to find experiment", http.StatusBadRequest) + return + } + + if errors.Is(err, experiment.ErrExperimentNotRunning) { + http.Error(w, "cannot start netflow on stopped experiment", http.StatusConflict) + return + } + + if errors.Is(err, experiment.ErrNetflowPhenixBridge) { + http.Error(w, "cannot start netflow on experiment with default bridge set to 'phenix'", http.StatusConflict) + return + } + + http.Error(w, "unable to start netflow capture", http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusNoContent) +} + +// DELETE /experiments/{exp}/netflow +func StopNetflow(w http.ResponseWriter, r *http.Request) { + plog.Debug("HTTP handler called", "handler", "StopNetflow") + + var ( + ctx = r.Context() + role = ctx.Value("role").(rbac.Role) + vars = mux.Vars(r) + exp = vars["exp"] + ) + + if !role.Allowed("experiments/netflow", "delete", exp) { + http.Error(w, "forbidden", http.StatusForbidden) + return + } + + if err := experiment.StopNetflow(exp); err != nil { + plog.Error("stopping netflow capture", "exp", exp, "err", err) + + if errors.Is(err, experiment.ErrNetflowNotStarted) { + http.Error(w, "not found", http.StatusNotFound) + return + } + + http.Error(w, "unable to stop netflow capture", http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusNoContent) +} + +// GET /experiments/{exp}/netflow/ws +func GetNetflowWebSocket(w http.ResponseWriter, r *http.Request) { + plog.Debug("HTTP handler called", "handler", "GetNetflowWebSocket") + + var ( + ctx = r.Context() + role = ctx.Value("role").(rbac.Role) + vars = mux.Vars(r) + exp = vars["exp"] + ) + + if !role.Allowed("experiments/netflow", "get", exp) { + http.Error(w, "forbidden", http.StatusForbidden) + return + } + + flow := experiment.GetNetflow(exp) + if flow == nil { + http.Error(w, "not found", http.StatusNotFound) + return + } + + var ( + endpoint = flow.Conn.LocalAddr().String() + + id = putil.RandomString(24) + cb = flow.NewChannel(id) + + upgrader = websocket.Upgrader{ + ReadBufferSize: 4096, + WriteBufferSize: 4096, + } + + done = make(chan struct{}) + ) + + upgrader.CheckOrigin = func(*http.Request) bool { return true } + + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + plog.Error("upgrading connection to WebSocket", "err", err) + return + } + + pongHandler := func(string) error { + plog.Info("received pong message from websocket client", "client", id) + + conn.SetReadDeadline(time.Now().Add(10 * time.Second)) + return nil + } + + closeHandler := func(code int, msg string) error { + plog.Info("received close message from websocket client", "client", id) + + var ( + message = websocket.FormatCloseMessage(code, "") + deadline = time.Now().Add(5 * time.Second) + ) + + // This will be an extra write message if we initiated the close. + conn.WriteControl(websocket.CloseMessage, message, deadline) + return nil + } + + plog.Info("ws client connected to netflow", "endpoint", endpoint, "client", id) + + go func() { // reader (for pong and close messages) + defer close(done) // stop writer + + conn.SetPongHandler(pongHandler) + conn.SetCloseHandler(closeHandler) + conn.SetReadLimit(1024) + + expected := []int{websocket.CloseNormalClosure, websocket.CloseGoingAway, websocket.CloseNoStatusReceived} + + for { + // This will error out if: + // 1. Client does not respond with pong message in time; or + // 2. Client sends a close message (either initiating it or + // responding to ours). + // Either way, we're done here. + // + // NOTE: if this errors because of a pong message not being received in + // time, the underlying socket will be closed without a WebSocket close + // message being sent. This is probably okay since it's likely that a pong + // wasn't received in time because the client no longer exists anyway. + _, _, err := conn.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, expected...) { + plog.Error("reading websocket message", "client", id, "err", err) + } + + return + } + } + }() + + go func() { // writer (for netflow, ping, and close messages) + ticker := time.NewTicker((10 * time.Second * 7) / 10) + defer ticker.Stop() + + for { + select { + case <-done: + return + case msg, open := <-cb: + if !open { + plog.Info("netflow channel closed - closing websocket", "client", id) + + var ( + message = websocket.FormatCloseMessage(websocket.CloseNormalClosure, "netflow stopped") + deadline = time.Now().Add(5 * time.Second) + ) + + // This will (eventually) end up causing the reader to exit when it + // receives the close message response from the client. + conn.WriteControl(websocket.CloseMessage, message, deadline) + return + } + + conn.SetWriteDeadline(time.Now().Add(5 * time.Second)) + + if err := conn.WriteJSON(msg); err != nil { + plog.Error("writing netflow message", "client", id, "err", err) + } + case <-ticker.C: + deadline := time.Now().Add(5 * time.Second) + + if err := conn.WriteControl(websocket.PingMessage, nil, deadline); err != nil { + plog.Error("writing ping message", "client", id, "err", err) + } + } + } + }() + + <-done // wait for reader to be done + + conn.Close() + flow.DeleteChannel(id) + + plog.Info("ws client disconnected from netflow", "endpoint", endpoint, "client", id) +} diff --git a/src/go/web/proto/experiment.proto b/src/go/web/proto/experiment.proto index 8d0525ff..8c85c182 100644 --- a/src/go/web/proto/experiment.proto +++ b/src/go/web/proto/experiment.proto @@ -71,6 +71,7 @@ message CreateExperimentRequest { string workflow_branch = 6 [json_name="workflow_branch"]; repeated string disabled_apps = 7 [json_name="disabled_apps"]; string deploy_mode = 8 [json_name="deploy_mode"]; + string default_bridge = 9 [json_name="default_bridge"]; } message SnapshotRequest { diff --git a/src/go/web/rbac/known_policy.go b/src/go/web/rbac/known_policy.go index 085a7218..0e9b7a25 100644 --- a/src/go/web/rbac/known_policy.go +++ b/src/go/web/rbac/known_policy.go @@ -1,5 +1,5 @@ // Code generated by go generate; DO NOT EDIT. -// This file was generated at build time 2024-01-23 10:14:31.579393538 -0700 MST m=+0.354711363 +// This file was generated at build time 2024-02-05 11:46:23.269706775 -0700 MST m=+0.098769361 // This contains all known role checks used in codebase package rbac @@ -28,10 +28,14 @@ var Permissions = []Permission{ {"experiments/captures", "list"}, {"experiments/files", "get"}, {"experiments/files", "list"}, + {"experiments/netflow", "create"}, + {"experiments/netflow", "delete"}, + {"experiments/netflow", "get"}, {"experiments/schedule", "create"}, {"experiments/schedule", "get"}, {"experiments/start", "update"}, {"experiments/stop", "update"}, + {"experiments/topology", "get"}, {"experiments/trigger", "create"}, {"experiments/trigger", "delete"}, {"history", "get"}, diff --git a/src/go/web/server.go b/src/go/web/server.go index 9da2c61c..42c21afb 100644 --- a/src/go/web/server.go +++ b/src/go/web/server.go @@ -160,6 +160,12 @@ func Start(opts ...ServerOption) error { api.Handle("/experiments/{name}/apps", weberror.ErrorHandler(GetExperimentApps)).Methods("GET", "OPTIONS") api.Handle("/experiments/{name}/start", weberror.ErrorHandler(StartExperiment)).Methods("POST", "OPTIONS") api.Handle("/experiments/{name}/stop", weberror.ErrorHandler(StopExperiment)).Methods("POST", "OPTIONS") + api.HandleFunc("/experiments/{exp}/netflow", GetNetflow).Methods("GET", "OPTIONS") + api.HandleFunc("/experiments/{exp}/netflow", StartNetflow).Methods("POST", "OPTIONS") + api.HandleFunc("/experiments/{exp}/netflow", StopNetflow).Methods("DELETE", "OPTIONS") + api.HandleFunc("/experiments/{exp}/netflow/ws", GetNetflowWebSocket).Methods("GET", "OPTIONS") + api.HandleFunc("/experiments/{name}/topology", GetExperimentTopology).Methods("GET", "OPTIONS") + api.HandleFunc("/experiments/{name}/topology/search", SearchExperimentTopology).Methods("GET", "OPTIONS") api.HandleFunc("/experiments/{name}/trigger", TriggerExperimentApps).Methods("POST", "OPTIONS") api.HandleFunc("/experiments/{name}/trigger", CancelTriggeredExperimentApps).Methods("DELETE", "OPTIONS") api.HandleFunc("/experiments/{name}/schedule", GetExperimentSchedule).Methods("GET", "OPTIONS") diff --git a/src/go/web/types.go b/src/go/web/types.go index 96b5fc03..056d51ef 100644 --- a/src/go/web/types.go +++ b/src/go/web/types.go @@ -1,8 +1,9 @@ package web import ( - "phenix/web/rbac" "sort" + + "phenix/web/rbac" ) type SignupRequest struct { diff --git a/src/go/web/workflow.go b/src/go/web/workflow.go index 6b117860..d2639277 100644 --- a/src/go/web/workflow.go +++ b/src/go/web/workflow.go @@ -139,6 +139,7 @@ func ApplyWorkflow(w http.ResponseWriter, r *http.Request) error { experiment.CreateWithVLANMin(wf.VLANMin()), experiment.CreateWithVLANMax(wf.VLANMax()), experiment.CreateWithDeployMode(wf.ExperimentDeployMode()), + experiment.CreateWithDefaultBridge(wf.DefaultBridgeName()), } if err := experiment.Create(ctx, opts...); err != nil { @@ -303,6 +304,16 @@ func ApplyWorkflow(w http.ResponseWriter, r *http.Request) error { } } + if len(wf.DefaultBridgeName()) > 15 { + err := weberror.NewWebError( + fmt.Errorf("default bridge name must be 15 characters or less"), + "unable to set default bridge for experiment %s", expName, + ) + + return err.SetStatus(http.StatusBadRequest) + } + + exp.Spec.SetDefaultBridge(wf.DefaultBridgeName()) exp.Spec.SetSchedule(schedules) exp.Spec.SetDeployMode(string(wf.ExperimentDeployMode())) exp.Spec.SetVLANRange(wf.VLANMin(), wf.VLANMax(), true) @@ -489,6 +500,8 @@ type workflow struct { Min int `mapstructure:"min"` Max int `mapstructure:"max"` } `mapstructure:"vlanRange"` + + DefaultBridge string `mapstructure:"defaultBridge"` } func (this workflow) AutoUpdate() bool { @@ -571,3 +584,11 @@ func (this workflow) VLANMax() int { return this.VLANRange.Max } + +func (this workflow) DefaultBridgeName() string { + if this.DefaultBridge == "" { + return "phenix" + } + + return this.DefaultBridge +} diff --git a/src/js/package.json b/src/js/package.json index 586a396d..f9671b3f 100644 --- a/src/js/package.json +++ b/src/js/package.json @@ -10,8 +10,8 @@ "test": "jest" }, "dependencies": { - "@fortawesome/fontawesome-free": "^5.15.1", - "@fortawesome/fontawesome-svg-core": "^1.2.32", + "@fortawesome/fontawesome-free": "^6.4.0", + "@fortawesome/fontawesome-svg-core": "^6.4.0", "@fortawesome/vue-fontawesome": "^2.0.6", "buefy": "^0.9.22", "d3": "^6.2.0", @@ -36,8 +36,6 @@ }, "devDependencies": { "@babel/preset-env": "^7.22.5", - "@fortawesome/fontawesome-free": "^5.15.1", - "@fortawesome/free-solid-svg-icons": "^5.15.1", "@vue/cli-plugin-babel": "^3.7.0", "@vue/cli-plugin-eslint": "^3.7.0", "@vue/cli-service": "^3.7.0", diff --git a/src/js/src/components/Experiments.vue b/src/js/src/components/Experiments.vue index 6d9dd374..d313b0dc 100644 --- a/src/js/src/components/Experiments.vue +++ b/src/js/src/components/Experiments.vue @@ -59,6 +59,9 @@ + + + @@ -592,7 +595,8 @@ vlan_max: +this.createModal.vlan_max, workflow_branch: this.createModal.branch, deploy_mode: this.createModal.deploy_mode, - disabled_apps: disabledApps + disabled_apps: disabledApps, + default_bridge: this.createModal.bridge } if ( !this.createModal.name ) { @@ -775,6 +779,7 @@ vlan_max: null, branch: null, deploy_mode: null, + bridge: null, }, experiments: [], topologies: [], diff --git a/src/js/src/components/RunningExperiment.vue b/src/js/src/components/RunningExperiment.vue index 50c0ff6c..1f2b6032 100644 --- a/src/js/src/components/RunningExperiment.vue +++ b/src/js/src/components/RunningExperiment.vue @@ -512,6 +512,12 @@   
+ + + +