Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resulve issue #224 GetRef() in support package should remove 'vendor'… #225

Closed
wants to merge 12 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,11 @@ func New(config *Config, runner action.Runner, options ...Option) (*App, error)
return nil, err
}

// Enable flow control feature
if EnableFlowControl() {
app.initFlowController()
}

return app, nil
}

Expand Down Expand Up @@ -553,6 +558,7 @@ func registerImport(anImport string) error {

ct := getContribType(ref)
if ct == "other" {
support.SaveNonContributionAlias(alias, ref)
log.RootLogger().Debugf("Added Non-Contribution Import: %s", ref)
return nil
//return fmt.Errorf("invalid import, contribution '%s' not registered", anImport)
Expand Down
13 changes: 12 additions & 1 deletion app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package app

import (
"os"
"strconv"

"github.com/project-flogo/core/action"
"github.com/project-flogo/core/app/resource"
Expand All @@ -13,6 +14,7 @@ import (

const (
EnvKeyDelayedAppStopInterval = "FLOGO_APP_DELAYED_STOP_INTERVAL"
EnvKeyEnableFlowControl = "FLOGO_APP_ENABLE_FLOW_CONTROL"
)

// Def is the configuration for the App
Expand Down Expand Up @@ -41,7 +43,16 @@ func GetDelayedStopInterval() string {
return ""
}

func EnableFlowControl() bool {
enable := os.Getenv(EnvKeyEnableFlowControl)
if len(enable) > 0 {
b, _ := strconv.ParseBool(enable)
return b
}
return false
}

type LifecycleAware interface {
OnStartup() error
OnShutdown() error
}
}
113 changes: 113 additions & 0 deletions app/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package app

import (
"fmt"
"github.com/project-flogo/core/support/log"
"github.com/project-flogo/core/trigger"
"sync"
)

const (
AlreadyControlled = "app is already controlled"
)

var controller Controller

type Controller interface {
StartControl() error
ReleaseControl() error
}

type controllerData struct {
flowControlled bool
triggers map[string]trigger.FlowControlAware
lock sync.Mutex
}

func GetFlowController() Controller {
return controller
}

// StartControl uses to start control the controller, the evaluator must call start control first then release control
func (c *controllerData) StartControl() error {
c.lock.Lock()
defer c.lock.Unlock()
if c.flowControlled {
return fmt.Errorf(AlreadyControlled)
} else {
// Pause trigger
c.flowControlled = true
err := c.pauseTriggers()
if err != nil {
errMsg := fmt.Errorf("error pausing triggers: %s", err.Error())
log.RootLogger().Error(errMsg)
return errMsg
}
return nil
}
}

// ReleaseControl uses to release control the controller
func (c *controllerData) ReleaseControl() error {
c.lock.Lock()
defer c.lock.Unlock()
if c.flowControlled {
err := c.resumeTriggers()
if err != nil {
// Release control if error occurred here
c.flowControlled = false
errMsg := fmt.Errorf("error resume triggers: %s", err.Error())
log.RootLogger().Error(errMsg)
return errMsg
}
c.flowControlled = false
}
return nil
}

func (app *App) initFlowController() {
controllerData := &controllerData{lock: sync.Mutex{}}
controllerData.triggers = make(map[string]trigger.FlowControlAware)
for id, trgW := range app.triggers {
if t, ok := trgW.trg.(trigger.FlowControlAware); ok {
controllerData.triggers[id] = t
}
}
controller = controllerData
}

// Resume triggers
func (c *controllerData) resumeTriggers() error {
// Resume triggers
log.RootLogger().Info("Resuming Triggers...")
for id, trg := range c.triggers {
err := trg.Resume()
if err != nil {
//return err
//TODO Letting other triggers resume. Should we stop the app here?
log.RootLogger().Errorf("Trigger [%s] failed to resume due to error - %s.", id, err.Error())
continue
}
log.RootLogger().Infof("Trigger [%s] is resumed.", id)
}
log.RootLogger().Info("Triggers Resumed")
return nil
}

// Pause triggers
func (c *controllerData) pauseTriggers() error {
log.RootLogger().Info("Pausing Triggers...")
// Pause Triggers
for id, trg := range c.triggers {
err := trg.Pause()
if err != nil {
//return err
//TODO Letting other triggers pause. Should we stop the app here?
log.RootLogger().Errorf("Trigger [%s] failed to pause due to error - %s.", id, err.Error())
continue
}
log.RootLogger().Infof("Trigger [%s] is paused.", id)
}
log.RootLogger().Info("Triggers Paused")
return nil
}
25 changes: 25 additions & 0 deletions app/controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package app

import (
"github.com/stretchr/testify/assert"
"testing"
)

func TestController(t *testing.T) {
testApp := &App{stopOnError: true, name: "test app", version: "v1.0.0"}
testApp.initFlowController()
c := GetFlowController()
err := c.StartControl()
assert.Nil(t, err)
// Start another control
err = c.StartControl()
assert.Equal(t, AlreadyControlled, err.Error())
err = c.ReleaseControl()
assert.Nil(t, err)

//Start again
err = c.StartControl()
assert.Nil(t, err)
err = c.ReleaseControl()
assert.Nil(t, err)
}
10 changes: 10 additions & 0 deletions app/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,13 @@ func SetValue(name string, value interface{}) error {
}
return appData.SetValue(name, value)
}

// DeleteValue remove an app attribute
func DeleteValue(name string) {
if log.RootLogger().TraceEnabled() {
log.RootLogger().Tracef("Deleting App Value '%s'", name)
}
if d, ok := appData.(data.NeedsDelete); ok {
d.Delete(name)
}
}
27 changes: 27 additions & 0 deletions data/coerce/compound.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,15 @@ func ToParams(val interface{}) (map[string]string, error) {
switch t := val.(type) {
case map[string]string:
return t, nil
case []byte:
var m map[string]string
if len(t) > 0 {
err := json.Unmarshal(t, &m)
if err != nil {
return nil, fmt.Errorf("unable to coerce %#v to params", val)
}
}
return m, nil
case string:
m := make(map[string]string)
if t != "" {
Expand Down Expand Up @@ -109,6 +118,15 @@ func ToObject(val interface{}) (map[string]interface{}, error) {
ret[key] = value
}
return ret, nil
case []byte:
var m map[string]interface{}
if len(t) > 0 {
err := json.Unmarshal(t, &m)
if err != nil {
return nil, fmt.Errorf("unable to coerce %#v to map[string]interface{}", val)
}
}
return m, nil
case string:
m := make(map[string]interface{})
if t != "" {
Expand Down Expand Up @@ -145,6 +163,15 @@ func ToArray(val interface{}) ([]interface{}, error) {
a = append(a, v)
}
return a, nil
case []byte:
a := make([]interface{}, 0)
if len(a) > 0 {
err := json.Unmarshal(t, &a)
if err != nil {
a = append(a, t)
}
}
return a, nil
case string:
a := make([]interface{}, 0)
if t != "" {
Expand Down
Loading