Skip to content

Commit e859c5d

Browse files
committed
feat: advanced watcher usernames management
This CL introduces a new plugin `watcher` that allows to add new watcher usernames to a task, similar to how the `tag` plugin works. A new field `allowed_watcher_usernames` has also been added to the core model of a task template to allows pre-defined watchers usernames at the template level. The pre-defined list of usernames of the template is merged with the input list given during the creation of a task, ignoring any duplicate values. Signed-off-by: William Poussier <[email protected]>
1 parent 9891df7 commit e859c5d

File tree

14 files changed

+139
-18
lines changed

14 files changed

+139
-18
lines changed

engine/engine.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,17 @@ func resolve(dbp zesty.DBProvider, res *resolution.Resolution, t *task.Task, sm
374374
t.Tags[k] = v
375375
}
376376
}
377+
// Merge task's watcher usernames with the usernames returned in the step
378+
// ignoring duplicate usernames already present.
379+
loop:
380+
for _, u := range s.WatcherUsernames {
381+
for _, e := range t.WatcherUsernames {
382+
if e == u {
383+
continue loop
384+
}
385+
}
386+
t.WatcherUsernames = append(t.WatcherUsernames, u)
387+
}
377388

378389
// "commit" step back into resolution
379390
res.SetStep(s.Name, s)

engine/functions/functions.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,8 @@ func extractArguments(path string, v reflect.Value) ([]string, error) {
116116

117117
// Exec is the implementation of the runner.Exec function but does nothing: function runners
118118
// are just place holders to resolve to actual plugin/builtin.
119-
func (f *Function) Exec(stepName string, baseConfig json.RawMessage, config json.RawMessage, ctx interface{}) (interface{}, interface{}, map[string]string, error) {
120-
return nil, nil, nil, errors.New("functions cannot be executed")
119+
func (f *Function) Exec(stepName string, baseConfig json.RawMessage, config json.RawMessage, ctx interface{}) (interface{}, interface{}, map[string]string, []string, error) {
120+
return nil, nil, nil, nil, errors.New("functions cannot be executed")
121121
}
122122

123123
// ValidConfig insure that the given configuration resolves all the input needed by the function.

engine/step/step.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,8 @@ type Step struct {
108108

109109
Resources []string `json:"resources"` // resource limits to enforce
110110

111-
Tags map[string]string `json:"tags"`
111+
Tags map[string]string `json:"-"`
112+
WatcherUsernames []string `json:"-"`
112113
}
113114

114115
// Context provides a step with extra metadata about the task
@@ -287,7 +288,7 @@ func (st *Step) generateExecution(action executor.Executor, baseConfig map[strin
287288
return &ret, nil
288289
}
289290

290-
func (st *Step) execute(execution *execution, callback func(interface{}, interface{}, map[string]string, error)) {
291+
func (st *Step) execute(execution *execution, callback func(interface{}, interface{}, map[string]string, []string, error)) {
291292

292293
select {
293294
case <-execution.stopRunningSteps:
@@ -302,8 +303,8 @@ func (st *Step) execute(execution *execution, callback func(interface{}, interfa
302303
utask.AcquireResources(limits)
303304
defer utask.ReleaseResources(limits)
304305

305-
output, metadata, tags, err := execution.runner.Exec(st.Name, execution.baseCfgRaw, execution.config, execution.ctx)
306-
callback(output, metadata, tags, err)
306+
output, metadata, tags, watchers, err := execution.runner.Exec(st.Name, execution.baseCfgRaw, execution.config, execution.ctx)
307+
callback(output, metadata, tags, watchers, err)
307308
}
308309

309310
// Run carries out the action defined by a Step, by providing values to its configuration
@@ -356,7 +357,7 @@ func Run(st *Step, baseConfig map[string]json.RawMessage, stepValues *values.Val
356357
go func() {
357358
defer preHookWg.Done()
358359

359-
st.execute(preHookExecution, func(output interface{}, metadata interface{}, tags map[string]string, err error) {
360+
st.execute(preHookExecution, func(output interface{}, metadata interface{}, tags map[string]string, watchers []string, err error) {
360361
if err != nil {
361362
st.State = StateFatalError
362363
st.Error = fmt.Sprintf("prehook: %s", err)
@@ -384,8 +385,8 @@ func Run(st *Step, baseConfig map[string]json.RawMessage, stepValues *values.Val
384385
return
385386
}
386387

387-
st.execute(execution, func(output interface{}, metadata interface{}, tags map[string]string, err error) {
388-
st.Output, st.Metadata, st.Tags = output, metadata, tags
388+
st.execute(execution, func(output interface{}, metadata interface{}, tags map[string]string, watchers []string, err error) {
389+
st.Output, st.Metadata, st.Tags, st.WatcherUsernames = output, metadata, tags, watchers
389390

390391
execution.generateOutput(st, preHookValues)
391392

engine/step/util.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
// Runner represents a component capable of executing a specific action,
1010
// provided a configuration and a context
1111
type Runner interface {
12-
Exec(stepName string, baseConfig json.RawMessage, config json.RawMessage, ctx interface{}) (interface{}, interface{}, map[string]string, error)
12+
Exec(stepName string, baseConfig json.RawMessage, config json.RawMessage, ctx interface{}) (interface{}, interface{}, map[string]string, []string, error)
1313
ValidConfig(baseConfig json.RawMessage, config json.RawMessage) error
1414
Context(stepName string) interface{}
1515
Resources(baseConfig json.RawMessage, config json.RawMessage) []string

models/task/task.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ func Create(dbp zesty.DBProvider, tt *tasktemplate.TaskTemplate, reqUsername str
9292
PublicID: uuid.Must(uuid.NewV4()).String(),
9393
TemplateID: tt.ID,
9494
RequesterUsername: reqUsername,
95-
WatcherUsernames: watcherUsernames,
95+
WatcherUsernames: mergeStringSlicesWithoutDuplicates(tt.AllowedWatcherUsernames, watcherUsernames),
9696
ResolverUsernames: resolverUsernames,
9797
Created: now.Get(),
9898
LastActivity: now.Get(),
@@ -607,3 +607,18 @@ func (t *Task) notifyState(potentialResolvers []string) {
607607
notify.ListActions().TaskStateAction,
608608
)
609609
}
610+
611+
func mergeStringSlicesWithoutDuplicates(a, b []string) []string {
612+
m := make(map[string]struct{}, len(a)+len(b))
613+
for _, v := range a {
614+
m[v] = struct{}{}
615+
}
616+
for _, v := range b {
617+
m[v] = struct{}{}
618+
}
619+
out := make([]string, 0, len(m))
620+
for k := range m {
621+
out = append(out, k)
622+
}
623+
return out
624+
}

models/tasktemplate/template.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ type TaskTemplate struct {
3333

3434
AllowedResolverUsernames []string `json:"allowed_resolver_usernames" db:"allowed_resolver_usernames"`
3535
AllowAllResolverUsernames bool `json:"allow_all_resolver_usernames" db:"allow_all_resolver_usernames"`
36+
AllowedWatcherUsernames []string `json:"allowed_watcher_usernames,omitempty" db:"allowed_watcher_usernames"`
3637
AutoRunnable bool `json:"auto_runnable" db:"auto_runnable"`
3738
Blocked bool `json:"blocked" db:"blocked"`
3839
Hidden bool `json:"hidden" db:"hidden"`
@@ -54,6 +55,7 @@ func Create(dbp zesty.DBProvider,
5455
inputs, resolverInputs []input.Input,
5556
allowedResolverUsernames []string,
5657
allowAllResolverUsernames, autoRunnable bool,
58+
allowedWatcherUsernames []string,
5759
steps map[string]*step.Step,
5860
variables []values.Variable,
5961
tags map[string]string,
@@ -76,6 +78,7 @@ func Create(dbp zesty.DBProvider,
7678
Tags: tags,
7779
AllowedResolverUsernames: allowedResolverUsernames,
7880
AllowAllResolverUsernames: allowAllResolverUsernames,
81+
AllowedWatcherUsernames: allowedWatcherUsernames,
7982
AutoRunnable: autoRunnable,
8083
Blocked: false,
8184
Hidden: false,

pkg/plugins/builtin/builtin.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
pluginssh "github.com/ovh/utask/pkg/plugins/builtin/ssh"
1313
pluginsubtask "github.com/ovh/utask/pkg/plugins/builtin/subtask"
1414
plugintag "github.com/ovh/utask/pkg/plugins/builtin/tag"
15+
pluginwatcher "github.com/ovh/utask/pkg/plugins/builtin/watcher"
1516
"github.com/ovh/utask/pkg/plugins/taskplugin"
1617
)
1718

@@ -28,6 +29,7 @@ func Register() error {
2829
pluginping.Plugin,
2930
pluginscript.Plugin,
3031
plugintag.Plugin,
32+
pluginwatcher.Plugin,
3133
} {
3234
if err := step.RegisterRunner(p.PluginName(), p); err != nil {
3335
return err

pkg/plugins/builtin/http/http_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ func Test_exec(t *testing.T) {
147147
cfgJSON, err := json.Marshal(cfg)
148148
assert.NoError(t, err)
149149

150-
output, metadata, _, err := Plugin.Exec("test", json.RawMessage(""), json.RawMessage(cfgJSON), nil)
150+
output, metadata, _, _, err := Plugin.Exec("test", json.RawMessage(""), json.RawMessage(cfgJSON), nil)
151151
require.NoError(t, err)
152152

153153
assert.NoError(t, err)

pkg/plugins/builtin/tag/README.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,7 @@ This plugin updates the tags of the current task. Existing tags are overwritten
44

55
## Configuration
66

7-
|Fields|Description
8-
| --- | --- |
7+
| Fields | Description |
98
| ------ | --------------- |
109
| `tags` | key/values tags |
1110

pkg/plugins/builtin/watcher/README.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# `watcher` Plugin
2+
3+
This plugin updates the watcher usernames of the current task. New usernames are added to the list of existing one, ignoring any duplicate.
4+
5+
## Configuration
6+
7+
| Fields | Description |
8+
| ----------- | ------------------ |
9+
| `usernames` | an array of string |
10+
11+
## Example
12+
13+
An action of type `watcher` requires only one field, the list of watcher usernames to add to the current task.
14+
15+
```yaml
16+
action:
17+
type: watcher
18+
configuration:
19+
usernames:
20+
- foo
21+
- bar
22+
```
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package pluginwatcher
2+
3+
import (
4+
"fmt"
5+
"strings"
6+
7+
"github.com/ovh/utask/pkg/plugins/taskplugin"
8+
)
9+
10+
// The watcher plugin allow to update the allowed watcher usernames of a task.
11+
var (
12+
Plugin = taskplugin.New("watcher", "0.1", exec,
13+
taskplugin.WithConfig(validConfig, Config{}),
14+
taskplugin.WithWatchers(watchers),
15+
)
16+
)
17+
18+
// Config represents the configuration of the plugin.
19+
type Config struct {
20+
Usernames []string `json:"usernames"`
21+
}
22+
23+
func validConfig(config interface{}) error {
24+
cfg := config.(*Config)
25+
26+
for i, v := range cfg.Usernames {
27+
if strings.TrimSpace(v) == "" {
28+
return fmt.Errorf("invalid watcher username at position %d", i)
29+
}
30+
}
31+
return nil
32+
}
33+
34+
func exec(stepName string, config interface{}, ctx interface{}) (interface{}, interface{}, error) {
35+
return nil, nil, nil
36+
}
37+
38+
func watchers(config, _, _, _ interface{}, _ error) []string {
39+
if config == nil {
40+
return nil
41+
}
42+
cfg := config.(*Config)
43+
44+
return cfg.Usernames
45+
}

pkg/plugins/taskplugin/taskplugin.go

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ type PluginExecutor struct {
2929
contextFactory func(string) interface{}
3030
metadataSchema json.RawMessage
3131
tagsFunc tagsFunc
32+
watchersFunc watchersFunc
3233
}
3334

3435
// Context generates a context payload to pass to Exec()
@@ -84,20 +85,20 @@ func (r PluginExecutor) ValidConfig(baseConfig json.RawMessage, config json.RawM
8485
}
8586

8687
// Exec performs the action implemented by the executor
87-
func (r PluginExecutor) Exec(stepName string, baseConfig json.RawMessage, config json.RawMessage, ctx interface{}) (interface{}, interface{}, map[string]string, error) {
88+
func (r PluginExecutor) Exec(stepName string, baseConfig json.RawMessage, config json.RawMessage, ctx interface{}) (interface{}, interface{}, map[string]string, []string, error) {
8889
var cfg interface{}
8990

9091
if r.configFactory != nil {
9192
cfg = r.configFactory()
9293
if len(baseConfig) > 0 {
9394
err := utils.JSONnumberUnmarshal(bytes.NewReader(baseConfig), cfg)
9495
if err != nil {
95-
return nil, nil, nil, errors.Annotate(err, "failed to unmarshal base configuration")
96+
return nil, nil, nil, nil, errors.Annotate(err, "failed to unmarshal base configuration")
9697
}
9798
}
9899
err := utils.JSONnumberUnmarshal(bytes.NewReader(config), cfg)
99100
if err != nil {
100-
return nil, nil, nil, errors.Annotate(err, "failed to unmarshal configuration")
101+
return nil, nil, nil, nil, errors.Annotate(err, "failed to unmarshal configuration")
101102
}
102103
}
103104
output, metadata, err := r.execfunc(stepName, cfg, ctx)
@@ -106,7 +107,11 @@ func (r PluginExecutor) Exec(stepName string, baseConfig json.RawMessage, config
106107
if r.tagsFunc != nil {
107108
tags = r.tagsFunc(cfg, ctx, output, metadata, err)
108109
}
109-
return output, metadata, tags, err
110+
var watchers []string
111+
if r.watchersFunc != nil {
112+
watchers = r.watchersFunc(cfg, ctx, output, metadata, err)
113+
}
114+
return output, metadata, tags, watchers, err
110115
}
111116

112117
// PluginName returns a plugin's name
@@ -125,6 +130,7 @@ func (r PluginExecutor) MetadataSchema() json.RawMessage {
125130
}
126131

127132
type tagsFunc func(config, ctx, output, metadata interface{}, err error) map[string]string
133+
type watchersFunc func(config, ctx, output, metadata interface{}, err error) []string
128134

129135
// PluginOpt is a helper struct to customize an action executor
130136
type PluginOpt struct {
@@ -135,6 +141,7 @@ type PluginOpt struct {
135141
resourcesFunc func(interface{}) []string
136142
metadataFunc func() string
137143
tagsFunc tagsFunc
144+
watchersFunc watchersFunc
138145
}
139146

140147
// WithConfig defines the configuration struct and validation function
@@ -174,6 +181,13 @@ func WithTags(fn tagsFunc) func(*PluginOpt) {
174181
}
175182
}
176183

184+
// WithWatchers defines a function to manipulate the watcher usernames of a task.
185+
func WithWatchers(fn watchersFunc) func(*PluginOpt) {
186+
return func(o *PluginOpt) {
187+
o.watchersFunc = fn
188+
}
189+
}
190+
177191
// WithResources defines a function indicating what resources will be needed by the plugin
178192
func WithResources(resourcesFunc func(interface{}) []string) func(*PluginOpt) {
179193
return func(o *PluginOpt) {
@@ -255,5 +269,6 @@ func New(pluginName string, pluginVersion string, execfunc ExecFunc, opts ...fun
255269
contextFactory: contextFactory,
256270
metadataSchema: schema,
257271
tagsFunc: pOpt.tagsFunc,
272+
watchersFunc: pOpt.watchersFunc,
258273
}
259274
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
-- +migrate Up
2+
3+
ALTER TABLE "task_template" ADD COLUMN "allowed_watcher_usernames" JSONB NOT NULL DEFAULT '[]';
4+
5+
-- +migrate Down
6+
7+
ALTER TABLE "task_template" DROP COLUMN "allowed_watcher_usernames";

sql/schema.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ CREATE TABLE "task_template" (
1919
variables JSONB NOT NULL DEFAULT 'null',
2020
allowed_resolver_usernames JSONB NOT NULL DEFAULT '[]',
2121
allow_all_resolver_usernames BOOL NOT NULL DEFAULT false,
22+
allowed_watcher_usernames JSONB NOT NULL DEFAULT '[]',
2223
auto_runnable BOOL NOT NULL DEFAULT false,
2324
blocked BOOL NOT NULL DEFAULT false,
2425
hidden BOOL NOT NULL DEFAULT false,

0 commit comments

Comments
 (0)