Skip to content

Commit

Permalink
Merge pull request #251 from franknstyle/para
Browse files Browse the repository at this point in the history
add concurrency for resource command collection
  • Loading branch information
franknstyle authored Jun 22, 2023
2 parents 9cb89a4 + af3028d commit f907e3c
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 107 deletions.
2 changes: 1 addition & 1 deletion ssh/scp.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func CopyFrom(args SSHArgs, agent Agent, rootDir string, sourcePath string) erro
if err := wait.ExponentialBackoff(retries, func() (bool, error) {
p := e.RunProc(effectiveCmd)
if p.Err() != nil {
logrus.Warn(fmt.Sprintf("scp: copyFrom: failed to connect to %s: '%s %s': retrying connection", args.Host, p.Err(), p.Result()))
logrus.Warn(fmt.Sprintf("scp: copyFrom: failed to connect to %s:%s '%s %s': retrying connection", args.Host, args.Port, p.Err(), p.Result()))
return false, nil
}
return true, nil // worked
Expand Down
6 changes: 2 additions & 4 deletions starlark/archive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,8 @@ func TestArchiveScript(t *testing.T) {
eval func(t *testing.T, script string)
}{
{
name: "archive defaults",
script: `
result = archive(output_file="/tmp/archive.tar.gz", source_paths=["/tmp/crashd"])
`,
name: "archive defaults",
script: `result = archive(output_file="/tmp/archive.tar.gz", source_paths=["/tmp/crashd"])`,
eval: func(t *testing.T, script string) {
exe := New()
if err := exe.Exec("test.star", strings.NewReader(script)); err != nil {
Expand Down
76 changes: 42 additions & 34 deletions starlark/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,56 +80,64 @@ func captureFunc(thread *starlark.Thread, b *starlark.Builtin, args starlark.Tup
}
resultList = append(resultList, result.toStarlarkStruct())
}

return starlark.NewList(resultList), nil
}

func execCapture(cmdStr, rootPath, fileName, desc string, agent ssh.Agent, resources *starlark.List) ([]commandResult, error) {
if resources == nil {
return nil, fmt.Errorf("%s: missing resources", identifiers.capture)
}

logrus.Debugf("%s: executing command on %d resources", identifiers.capture, resources.Len())
resultsChannel := make(chan commandResult)
var results []commandResult
for i := 0; i < resources.Len(); i++ {
val := resources.Index(i)
res, ok := val.(*starlarkstruct.Struct)
if !ok {
return nil, fmt.Errorf("%s: unexpected resource type", identifiers.capture)
}

val, err := res.Attr("kind")
if err != nil {
return nil, fmt.Errorf("%s: resource.kind: %s", identifiers.capture, err)
}
kind := val.(starlark.String)

val, err = res.Attr("transport")
if err != nil {
return nil, fmt.Errorf("%s: resource.transport: %s", identifiers.capture, err)
}
transport := val.(starlark.String)
for i := 0; i < resources.Len(); i++ {
go func(i int) {
val := resources.Index(i)
res, ok := val.(*starlarkstruct.Struct)
if !ok {
resultsChannel <- commandResult{err: fmt.Errorf("%s: unexpected resource type", identifiers.capture)}
}
val, err := res.Attr("kind")
if err != nil {
resultsChannel <- commandResult{err: fmt.Errorf("%s: resource.kind: %s", identifiers.capture, err)}
}
kind := val.(starlark.String)

val, err = res.Attr("host")
if err != nil {
return nil, fmt.Errorf("%s: resource.host: %s", identifiers.capture, err)
}
host := string(val.(starlark.String))
rootDir := filepath.Join(rootPath, sanitizeStr(host))
val, err = res.Attr("transport")
if err != nil {
resultsChannel <- commandResult{err: fmt.Errorf("%s: resource.transport: %s", identifiers.capture, err)}
}
transport := val.(starlark.String)

switch {
case string(kind) == identifiers.hostResource && string(transport) == "ssh":
result, err := execCaptureSSH(host, cmdStr, rootDir, fileName, desc, agent, res)
val, err = res.Attr("host")
if err != nil {
logrus.Errorf("%s failed: cmd=[%s]: %s", identifiers.capture, cmdStr, err)
resultsChannel <- commandResult{err: fmt.Errorf("%s: resource.host: %s", identifiers.capture, err)}
}
results = append(results, result)
default:
logrus.Errorf("%s: unsupported or invalid resource kind: %s", identifiers.capture, kind)
host := string(val.(starlark.String))
rootDir := filepath.Join(rootPath, sanitizeStr(host))

switch {
case string(kind) == identifiers.hostResource && string(transport) == "ssh":
result, err := execCaptureSSH(host, cmdStr, rootDir, fileName, desc, agent, res)
if err != nil {
logrus.Errorf("%s failed: cmd=[%s]: %s", identifiers.capture, cmdStr, err)
}
resultsChannel <- result
default:
logrus.Errorf("%s: unsupported or invalid resource kind: %s", identifiers.capture, kind)
}
}(i)
}

for i := 0; i < resources.Len(); i++ {
r := <-resultsChannel
if r.err != nil {
logrus.Infof("result is: %v", r)
results = append(results, r)
} else {
continue
}
}

return results, nil
}

Expand Down
159 changes: 91 additions & 68 deletions starlark/capture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,36 +36,42 @@ func testCaptureFuncForHostResources(t *testing.T, port, privateKey, username st
if err != nil {
t.Fatal(err)
}
result := ""
if strct, ok := val.(*starlarkstruct.Struct); ok {
if val, err := strct.Attr("result"); err == nil {
if r, ok := val.(starlark.String); ok {
result = string(r)
}
}
resultList, ok := val.(*starlark.List)
if !ok {
t.Fatalf("expecting type *starlark.List, got %T", val)
}

expected := filepath.Join(defaults.workdir, sanitizeStr("127.0.0.1"), fmt.Sprintf("%s.txt", sanitizeStr("echo 'Hello World!'")))
if result != expected {
t.Errorf("unexpected file name captured: %s", result)
}
for i := 0; i < resultList.Len(); i++ {
result := ""
if strct, ok := resultList.Index(i).(*starlarkstruct.Struct); ok {
if val, err := strct.Attr("result"); err == nil {
if r, ok := val.(starlark.String); ok {
result = string(r)
}
}
}
expected := filepath.Join(defaults.workdir, sanitizeStr("127.0.0.1"), fmt.Sprintf("%s.txt", sanitizeStr("echo 'Hello World!'")))
if result != expected {
t.Errorf("unexpected file name captured: %s", result)
}

file, err := os.Open(result)
if err != nil {
t.Fatal(err)
}
buf := new(bytes.Buffer)
if _, err := io.Copy(buf, file); err != nil {
t.Fatal(err)
}
expected = strings.TrimSpace(buf.String())
if expected != "Hello World!" {
t.Errorf("unexpected content captured: %s", expected)
}
if err := file.Close(); err != nil {
t.Error(err)
file, err := os.Open(result)
if err != nil {
t.Fatal(err)
}
buf := new(bytes.Buffer)
if _, err := io.Copy(buf, file); err != nil {
t.Fatal(err)
}
expected = strings.TrimSpace(buf.String())
if expected != "Hello World!" {
t.Errorf("unexpected content captured: %s", expected)
}
if err := file.Close(); err != nil {
t.Error(err)
}
os.RemoveAll(result)
}
defer os.RemoveAll(result)
},
},

Expand All @@ -88,35 +94,43 @@ func testCaptureFuncForHostResources(t *testing.T, port, privateKey, username st
t.Fatal(err)
}

result := ""
if strct, ok := val.(*starlarkstruct.Struct); ok {
if val, err := strct.Attr("result"); err == nil {
if r, ok := val.(starlark.String); ok {
result = string(r)
resultList, ok := val.(*starlark.List)
if !ok {
t.Fatalf("expecting type *starlark.List, got %T", val)
}

for i := 0; i < resultList.Len(); i++ {
result := ""
if strct, ok := resultList.Index(i).(*starlarkstruct.Struct); ok {
if val, err := strct.Attr("result"); err == nil {
if r, ok := val.(starlark.String); ok {
result = string(r)
}
}
}
}
expected := filepath.Join(defaults.workdir, sanitizeStr("127.0.0.1"), "echo_out.txt")
if result != expected {
t.Errorf("unexpected file name captured: %s", result)
}

file, err := os.Open(result)
if err != nil {
t.Fatal(err)
}
buf := new(bytes.Buffer)
if _, err := io.Copy(buf, file); err != nil {
t.Fatal(err)
}
expected = strings.TrimSpace(buf.String())
if expected != "echo command\nHello World!" {
t.Errorf("unexpected content captured: %s", expected)
}
if err := file.Close(); err != nil {
t.Error(err)
expected := filepath.Join(defaults.workdir, sanitizeStr("127.0.0.1"), "echo_out.txt")
if result != expected {
t.Errorf("unexpected file name captured: %s", result)
}

file, err := os.Open(result)
if err != nil {
t.Fatal(err)
}
buf := new(bytes.Buffer)
if _, err := io.Copy(buf, file); err != nil {
t.Fatal(err)
}
expected = strings.TrimSpace(buf.String())
if expected != "echo command\nHello World!" {
t.Errorf("unexpected content captured: %s", expected)
}
if err := file.Close(); err != nil {
t.Error(err)
}
os.RemoveAll(result)
}
defer os.RemoveAll(result)
},
},

Expand Down Expand Up @@ -174,10 +188,12 @@ func testCaptureFuncScriptForHostResources(t *testing.T, port, privateKey, usern
tests := []struct {
name string
script string
eval func(t *testing.T, script string)

eval func(t *testing.T, script string)
}{
{
name: "default cmd multiple machines",

script: fmt.Sprintf(`
set_defaults(resources(provider = host_list_provider(hosts=["127.0.0.1","localhost"], ssh_config = ssh_config(username="%s", port="%s", private_key_path="%s", max_retries=50))))
result = capture("echo 'Hello World!'")`, username, port, privateKey),
Expand Down Expand Up @@ -234,32 +250,39 @@ result = exec(hosts)`, username, port, privateKey),
t.Fatal(err)
}

resultVal := exe.result["result"]
if resultVal == nil {
val := exe.result["result"]
if val == nil {
t.Fatal("capture() should be assigned to a variable")
}
resultList, ok := resultVal.(*starlark.List)

resultList, ok := val.(*starlark.List)
if !ok {
t.Fatal("capture() with multiple resources should return a list")
t.Fatalf("expecting type *starlark.List, got %T", val)
}

for i := 0; i < resultList.Len(); i++ {
resultStruct, ok := resultList.Index(i).(*starlarkstruct.Struct)
resultsList, ok := resultList.Index(i).(*starlark.List)
if !ok {
t.Fatalf("run(): expecting a starlark struct, got %T", resultList.Index(i))
}
val, err := resultStruct.Attr("result")
if err != nil {
t.Fatal(err)
t.Fatalf("capture(): expecting a starlark struct, got %T", resultList.Index(i))
}
result := string(val.(starlark.String))
if _, err := os.Stat(result); err != nil {
t.Fatalf("captured command file not found: %s", err)

for i := 0; i < resultsList.Len(); i++ {
resultStruct, ok := resultList.Index(i).(*starlarkstruct.Struct)
if !ok {
t.Fatalf("capture(): expecting a starlark struct, got %T", resultsList.Index(i))
}
val, err := resultStruct.Attr("result")
if err != nil {
t.Fatal(err)
}
result := string(val.(starlark.String))
if _, err := os.Stat(result); err != nil {
t.Fatalf("captured command file not found: %s", err)
}
os.RemoveAll(result)
}
os.RemoveAll(result)
}
},
},
}},
}

for _, test := range tests {
Expand Down

0 comments on commit f907e3c

Please sign in to comment.