diff --git a/.gitattributes b/.gitattributes deleted file mode 100644 index 8a39acc..0000000 --- a/.gitattributes +++ /dev/null @@ -1,24 +0,0 @@ -# disable auto crlf -# TODO: this does not seems to work on windows -* text=false - -# force lf -*.sh text eol=lf -*.go eol=lf -*.java eol=lf -.gitignore eol=lf -.gitattributes eol=lf -*.yml eol=lf -*.md eol=lf - -*.html eol=lf -*.js eol=lf -*.ts eol=lf -*.css eol=lf -*.json eol=lf - -# force encoding -*.md encoding=utf-8 -*.sh encoding=utf-8 -*.go encoding=utf-8 -*.java encoding=utf-8 diff --git a/glide.lock b/glide.lock deleted file mode 100644 index 290478b..0000000 --- a/glide.lock +++ /dev/null @@ -1,91 +0,0 @@ -hash: 199190b6b0e23df6b9346f252cd82579b9b1c7923cf7877a8fb9c95ffa3ee296 -updated: 2017-01-03T23:42:20.361289836-08:00 -imports: -- name: github.com/dyweb/gommon - version: 7aad4c9c1ff32b9473a816ba27158bf59bfec307 - vcs: git - subpackages: - - log - - requests - - structure -- name: github.com/fsnotify/fsnotify - version: 30411dbcefb7a1da7e84f75530ad3abe4011b4f8 -- name: github.com/hashicorp/hcl - version: aa7699b7b62c5f410f4cf7b58f3f9b17a71fb4ad - subpackages: - - hcl/ast - - hcl/parser - - hcl/scanner - - hcl/strconv - - hcl/token - - json/parser - - json/scanner - - json/token -- name: github.com/inconshreveable/mousetrap - version: 76626ae9c91c4f2a10f34cad8ce83ea42c93bb75 -- name: github.com/kr/fs - version: 2788f0dbd16903de03cb8186e5c7d97b69ad387b -- name: github.com/magiconair/properties - version: c265cfa48dda6474e208715ca93e987829f572f8 -- name: github.com/mitchellh/mapstructure - version: d2dd0262208475919e1a362f675cfc0e7c10e905 -- name: github.com/pelletier/go-buffruneio - version: df1e16fde7fc330a0ca68167c23bf7ed6ac31d6d -- name: github.com/pelletier/go-toml - version: 45932ad32dfdd20826f5671da37a5f3ce9f26a8d -- name: github.com/pkg/errors - version: 248dadf4e9068a0b3e79f02ed0a610d935de5302 - vcs: git -- name: github.com/pkg/sftp - version: a71e8f580e3b622ebff585309160b1cc549ef4d2 -- name: github.com/spf13/afero - version: cc9c21814bb945440253108c4d3c65c85aac3c68 - subpackages: - - mem - - sftp -- name: github.com/spf13/cast - version: 27b586b42e29bec072fe7379259cc719e1289da6 -- name: github.com/spf13/cobra - version: 856b96dcb49d6427babe192998a35190a12c2230 - vcs: git -- name: github.com/spf13/jwalterweatherman - version: 33c24e77fb80341fe7130ee7c594256ff08ccc46 -- name: github.com/spf13/pflag - version: 367864438f1b1a3c7db4da06a2f55b144e6784e0 -- name: github.com/spf13/viper - version: 50515b700e02658272117a72bd641b6b7f1222e5 - vcs: git -- name: golang.org/x/crypto - version: a548aac93ed489257b9d959b40fe1e8c1e20778c - subpackages: - - curve25519 - - ed25519 - - ed25519/internal/edwards25519 - - ssh -- name: golang.org/x/sys - version: a408501be4d17ee978c04a618e7a1b22af058c0e - subpackages: - - unix -- name: golang.org/x/text - version: 2910a502d2bf9e43193af9d68ca516529614eed3 - subpackages: - - transform - - unicode/norm -- name: gopkg.in/yaml.v2 - version: a83829b6f1293c91addabc89d0571c246397bbf4 -testImports: -- name: github.com/davecgh/go-spew - version: 04cdfd42973bb9c8589fd6a731800cf222fde1a9 - subpackages: - - spew -- name: github.com/pmezard/go-difflib - version: d8ed2627bdf02c080bf22230dbb337003b7aba2d - subpackages: - - difflib -- name: github.com/stretchr/testify - version: 2402e8e7a02fc811447d11f881aa9746cdc57983 - vcs: git - subpackages: - - assert - - require - - suite diff --git a/glide.yaml b/glide.yaml deleted file mode 100644 index 4a01fa8..0000000 --- a/glide.yaml +++ /dev/null @@ -1,18 +0,0 @@ -package: github.com/xephonhq/xephon-b -import: -- package: github.com/spf13/cobra - vcs: git - version: 856b96dcb49d6427babe192998a35190a12c2230 -- package: github.com/spf13/viper - vcs: git - version: 50515b700e02658272117a72bd641b6b7f1222e5 -- package: github.com/pkg/errors - vcs: git - version: 248dadf4e9068a0b3e79f02ed0a610d935de5302 -- package: github.com/dyweb/gommon - vcs: git - version: 7aad4c9c1ff32b9473a816ba27158bf59bfec307 -testImport: -- package: github.com/stretchr/testify - vcs: git - version: 2402e8e7a02fc811447d11f881aa9746cdc57983 diff --git a/pkg/README.md b/pkg/README.md deleted file mode 100644 index 1bc29b4..0000000 --- a/pkg/README.md +++ /dev/null @@ -1,13 +0,0 @@ -# Packages - -This is the folder keep all the golang packages under the `github.com/xephonhq/xephon-b` namespace. -The structure is similar to https://github.com/grafana/grafana/tree/master/pkg -(And it's personal falvor I don't like too much folder in project root) - -If certain package is stable and can be shared among other projects, -they may be moved to `github.com/xephon-contrib` namespace and adopt semantic version. -Before that, all the packages in `github.com/xephonhq/xephon-b` have no guarantee of API stability - -External packages are managed by [glide](https://github.com/Masterminds/glide) in `vendor` folder - -The outer `cmd` folder is used for better binary name when using `go get` diff --git a/pkg/cmd/README.md b/pkg/cmd/README.md deleted file mode 100644 index 36405ff..0000000 --- a/pkg/cmd/README.md +++ /dev/null @@ -1,4 +0,0 @@ -# Command line - -This directory holds all the commands, with file name as their command name. -ie: `xephon-b micro` is defined in `micro.go`. `root.go` is the entry point diff --git a/pkg/cmd/loader.go b/pkg/cmd/loader.go deleted file mode 100644 index 0363c2e..0000000 --- a/pkg/cmd/loader.go +++ /dev/null @@ -1,54 +0,0 @@ -package cmd - -import ( - "fmt" - "os" - - "github.com/spf13/cobra" - "github.com/xephonhq/xephon-b/pkg/config" - "github.com/xephonhq/xephon-b/pkg/loader" -) - -// flags to bind -var ( - loadSourceLocation string - loadDataType string - targetType string - targetLocation string -) - -var LoaderCmd = &cobra.Command{ - Use: "loader", - Short: "loader load generated data to tsdb", - Long: "loader load data generated by simulator into time series databases", - Run: func(cmd *cobra.Command, args []string) { - fmt.Println("lock and load") - if loadSourceLocation == "" { - // TODO: actually we can provide generating load on the fly .... - log.Fatal("must provide source data location") - return - } - // open the file to obtain io.Reader - // TODO: should check file exists, os.Create will create file if not exists - f, err := os.Open(loadSourceLocation) - if err != nil { - log.Error("can't read source file") - log.Fatal(err.Error()) - return - } - defer f.Close() - - c := config.LoaderConfig{Source: f} - loader := loader.NewLoader(c) - loader.Start() - }, -} - -func init() { - LoaderCmd.Flags().StringVar(&loadSourceLocation, "src", "", "location of data file") - LoaderCmd.Flags().StringVar(&loadDataType, "data-type", "int", "data point type, int or double") - LoaderCmd.Flags().StringVar(&targetType, "target-type", "void", "target database") - LoaderCmd.Flags().StringVar(&targetLocation, "target-location", "localhost:10086", "target url") - - RootCmd.AddCommand(LoaderCmd) -} diff --git a/pkg/cmd/root.go b/pkg/cmd/root.go deleted file mode 100644 index 09ab995..0000000 --- a/pkg/cmd/root.go +++ /dev/null @@ -1,53 +0,0 @@ -package cmd - -import ( - "fmt" - "os" - - "github.com/spf13/cobra" - "github.com/spf13/viper" - "github.com/xephonhq/xephon-b/pkg/config" - "github.com/xephonhq/xephon-b/pkg/util" -) - -// Short name use in cmd package -var log = util.Logger.NewEntryWithPkg("x.cmd") - -// RootCmd is the top command, other commands should be its child -var RootCmd = &cobra.Command{ - Use: "xephon-b", - Short: "Time series benmark suite", - Long: `Xephon-B is a benmark suite with a micro benmark tool`, - Run: func(cmd *cobra.Command, args []string) { - fmt.Println("Xephon-B:" + Version + " Use `xb -h` for more information") - }, -} - -// Execute run the root command and return -func Execute() { - if err := RootCmd.Execute(); err != nil { - os.Exit(-1) - } -} - -func init() { - cobra.OnInitialize(initConfig) - - RootCmd.PersistentFlags().StringVar(&config.ConfigFile, "config", config.DefaultConfigFile, "config file (default is ./xephon-b.yml)") - RootCmd.PersistentFlags().BoolVar(&config.Debug, "debug", false, "debug") -} - -func initConfig() { - if config.Debug { - util.UseVerboseLog() - } - viper.AutomaticEnv() - // TODO: check file existence - viper.SetConfigFile(config.ConfigFile) - err := viper.ReadInConfig() - if err != nil { - log.Warn(err) - } else { - log.Debugf("config file %s is loaded", config.ConfigFile) - } -} diff --git a/pkg/cmd/simulator.go b/pkg/cmd/simulator.go deleted file mode 100644 index c298440..0000000 --- a/pkg/cmd/simulator.go +++ /dev/null @@ -1,78 +0,0 @@ -package cmd - -import ( - "os" - - "github.com/spf13/cobra" - - "github.com/xephonhq/xephon-b/pkg/config" - "github.com/xephonhq/xephon-b/pkg/serialize" - "github.com/xephonhq/xephon-b/pkg/simulator/machine" -) - -// flags to bind -var ( - simulatorType string - simulatorDataEncoding string - simulatorOutput string - simulatorOutputLocation string -) - -// SimulatorCmd generate and store simulated time series data -var SimulatorCmd = &cobra.Command{ - // TODO: alias is not shown in help - Use: "simulator", - Aliases: []string{"sim"}, - Short: "Simulate time series data for different scenario", - Long: "Simulate real word time series data, serialize and store to file", - Run: func(cmd *cobra.Command, args []string) { - log.Debug("triggered simulator command") - // TODO: check if what user passed is not supported - // TODO: simulator config is not even used - if simulatorType != "machine" { - log.Fatalf("we only support machine, no %s", simulatorType) - return - } - - c := config.ReadMachineSimulatorConfigFromViper() - sm := machine.NewMachineSimulator(*c) - switch simulatorOutput { - case "stdout": - sm.SetWriter(os.Stdout) - case "file": - // try to open the file - f, err := os.Create(simulatorOutputLocation) - if err != nil { - log.Error("can not create output file") - log.Fatalf(err.Error()) - return - } - defer f.Close() - sm.SetWriter(f) - default: - log.Fatalf("unsupported output type %s", simulatorOutput) - return - } - switch simulatorDataEncoding { - case "json": - log.Debug("set encoding to json") - sm.SetSerializer(&serialize.JsonSerializer{}) - case "debug": - log.Debug("set encoding to debug") - sm.SetSerializer(&serialize.DebugSerializer{}) - default: - log.Fatalf("unsupported encoding %s", simulatorDataEncoding) - return - } - sm.Start() - }, -} - -func init() { - SimulatorCmd.Flags().StringVar(&simulatorType, "type", "machine", "simluator type") - SimulatorCmd.Flags().StringVar(&simulatorDataEncoding, "encoding", "debug", "serializer encoding") - SimulatorCmd.Flags().StringVar(&simulatorOutput, "output", "stdout", "output type") - SimulatorCmd.Flags().StringVar(&simulatorOutputLocation, "location", "give_me_a_name.dat", "output destination") - - RootCmd.AddCommand(SimulatorCmd) -} diff --git a/pkg/cmd/version.go b/pkg/cmd/version.go deleted file mode 100644 index 2b79d1e..0000000 --- a/pkg/cmd/version.go +++ /dev/null @@ -1,23 +0,0 @@ -package cmd - -import ( - "fmt" - - "github.com/spf13/cobra" -) - -// Version need to be manually updated -const Version = "0.0.1-dev" - -// VersionCmd print the version -var VersionCmd = &cobra.Command{ - Use: "version", - Short: "Show Xephon-B version", - Run: func(cmd *cobra.Command, args []string) { - fmt.Println(Version) - }, -} - -func init() { - RootCmd.AddCommand(VersionCmd) -} diff --git a/pkg/common/point.go b/pkg/common/point.go deleted file mode 100644 index 7106ebc..0000000 --- a/pkg/common/point.go +++ /dev/null @@ -1,29 +0,0 @@ -package common - -// Point represent a data point in a series -// ie: cpu.usage <2016-11-12-12:41:33, 0.062>, <2016-11-12-12:41:34, 0.078> -// cpu.usage is a series and it has two points, which shows different usage at different time - -// IntPoint has integer value and unix nano timestamp in int64 -type IntPoint struct { - V int `json:"v"` - TimeNano int64 `json:"t"` -} - -// IntPointWithSeries contains a point to its series -type IntPointWithSeries struct { - IntPoint - *Series -} - -// DoublePoint has double value and unix nano timestamp in int64 -type DoublePoint struct { - V float64 `json:"v"` - TimeNano int64 `json:"t"` -} - -// DoublePointWithSeries contains a pointer to its series -type DoublePointWithSeries struct { - DoublePoint - *Series -} diff --git a/pkg/common/series.go b/pkg/common/series.go deleted file mode 100644 index e6bba55..0000000 --- a/pkg/common/series.go +++ /dev/null @@ -1,72 +0,0 @@ -package common - -// NOTE: our series only have one value for each timestamp, because multiple values can be expand to single value -// and I am not sure how InfluxDB and Druid implement multiple values @czheo -// -// A multiple value series -// cpu.0 system=ubuntu, arch=amd64, usage=0.062, idle=0.034, 1412312312 -// Expand to single value series -// cpu.0.usage system=ubuntu, arch=amd64, 0.062, 1412312312 -// cpu.0.idle system=ubuntu, arch=amd64, 0.034, 1412312312 -import ( - "sort" -) - -// Series is a time series -type Series struct { - // TODO: string or []byte - Name string `json:"name"` - Tags map[string]string `json:"tag"` -} - -// SeriesWithIntPoints is a series with int value points -type SeriesWithIntPoints struct { - Series - // TODO: store two arrays, one for timestamp, one for value might be more efficient - Points []*IntPoint -} - -// SeriesWithDoublePoints is a series with double value points -type SeriesWithDoublePoints struct { - Series - Points []*DoublePoint -} - -// AddTag adds a key value pair WITHOUT ANY checking for duplication -func (s *Series) AddTag(key string, val string) { - s.Tags[key] = val -} - -// SortedKeys return sorted keys -// http://stackoverflow.com/questions/21362950/golang-getting-a-slice-of-keys-from-a-map -func (s *Series) SortedKeys() []string { - keys := make([]string, len(s.Tags)) - i := 0 - for k := range s.Tags { - keys[i] = k - i++ - } - sort.Strings(keys) - return keys -} - -// https://nathanleclaire.com/blog/2014/08/09/dont-get-bitten-by-pointer-vs-non-pointer-method-receivers-in-golang/ -// NOTE: must use non-pointer receiver in order to use %s in fmt -func (s Series) String() string { - // NOTE: used for debug only - // name:k1=v1,k2=v2 - // TODO: more efficient - str := s.Name + ":" - for k, v := range s.Tags { - str += k + "=" + v + "," - } - return str -} - -// NewSeries return a series with its tag map intitialized -func NewSeries(name string) *Series { - return &Series{ - Name: name, - Tags: make(map[string]string), - } -} diff --git a/pkg/common/series_test.go b/pkg/common/series_test.go deleted file mode 100644 index 31f6ed3..0000000 --- a/pkg/common/series_test.go +++ /dev/null @@ -1,27 +0,0 @@ -package common - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestSeries_String(t *testing.T) { - assert := assert.New(t) - - name := "cpu.idle" - s := NewSeries(name) - s.AddTag("os", "ubuntu") - s.AddTag("arch", "amd64") - assert.Contains(s.String(), "cpu.idle", "os=ubuntu", "arch=amd64") -} - -func TestSeries_SortedKeys(t *testing.T) { - assert := assert.New(t) - - s := NewSeries("dummy") - s.AddTag("a", "123") - s.AddTag("b", "789") - s.AddTag("1a", "789") - assert.Equal([]string{"1a", "a", "b"}, s.SortedKeys()) -} diff --git a/pkg/config/README.md b/pkg/config/README.md deleted file mode 100644 index a47c7c4..0000000 --- a/pkg/config/README.md +++ /dev/null @@ -1,4 +0,0 @@ -# Config - -This directory contains struct that represent configuration, in order to abstract config from detail config file (whose format and structure -may chage a lot) \ No newline at end of file diff --git a/pkg/config/global.go b/pkg/config/global.go deleted file mode 100644 index 40f6ef2..0000000 --- a/pkg/config/global.go +++ /dev/null @@ -1,8 +0,0 @@ -package config - -// ConfigFile is the location of the loaded config file -var ConfigFile string -var DefaultConfigFile = "xephon-b.yml" - -// Debug indicate whether run in debug mode -var Debug bool diff --git a/pkg/config/loader.go b/pkg/config/loader.go deleted file mode 100644 index eda27cb..0000000 --- a/pkg/config/loader.go +++ /dev/null @@ -1,7 +0,0 @@ -package config - -import "io" - -type LoaderConfig struct { - Source io.Reader -} diff --git a/pkg/config/machine_simulator.go b/pkg/config/machine_simulator.go deleted file mode 100644 index a1fc041..0000000 --- a/pkg/config/machine_simulator.go +++ /dev/null @@ -1,26 +0,0 @@ -package config - -import ( - "time" - - "github.com/spf13/viper" -) - -// MachineSimulatorConfig defines the configurable property for the machine simulator -type MachineSimulatorConfig struct { - Num int - Start time.Time - End time.Time - Step time.Duration -} - -// ReadMachineSimulatorConfigFromViper return a config struct using configuration in yml -func ReadMachineSimulatorConfigFromViper() *MachineSimulatorConfig { - c := &MachineSimulatorConfig{} - c.Num = viper.GetInt("simulator.machine.num") - c.Start = viper.GetTime("simulator.machine.start") - c.End = viper.GetTime("simulator.machine.end") - // TODO: may remove the outer time.Duration? - c.Step = time.Duration(time.Duration(viper.GetInt("simulator.machine.step")) * time.Second) - return c -} diff --git a/pkg/config/machine_simulator_test.go b/pkg/config/machine_simulator_test.go deleted file mode 100644 index e292f37..0000000 --- a/pkg/config/machine_simulator_test.go +++ /dev/null @@ -1,12 +0,0 @@ -package config - -import ( - "github.com/xephonhq/xephon-b/pkg/util" - "testing" -) - -func TestReadMachineSimulatorConfigFromViper(t *testing.T) { - util.ViperReadTestConfig() - c := ReadMachineSimulatorConfigFromViper() - t.Log(c) -} diff --git a/pkg/config/simulator.go b/pkg/config/simulator.go deleted file mode 100644 index e4a050c..0000000 --- a/pkg/config/simulator.go +++ /dev/null @@ -1,12 +0,0 @@ -package config - -var SupportedType = []string{"machine"} -var SupportedEncoding = []string{"debug", "json"} -var SupportedOutput = []string{"stdout", "file"} - -type SimulatorConfig struct { - Type string - Encoding string - Output string - OutputLocation string -} diff --git a/pkg/generator/generator.go b/pkg/generator/generator.go deleted file mode 100644 index ba375af..0000000 --- a/pkg/generator/generator.go +++ /dev/null @@ -1,5 +0,0 @@ -package generator - -import "github.com/xephonhq/xephon-b/pkg/util" - -var log = util.Logger.NewEntryWithPkg("x.generator") diff --git a/pkg/generator/generator_test.go b/pkg/generator/generator_test.go deleted file mode 100644 index 12c7bd1..0000000 --- a/pkg/generator/generator_test.go +++ /dev/null @@ -1,25 +0,0 @@ -package generator - -import ( - gt "github.com/xephonhq/xephon-b/pkg/generator/time" - gv "github.com/xephonhq/xephon-b/pkg/generator/value" - "testing" - "time" -) - -func TestNewGenerator(t *testing.T) { - t.Parallel() - v := gv.NewConstantIntGenerator(1) - t.Log(v.NextInt()) - start := time.Now() - end := time.Now().Add(time.Minute) - step := time.Duration(10 * time.Second) - tg := gt.NewFixedIntervalTimeGenerator(start, end, step) - for { - ts, err := tg.NextTimestamp() - if err == gt.ErrEndOfTime { - break - } - t.Log(ts) - } -} diff --git a/pkg/generator/series.go b/pkg/generator/series.go deleted file mode 100644 index 6dd9156..0000000 --- a/pkg/generator/series.go +++ /dev/null @@ -1,11 +0,0 @@ -package generator - -import ( - "github.com/xephonhq/xephon-b/pkg/common" - "github.com/xephonhq/xephon-b/pkg/generator/value" -) - -type SeriesWithValueGenerator struct { - common.Series - ValueGenerator value.ValueGenerator -} diff --git a/pkg/generator/time/fixed_interval.go b/pkg/generator/time/fixed_interval.go deleted file mode 100644 index 7ef22c9..0000000 --- a/pkg/generator/time/fixed_interval.go +++ /dev/null @@ -1,36 +0,0 @@ -package time - -import ( - "errors" - "time" -) - -var ErrEndOfTime = errors.New("EOT") - -type FixedIntervalTimeGenerator struct { - //Start time.Time - //End time.Time - //Interval time.Duration - start int64 - end int64 - current int64 - interval int64 -} - -func NewFixedIntervalTimeGenerator(s time.Time, e time.Time, i time.Duration) *FixedIntervalTimeGenerator { - return &FixedIntervalTimeGenerator{ - start: s.UnixNano(), - end: e.UnixNano(), - current: s.UnixNano(), - interval: i.Nanoseconds(), - } -} - -func (g *FixedIntervalTimeGenerator) NextTimestamp() (int64, error) { - t := g.current - g.current += g.interval - if g.current >= g.end { - return t, ErrEndOfTime - } - return t, nil -} diff --git a/pkg/generator/value/constant.go b/pkg/generator/value/constant.go deleted file mode 100644 index 9670063..0000000 --- a/pkg/generator/value/constant.go +++ /dev/null @@ -1,20 +0,0 @@ -package value - -type ConstantValueGenerator struct { - vInt int - vDouble float64 -} - -func NewConstantIntGenerator(v int) *ConstantValueGenerator { - return &ConstantValueGenerator{ - vInt: v, - } -} - -func (g *ConstantValueGenerator) NextInt() int { - return g.vInt -} - -func (g *ConstantValueGenerator) NextDouble() float64 { - return g.vDouble -} diff --git a/pkg/generator/value/generator.go b/pkg/generator/value/generator.go deleted file mode 100644 index 129677a..0000000 --- a/pkg/generator/value/generator.go +++ /dev/null @@ -1,6 +0,0 @@ -package value - -type ValueGenerator interface { - NextInt() int - NextDouble() float64 -} diff --git a/pkg/generator/value/generator_test.go b/pkg/generator/value/generator_test.go deleted file mode 100644 index 8643d81..0000000 --- a/pkg/generator/value/generator_test.go +++ /dev/null @@ -1,11 +0,0 @@ -package value - -import ( - "testing" -) - -// test implementation satisfies the interface -func TestGeneratorInterface(t *testing.T) { - t.Parallel() - var _ ValueGenerator = (*ConstantValueGenerator)(nil) -} diff --git a/pkg/loader/loader.go b/pkg/loader/loader.go deleted file mode 100644 index 56f57be..0000000 --- a/pkg/loader/loader.go +++ /dev/null @@ -1,43 +0,0 @@ -package loader - -import ( - "bufio" - "io" - - "github.com/xephonhq/xephon-b/pkg/config" - "github.com/xephonhq/xephon-b/pkg/serialize" - "github.com/xephonhq/xephon-b/pkg/util" -) - -// Short name use in loader package -var log = util.Logger.NewEntryWithPkg("x.loader") - -type Loader struct { - config config.LoaderConfig - source io.Reader // use bufio.Scanner to read by line - serializer serialize.Serializer -} - -func NewLoader(c config.LoaderConfig) *Loader { - l := &Loader{} - l.source = c.Source - l.config = c - l.serializer = &serialize.JsonSerializer{} - return l -} - -func (l *Loader) Start() { - scanner := bufio.NewScanner(l.source) - for scanner.Scan() { - sp, err := l.serializer.ReadInt(scanner.Bytes()) - if err != nil { - log.Warn(err) - } - // TODO: this might be too much ouput for debug - log.Debug(sp) - // TODO: use channel to give this to client - } - if err := scanner.Err(); err != nil { - log.Warn(err.Error()) - } -} diff --git a/pkg/monitor/README.md b/pkg/monitor/README.md deleted file mode 100644 index 6c60310..0000000 --- a/pkg/monitor/README.md +++ /dev/null @@ -1,3 +0,0 @@ -# Monitor -- container -- host diff --git a/pkg/serialize/debug.go b/pkg/serialize/debug.go deleted file mode 100644 index 88415ee..0000000 --- a/pkg/serialize/debug.go +++ /dev/null @@ -1,34 +0,0 @@ -package serialize - -import ( - "errors" - "fmt" - - "github.com/xephonhq/xephon-b/pkg/common" -) - -//DebugSerializer change point with its series to human readable string -type DebugSerializer struct { -} - -func (d *DebugSerializer) WriteInt(p *common.IntPointWithSeries) ([]byte, error) { - // TODO: should use bytes.Buffer and maybe pool - s := fmt.Sprintf("%s %d %d", p.Series, p.V, p.TimeNano) - return []byte(s), nil -} - -func (d *DebugSerializer) WriteDouble(p *common.DoublePointWithSeries) ([]byte, error) { - // TODO: should use bytes.Buffer and maybe pool - s := fmt.Sprintf("%s %0.2f %d", p.Series, p.V, p.TimeNano) - return []byte(s), nil -} - -func (d *DebugSerializer) ReadInt(s []byte) (*common.IntPointWithSeries, error) { - p := common.IntPointWithSeries{} - return &p, errors.New("not supported") -} - -func (d *DebugSerializer) ReadDouble(s []byte) (*common.DoublePointWithSeries, error) { - p := common.DoublePointWithSeries{} - return &p, errors.New("not supported") -} diff --git a/pkg/serialize/json.go b/pkg/serialize/json.go deleted file mode 100644 index 13a6b25..0000000 --- a/pkg/serialize/json.go +++ /dev/null @@ -1,40 +0,0 @@ -package serialize - -import ( - "encoding/json" - - "github.com/xephonhq/xephon-b/pkg/common" -) - -type JsonSerializer struct { -} - -func (j *JsonSerializer) WriteInt(p *common.IntPointWithSeries) ([]byte, error) { - s, err := json.Marshal(p) - // TODO: don't know if this append is efficient - return append(s, '\n'), err -} - -func (j *JsonSerializer) WriteDouble(p *common.DoublePointWithSeries) ([]byte, error) { - s, err := json.Marshal(p) - // TODO: don't know if this append is efficient - return append(s, '\n'), err -} - -func (j *JsonSerializer) ReadInt(s []byte) (*common.IntPointWithSeries, error) { - p := common.IntPointWithSeries{} - err := json.Unmarshal(s, &p) - if err != nil { - log.Warn(err) - } - return &p, err -} - -func (j *JsonSerializer) ReadDouble(s []byte) (*common.DoublePointWithSeries, error) { - p := common.DoublePointWithSeries{} - err := json.Unmarshal(s, &p) - if err != nil { - log.Warn(err) - } - return &p, err -} diff --git a/pkg/serialize/serializer.go b/pkg/serialize/serializer.go deleted file mode 100644 index 01b546e..0000000 --- a/pkg/serialize/serializer.go +++ /dev/null @@ -1,17 +0,0 @@ -package serialize - -import ( - "github.com/xephonhq/xephon-b/pkg/common" - "github.com/xephonhq/xephon-b/pkg/util" -) - -// Short name use in machine simulator package -var log = util.Logger.NewEntryWithPkg("x.serialize") - -// Serializer transform point with series into underlying format -type Serializer interface { - WriteInt(*common.IntPointWithSeries) ([]byte, error) - WriteDouble(*common.DoublePointWithSeries) ([]byte, error) - ReadInt(s []byte) (*common.IntPointWithSeries, error) - ReadDouble(s []byte) (*common.DoublePointWithSeries, error) -} diff --git a/pkg/serialize/serializer_test.go b/pkg/serialize/serializer_test.go deleted file mode 100644 index 0882282..0000000 --- a/pkg/serialize/serializer_test.go +++ /dev/null @@ -1,84 +0,0 @@ -package serialize - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/suite" - - "github.com/xephonhq/xephon-b/pkg/common" - - "time" -) - -// test implementation satisfies the interface -func TestSerializerInterface(t *testing.T) { - t.Parallel() - var _ Serializer = (*DebugSerializer)(nil) - var _ Serializer = (*JsonSerializer)(nil) -} - -type SerializeTestSuite struct { - suite.Suite - iP *common.IntPointWithSeries - dP *common.DoublePointWithSeries - ts int64 -} - -func TestSerializeTestSuite(t *testing.T) { - suite.Run(t, new(SerializeTestSuite)) -} - -func (suite *SerializeTestSuite) SetupTest() { - name := "cpu.idle" - s := common.NewSeries(name) - s.AddTag("os", "ubuntu") - s.AddTag("arch", "amd64") - ts := time.Now().UnixNano() - suite.ts = ts - suite.iP = &common.IntPointWithSeries{Series: s} - suite.iP.V = 123 - suite.iP.TimeNano = ts - suite.dP = &common.DoublePointWithSeries{Series: s} - suite.dP.V = 12.03 - suite.dP.TimeNano = ts -} - -func (suite *SerializeTestSuite) TestDebugSerializer() { - assert := assert.New(suite.T()) - ds := DebugSerializer{} - - w, _ := ds.WriteInt(suite.iP) - s := string(w) - assert.Contains(s, "cpu.idle", "123", "os=ubuntu", "arch=amd64") - - w, _ = ds.WriteDouble(suite.dP) - s = string(w) - assert.Contains(s, "cpu.idle", "12.03", "os=ubuntu", "arch=amd64") -} - -func (suite *SerializeTestSuite) TestJsonSerializer() { - assert := assert.New(suite.T()) - js := JsonSerializer{} - w, err := js.WriteInt(suite.iP) - //o := fmt.Sprintf("{\"v\":123,\"t\":%d,\"name\":\"cpu.idle\",\"tag\":{\"arch\":\"amd64\",\"os\":\"ubuntu\"}}", suite.ts ) - assert.Nil(err) - suite.T().Log(string(w)) - //assert.Equal(o, string(w)) - // NOTE: \n has no effect on json deserialization - dI, err := js.ReadInt(w) - assert.Nil(err) - suite.T().Log(dI) - suite.T().Log(dI.V) - suite.T().Log(dI.TimeNano) - - w, err = js.WriteDouble(suite.dP) - //o = fmt.Sprintf("{\"v\":12.03,\"t\":%d,\"name\":\"cpu.idle\",\"tag\":{\"arch\":\"amd64\",\"os\":\"ubuntu\"}}", suite.ts) - assert.Nil(err) - suite.T().Log(string(w)) - //assert.Equal(o, string(w)) - dd, err := js.ReadDouble(w) - assert.Nil(err) - suite.T().Log(dd) - suite.T().Log(dd.V) -} diff --git a/pkg/simulator/machine/machine.go b/pkg/simulator/machine/machine.go deleted file mode 100644 index 68abda9..0000000 --- a/pkg/simulator/machine/machine.go +++ /dev/null @@ -1,43 +0,0 @@ -package machine - -import ( - "fmt" - "sync/atomic" - - "github.com/xephonhq/xephon-b/pkg/common" - "github.com/xephonhq/xephon-b/pkg/generator" - "github.com/xephonhq/xephon-b/pkg/generator/value" -) - -type Machine struct { - Name string - OS string - RAM int - CPU int - DISK int - intSeries []*generator.SeriesWithValueGenerator - doubleSeries []*generator.SeriesWithValueGenerator -} - -func GenerateDefaultMachine() Machine { - num := atomic.AddInt64(&defaultMachineNumber, 1) - m := Machine{ - Name: fmt.Sprintf("default-%d", num), - OS: "ubuntu16.04", - CPU: 2, - RAM: 2048, - DISK: 1024000, - } - baseSeries := common.NewSeries("machine") - for i := 0; i < 4; i++ { - s := generator.SeriesWithValueGenerator{ - Series: *baseSeries, - ValueGenerator: value.NewConstantIntGenerator(1), - } - s.AddTag("host", m.Name) - s.AddTag("os", m.OS) - s.AddTag("cpu_core", fmt.Sprintf("%d", i)) - m.intSeries = append(m.intSeries, &s) - } - return m -} diff --git a/pkg/simulator/machine/simulator.go b/pkg/simulator/machine/simulator.go deleted file mode 100644 index 3c2e135..0000000 --- a/pkg/simulator/machine/simulator.go +++ /dev/null @@ -1,108 +0,0 @@ -package machine - -import ( - "io" - "sync" - - "github.com/xephonhq/xephon-b/pkg/common" - "github.com/xephonhq/xephon-b/pkg/config" - gt "github.com/xephonhq/xephon-b/pkg/generator/time" - - "github.com/xephonhq/xephon-b/pkg/serialize" - "github.com/xephonhq/xephon-b/pkg/util" -) - -// Short name use in machine simulator package -var log = util.Logger.NewEntryWithPkg("x.s.machine") - -var defaultMachineNumber int64 = 0 - -type MachineSimulator struct { - config config.MachineSimulatorConfig - machines []*Machine - serializer serialize.Serializer - writer io.Writer -} - -func (ms *MachineSimulator) Type() string { - return "machine" -} - -func (ms *MachineSimulator) SetSerializer(s serialize.Serializer) { - ms.serializer = s -} - -func (ms *MachineSimulator) SetWriter(w io.Writer) { - ms.writer = w -} - -func (ms *MachineSimulator) Start() { - log.Debug("machine simulator start") - intPointChan := make(chan *common.IntPointWithSeries) - // TODO: passing byte array may not be efficient, but leave it to later ... - serializedIntPointChan := make(chan []byte) - // TODO: Add double point support - // doublePointChan := make(chan *common.DoublePointWithSeries) - var wg sync.WaitGroup - var mg sync.WaitGroup - wg.Add(1) - go func() { - mg.Add(len(ms.machines)) - for _, machine := range ms.machines { - go func(m *Machine) { - tg := gt.NewFixedIntervalTimeGenerator(ms.config.Start, ms.config.End, ms.config.Step) - for { - timestamp, err := tg.NextTimestamp() - if err != nil { - if err != gt.ErrEndOfTime { - log.Warn(err) - } - break - } - for _, series := range m.intSeries { - p := common.IntPoint{V: series.ValueGenerator.NextInt(), TimeNano: timestamp} - intPointChan <- &common.IntPointWithSeries{IntPoint: p, Series: &series.Series} - } - } - mg.Done() - }(machine) - } - mg.Wait() - close(intPointChan) - wg.Done() - }() - wg.Add(1) - go func() { - for p := range intPointChan { - sp, err := ms.serializer.WriteInt(p) - if err != nil { - continue - } - serializedIntPointChan <- sp - } - close(serializedIntPointChan) - wg.Done() - }() - wg.Add(1) - go func() { - for sp := range serializedIntPointChan { - ms.writer.Write(sp) - } - wg.Done() - }() - wg.Wait() - log.Debug("machine simulator end") -} - -func NewMachineSimulator(c config.MachineSimulatorConfig) *MachineSimulator { - ms := MachineSimulator{config: c} - for i := 0; i < c.Num; i++ { - m := GenerateDefaultMachine() - ms.AddMachine(&m) - } - return &ms -} - -func (ms *MachineSimulator) AddMachine(m *Machine) { - ms.machines = append(ms.machines, m) -} diff --git a/pkg/simulator/machine/simulator_test.go b/pkg/simulator/machine/simulator_test.go deleted file mode 100644 index 75a01f4..0000000 --- a/pkg/simulator/machine/simulator_test.go +++ /dev/null @@ -1,37 +0,0 @@ -package machine - -import ( - "os" - "testing" - - "time" - - "github.com/stretchr/testify/assert" - "github.com/xephonhq/xephon-b/pkg/config" - "github.com/xephonhq/xephon-b/pkg/serialize" -) - -func TestGenerateDefaultMachine(t *testing.T) { - assert := assert.New(t) - m := GenerateDefaultMachine() - assert.Equal("default-1", m.Name) - assert.Equal("ubuntu16.04", m.OS) - m = GenerateDefaultMachine() - assert.Equal("default-2", m.Name) -} - -func TestMachineSimulator(t *testing.T) { - t.Parallel() - c := config.MachineSimulatorConfig{ - Start: time.Now(), - End: time.Now().Add(time.Duration(time.Minute)), - Step: time.Duration(10 * time.Second), - Num: 3, - } - sm := NewMachineSimulator(c) - t.Log(sm) - sm.SetWriter(os.Stdout) - s := serialize.DebugSerializer{} - sm.SetSerializer(&s) - sm.Start() -} diff --git a/pkg/simulator/simulator.go b/pkg/simulator/simulator.go deleted file mode 100644 index 834883c..0000000 --- a/pkg/simulator/simulator.go +++ /dev/null @@ -1,15 +0,0 @@ -package simulator - -import ( - "io" - - "github.com/xephonhq/xephon-b/pkg/serialize" -) - -// Simulator contains multiple series and represent one type of source of time series data -type Simulator interface { - Type() string - SetSerializer(serialize.Serializer) - SetWriter(io.Writer) - Start() -} diff --git a/pkg/simulator/simulator_test.go b/pkg/simulator/simulator_test.go deleted file mode 100644 index 5dcda1b..0000000 --- a/pkg/simulator/simulator_test.go +++ /dev/null @@ -1,11 +0,0 @@ -package simulator - -import ( - "github.com/xephonhq/xephon-b/pkg/simulator/machine" - "testing" -) - -func TestSimulatorInterface(t *testing.T) { - t.Parallel() - var _ Simulator = (*machine.MachineSimulator)(nil) -} diff --git a/pkg/tsdb/README.md b/pkg/tsdb/README.md deleted file mode 100644 index f236821..0000000 --- a/pkg/tsdb/README.md +++ /dev/null @@ -1,34 +0,0 @@ -# TSDB - -This directory hold - -- unified data type, query interface -- the client code for various TSDBs -- the server API implementation for various TSDBs - -## Client - -- Client is actually a bunch of client connections with TSDB which share same transport, see initial discussion [here](https://github.com/xephonhq/xephon-b/pull/14) - - [ ] TODO: may have an abstraction over http client -- You can control trace (collect client metrics), concurrency, retry policy -- Payload is not thread safe, if you want to fine control over payload, use lock on it. - - [ ] TODO: add lock to payload struct -- Add points to client is thread safe, and how the client handle it is based on config, - i.e. it may batch it until there are enough points - - -## Payload - -- It acts like what is called `builder` in some TSDB Java clients, you can add points to it and they get bytes. -- It is NOT thread safe. -- It has two modes - -### Simple - -- Add points without any grouping, it will turn every point into bytes directly -- Just call `Bytes()`, once you call `Bytes()` you can NO longer add points to the payload, and there is no good reason for doing that. - -### Group by series - -- When adding a point with series, its meta will be sorted, if the same meta already exists, the point will be add to it. -Otherwise `SeriesWithIntPoints` is added. \ No newline at end of file diff --git a/pkg/tsdb/client.go b/pkg/tsdb/client.go deleted file mode 100644 index 0208e0f..0000000 --- a/pkg/tsdb/client.go +++ /dev/null @@ -1,6 +0,0 @@ -package tsdb - -type TSDBClient interface { - Ping() error - Put(p TSDBPayload) error -} diff --git a/pkg/tsdb/client_test.go b/pkg/tsdb/client_test.go deleted file mode 100644 index f739e20..0000000 --- a/pkg/tsdb/client_test.go +++ /dev/null @@ -1,8 +0,0 @@ -package tsdb - -// FIXME: this result in import cycle -// -//func TestTSDBClientInterface(t *testing.T) { -// t.Parallel() -// var _ TSDBClient = (*kairosdb.KairosDBHTTPClient)(nil) -//} diff --git a/pkg/tsdb/common/db_const.go b/pkg/tsdb/common/db_const.go deleted file mode 100644 index 62516a1..0000000 --- a/pkg/tsdb/common/db_const.go +++ /dev/null @@ -1,22 +0,0 @@ -package common - -// Database Names -const ( - KairosDB = "kairosdb" - OpenTSDB = "opentsdb" - InfluxDB = "influxdb" -) - -// Default HTTP API Ports -const ( - KairosDBPort = 8080 - OpenTSDBPort = 4242 - InfuxDBPort = 8086 -) - -// DefaultHTTPPorts is a map of default port numbers -var DefaultHTTPPorts = map[string]int{ - KairosDB: KairosDBPort, - OpenTSDB: OpenTSDBPort, - InfluxDB: InfuxDBPort, -} diff --git a/pkg/tsdb/config/client.go b/pkg/tsdb/config/client.go deleted file mode 100644 index b50b7ba..0000000 --- a/pkg/tsdb/config/client.go +++ /dev/null @@ -1,10 +0,0 @@ -package config - -// TSDBClientConfig control the concurrency of client -type TSDBClientConfig struct { - Host TSDBHostConfig - ConcurrentConnection int - QPSPerClient int - Timeout int - EnableTrace int // or trace level -} diff --git a/pkg/tsdb/config/host.go b/pkg/tsdb/config/host.go deleted file mode 100644 index 77b6183..0000000 --- a/pkg/tsdb/config/host.go +++ /dev/null @@ -1,35 +0,0 @@ -package config - -import ( - "fmt" - - "github.com/pkg/errors" - "github.com/xephonhq/xephon-b/pkg/tsdb/common" -) - -type TSDBHostConfig struct { - Address string - Port int - SSL bool -} - -func (c TSDBHostConfig) HostURL() string { - // TODO: should trim the extra http(s) if user pass it as address - if c.SSL { - return fmt.Sprintf("https://%s:%d", c.Address, c.Port) - } - return fmt.Sprintf("http://%s:%d", c.Address, c.Port) -} - -func NewDefaultHost(tsdb string) (TSDBHostConfig, error) { - c := TSDBHostConfig{ - Address: "localhost", - SSL: false, - } - port, ok := common.DefaultHTTPPorts[tsdb] - if !ok { - return c, errors.New(fmt.Sprintf("%s doest not have default http port hardcoded", tsdb)) - } - c.Port = port - return c, nil -} diff --git a/pkg/tsdb/influxdb/client.go b/pkg/tsdb/influxdb/client.go deleted file mode 100644 index 8e6c940..0000000 --- a/pkg/tsdb/influxdb/client.go +++ /dev/null @@ -1,39 +0,0 @@ -package influxdb - -import ( - "net/http" - - "github.com/dyweb/gommon/requests" - "github.com/pkg/errors" - "github.com/xephonhq/xephon-b/pkg/tsdb" - "github.com/xephonhq/xephon-b/pkg/tsdb/config" - "github.com/xephonhq/xephon-b/pkg/util" -) - -const influxDBVersionHeader = "X-Influxdb-Version" - -type InfluxDBClient struct { - Config config.TSDBClientConfig -} - -// Short name use in InfluxDB client package -var log = util.Logger.NewEntryWithPkg("x.tsdb.influxdb") - -// Ping use InfluxDB /ping API to check if InfluxDB is alive -func (client *InfluxDBClient) Ping() error { - // https://docs.influxdata.com/influxdb/v1.1/tools/api/ - pingURL := client.Config.Host.HostURL() + "/ping" - res, err := requests.Get(pingURL) - if err != nil { - return errors.Wrapf(err, "can't reach InfluxDB via %s", pingURL) - } - if res.Res.StatusCode != http.StatusNoContent { - return errors.Wrapf(err, "wrong status code returned %d, body is %s", res.Res.StatusCode, res.Text) - } - log.Info("InfluxDB version is " + res.Res.Header.Get(influxDBVersionHeader)) - return nil -} - -func (client *InfluxDBClient) Put(p tsdb.TSDBPayload) error { - return nil -} diff --git a/pkg/tsdb/integration_test/kairosdb_test.go b/pkg/tsdb/integration_test/kairosdb_test.go deleted file mode 100644 index 57d16bb..0000000 --- a/pkg/tsdb/integration_test/kairosdb_test.go +++ /dev/null @@ -1,20 +0,0 @@ -// +build !race - -package integration_test - -import ( - "testing" - - "github.com/stretchr/testify/suite" - tutil "github.com/xephonhq/xephon-b/pkg/util/test" -) - -type KairosDBTestSuite struct { - suite.Suite -} - -func TestKairosDBTestSuite(t *testing.T) { - if tutil.KairosDB() { - suite.Run(t, new(KairosDBTestSuite)) - } -} diff --git a/pkg/tsdb/kairosdb/README.md b/pkg/tsdb/kairosdb/README.md deleted file mode 100644 index c6420a7..0000000 --- a/pkg/tsdb/kairosdb/README.md +++ /dev/null @@ -1,15 +0,0 @@ -# KairosDB - -Time series database built on top of Cassandra - -## API - -HTTP(s) - -https://kairosdb.github.io/docs/build/html/restapi/AddDataPoints.html - -NOTE: though it allow the one point syntax, we choose to use the multiple points syntax for all series - -Telnet - -PS: I really want to know if telent is more efficient than http \ No newline at end of file diff --git a/pkg/tsdb/kairosdb/client.go b/pkg/tsdb/kairosdb/client.go deleted file mode 100644 index 5c3dbf9..0000000 --- a/pkg/tsdb/kairosdb/client.go +++ /dev/null @@ -1,110 +0,0 @@ -package kairosdb - -import ( - "io" - "net/http" - "sync" - - "github.com/xephonhq/xephon-b/pkg/tsdb" - "github.com/xephonhq/xephon-b/pkg/tsdb/config" - - "io/ioutil" - - "github.com/dyweb/gommon/requests" - "github.com/pkg/errors" - "github.com/xephonhq/xephon-b/pkg/util" -) - -// Short name use in KairosdDB client package -var log = util.Logger.NewEntryWithPkg("x.tsdb.kairosdb") - -type KairosDBHTTPClient struct { - Config config.TSDBClientConfig - transport *http.Transport - httpClients []*http.Client - requestChan chan *http.Request // TODO: maybe a buffered channel - putURL string - initializeWg sync.WaitGroup - shutdownWg sync.WaitGroup -} - -type KairosDBTelnetClient struct { -} - -// Ping use KairosDB version API to check if it alive -// Ping does not require Initialize to be called -// FIXME: this call also works for OpenTSDB -func (client *KairosDBHTTPClient) Ping() error { - versionURL := client.Config.Host.HostURL() + "/api/v1/version" - res, err := requests.GetJSON(versionURL) - if err != nil { - return errors.Wrapf(err, "can't reach KairosDB via %s", versionURL) - } - log.Info("KairosDB version is " + res["version"]) - return nil -} - -// Initialize creates a bunch of http clients and waits for every goroutine to start -func (client *KairosDBHTTPClient) Initialize() error { - if client.Config.ConcurrentConnection < 1 { - log.Panic("concurrent connection must be larger thant 1") - } - - concurrency := client.Config.ConcurrentConnection - client.transport = &http.Transport{ - MaxIdleConnsPerHost: concurrency, - } - // create clients based on concurrent connection - // all clients share one transport - for i := 0; i < concurrency; i++ { - // TODO: should allocate a fixed size array and assign - client.httpClients = append(client.httpClients, - &http.Client{Transport: client.transport}) - } - client.requestChan = make(chan *http.Request) - client.putURL = client.Config.Host.HostURL() + "/api/v1/datapoints" - client.initializeWg.Add(concurrency) - client.shutdownWg.Add(concurrency) - for i := 0; i < concurrency; i++ { - // TODO: a separate function for this - go func(i int) { - log.Debugf("http client %d routine started", i) - // log.Infof("http client %d routine started", i) - httpClient := client.httpClients[i] - client.initializeWg.Done() - for req := range client.requestChan { - res, err := httpClient.Do(req) - if err != nil { - log.Warn(err) - } else { - io.Copy(ioutil.Discard, res.Body) - // TODO: I wrote a 'FIXME: now the request is canceled' comment in mini-impl/ab code - res.Body.Close() - } - } - log.Debugf("http client %d routine stopped", i) - // log.Infof("http client %d routine stopped", i) - client.shutdownWg.Done() - }(i) - } - client.initializeWg.Wait() - return nil -} - -// Shutdown close the request channel and waits for all the goroutine to return -func (client *KairosDBHTTPClient) Shutdown() { - close(client.requestChan) - client.shutdownWg.Wait() -} - -// Put sends payload using one of the many http clients -func (client *KairosDBHTTPClient) Put(p tsdb.TSDBPayload) error { - // cast it to its own payload - payload, ok := p.(*KairosDBPayload) - if !ok { - log.Panic("must pass KairosDBPayload to KairosDBClient") - } - - payload.Bytes() - return nil -} diff --git a/pkg/tsdb/kairosdb/kairosdb_test.go b/pkg/tsdb/kairosdb/kairosdb_test.go deleted file mode 100644 index b9bfd1d..0000000 --- a/pkg/tsdb/kairosdb/kairosdb_test.go +++ /dev/null @@ -1,64 +0,0 @@ -package kairosdb - -import ( - "testing" - "time" - - "github.com/xephonhq/xephon-b/pkg/common" - "github.com/xephonhq/xephon-b/pkg/tsdb" - "github.com/xephonhq/xephon-b/pkg/tsdb/config" -) - -func TestTSDBClientInterface(t *testing.T) { - var _ tsdb.TSDBClient = (*KairosDBHTTPClient)(nil) -} - -func TestTSDBPayloadInterface(t *testing.T) { - t.Parallel() - var _ tsdb.TSDBPayload = (*KairosDBPayload)(nil) -} - -func TestKairosDBPayload_AddIntPoint(t *testing.T) { - s := common.NewSeries("cpu") - s.AddTag("os", "ubuntu") - s.AddTag("type", "idle") - p := common.IntPoint{TimeNano: time.Now().UnixNano(), V: 1} - sp := common.IntPointWithSeries{Series: s, IntPoint: p} - payload := NewKairosDBPayload() - payload.AddIntPoint(&sp) - payload.AddIntPoint(&sp) - b, _ := payload.Bytes() - t.Log(string(b)) -} - -func TestKairosDBHTTPClient_Initialize_Panic(t *testing.T) { - defer func() { - if r := recover(); r == nil { - t.Log("should panic when concurrent is not set") - t.Fail() - } - }() - c := config.TSDBClientConfig{ - Host: config.TSDBHostConfig{ - Address: "localhost", - Port: 8080, - SSL: false, - }, - } - client := KairosDBHTTPClient{Config: c} - client.Initialize() -} - -func TestKairosDBHTTPClient_Initialize(t *testing.T) { - c := config.TSDBClientConfig{ - Host: config.TSDBHostConfig{ - Address: "localhost", - Port: 8080, - SSL: false, - }, - ConcurrentConnection: 100, - } - client := KairosDBHTTPClient{Config: c} - client.Initialize() - client.Shutdown() -} diff --git a/pkg/tsdb/kairosdb/payload.go b/pkg/tsdb/kairosdb/payload.go deleted file mode 100644 index 6bdacea..0000000 --- a/pkg/tsdb/kairosdb/payload.go +++ /dev/null @@ -1,65 +0,0 @@ -package kairosdb - -import ( - "bytes" - "encoding/json" - "fmt" - - "github.com/xephonhq/xephon-b/pkg/common" -) - -// KairosDBPayload is NOT thread safe -type KairosDBPayload struct { - firstPoint bool - end bool - buffer *bytes.Buffer - bufferedSeries []*common.Series - bufferedIntPoints []*common.IntPoint - bufferedDoublePoints []*common.DoublePoint -} - -func NewKairosDBPayload() *KairosDBPayload { - p := KairosDBPayload{} - p.buffer = bytes.NewBufferString("[") - p.firstPoint = true - p.end = false - return &p -} - -// AddIntPoint turns point into bytes without any grouping -func (p *KairosDBPayload) AddIntPoint(sp *common.IntPointWithSeries) { - if !p.firstPoint { - p.buffer.WriteString(",{") - } else { - p.buffer.WriteString("{") - p.firstPoint = false - } - p.buffer.WriteString(fmt.Sprintf("\"name\":\"%s\",", sp.Name)) - p.buffer.WriteString(fmt.Sprintf("\"datapoints\":[[%d, %d]],\"tags\":", sp.TimeNano, sp.V)) - t, _ := json.Marshal(sp.Tags) - p.buffer.Write(t) - p.buffer.WriteString("}") -} - -func (p *KairosDBPayload) AddPointToBuffer() { - // this store the struct and merge into one series when get the string ([]byte) , actually it's a group by -} - -func (p *KairosDBPayload) groupBySeries() { - -} - -func (p *KairosDBPayload) DataSize() int { - // the real data size, - // TODO: count series data several times? - // TODO: the payload size, they are all different - return 0 -} - -func (p *KairosDBPayload) Bytes() ([]byte, error) { - if !p.end { - p.buffer.WriteString("]") - p.end = true - } - return p.buffer.Bytes(), nil -} diff --git a/pkg/tsdb/kairosdb/payload_test.go b/pkg/tsdb/kairosdb/payload_test.go deleted file mode 100644 index 3e9a61d..0000000 --- a/pkg/tsdb/kairosdb/payload_test.go +++ /dev/null @@ -1 +0,0 @@ -package kairosdb diff --git a/pkg/tsdb/opentsdb/client.go b/pkg/tsdb/opentsdb/client.go deleted file mode 100644 index a653072..0000000 --- a/pkg/tsdb/opentsdb/client.go +++ /dev/null @@ -1,33 +0,0 @@ -package opentsdb - -import ( - "github.com/dyweb/gommon/requests" - "github.com/pkg/errors" - "github.com/xephonhq/xephon-b/pkg/tsdb" - "github.com/xephonhq/xephon-b/pkg/tsdb/config" - "github.com/xephonhq/xephon-b/pkg/util" -) - -// Short name use in OpenTSDB client package -var log = util.Logger.NewEntryWithPkg("x.tsdb.opentsdb") - -type OpenTSDBHTTPClient struct { - Config config.TSDBClientConfig -} - -type OpenTSDBTelnetClient struct { -} - -func (client *OpenTSDBHTTPClient) Ping() error { - versionURL := client.Config.Host.HostURL() + "/api/version" - res, err := requests.GetJSON(versionURL) - if err != nil { - return errors.Wrapf(err, "can't reach OpenTSDB via %s", versionURL) - } - log.Info("OpenTSDB version is " + res["version"]) - return nil -} - -func (client *OpenTSDBHTTPClient) Put(p tsdb.TSDBPayload) error { - return nil -} diff --git a/pkg/tsdb/payload.go b/pkg/tsdb/payload.go deleted file mode 100644 index 583170c..0000000 --- a/pkg/tsdb/payload.go +++ /dev/null @@ -1,6 +0,0 @@ -package tsdb - -type TSDBPayload interface { - // TODO: would it be more efficient to pass *[]byte, idk - Bytes() ([]byte, error) -} diff --git a/pkg/tsdb/payload_test.go b/pkg/tsdb/payload_test.go deleted file mode 100644 index a124ce1..0000000 --- a/pkg/tsdb/payload_test.go +++ /dev/null @@ -1,7 +0,0 @@ -package tsdb - -// FIXME: import cycle -//func TestTSDBPayloadInterface(t *testing.T){ -// t.Parallel() -// var _ TSDBPayload = (*kairosdb.KairosDBPayload)(nil) -//} diff --git a/pkg/util/README.md b/pkg/util/README.md deleted file mode 100644 index 3c211b7..0000000 --- a/pkg/util/README.md +++ /dev/null @@ -1,3 +0,0 @@ -# Util - -This directory hold util functions and small patches (copy and pasted code) from third party libraries. \ No newline at end of file diff --git a/pkg/util/log.go b/pkg/util/log.go deleted file mode 100644 index b4802ee..0000000 --- a/pkg/util/log.go +++ /dev/null @@ -1,24 +0,0 @@ -package util - -import dlog "github.com/dyweb/gommon/log" - -// Log util - -// Logger is the default logger with info level -var Logger = dlog.NewLogger() - -// Short name use in util package -var log = Logger.NewEntryWithPkg("x.util") - -func init() { - f := dlog.NewTextFormatter() - f.EnableColor = true - Logger.Formatter = f - Logger.Level = dlog.InfoLevel -} - -// UseVerboseLog set logger level to debug -func UseVerboseLog() { - Logger.Level = dlog.DebugLevel - log.Debug("enable debug logging") -} diff --git a/pkg/util/test.go b/pkg/util/test.go deleted file mode 100644 index d72dba7..0000000 --- a/pkg/util/test.go +++ /dev/null @@ -1,34 +0,0 @@ -package util - -// Test util - -import ( - "os" - "path" - "runtime" - - "github.com/spf13/viper" -) - -// ViperReadTestConfig -func ViperReadTestConfig() { - // Get current file path https://coderwall.com/p/_fmbug/go-get-path-to-current-file - _, filename, _, ok := runtime.Caller(1) - if !ok { - log.Fatal("can't get current file path") - } - - filePath := path.Join(path.Dir(filename), "../../xephon-b.yml") - log.Debug(filePath) - viper.SetConfigFile(filePath) - viper.ReadInConfig() -} - -func TestKairosDB() bool { - // TODO: maybe using testing.Short() - if os.Getenv("TEST_KAIROSDB") == "1" { - // TODO: maybe need to ping to make sure the db is running and then cache the result - return true - } - return false -} diff --git a/pkg/util/test/db.go b/pkg/util/test/db.go deleted file mode 100644 index 0851035..0000000 --- a/pkg/util/test/db.go +++ /dev/null @@ -1,51 +0,0 @@ -package test - -import ( - "os" - - st "github.com/dyweb/gommon/structure" - "github.com/xephonhq/xephon-b/pkg/tsdb/common" - "github.com/xephonhq/xephon-b/pkg/tsdb/config" - "github.com/xephonhq/xephon-b/pkg/tsdb/kairosdb" -) - -var pinged = st.NewSet() -var running = st.NewSet() - -// KairosDB determines if we should run test that requires KairosDB up and running -// - if the environment variable is set, we test -// TODO: this may mess up the running database, but sicne we use docker, the data can lost -// - if we can ping the database using provided config, we test -func KairosDB() bool { - // NOTE: you need to add `// +build !race` to your integration test - if running.Contains(common.KairosDB) { - return true - } - // we pinged and it is not running - if pinged.Contains(common.KairosDB) { - return false - } - // env var goes before ping, pinged is empty at first, so env var is triggered first - // TODO: get environment variable name from the `common` package instead hardcoded here - // TODO: disable DB test on Travis CI - if os.Getenv("TEST_KAIROSDB") == "1" { - running.Add(common.KairosDB) - return true - } - pinged.Add(common.KairosDB) - // TODO: allow get test config from config file instead of just using default - h, err := config.NewDefaultHost(common.KairosDB) - if err != nil { - log.Warn(err) - return false - } - c := config.TSDBClientConfig{Host: h} - client := &kairosdb.KairosDBHTTPClient{Config: c} - err = client.Ping() - if err != nil { - log.Warn(err) - return false - } - running.Add(common.KairosDB) - return true -} diff --git a/pkg/util/test/pkg.go b/pkg/util/test/pkg.go deleted file mode 100644 index d91ca97..0000000 --- a/pkg/util/test/pkg.go +++ /dev/null @@ -1,7 +0,0 @@ -package test - -import ( - "github.com/xephonhq/xephon-b/pkg/util" -) - -var log = util.Logger.NewEntryWithPkg("x.u.test") diff --git a/script/travis_install.sh b/script/travis_install.sh deleted file mode 100755 index 26704f7..0000000 --- a/script/travis_install.sh +++ /dev/null @@ -1,28 +0,0 @@ -#!/usr/bin/env bash - -# install glide - -# switch folder -# get the script path http://stackoverflow.com/questions/4774054/reliable-way-for-a-bash-script-to-get-the-full-path-to-itself -pushd `dirname $0` > /dev/null -SCRIPTPATH=`pwd -P` -popd > /dev/null -# get current working directory -ORIGINAL_WD=${PWD} -# switch to script directory -cd ${SCRIPTPATH} - -# download and extract -wget https://github.com/Masterminds/glide/releases/download/v0.12.3/glide-v0.12.3-linux-amd64.tar.gz -tar -zxvf glide-v0.12.3-linux-amd64.tar.gz -# add glide to path -export PATH=$PATH:${SCRIPTPATH}/linux-amd64 -# show it is working -glide -v - -# install dependencies -cd .. -glide install - -# go back to the old working directory -cd ${ORIGINAL_WD} \ No newline at end of file diff --git a/tsdb-proxy.yml b/tsdb-proxy.yml deleted file mode 100644 index 9d4c891..0000000 --- a/tsdb-proxy.yml +++ /dev/null @@ -1 +0,0 @@ -# example tsdb-proxy config file \ No newline at end of file diff --git a/xephon-b.yml b/xephon-b.yml deleted file mode 100644 index fcfbf4b..0000000 --- a/xephon-b.yml +++ /dev/null @@ -1,11 +0,0 @@ -# The example xephon-b configuration file -# TODO: might be split to several config files like ansible -simulator: - machine: - num: 3 - # start time in ISO 8601 - start: 1997-07-16T19:20:30.45+01:00 - # end time in ISO 8601 - end: 1997-07-16T19:21:30.45+01:00 - # in second - step: 10 \ No newline at end of file