Skip to content

Commit

Permalink
Added plugins feature that allows separate binaries as processes
Browse files Browse the repository at this point in the history
Used the loader which was compiled out, and had to brutilize it a bit
Need a conversation about what the intention was of some of the non-working code i it
  • Loading branch information
dahvid committed Sep 22, 2020
1 parent 13d16f4 commit 8bac583
Show file tree
Hide file tree
Showing 8 changed files with 370 additions and 69 deletions.
5 changes: 5 additions & 0 deletions graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ func (n *Graph) Add(name string, c interface{}) error {
return nil
}

// Get a component by name
func (n *Graph) Get(name string) interface{} {
return n.procs[name]
}

// AddGraph adds a new blank graph instance to a network. That instance can
// be modified then at run-time.
func (n *Graph) AddGraph(name string) error {
Expand Down
130 changes: 61 additions & 69 deletions loader.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
// +build ignore

package goflow

import (
"encoding/json"
"fmt"
"io/ioutil"
"reflect"
"strings"
Expand Down Expand Up @@ -43,98 +42,91 @@ type graphDescription struct {

// ParseJSON converts a JSON network definition string into
// a flow.Graph object that can be run or used in other networks
func ParseJSON(js []byte) *Graph {
func ParseJSON(js []byte, factory *Factory) *Graph {
// Parse JSON into Go struct
var descr graphDescription
err := json.Unmarshal(js, &descr)
if err != nil {
fmt.Println("Error parsing JSON", err)
return nil
}
// fmt.Printf("%+v\n", descr)

constructor := func() interface{} {
// Create a new Graph
net := new(Graph)
net.InitGraphState()

// Add processes to the network
for procName, procValue := range descr.Processes {
net.AddNew(procValue.Component, procName)
// Process mode detection
if procValue.Metadata.PoolSize > 0 {
proc := net.Get(procName).(*Component)
proc.Mode = ComponentModePool
proc.PoolSize = uint8(procValue.Metadata.PoolSize)
} else if procValue.Metadata.Sync {
proc := net.Get(procName).(*Component)
proc.Mode = ComponentModeSync
}
}
// Create a new Graph
//net := new(Graph)
//net.InitGraphState()
net := NewGraph()

// Add connections
for _, conn := range descr.Connections {
// Check if it is an IIP or actual connection
if conn.Data == nil {
// Add a connection
net.ConnectBuf(conn.Src.Process, conn.Src.Port, conn.Tgt.Process, conn.Tgt.Port, conn.Metadata.Buffer)
} else {
// Add an IIP
net.AddIIP(conn.Data, conn.Tgt.Process, conn.Tgt.Port)
}
}
// Add processes to the network
for procName, procValue := range descr.Processes {
net.AddNew(procName, procValue.Component, factory)
// Process mode detection
// if procValue.Metadata.PoolSize > 0 {
// proc := net.Get(procName).(*Component)
// proc.Mode = ComponentModePool
// proc.PoolSize = uint8(procValue.Metadata.PoolSize)
// } else if procValue.Metadata.Sync {
// proc := net.Get(procName).(*Component)
// proc.Mode = ComponentModeSync
// }
}

// Add port exports
for _, export := range descr.Exports {
// Split private into proc.port
procName := export.Private[:strings.Index(export.Private, ".")]
procPort := export.Private[strings.Index(export.Private, ".")+1:]
// Try to detect port direction using reflection
procType := reflect.TypeOf(net.Get(procName)).Elem()
field, fieldFound := procType.FieldByName(procPort)
if !fieldFound {
panic("Private port '" + export.Private + "' not found")
}
if field.Type.Kind() == reflect.Chan && (field.Type.ChanDir()&reflect.RecvDir) != 0 {
// It's an inport
net.MapInPort(export.Public, procName, procPort)
} else if field.Type.Kind() == reflect.Chan && (field.Type.ChanDir()&reflect.SendDir) != 0 {
// It's an outport
net.MapOutPort(export.Public, procName, procPort)
} else {
// It's not a proper port
panic("Private port '" + export.Private + "' is not a valid channel")
}
// TODO add support for subgraphs
// Add connections
for _, conn := range descr.Connections {
// Check if it is an IIP or actual connection
if conn.Data == nil {
// Add a connection
net.ConnectBuf(conn.Src.Process, conn.Src.Port, conn.Tgt.Process, conn.Tgt.Port, conn.Metadata.Buffer)
} else {
// Add an IIP
net.AddIIP(conn.Tgt.Process, conn.Tgt.Port, conn.Data)
}

return net
}

// Register a component to be reused
if descr.Properties.Name != "" {
Register(descr.Properties.Name, constructor)
// Add port exports
for _, export := range descr.Exports {
// Split private into proc.port
procName := export.Private[:strings.Index(export.Private, ".")]
procPort := export.Private[strings.Index(export.Private, ".")+1:]
// Try to detect port direction using reflection
procType := reflect.TypeOf(net.Get(procName)).Elem()
field, fieldFound := procType.FieldByName(procPort)
if !fieldFound {
panic("Private port '" + export.Private + "' not found")
}
if field.Type.Kind() == reflect.Chan && (field.Type.ChanDir()&reflect.RecvDir) != 0 {
// It's an inport
net.MapInPort(export.Public, procName, procPort)
} else if field.Type.Kind() == reflect.Chan && (field.Type.ChanDir()&reflect.SendDir) != 0 {
// It's an outport
net.MapOutPort(export.Public, procName, procPort)
} else {
// It's not a proper port
panic("Private port '" + export.Private + "' is not a valid channel")
}
// TODO add support for subgraphs
}

return constructor().(*Graph)
return net
}

// LoadJSON loads a JSON graph definition file into
// a flow.Graph object that can be run or used in other networks
func LoadJSON(filename string) *Graph {
func LoadJSON(filename string, factory *Factory) *Graph {
js, err := ioutil.ReadFile(filename)
if err != nil {
return nil
}
return ParseJSON(js)
return ParseJSON(js, factory)
}

// RegisterJSON registers an external JSON graph definition as a component
// that can be instantiated at run-time using component Factory.
// It returns true on success or false if component name is already taken.
func RegisterJSON(componentName, filePath string) bool {
var constructor ComponentConstructor
constructor = func() interface{} {
return LoadJSON(filePath)
}
return Register(componentName, constructor)
}
// func RegisterJSON(componentName, filePath string) bool {
// var constructor ComponentConstructor
// constructor = func() interface{} {
// return LoadJSON(filePath)
// }
// return Register(componentName, constructor)
// }
4 changes: 4 additions & 0 deletions make_plugs.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/bash
go install -buildmode=plugin test_plugins/Adder_goplug.go
go install -buildmode=plugin test_plugins/Plug1_goplug.go
go install -buildmode=plugin test_plugins/NGen_goplug.go
58 changes: 58 additions & 0 deletions plugins.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package goflow

import (
"io/ioutil"
"log"
"path"
"plugin"
"strings"
)

const plugInSuffix = `_goplug.so`

//PlugIn something
type PlugIn interface {
Component
Info() map[string]string
}

// LoadComponents goes through all paths, opens all plugins in those paths
// and loads them into factory
// Plugins are denoted by *_goplug.so, The filename must begin with a capitolized letter
func LoadComponents(paths []string, factory *Factory) ([]string, error) {
var loaded []string

for _, apath := range paths {
//fmt.Println("Loading plugins at", apath)
files, err := ioutil.ReadDir(apath)
if err != nil {
log.Printf("Path %s not found, error=%s.", apath, err.Error())
continue
}
for _, file := range files {
if strings.HasSuffix(file.Name(), plugInSuffix) {
plugpath := path.Join(apath, file.Name())
plug, err := plugin.Open(plugpath)
if err != nil {
log.Printf("Can't open plugin %s, error=%s.", plugpath, err.Error())
continue
}
//get name from name - _goplug.so and register with contructor
name := strings.TrimSuffix(file.Name(), plugInSuffix)
symbol, err := plug.Lookup(name)
if err != nil {
log.Printf("Can't find symbol %s in plugin %s, error=%s.", name, plugpath, err.Error())
continue
}
constructor := symbol.(func() (interface{}, error))
anerr := factory.Register(name, constructor)
if anerr != nil {
log.Printf("Failed to register plugin %s, error=%s.", plugpath, err.Error())
continue
}
loaded = append(loaded, name)
}
}
}
return loaded, nil
}
Loading

0 comments on commit 8bac583

Please sign in to comment.