Skip to content

Commit

Permalink
Merge pull request #395 from openziti/misc-fixes
Browse files Browse the repository at this point in the history
Miscelleous fixes
  • Loading branch information
plorenz authored Apr 24, 2024
2 parents 3d5d501 + 9c8d0a7 commit bf7b5f9
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 22 deletions.
8 changes: 4 additions & 4 deletions cmd/fablab/subcmd/refresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ var refreshCmd = &cobra.Command{

func refresh(_ *cobra.Command, _ []string) {
if err := model.Bootstrap(); err != nil {
logrus.Fatalf("unable to bootstrap (%v)", err)
logrus.WithError(err).Fatal("unable to bootstrap")
}

ctx, err := model.NewRun()
Expand All @@ -47,19 +47,19 @@ func refresh(_ *cobra.Command, _ []string) {
figlet.Figlet("configuration")

if err := ctx.GetModel().Build(ctx); err != nil {
logrus.Fatalf("error building (%v)", err)
logrus.WithError(err).Fatal("error building")
}

figlet.Figlet("distribution")

if err := ctx.GetModel().Sync(ctx); err != nil {
logrus.Fatalf("error distributing (%v)", err)
logrus.WithError(err).Fatal("error distributing")
}

figlet.Figlet("activation")

if err := ctx.GetModel().Activate(ctx); err != nil {
logrus.Fatalf("error activating (%v)", err)
logrus.WithError(err).Fatalf("error activating")
}

figlet.Figlet("FABUL0US!1!")
Expand Down
29 changes: 17 additions & 12 deletions kernel/lib/parallel/parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package parallel

import (
"context"
"github.com/michaelquigley/pfxlog"
"github.com/openziti/fablab/kernel/lib/util"
"github.com/pkg/errors"
"golang.org/x/sync/semaphore"
"sync/atomic"
)

type Task func() error
Expand All @@ -14,6 +16,8 @@ func Execute(tasks []Task, concurrency int64) error {
return errors.Errorf("invalid concurrency %v, must be at least 1", concurrency)
}

completed := atomic.Int64{}

sem := semaphore.NewWeighted(concurrency)
errorsC := make(chan error, len(tasks))
for _, task := range tasks {
Expand All @@ -25,31 +29,32 @@ func Execute(tasks []Task, concurrency int64) error {
go func() {
defer func() {
sem.Release(1)
current := completed.Add(1)
if current%10 == 0 {
pfxlog.Logger().Infof("completed %d/%d tasks", current, len(tasks))
}
if int(current) == len(tasks) {
close(errorsC)
}
}()
if err := boundTask(); err != nil {
errorsC <- err
}
}()
}

if err := sem.Acquire(context.Background(), concurrency); err != nil {
return err
}

close(errorsC)

var errors []error
var errList []error
for err := range errorsC {
errors = append(errors, err)
errList = append(errList, err)
}

if len(errors) == 0 {
if len(errList) == 0 {
return nil
}

if len(errors) == 1 {
return errors[0]
if len(errList) == 1 {
return errList[0]
}

return util.MultipleErrors(errors)
return util.MultipleErrors(errList)
}
7 changes: 4 additions & 3 deletions kernel/lib/runlevel/3_distribution/distpk.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,20 @@ func (self *distSshKey) Execute(run model.Run) error {
keyPath := fmt.Sprintf("/home/%v/.ssh/id_rsa", ssh.User())

if _, err := libssh.RemoteExecAll(ssh, fmt.Sprintf("rm -f %v", keyPath)); err == nil {
logrus.Infof("%s => %s", host.PublicIp, "removing old PK")
logrus.Infof("%s => %s", host.PublicIp, "removed old PK")
} else {
return fmt.Errorf("error removing old PK on host [%s] (%w)", host.PublicIp, err)
}

if err := libssh.SendFile(ssh, ssh.KeyPath(), keyPath); err != nil {
logrus.Errorf("[%s] unable to send %s => %s", host.PublicIp, ssh.KeyPath(), keyPath)
return err
return fmt.Errorf("[%s] unable to send %s => %s (%w)", host.PublicIp, ssh.KeyPath(), keyPath, err)
}

logrus.Infof("[%s] %s => %s", host.PublicIp, ssh.KeyPath(), keyPath)

if _, err := libssh.RemoteExecAll(ssh, fmt.Sprintf("chmod 0400 %v", keyPath)); err == nil {
logrus.Infof("%s => %s", host.PublicIp, "setting pk permissions")
logrus.Infof("%s => %s", host.PublicIp, "set pk permissions")
return nil
} else {
return fmt.Errorf("error setting pk permissions on host [%s] (%w)", host.PublicIp, err)
Expand Down
2 changes: 1 addition & 1 deletion kernel/lib/runlevel/3_distribution/rsync/rsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func synchronizeHostToHost(ctx *rsyncContext, srcConfig, dstConfig *Config) erro

destination := fmt.Sprintf("%s:%s", dstConfig.loginPrefix(), ctx.getDestPath(dstConfig.host))

cmd := fmt.Sprintf("rsync -avz --delete -e 'ssh -o StrictHostKeyChecking=no' %s* %s",
cmd := fmt.Sprintf("rsync -avz --delete -e 'ssh -o StrictHostKeyChecking=no' %s %s",
ctx.getDestPath(srcConfig.host), destination)
output, err := libssh.RemoteExec(srcConfig.sshConfigFactory, cmd)
if err == nil && output != "" {
Expand Down
29 changes: 27 additions & 2 deletions kernel/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,31 @@ func (host *Host) DoExclusiveFallible(f func() error) error {
return f()
}

func (host *Host) ExecLoggedWithTimeout(timeout time.Duration, cmds ...string) (string, error) {
resultCh := make(chan struct {
output string
err error
}, 1)

go func() {
result, err := host.ExecLogged(cmds...)
resultCh <- struct {
output string
err error
}{
output: result,
err: err,
}
}()

select {
case result := <-resultCh:
return result.output, result.err
case <-time.After(timeout):
return "", errors.Errorf("timed out after %v", timeout)
}
}

func (host *Host) ExecLogged(cmds ...string) (string, error) {
buf := &libssh.SyncBuffer{}
err := host.Exec(buf, cmds...)
Expand Down Expand Up @@ -1111,9 +1136,9 @@ func (m *Model) Build(run Run) error {
}

func (m *Model) Sync(run Run) error {
for _, stage := range m.Distribution {
for idx, stage := range m.Distribution {
if err := stage.Execute(run); err != nil {
return fmt.Errorf("error distributing (%w)", err)
return fmt.Errorf("error distributing stage %d - %T, (%w)", idx+1, stage, err)
}
}

Expand Down

0 comments on commit bf7b5f9

Please sign in to comment.