Skip to content

Commit

Permalink
engine: graph: Allow send/recv to work with autogrouped resources
Browse files Browse the repository at this point in the history
We've previously not received a value from within an autogrouped
resource. It turns out this would be quite useful, and so this patch
implements the additional plumbing and testing so that this works!

Testing that an autogrouped resource can still send values has not been
done at this time.
  • Loading branch information
purpleidea committed Dec 8, 2023
1 parent bf5cc63 commit 18e1f08
Show file tree
Hide file tree
Showing 5 changed files with 217 additions and 34 deletions.
32 changes: 23 additions & 9 deletions engine/graph/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,21 +112,35 @@ func (obj *Engine) Process(ctx context.Context, vertex pgraph.Vertex) error {

// sendrecv!
// connect any senders to receivers and detect if values changed
// this actually checks and sends into resource trees recursively...
if res, ok := vertex.(engine.RecvableRes); ok {
if updated, err := obj.SendRecv(res); err != nil {
return errwrap.Wrapf(err, "could not SendRecv")
} else if len(updated) > 0 {
for _, changed := range updated {
if changed { // at least one was updated
for r, m := range updated { // map[engine.RecvableRes]map[string]bool
v, ok := r.(pgraph.Vertex)
if !ok {
continue
}
_, stateExists := obj.state[v] // autogrouped children probably don't have a state
if !stateExists {
continue
}
for _, changed := range m {
if !changed {
continue
}
// if changed == true, at least one was updated
// invalidate cache, mark as dirty
obj.state[vertex].tuid.StopTimer()
obj.state[vertex].isStateOK = false
break
obj.state[v].tuid.StopTimer()
obj.state[v].isStateOK = false
//break // we might have more vertices now
}

// re-validate after we change any values
if err := engine.Validate(r); err != nil {
return errwrap.Wrapf(err, "failed Validate after SendRecv")
}
}
// re-validate after we change any values
if err := engine.Validate(res); err != nil {
return errwrap.Wrapf(err, "failed Validate after SendRecv")
}
}
}
Expand Down
71 changes: 61 additions & 10 deletions engine/graph/sendrecv.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package graph
import (
"fmt"
"reflect"
"sort"
"strings"

"github.com/purpleidea/mgmt/engine"
engineUtil "github.com/purpleidea/mgmt/engine/util"
Expand All @@ -29,18 +31,67 @@ import (

// SendRecv pulls in the sent values into the receive slots. It is called by the
// receiver and must be given as input the full resource struct to receive on.
// It applies the loaded values to the resource.
func (obj *Engine) SendRecv(res engine.RecvableRes) (map[string]bool, error) {
recv := res.Recv()
// It applies the loaded values to the resource. It is called recursively, as it
// recurses into any grouped resources found within the first receiver. It
// returns a map of resource pointer, to resource field key, to changed boolean.
func (obj *Engine) SendRecv(res engine.RecvableRes) (map[engine.RecvableRes]map[string]bool, error) {
updated := make(map[engine.RecvableRes]map[string]bool) // list of updated keys
if obj.Debug {
obj.Logf("SendRecv: %s", res) // receiving here
}
if groupableRes, ok := res.(engine.GroupableRes); ok {
for _, x := range groupableRes.GetGroup() { // grouped elements
recvableRes, ok := x.(engine.RecvableRes)
if !ok {
continue
}
if obj.Debug {
obj.Logf("SendRecv: %s: grouped: %s", res, x) // receiving here
}
// We need to recurse here so that autogrouped resources
// inside autogrouped resources would work... In case we
// work correctly. We just need to make sure that things
// are grouped in the correct order, but that is not our
// problem! Recurse and merge in the changed results...
innerUpdated, err := obj.SendRecv(recvableRes)
if err != nil {
return nil, errwrap.Wrapf(err, "recursive SendRecv error")
}
for r, m := range innerUpdated { // res ptr, map
if _, exists := updated[r]; !exists {
updated[r] = make(map[string]bool)
}
for s, b := range m {
// don't overwrite in case one exists...
if old, exists := updated[r][s]; exists {
b = b || old // unlikely i think
}
updated[r][s] = b
}
}
}
}

recv := res.Recv()
keys := []string{}
for k := range recv { // map[string]*Send
keys = append(keys, k)
}
sort.Strings(keys)
if obj.Debug && len(keys) > 0 {
// NOTE: this could expose private resource data like passwords
obj.Logf("%s: SendRecv: %+v", res, recv)
obj.Logf("SendRecv: %s recv: %+v", res, strings.Join(keys, ", "))
}
var updated = make(map[string]bool) // list of updated keys
var err error
for k, v := range recv {
updated[k] = false // default
v.Changed = false // reset to the default
for k, v := range recv { // map[string]*Send
// v.Res // SendableRes // a handle to the resource which is sending a value
// v.Key // string // the key in the resource that we're sending
if _, exists := updated[res]; !exists {
updated[res] = make(map[string]bool)
}

updated[res][k] = false // default
v.Changed = false // reset to the default

var st interface{} = v.Res // old style direct send/recv
if true { // new style send/recv API
Expand Down Expand Up @@ -167,8 +218,8 @@ func (obj *Engine) SendRecv(res engine.RecvableRes) (map[string]bool, error) {
continue
}
//dest.Set(orig) // do it for all types that match
updated[k] = true // we updated this key!
v.Changed = true // tag this key as updated!
updated[res][k] = true // we updated this key!
v.Changed = true // tag this key as updated!
obj.Logf("SendRecv: %s.%s -> %s.%s", v.Res, v.Key, res, k)
}
return updated, err
Expand Down
32 changes: 32 additions & 0 deletions examples/lang/sendrecv-autogroup.mcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# send/recv of value1.any into print1.msg works!
value "value1" {
any => "i am value1",
}
print "print1" {
msg => "i am print1",

Meta:autogroup => false,
}
Value["value1"].any -> Print["print1"].msg

# One of these will be autogrouped into the other! The inner one can receive!
# send/recv from value2.any into print2.msg works
# send/recv from value3.any into (the usually autogrouped) print3 works too!
value "value2" {
any => "i am value2",
}
value "value3" {
any => "i am value3",
}
print "print2" {
msg => "i am print2",

Meta:autogroup => true,
}
print "print3" {
msg => "i am print3",

Meta:autogroup => true,
}
Value["value2"].any -> Print["print2"].msg
Value["value3"].any -> Print["print3"].msg
68 changes: 53 additions & 15 deletions lang/interpret_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/purpleidea/mgmt/engine"
"github.com/purpleidea/mgmt/engine/graph"
"github.com/purpleidea/mgmt/engine/graph/autoedge"
"github.com/purpleidea/mgmt/engine/graph/autogroup"
"github.com/purpleidea/mgmt/engine/local"
engineUtil "github.com/purpleidea/mgmt/engine/util"
"github.com/purpleidea/mgmt/etcd"
Expand All @@ -50,6 +51,7 @@ import (
"github.com/purpleidea/mgmt/lang/unification"
"github.com/purpleidea/mgmt/pgraph"
"github.com/purpleidea/mgmt/util"
"github.com/purpleidea/mgmt/util/errwrap"

"github.com/kylelemons/godebug/pretty"
"github.com/spf13/afero"
Expand Down Expand Up @@ -2061,6 +2063,15 @@ func TestAstFunc3(t *testing.T) {

// TODO: apply the global metaparams to the graph

// XXX: can we change this into a ge.Apply operation?
// run autogroup; modifies the graph
if err := ge.AutoGroup(&autogroup.NonReachabilityGrouper{}); err != nil {
//ge.Abort() // delete graph
t.Errorf("test #%d: FAIL", index)
t.Errorf("test #%d: error running autogrouping: %+v", index, err)
return
}

fastPause := false
ge.Pause(fastPause) // sync
if err := ge.Commit(); err != nil {
Expand Down Expand Up @@ -2091,34 +2102,24 @@ func TestAstFunc3(t *testing.T) {
t.Logf("test #%d: graph: %+v", index, ngraph)
str := strings.Trim(ngraph.Sprint(), "\n") // text format of output graph

for i, v := range ngraph.Vertices() {
for _, v := range ngraph.Vertices() {
res, ok := v.(engine.Res)
if !ok {
t.Errorf("test #%d: FAIL\n\n", index)
t.Logf("test #%d: unexpected non-resource: %+v", index, v)
return
}
m, err := engineUtil.ResToParamValues(res)

s, err := stringResFields(res)
if err != nil {
t.Errorf("test #%d: FAIL\n\n", index)
t.Logf("test #%d: can't read resource: %+v", index, err)
return
}
if i == 0 {
str += "\n"
}
keys := []string{}
for k := range m {
keys = append(keys, k)
}
sort.Strings(keys) // sort for determinism
for _, field := range keys {
v := m[field]
str += fmt.Sprintf("Field: %s[%s].%s = %s\n", res.Kind(), res.Name(), field, v)
}
if i < len(ngraph.Vertices()) {
if str != "" {
str += "\n"
}
str += s
}

if expstr == magicEmpty {
Expand Down Expand Up @@ -2162,3 +2163,40 @@ func TestAstFunc3(t *testing.T) {
t.Skip("skipping all tests...")
}
}

// stringResFields is a helper function to store a resource graph as a text
// format for test comparisons.
func stringResFields(res engine.Res) (string, error) {
m, err := engineUtil.ResToParamValues(res)
if err != nil {
return "", errwrap.Wrapf(err, "can't read resource %s", res)
}
str := ""
keys := []string{}
for k := range m {
keys = append(keys, k)
}
sort.Strings(keys) // sort for determinism
for _, field := range keys {
v := m[field]
str += fmt.Sprintf("Field: %s[%s].%s = %s\n", res.Kind(), res.Name(), field, v)
}

groupableRes, ok := res.(engine.GroupableRes)
if !ok {
return str, nil
}
for _, x := range groupableRes.GetGroup() { // grouped elements
s, err := stringResFields(x) // recurse
if err != nil {
return "", err
}
// add a prefix to each line?
s = strings.Trim(s, "\n") // trim trailing newlines
for _, f := range strings.Split(s, "\n") {
str += fmt.Sprintf("Group: %s: ", res) + f + "\n"
}
//str += s
}
return str, nil
}
48 changes: 48 additions & 0 deletions lang/interpret_test/TestAstFunc3/sendrecv-autogroup.txtar
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
-- main.mcl --
# send/recv of value1.any into print1.msg works!
value "value1" {
any => "i am value1",
}
print "print1" {
msg => "i am print1",

Meta:autogroup => false,
}
Value["value1"].any -> Print["print1"].msg

# One of these will be autogrouped into the other! The inner one can receive!
# send/recv from value2.any into print2.msg works
# send/recv from value3.any into (the usually autogrouped) print3 works too!
value "value2" {
any => "i am value2",
}
value "value3" {
any => "i am value3",
}
print "print2" {
msg => "i am print2",

Meta:autogroup => true,
}
print "print3" {
msg => "i am print3",

Meta:autogroup => true,
}
Value["value2"].any -> Print["print2"].msg
Value["value3"].any -> Print["print3"].msg
-- OUTPUT --
Edge: value[value1] -> print[print1] # value[value1] -> print[print1]
Edge: value[value2] -> print[print2] # value[value2] -> print[print2]
Edge: value[value3] -> print[print2] # value[value3] -> print[print3]
Field: print[print1].Msg = "i am value1"
Field: print[print2].Msg = "i am value2"
Field: value[value1].Any = "i am value1"
Field: value[value2].Any = "i am value2"
Field: value[value3].Any = "i am value3"
Group: print[print2]: Field: print[print3].Msg = "i am value3"
Vertex: print[print1]
Vertex: print[print2]
Vertex: value[value1]
Vertex: value[value2]
Vertex: value[value3]

0 comments on commit 18e1f08

Please sign in to comment.