Skip to content

Commit

Permalink
feat: transformer function struct (vanus-labs#357)
Browse files Browse the repository at this point in the history
* feat: function struct

Signed-off-by: delu <[email protected]>

* feat: function struct

Signed-off-by: delu <[email protected]>

* ut: fix ut

Signed-off-by: delu <[email protected]>

Signed-off-by: delu <[email protected]>
  • Loading branch information
xdlbdy authored Dec 22, 2022
1 parent ed7b93b commit 4d26646
Show file tree
Hide file tree
Showing 56 changed files with 1,746 additions and 1,002 deletions.
5 changes: 4 additions & 1 deletion .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,9 @@ issues:
- source: "strconv|make|len|math"
linters:
- gomnd
- path: "action"
linters:
- dupl
- path: "convert.go"
linters:
- dupl
Expand All @@ -412,7 +415,7 @@ issues:
linters:
- gosec
- gomnd
- path: "action.go"
- path: "init.go"
linters:
- gochecknoinits
- path: "^vsctl"
Expand Down
8 changes: 3 additions & 5 deletions internal/controller/trigger/validation/subscripton.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@ import (
"fmt"
"net/url"

"github.com/linkall-labs/vanus/internal/primitive/transform/action"

"github.com/linkall-labs/vanus/internal/primitive/transform/arg"

"github.com/linkall-labs/vanus/internal/primitive"
"github.com/linkall-labs/vanus/internal/primitive/cel"
"github.com/linkall-labs/vanus/internal/primitive/transform/arg"
"github.com/linkall-labs/vanus/internal/primitive/transform/runtime"
"github.com/linkall-labs/vanus/pkg/errors"
ctrlpb "github.com/linkall-labs/vanus/proto/pkg/controller"
metapb "github.com/linkall-labs/vanus/proto/pkg/meta"
Expand Down Expand Up @@ -176,7 +174,7 @@ func validateTransformer(ctx context.Context, transformer *metapb.Transformer) e
for i, command := range a.Command {
commands[i] = command.AsInterface()
}
if _, err := action.NewAction(commands); err != nil {
if _, err := runtime.NewAction(commands); err != nil {
return errors.ErrInvalidRequest.WithMessage(
fmt.Sprintf("transformer pipeline %dst command %s is invalid:[%s]", n+1, commands[0], err.Error()))
}
Expand Down
210 changes: 62 additions & 148 deletions internal/primitive/transform/action/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,14 @@ package action

import (
"fmt"
"strings"

"github.com/linkall-labs/vanus/internal/primitive/transform/function"

"github.com/linkall-labs/vanus/internal/primitive/transform/arg"
"github.com/linkall-labs/vanus/internal/primitive/transform/common"
"github.com/linkall-labs/vanus/internal/primitive/transform/context"
"github.com/linkall-labs/vanus/internal/primitive/transform/function"
"github.com/pkg/errors"
)

type newAction func() Action

type Action interface {
// Name func name
Name() string
Expand All @@ -41,190 +37,108 @@ type Action interface {
Execute(ceCtx *context.EventContext) error
}

type commonAction struct {
name string
fixedArgs []arg.TypeList
variadicArg arg.TypeList
fn function.Function
type CommonAction struct {
ActionName string
FixedArgs []arg.TypeList
VariadicArg arg.TypeList
Fn function.Function

args []arg.Arg
argTypes []common.Type
targetArg arg.Arg
Args []arg.Arg
ArgTypes []common.Type
TargetArg arg.Arg
}

func (a *commonAction) Name() string {
if a.name != "" {
return a.name
}
if a.fn != nil {
return a.fn.Name()
func (a *CommonAction) Name() string {
return a.ActionName
}

func (a *CommonAction) Arity() int {
return len(a.FixedArgs)
}

func (a *CommonAction) ArgType(index int) arg.TypeList {
if index < len(a.FixedArgs) {
return a.FixedArgs[index]
}
return ""
return a.VariadicArg
}

func (a *commonAction) Arity() int {
return len(a.fixedArgs)
func (a *CommonAction) IsVariadic() bool {
return len(a.VariadicArg) > 0
}

func (a *commonAction) ArgType(index int) arg.TypeList {
if index < len(a.fixedArgs) {
return a.fixedArgs[index]
func (a *CommonAction) RunArgs(ceCtx *context.EventContext) ([]interface{}, error) {
args := make([]interface{}, len(a.Args))
if len(a.Args) != len(a.ArgTypes) {
return nil, fmt.Errorf("arg lenth %d not same arg type %d", len(a.Args), len(a.ArgTypes))
}
return a.variadicArg
for i, _arg := range a.Args {
value, err := _arg.Evaluate(ceCtx)
if err != nil {
return nil, errors.Wrapf(err, "arg %s evaluate error", _arg.Original())
}
v, err := common.Cast(value, a.ArgTypes[i])
if err != nil {
return nil, err
}
args[i] = v
}
return args, nil
}

func (a *commonAction) IsVariadic() bool {
return len(a.variadicArg) > 0
type FunctionAction struct {
CommonAction
}

func (a *commonAction) Init(args []arg.Arg) error {
a.targetArg = args[0]
a.args = args[1:]
func (a *FunctionAction) Init(args []arg.Arg) error {
a.TargetArg = args[0]
a.Args = args[1:]
return a.setArgTypes()
}

func (a *commonAction) setArgTypes() error {
if a.fn == nil {
func (a *FunctionAction) setArgTypes() error {
if a.Fn == nil {
return fmt.Errorf("fn is nil")
}
if len(a.args) < a.fn.Arity() {
if len(a.Args) < a.Fn.Arity() {
return ErrArgNumber
}
if len(a.args) > a.fn.Arity() && !a.fn.IsVariadic() {
if len(a.Args) > a.Fn.Arity() && !a.Fn.IsVariadic() {
return ErrArgNumber
}
argTypes := make([]common.Type, len(a.args))
for i := 0; i < len(a.args); i++ {
argTypes[i] = *a.fn.ArgType(i)
argTypes := make([]common.Type, len(a.Args))
for i := 0; i < len(a.Args); i++ {
argTypes[i] = *a.Fn.ArgType(i)
}
a.argTypes = argTypes
a.ArgTypes = argTypes
return nil
}

func (a *commonAction) Execute(ceCtx *context.EventContext) error {
if a.fn == nil {
func (a *FunctionAction) Execute(ceCtx *context.EventContext) error {
if a.Fn == nil {
return fmt.Errorf("fn is nil")
}
args, err := a.runArgs(ceCtx)
args, err := a.RunArgs(ceCtx)
if err != nil {
return err
}
fnValue, err := a.fn.Execute(args)
fnValue, err := a.Fn.Execute(args)
if err != nil {
return err
}
return a.targetArg.SetValue(ceCtx, fnValue)
return a.TargetArg.SetValue(ceCtx, fnValue)
}

func (a *commonAction) runArgs(ceCtx *context.EventContext) ([]interface{}, error) {
args := make([]interface{}, len(a.args))
if len(a.args) != len(a.argTypes) {
return nil, fmt.Errorf("arg lenth %d not same arg type %d", len(a.args), len(a.argTypes))
}
for i, _arg := range a.args {
value, err := _arg.Evaluate(ceCtx)
if err != nil {
return nil, errors.Wrapf(err, "arg %s evaluate error", _arg.Original())
}
v, err := common.Cast(value, a.argTypes[i])
if err != nil {
return nil, err
}
args[i] = v
}
return args, nil
type SourceTargetSameAction struct {
FunctionAction
}

type sourceTargetSameAction struct {
commonAction
}

func (a *sourceTargetSameAction) Init(args []arg.Arg) error {
a.targetArg = args[0]
a.args = args
func (a *SourceTargetSameAction) Init(args []arg.Arg) error {
a.TargetArg = args[0]
a.Args = args
return a.setArgTypes()
}

var actionMap = map[string]newAction{}

func AddAction(actionFn newAction) error {
a := actionFn()
if _, exist := actionMap[a.Name()]; exist {
return ErrExist
}
actionMap[a.Name()] = actionFn
return nil
}

func init() {
for _, fn := range []newAction{
// struct
newCreateActionAction,
newDeleteAction,
newReplaceAction,
newMoveActionAction,
newRenameActionAction,
// math
newMathAddActionAction,
newMathSubActionAction,
newMathMulActionAction,
newMathDivActionAction,
// format
newDateFormatAction,
newUnixTimeFormatAction,
// string
newJoinAction,
newUpperAction,
newLowerAction,
newAddPrefixAction,
newAddSuffixAction,
newReplaceWithRegexAction,
// condition
newConditionIfAction,
} {
if err := AddAction(fn); err != nil {
panic(err)
}
}
}

func NewAction(command []interface{}) (Action, error) {
funcName, ok := command[0].(string)
if !ok {
return nil, fmt.Errorf("command name must be stirng")
}
actionFn, exist := actionMap[strings.ToUpper(funcName)]
if !exist {
return nil, fmt.Errorf("command %s not exist", funcName)
}
a := actionFn()
argNum := len(command) - 1
if argNum < a.Arity() {
return nil, fmt.Errorf("command %s arg number is not enough, it need %d but only have %d",
funcName, a.Arity(), argNum)
}
if argNum > a.Arity() && !a.IsVariadic() {
return nil, fmt.Errorf("command %s arg number is too many, it need %d but have %d", funcName, a.Arity(), argNum)
}
args := make([]arg.Arg, argNum)
for i := 1; i < len(command); i++ {
_arg, err := arg.NewArg(command[i])
if err != nil {
return nil, errors.Wrapf(err, "command %s arg %d is invalid", funcName, i)
}
argType := a.ArgType(i - 1)
if !argType.Contains(_arg) {
return nil, fmt.Errorf("command %s arg %d not support type %s", funcName, i, _arg.Type())
}
args[i-1] = _arg
}
err := a.Init(args)
if err != nil {
return nil, errors.Wrapf(err, "command %s init error", funcName)
}
return a, nil
}

var (
ErrExist = fmt.Errorf("action have exist")
ErrArgNumber = fmt.Errorf("action arg number invalid")
Expand Down
44 changes: 14 additions & 30 deletions internal/primitive/transform/action/action_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,43 +12,27 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package action
package action_test

import (
"testing"

cetest "github.com/cloudevents/sdk-go/v2/test"
"github.com/linkall-labs/vanus/internal/primitive/transform/context"
"github.com/linkall-labs/vanus/internal/primitive/transform/runtime"
. "github.com/smartystreets/goconvey/convey"
)

func TestNewAction(t *testing.T) {
Convey("test new action", t, func() {
Convey("func name is not string", func() {
_, err := NewAction([]interface{}{123})
So(err, ShouldNotBeNil)
})
Convey("func name no exist", func() {
_, err := NewAction([]interface{}{"UnknownCommand"})
So(err, ShouldNotBeNil)
})
Convey("func arity not enough", func() {
_, err := NewAction([]interface{}{"delete"})
So(err, ShouldNotBeNil)
})
Convey("func arity number greater than", func() {
_, err := NewAction([]interface{}{"delete", "arg1", "arg2"})
So(err, ShouldNotBeNil)
})
Convey("func new arg error", func() {
_, err := NewAction([]interface{}{"delete", "$.a-b"})
So(err, ShouldNotBeNil)
})
Convey("func new arg type is invalid", func() {
_, err := NewAction([]interface{}{"delete", "arg"})
So(err, ShouldNotBeNil)
})
Convey("func new valid", func() {
_, err := NewAction([]interface{}{"delete", "$.id"})
So(err, ShouldBeNil)
func TestActionExecute(t *testing.T) {
Convey("test action", t, func() {
a, err := runtime.NewAction([]interface{}{"delete", "$.test"})
So(err, ShouldBeNil)
e := cetest.MinEvent()
e.SetExtension("test", "abc")
err = a.Execute(&context.EventContext{
Event: &e,
})
So(err, ShouldBeNil)
So(len(e.Extensions()), ShouldEqual, 0)
})
}
Loading

0 comments on commit 4d26646

Please sign in to comment.