Skip to content

Commit

Permalink
create a package for filehandling version of dsnexec
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-garcia committed Jul 23, 2023
1 parent d140926 commit 706add6
Show file tree
Hide file tree
Showing 10 changed files with 445 additions and 117 deletions.
155 changes: 41 additions & 114 deletions dsnexec/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,63 +2,68 @@ package cmd

import (
"context"
"fmt"
"os"
"os/signal"

"github.com/infobloxopen/db-controller/dsnexec/pkg/dsnexec"
"github.com/infobloxopen/hotload/fsnotify"
"github.com/infobloxopen/db-controller/dsnexec/pkg/fdsnexec"
_ "github.com/infobloxopen/db-controller/dsnexec/pkg/fprintf"
_ "github.com/infobloxopen/db-controller/dsnexec/pkg/shelldb"
"github.com/infobloxopen/hotload"
_ "github.com/infobloxopen/hotload/fsnotify"
"github.com/lib/pq"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"gopkg.in/yaml.v2"
)

func init() {
hotload.RegisterSQLDriver("postgres", pq.Driver{})
}

// runCmd represents the run command
var runCmd = &cobra.Command{
Use: "run",
Short: "run a dsnexec watcher",
Long: `dsnexec run will run a dsnexec watcher based on the config file provided.`,
Run: func(cmd *cobra.Command, args []string) {
c, err := parseConfig(confFile)
if err != nil {
log.Fatalf("failed to parse config: %s", err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt)

exited := make(chan error, len(c.Configs))
for k := range c.Configs {
cfg := c.Configs[k]
go func(c Config) {
exited <- c.run(ctx)
}(cfg)
ctx, cancel := context.WithCancel(context.Background())

go func() {
<-signalChan
cancel()
}()

c, err := parseConfig(confFile)
if err != nil {
log.Fatalf("failed to parse config: %s", err)
}

select {
case err := <-exited:
if err != nil {
log.Fatalf("failed to run: %s", err)
for _, e := range enablingFlags {
if _, ok := c.Configs[e]; ok {
c.Configs[e].Disabled = false
}
case <-signalChan:
fmt.Println("exiting...")
cancel()
for i := 0; i < len(c.Configs); i++ {
err := <-exited
if err != nil {
log.Fatalf("failed to run: %s", err)
}
}
for _, d := range disablingFlags {
if _, ok := c.Configs[d]; ok {
c.Configs[d].Disabled = true
}
return
}

handler, err := fdsnexec.NewHandler(c)
if err != nil {
log.Fatalf("failed to create handler: %s", err)
}
if err := handler.Run(ctx); err != nil {
log.Fatalf("failed to run handler: %s", err)
}
},
}

func parseConfig(f string) (*InputFile, error) {
var c InputFile
func parseConfig(f string) (*fdsnexec.InputFile, error) {
var c fdsnexec.InputFile
// read file
bs, err := os.ReadFile(f)
if err != nil {
Expand All @@ -72,90 +77,9 @@ func parseConfig(f string) (*InputFile, error) {
return &c, nil
}

type InputFile struct {
Configs map[string]Config `yaml:"configs"`
}

type Config struct {
Sources []Source `yaml:"sources"`
Destination dsnexec.DBConnInfo `yaml:"destination"`
Commands []dsnexec.Command `yaml:"commands"`
}

type Destination struct {
dsnexec.DBConnInfo
Iterate bool
}

func (c Config) run(ctx context.Context) error {
notifyS := fsnotify.NewStrategy()

type update struct {
filename string
value string
driver string
}
dsnConfig := dsnexec.Config{
Sources: make(map[string]dsnexec.DBConnInfo),
Destination: c.Destination,
Commands: c.Commands,
}
updates := make(chan update, len(c.Sources))

for _, s := range c.Sources {
val, values, err := notifyS.Watch(ctx, s.Filename, nil)
if err != nil {
return err
}
dsnConfig.Sources[s.Filename] = dsnexec.DBConnInfo{
Driver: s.Driver,
DSN: val,
}
go func(filename string, values <-chan string, driver string) {
for {
select {
case <-ctx.Done():
return
case v := <-values:
updates <- update{
filename: filename,
value: v,
driver: driver,
}
}
}
}(s.Filename, values, s.Driver)
}

handler, err := dsnexec.NewHanlder(dsnexec.WithConfig(dsnConfig))
if err != nil {
return err
}

// initial sync
if err := handler.Exec(); err != nil {
return fmt.Errorf("failed initial execute: %v", err)
}

for {
select {
case <-ctx.Done():
return nil
case u := <-updates:
log.Infof("updating dsn for %s", u.filename)
if err := handler.UpdateDSN(u.filename, u.value); err != nil {
return fmt.Errorf("failed to update dsn: %s", err)
}
}
}
}

type Source struct {
Driver string `yaml:"driver"`
Filename string `yaml:"filename"`
}

var confFile string
var enablingFlags []string
var disablingFlags []string

func init() {
rootCmd.AddCommand(runCmd)
Expand All @@ -169,4 +93,7 @@ func init() {
// Cobra supports local flags which will only run when this command
// is called directly, e.g.:
runCmd.Flags().StringVarP(&confFile, "config-file", "c", "config.yaml", "Path to config file")

runCmd.Flags().StringSliceVarP(&enablingFlags, "enable", "e", []string{}, "Enable a config by name")
runCmd.Flags().StringSliceVarP(&disablingFlags, "disable", "d", []string{}, "Disable a config by name")
}
33 changes: 33 additions & 0 deletions dsnexec/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,36 @@ configs:
args:
- "-c"
- echo "{{ index . "test/source.json" }}"

# this is an example config to use the fprintf driver
# it will write json of the sources to a file
filewrite:
sources:
- driver: json
filename: test/source.json
destination:
driver: fprintf
# The $tmp variable is a special variable that is the path to a temporary directory
# The hostname endcodes the rendering driver. fmt.Fprintf is the default driver
# which can be specificed by 'fprintf'
dsn: file:///$tmp/myfile.json
commands:
- command: "%s" # commands can have text/templates also
# this is the format string for the fprintf driver
args:
# these are arguments to the fmt.Fprintf function
- '{{ index . "test/source.json" | toJson }}'

# This is an example of executing sql commands in response to a change.
sql:
disabled: true
sources:
- driver: json
filename: test/source.json
destination:
driver: hotload
dsn: fsnotify://postgres/path/to/myfile.txt
commands:
- command: ALTER SERVER myserver OPTIONS (SET host=?);
args:
- '{{ index . "test/source.json" "host" }}'
13 changes: 11 additions & 2 deletions dsnexec/pkg/dsnexec/dsnexec.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,17 @@ func (w *Handler) exec() error {
defer db.Close()

for i, v := range w.config.Commands {
t, err := template.New("command").Funcs(sprig.FuncMap()).Parse(v.Command)
if err != nil {
return fmt.Errorf("failed to parse command template %v: %v", v.Command, err)
}
bs := bytes.NewBuffer(nil)
if err := t.Execute(bs, argContext); err != nil {
return fmt.Errorf("failed to render command template: %v: %v", v.Command, err)
}
cmd := bs.String()
if len(v.Args) == 0 {
if _, err := db.Exec(v.Command); err != nil {
if _, err := db.Exec(cmd); err != nil {
return fmt.Errorf("failed to execute sql: %v", err)
}
continue
Expand All @@ -111,7 +120,7 @@ func (w *Handler) exec() error {
}
args = append(args, val)
}
if _, err := db.Exec(v.Command, args...); err != nil {
if _, err := db.Exec(cmd, args...); err != nil {
return fmt.Errorf("failed to execute command: command %s %v", v.Command, err)
}
}
Expand Down
1 change: 0 additions & 1 deletion dsnexec/pkg/dsnexec/dsns.go

This file was deleted.

7 changes: 7 additions & 0 deletions dsnexec/pkg/dsnexec/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ func init() {
parsers["postgres"] = parsePostgresDSN
parsers["json"] = parseJSON
parsers["yaml"] = parseYAML
parsers["none"] = parseNone
parsers[""] = parseNone
}

func parseNone(dsn string) (map[string]string, error) {
value := make(map[string]string)
return value, nil
}

func parseJSON(dsn string) (map[string]string, error) {
Expand Down
23 changes: 23 additions & 0 deletions dsnexec/pkg/fdsnexec/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package fdsnexec

import "github.com/infobloxopen/db-controller/dsnexec/pkg/dsnexec"

type Source struct {
Driver string `yaml:"driver"`
Filename string `yaml:"filename"`
}

// InputFile is the input file format for fdsnexec. It is a yaml file with
// a top level key of configs. The configs key is a map of config names to
// Configs.
type InputFile struct {
Configs map[string]*Config `yaml:"configs"`
}

// Config is the config for a single fdsnexec instance.
type Config struct {
Disabled bool `yaml:"disabled"`
Sources []Source `yaml:"sources"`
Destination dsnexec.DBConnInfo `yaml:"destination"`
Commands []dsnexec.Command `yaml:"commands"`
}
Loading

0 comments on commit 706add6

Please sign in to comment.