Skip to content

Commit

Permalink
Merge pull request #161 from Bitspark/more-operators
Browse files Browse the repository at this point in the history
More operators
  • Loading branch information
jm9e authored Aug 12, 2018
2 parents 6a8e5c2 + d775f92 commit 4a47448
Show file tree
Hide file tree
Showing 9 changed files with 678 additions and 6 deletions.
7 changes: 1 addition & 6 deletions pkg/elem/data_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,11 @@ var dataConvertCfg = &builtinConfig{
out := op.Main().Out()
for !op.CheckStop() {
i := in.Pull()
if core.IsMarker(i) {
if core.IsMarker(i) || i == nil {
out.Push(i)
continue
}

if i == nil {
out.Push(nil)
continue
}

switch in.Type() {
case core.TYPE_NUMBER:
item := i.(float64)
Expand Down
119 changes: 119 additions & 0 deletions pkg/elem/database_execute.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package elem

import (
"github.com/Bitspark/slang/pkg/core"
"database/sql"
_ "github.com/go-sql-driver/mysql"
)

var databaseExecuteCfg = &builtinConfig{
opDef: core.OperatorDef{
ServiceDefs: map[string]*core.ServiceDef{
core.MAIN_SERVICE: {
In: core.TypeDef{
Type: "map",
Map: map[string]*core.TypeDef{
"trigger": {
Type: "trigger",
},
"{queryParams}": {
Type: "primitive",
},
},
},
Out: core.TypeDef{
Type: "map",
Map: map[string]*core.TypeDef{
"rowsAffected": {
Type: "number",
},
"lastInsertId": {
Type: "number",
},
},
},
},
},
DelegateDefs: map[string]*core.DelegateDef{},
PropertyDefs: map[string]*core.TypeDef{
"query": {
Type: "string",
},
"queryParams": {
Type: "stream",
Stream: &core.TypeDef{
Type: "string",
},
},
"driver": {
Type: "string",
},
"url": {
Type: "string",
},
},
},
opFunc: func(op *core.Operator) {
query := op.Property("query").(string)

driver := op.Property("driver").(string)
url := op.Property("url").(string)

params := []string{}
for _, param := range op.Property("queryParams").([]interface{}) {
params = append(params, param.(string))
}

db, err := sql.Open(driver, url)
if err != nil {
panic(err.Error())
}
defer db.Close()

err = db.Ping()
if err != nil {
panic(err.Error())
}

stmt, err := db.Prepare(query)
if err != nil {
panic(err.Error())
}
defer stmt.Close()

in := op.Main().In()
out := op.Main().Out()
for !op.CheckStop() {
i := in.Pull()
if core.IsMarker(i) {
out.Push(i)
continue
}

im := i.(map[string]interface{})

args := []interface{}{}
for _, param := range params {
args = append(args, im[param])
}
result, err := stmt.Exec(args...)

if err != nil {
out.Push(nil)
continue
}

if n, err := result.LastInsertId(); err == nil {
out.Map("lastInsertId").Push(n)
} else {
out.Map("lastInsertId").Push(nil)
}

if n, err := result.RowsAffected(); err == nil {
out.Map("rowsAffected").Push(n)
} else {
out.Map("rowsAffected").Push(nil)
}
}
},
}
150 changes: 150 additions & 0 deletions pkg/elem/database_query.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package elem

import (
"github.com/Bitspark/slang/pkg/core"
"database/sql"
_ "github.com/go-sql-driver/mysql"
"reflect"
)

var databaseQueryCfg = &builtinConfig{
opDef: core.OperatorDef{
ServiceDefs: map[string]*core.ServiceDef{
core.MAIN_SERVICE: {
In: core.TypeDef{
Type: "map",
Map: map[string]*core.TypeDef{
"trigger": {
Type: "trigger",
},
"{queryParams}": {
Type: "primitive",
},
},
},
Out: core.TypeDef{
Type: "stream",
Stream: &core.TypeDef{
Type: "map",
Map: map[string]*core.TypeDef{
"trigger": {
Type: "trigger",
},
"{rowColumns}": {
Type: "primitive",
},
},
},
},
},
},
DelegateDefs: map[string]*core.DelegateDef{},
PropertyDefs: map[string]*core.TypeDef{
"query": {
Type: "string",
},
"queryParams": {
Type: "stream",
Stream: &core.TypeDef{
Type: "string",
},
},
"rowColumns": {
Type: "stream",
Stream: &core.TypeDef{
Type: "string",
},
},
"driver": {
Type: "string",
},
"url": {
Type: "string",
},
},
},
opFunc: func(op *core.Operator) {
query := op.Property("query").(string)

driver := op.Property("driver").(string)
url := op.Property("url").(string)

params := []string{}
for _, param := range op.Property("queryParams").([]interface{}) {
params = append(params, param.(string))
}

rowColumns := []string{}
for _, col := range op.Property("rowColumns").([]interface{}) {
rowColumns = append(rowColumns, col.(string))
}

db, err := sql.Open(driver, url)
if err != nil {
panic(err.Error())
}
defer db.Close()

err = db.Ping()
if err != nil {
panic(err.Error())
}

stmt, err := db.Prepare(query)
if err != nil {
panic(err.Error())
}
defer stmt.Close()

in := op.Main().In()
out := op.Main().Out()
for !op.CheckStop() {
i := in.Pull()
if core.IsMarker(i) {
out.Push(i)
continue
}

im := i.(map[string]interface{})

args := []interface{}{}
for _, param := range params {
args = append(args, im[param])
}
rows, err := stmt.Query(args...)

if err != nil {
out.Push(nil)
continue
}

colTypes, _ := rows.ColumnTypes()
out.PushBOS()
for rows.Next() {
row := make(map[string]interface{})
row["trigger"] = nil
dests := []interface{}{}
for i := range rowColumns {
colType := colTypes[i]
var colPtr interface{}
typeName := colType.DatabaseTypeName()
switch typeName {
case "VARCHAR", "TEXT", "LONGTEXT":
colPtr = new(string)
case "TINYINT", "SMALLINT", "MEDIUMINT", "INT", "BIGINT", "DECIMAL", "FLOAT", "DOUBLE":
colPtr = new(float64)
default:
colPtr = new(string)
}
dests = append(dests, colPtr)
}
rows.Scan(dests...)
for i, col := range rowColumns {
row[col] = reflect.ValueOf(dests[i]).Elem().Interface()
}
out.Stream().Push(row)
}
out.PushEOS()
}
},
}
67 changes: 67 additions & 0 deletions pkg/elem/files_zip_pack.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package elem

import (
"github.com/Bitspark/slang/pkg/core"
"github.com/Bitspark/slang/pkg/utils"
"bytes"
"archive/zip"
)

var filesZIPPackCfg = &builtinConfig{
opDef: core.OperatorDef{
ServiceDefs: map[string]*core.ServiceDef{
core.MAIN_SERVICE: {
In: core.TypeDef{
Type: "stream",
Stream: &core.TypeDef{
Type: "map",
Map: map[string]*core.TypeDef{
"path": {
Type: "string",
},
"file": {
Type: "binary",
},
},
},
},
Out: core.TypeDef{
Type: "binary",
},
},
},
},
opFunc: func(op *core.Operator) {
in := op.Main().In()
out := op.Main().Out()
for !op.CheckStop() {
i := in.Stream().Pull()
if !in.OwnBOS(i) {
out.Push(i)
continue
}

buf := new(bytes.Buffer)
zipWriter := zip.NewWriter(buf)

for {
i = in.Pull()
if in.OwnEOS(i) {
break
}

im := i.(map[string]interface{})

path := im["path"].(string)
file := im["file"].(utils.Binary)

fileWriter, _ := zipWriter.Create(path)
fileWriter.Write(file)
}

zipWriter.Close()

out.Push(utils.Binary(buf.Bytes()))
}
},
}
Loading

0 comments on commit 4a47448

Please sign in to comment.