diff --git a/cmd/arcaflow/main.go b/cmd/arcaflow/main.go index 16f2e9cf..24a014fe 100644 --- a/cmd/arcaflow/main.go +++ b/cmd/arcaflow/main.go @@ -4,11 +4,11 @@ import ( "context" "flag" "fmt" + "go.arcalot.io/log/v2" "os" "path/filepath" "strings" - "go.arcalot.io/log" "go.flow.arcalot.io/engine" "go.flow.arcalot.io/engine/config" "gopkg.in/yaml.v3" diff --git a/cmd/run-plugin/run.go b/cmd/run-plugin/run.go index cf5c1db5..f0a0b5d3 100644 --- a/cmd/run-plugin/run.go +++ b/cmd/run-plugin/run.go @@ -4,12 +4,11 @@ import ( "context" "flag" "fmt" - "os" - - "go.arcalot.io/log" + log "go.arcalot.io/log/v2" docker "go.flow.arcalot.io/dockerdeployer" "go.flow.arcalot.io/pluginsdk/atp" "gopkg.in/yaml.v3" + "os" ) //nolint:funlen diff --git a/config/config.go b/config/config.go index 793879a3..ad465a1a 100644 --- a/config/config.go +++ b/config/config.go @@ -1,8 +1,12 @@ package config -import ( - "go.arcalot.io/log" -) +import log "go.arcalot.io/log/v2" + +// StepOutputLogConfig is a config value for step output logging. +type StepOutputLogConfig struct { + // The log level if output is encountered + LogLevel log.Level `json:"level" yaml:"level"` +} // Config is the main configuration structure that configures the engine for execution. It is not identical to the // workflow being executed. @@ -15,4 +19,6 @@ type Config struct { LocalDeployer any `json:"deployer" yaml:"deployer"` // Log configures logging for workflow runs. Log log.Config `json:"log" yaml:"log"` + // StepOutputLogging allows logging of step output + LoggedOutputConfigs map[string]*StepOutputLogConfig `json:"logged_outputs" yaml:"logged_outputs"` } diff --git a/config/load_test.go b/config/load_test.go index 0c37ed7b..674cd1fb 100644 --- a/config/load_test.go +++ b/config/load_test.go @@ -1,10 +1,10 @@ package config_test import ( + "go.arcalot.io/log/v2" "testing" "go.arcalot.io/lang" - "go.arcalot.io/log" "go.flow.arcalot.io/engine/config" "gopkg.in/yaml.v3" ) diff --git a/config/schema.go b/config/schema.go index 43ad6c0c..29c4f9b1 100644 --- a/config/schema.go +++ b/config/schema.go @@ -1,9 +1,10 @@ package config import ( - "go.arcalot.io/log" + log "go.arcalot.io/log/v2" "go.flow.arcalot.io/engine/internal/util" "go.flow.arcalot.io/pluginsdk/schema" + "regexp" ) func getConfigSchema() *schema.TypedScopeSchema[*Config] { //nolint:funlen @@ -59,6 +60,56 @@ func getConfigSchema() *schema.TypedScopeSchema[*Config] { //nolint:funlen schema.PointerTo("{\"type\":\"docker\"}"), nil, ), + "logged_outputs": schema.NewPropertySchema( + schema.NewMapSchema( + schema.NewStringSchema( + schema.IntPointer(1), + schema.IntPointer(255), + regexp.MustCompile("^[$@a-zA-Z0-9-_]+$")), + schema.NewRefSchema("StepOutputLogConfig", nil), + nil, + nil, + ), + schema.NewDisplayValue( + schema.PointerTo("Logged Outputs"), + schema.PointerTo( + "Step output types to log. Make sure output log level is equal to or greater than the minimum log value.", + ), + nil, + ), + false, + nil, + nil, + nil, + schema.PointerTo("{}"), + nil, + ), + }, + ), + schema.NewStructMappedObjectSchema[*StepOutputLogConfig]( + "StepOutputLogConfig", + map[string]*schema.PropertySchema{ + "level": schema.NewPropertySchema( + schema.NewStringEnumSchema(map[string]*schema.DisplayValue{ + string(log.LevelDebug): {NameValue: schema.PointerTo("Debug")}, + string(log.LevelInfo): {NameValue: schema.PointerTo("Informational")}, + string(log.LevelWarning): {NameValue: schema.PointerTo("Warnings")}, + string(log.LevelError): {NameValue: schema.PointerTo("Errors")}, + }), + schema.NewDisplayValue( + schema.PointerTo("Log level"), + schema.PointerTo( + "The level to log matching step outputs. Must be greater than the minimum log level.", + ), + nil, + ), + false, + nil, + nil, + nil, + schema.PointerTo(util.JSONEncode(log.LevelInfo)), + nil, + ), }, ), schema.NewStructMappedObjectSchema[log.Config]( diff --git a/engine.go b/engine.go index 7ca52d31..b007dca9 100644 --- a/engine.go +++ b/engine.go @@ -3,14 +3,14 @@ package engine import ( "context" "fmt" + log "go.arcalot.io/log/v2" "strings" "sync" "go.arcalot.io/dgraph" - "go.arcalot.io/log" "go.flow.arcalot.io/deployer" + "go.flow.arcalot.io/deployer/registry" "go.flow.arcalot.io/engine/config" - "go.flow.arcalot.io/engine/internal/deploy/registry" "go.flow.arcalot.io/engine/internal/expand" "go.flow.arcalot.io/engine/internal/yaml" "go.flow.arcalot.io/engine/workflow" @@ -166,10 +166,7 @@ func (w workflowEngine) RunWorkflow( step := <-finishedSteps output := <-finishedOutputs lock.Lock() - w.logger.Infof("Step %s has finished with output %s.", step.Item().Item, output.Item().Output) - if output.Item().Output == "error" { - w.logger.Warningf("Step %s had error output: %s", step.Item().Item, step.Item().Output) - } + w.logger.Infof("Step \"%s\" has finished with output %s.", step.Item().Item, output.Item().Output) // Remove the step node from the dependency tree. w.logger.Debugf("Removing dependency tree node %s...", step.ID()) if err := step.Remove(); err != nil { @@ -308,6 +305,16 @@ mainloop: if err != nil { panic(err) } + stepLogConfig := w.config.LoggedOutputConfigs[outputID] + if stepLogConfig != nil { + w.logger.Writef( + stepLogConfig.LogLevel, + "Output ID for step \"%s\" is \"%s\".\nOutput data: \"%s\"", + item.Item, + outputID, + outputData, + ) + } lock.Lock() defer lock.Unlock() // Save the output data so other steps can query it. diff --git a/engine_test.go b/engine_test.go index 4f5d812c..91c6e4c1 100644 --- a/engine_test.go +++ b/engine_test.go @@ -3,10 +3,10 @@ package engine_test import ( "context" "errors" + log "go.arcalot.io/log/v2" "testing" "go.arcalot.io/assert" - "go.arcalot.io/log" "go.flow.arcalot.io/engine" "go.flow.arcalot.io/engine/config" ) diff --git a/go.mod b/go.mod index e7a02c54..11032167 100644 --- a/go.mod +++ b/go.mod @@ -6,13 +6,13 @@ require ( go.arcalot.io/assert v1.3.0 go.arcalot.io/dgraph v1.0.0 go.arcalot.io/lang v1.0.0 - go.arcalot.io/log v1.2.0 - go.flow.arcalot.io/deployer v0.0.0-20221115141549-bf5292d4261a - go.flow.arcalot.io/dockerdeployer v0.1.0 + go.arcalot.io/log/v2 v2.0.0 + go.flow.arcalot.io/deployer v0.1.0 + go.flow.arcalot.io/dockerdeployer v0.2.0 go.flow.arcalot.io/expressions v0.0.0-20221115232532-4d7fa005c94b - go.flow.arcalot.io/kubernetesdeployer v0.0.0-20221116174546-f56b920e76d3 - go.flow.arcalot.io/pluginsdk v0.0.0-20230215171423-910f77c96f75 - go.flow.arcalot.io/podmandeployer v0.1.0 + go.flow.arcalot.io/kubernetesdeployer v0.1.0 + go.flow.arcalot.io/pluginsdk v0.1.0 + go.flow.arcalot.io/podmandeployer v0.2.0 gopkg.in/yaml.v3 v3.0.1 ) diff --git a/go.sum b/go.sum index f97a0616..29b2eb2e 100644 --- a/go.sum +++ b/go.sum @@ -149,20 +149,20 @@ go.arcalot.io/dgraph v1.0.0 h1:ru/3U/mzRoIEie6zdhKPhPkmwlaDBx+gMqVLYy6ogDk= go.arcalot.io/dgraph v1.0.0/go.mod h1:FuNv92OgHsJYepD6Unwn+S/4DioBnv06JxQ2BtQct7E= go.arcalot.io/lang v1.0.0 h1:mgDaieT4wWdZTnR4V7+/pgYRmzfU7VZZgIzHccuxAbY= go.arcalot.io/lang v1.0.0/go.mod h1:ALqfYEhAzC2WoGLaycmJoNJd5NmkR7V1PSKp/c5D278= -go.arcalot.io/log v1.2.0 h1:EOfehJoycnpYXOBwYribLzfYb9y2YKVNWJ0kCzqnJSw= -go.arcalot.io/log v1.2.0/go.mod h1:g/oBcBi33s6GxFyiEdbnKtEPBg62t2Uc9cdF3fSuuyg= -go.flow.arcalot.io/deployer v0.0.0-20221115141549-bf5292d4261a h1:/c58nkQUUqucLHd6L0AGtEWfTj3+kHyu0CHO9GyZKCE= -go.flow.arcalot.io/deployer v0.0.0-20221115141549-bf5292d4261a/go.mod h1:zOWRsLPQ+3J0v0Js1iMSKP5vo0JbUruqvYx7d3zBWwY= -go.flow.arcalot.io/dockerdeployer v0.1.0 h1:gTrazFDE17upLWe2nTFoPRKsAQYE3KlZRf2kd+YFGFk= -go.flow.arcalot.io/dockerdeployer v0.1.0/go.mod h1:rXwamwiTYUCZegxYqAe/SSv0DJHRfnr/pkzy+lgoW+A= +go.arcalot.io/log/v2 v2.0.0 h1:mbmsWDVBXZNWrDzUh5JLzeGCQ59kTuMFs+pyfJGc1hk= +go.arcalot.io/log/v2 v2.0.0/go.mod h1:1V8jnFIIGwh2CtcGkHNOmy1nCo7LbazQNkUcnKYNMn4= +go.flow.arcalot.io/deployer v0.1.0 h1:5lcCN6rQD2dp2hkaW6dInaBoyKzWt8/kzJnDjeRAoLo= +go.flow.arcalot.io/deployer v0.1.0/go.mod h1:xVSB+svHVPmX6yTZIU0K4U/pDbs+rezsWa69vYA+E6k= +go.flow.arcalot.io/dockerdeployer v0.2.0 h1:rnymzi1UgWAROi4qDj3i7lGtwQcAUqxTYh+LL6Mln54= +go.flow.arcalot.io/dockerdeployer v0.2.0/go.mod h1:kvIkrK8c1NZk8KFcW/6NjcgxWSU6cKIxtjexnFjlwiQ= go.flow.arcalot.io/expressions v0.0.0-20221115232532-4d7fa005c94b h1:YXBg4jugNTqsGaSgmEjYAjzoIDTJ0vKk2Ru9n+Eh1Ck= go.flow.arcalot.io/expressions v0.0.0-20221115232532-4d7fa005c94b/go.mod h1:FA11tAWKkmXV1fSBkOu0t2X7Uxzg/7ATJyLqd9Ml9ww= -go.flow.arcalot.io/kubernetesdeployer v0.0.0-20221116174546-f56b920e76d3 h1:HIYsrMFf1HSux3UgK8ciEez/+jQZ3bEGlPLF62P/xWs= -go.flow.arcalot.io/kubernetesdeployer v0.0.0-20221116174546-f56b920e76d3/go.mod h1:bXvYOCMpo9iRGiSVzAiUSXtSfA3VQOHQwK8dKAyjQdc= -go.flow.arcalot.io/pluginsdk v0.0.0-20230215171423-910f77c96f75 h1:8PCsIkBShKQuzAGLj/GkmBdDPbkVCX5s0/SKIQauNyc= -go.flow.arcalot.io/pluginsdk v0.0.0-20230215171423-910f77c96f75/go.mod h1:iblQl4Bx9gteegDXLD4WvU7S4Au0vkK6yKTc9mX00ls= -go.flow.arcalot.io/podmandeployer v0.1.0 h1:/KN9e5Mb/VGZn2r7Lo78D2zUj7zBM3kBJ9+YM7K7ioc= -go.flow.arcalot.io/podmandeployer v0.1.0/go.mod h1:gFw/wz2Yb3nyAukoE6FG6tOtWdAhc6AaC1k8lwct1OI= +go.flow.arcalot.io/kubernetesdeployer v0.1.0 h1:d9TltGmFHIXenaln8UirBEqMHSAYTghRi1cA/r1E/c8= +go.flow.arcalot.io/kubernetesdeployer v0.1.0/go.mod h1:Ptjkx3WTMGK7B1I7h3Wq/48Y3iC8NkHns25VZUEYttA= +go.flow.arcalot.io/pluginsdk v0.1.0 h1:38nireifSAvHYweUxHHFEUCy4/DPtZ5iSQPN3KigdfU= +go.flow.arcalot.io/pluginsdk v0.1.0/go.mod h1:ceY4HhUbnhZyQa3C7lXu25TNVCbsWGuYZapsV5RuIrk= +go.flow.arcalot.io/podmandeployer v0.2.0 h1:FUEaOl+o20vnhDBf3W/K+HNvRRHeH9RQoT2QXTL3T6k= +go.flow.arcalot.io/podmandeployer v0.2.0/go.mod h1:EJtBVsGvI5NK/645qUMvEz8WfhDVMjH4cikYj+eE/Dk= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= diff --git a/internal/deploy/registry/helper_test.go b/internal/deploy/registry/helper_test.go deleted file mode 100644 index 76ef5829..00000000 --- a/internal/deploy/registry/helper_test.go +++ /dev/null @@ -1,40 +0,0 @@ -package registry_test - -import ( - "context" - "fmt" - - "go.arcalot.io/log" - "go.flow.arcalot.io/deployer" - "go.flow.arcalot.io/pluginsdk/schema" -) - -type testConfig struct { -} - -type testNewFactory struct { -} - -func (t testNewFactory) ID() string { - return "test" -} - -func (t testNewFactory) ConfigurationSchema() schema.Object { - return schema.NewTypedScopeSchema[testConfig]( - schema.NewStructMappedObjectSchema[testConfig]( - "test", - map[string]*schema.PropertySchema{}, - ), - ) -} - -func (t testNewFactory) Create(_ any, _ log.Logger) (deployer.Connector, error) { - return &testConnector{}, nil -} - -type testConnector struct { -} - -func (t testConnector) Deploy(_ context.Context, _ string) (deployer.Plugin, error) { - return nil, fmt.Errorf("not implemented") -} diff --git a/internal/deploy/registry/new.go b/internal/deploy/registry/new.go deleted file mode 100644 index 9ad1e237..00000000 --- a/internal/deploy/registry/new.go +++ /dev/null @@ -1,23 +0,0 @@ -package registry - -import ( - "fmt" - - "go.flow.arcalot.io/deployer" -) - -// New creates a new registry with the given factories. -func New(factory ...deployer.AnyConnectorFactory) Registry { - factories := make(map[string]deployer.AnyConnectorFactory, len(factory)) - - for _, f := range factory { - if v, ok := factories[f.ID()]; ok { - panic(fmt.Errorf("duplicate deployer factory ID: %s (first: %T, second: %T)", f.ID(), v, f)) - } - factories[f.ID()] = f - } - - return ®istry{ - factories, - } -} diff --git a/internal/deploy/registry/new_test.go b/internal/deploy/registry/new_test.go deleted file mode 100644 index 70d6f033..00000000 --- a/internal/deploy/registry/new_test.go +++ /dev/null @@ -1,39 +0,0 @@ -package registry_test - -import ( - "testing" - - "go.arcalot.io/assert" - "go.arcalot.io/lang" - "go.flow.arcalot.io/engine/internal/deploy/registry" - "go.flow.arcalot.io/pluginsdk/schema" -) - -func TestNew(t *testing.T) { - t.Parallel() - - t.Run("instantiation", testNewInstantiation) - t.Run("duplicate-ids", testNewDuplicateIDs) -} - -func testNewInstantiation(t *testing.T) { - t.Parallel() - r := registry.New(&testNewFactory{}) - factories := r.List() - assert.Equals(t, len(factories), 1) - assert.Equals(t, factories["test"].TypeID(), schema.TypeIDScope) -} - -func testNewDuplicateIDs(t *testing.T) { - t.Parallel() - - err := lang.Safe(func() { - _ = registry.New( - &testNewFactory{}, - &testNewFactory{}, - ) - }) - if err == nil { - t.Fatal("No error returned") - } -} diff --git a/internal/deploy/registry/registry.go b/internal/deploy/registry/registry.go deleted file mode 100644 index 060d6c81..00000000 --- a/internal/deploy/registry/registry.go +++ /dev/null @@ -1,57 +0,0 @@ -package registry - -import ( - "fmt" - "reflect" - - "go.arcalot.io/log" - "go.flow.arcalot.io/deployer" - "go.flow.arcalot.io/pluginsdk/schema" -) - -// Registry describes the functions a deployer registry must implement. -type Registry interface { - // List lists the registered deployers with their scopes. - List() map[string]schema.Object - // Schema returns a composite schema for the registry. - Schema() schema.OneOf[string] - // Create creates a connector with the given configuration type. The registry must identify the correct deployer - // based on the type passed. - Create(config any, logger log.Logger) (deployer.Connector, error) -} - -type registry struct { - deployerFactories map[string]deployer.AnyConnectorFactory -} - -func (r registry) List() map[string]schema.Object { - result := make(map[string]schema.Object, len(r.deployerFactories)) - for id, factory := range r.deployerFactories { - result[id] = factory.ConfigurationSchema() - } - return result -} - -func (r registry) Schema() schema.OneOf[string] { - schemas := make(map[string]schema.Object, len(r.deployerFactories)) - for id, factory := range r.deployerFactories { - schemas[id] = factory.ConfigurationSchema() - } - return schema.NewOneOfStringSchema[any]( - schemas, - "type", - ) -} - -func (r registry) Create(config any, logger log.Logger) (deployer.Connector, error) { - if config == nil { - return nil, fmt.Errorf("the deployer configuration cannot be nil") - } - reflectedConfig := reflect.ValueOf(config) - for _, factory := range r.deployerFactories { - if factory.ConfigurationSchema().ReflectedType() == reflectedConfig.Type() { - return factory.Create(config, logger) - } - } - return nil, fmt.Errorf("could not identify correct deployer factory for %T", config) -} diff --git a/internal/deploy/registry/registry_test.go b/internal/deploy/registry/registry_test.go deleted file mode 100644 index fc6d857c..00000000 --- a/internal/deploy/registry/registry_test.go +++ /dev/null @@ -1,90 +0,0 @@ -package registry_test - -import ( - "testing" - - "go.arcalot.io/assert" - "go.arcalot.io/log" - "go.flow.arcalot.io/engine/internal/deploy/registry" -) - -func TestRegistry_Schema(t *testing.T) { - t.Parallel() - - t.Run("correct-input", testRegistrySchemaCorrectInput) - t.Run("incorrect-input", testRegistrySchemaIncorrectInput) -} - -func testRegistrySchemaIncorrectInput(t *testing.T) { - r := registry.New( - &testNewFactory{}, - ) - schema := r.Schema() - - if _, err := schema.Unserialize(map[string]any{"type": "non-existent"}); err == nil { - t.Fatalf("No error returned") - } - - if _, err := schema.Unserialize(map[string]any{}); err == nil { - t.Fatalf("No error returned") - } -} - -func testRegistrySchemaCorrectInput(t *testing.T) { - r := registry.New( - &testNewFactory{}, - ) - schema := r.Schema() - - unserializedData, err := schema.Unserialize(map[string]any{"type": "test"}) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - if _, ok := unserializedData.(testConfig); !ok { - t.Fatalf("Incorrect unserialized data type returned: %T", unserializedData) - } -} - -func TestRegistry_Create(t *testing.T) { - t.Parallel() - t.Run("correct-creation", testRegistryCreateCorrectCreation) - t.Run("incorrect-config-type", testRegistryCreateIncorrectConfigType) - t.Run("nil-config", testRegistryCreateNilConfig) -} - -func testRegistryCreateCorrectCreation(t *testing.T) { - t.Parallel() - r := registry.New( - &testNewFactory{}, - ) - connector, err := r.Create(testConfig{}, log.NewTestLogger(t)) - assert.NoError(t, err) - if _, ok := connector.(*testConnector); !ok { - t.Fatalf("Incorrect connector returned: %T", connector) - } -} - -func testRegistryCreateIncorrectConfigType(t *testing.T) { - t.Parallel() - type testStruct struct { - } - - r := registry.New( - &testNewFactory{}, - ) - _, err := r.Create(testStruct{}, log.NewTestLogger(t)) - if err == nil { - t.Fatalf("expected error, no error returned") - } -} - -func testRegistryCreateNilConfig(t *testing.T) { - t.Parallel() - r := registry.New( - &testNewFactory{}, - ) - _, err := r.Create(nil, log.NewTestLogger(t)) - if err == nil { - t.Fatalf("expected error, no error returned") - } -} diff --git a/new.go b/new.go index de812ba9..ce6e34ef 100644 --- a/new.go +++ b/new.go @@ -1,9 +1,9 @@ package engine import ( - "go.arcalot.io/log" + log "go.arcalot.io/log/v2" + "go.flow.arcalot.io/deployer/registry" "go.flow.arcalot.io/engine/config" - "go.flow.arcalot.io/engine/internal/deploy/registry" ) // New creates a new workflow engine with the provided configuration. The passed deployerRegistry is responsible for