Skip to content

Commit

Permalink
Merge pull request #5 from cbrake/feature-v2
Browse files Browse the repository at this point in the history
Feature v2
  • Loading branch information
cbrake authored Oct 25, 2018
2 parents ca69206 + 1678fdd commit 7aca52a
Show file tree
Hide file tree
Showing 10 changed files with 490 additions and 299 deletions.
27 changes: 15 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ const (
db = "dbhelper"
)

var c *influxdbhelper.Client
var c influxdbhelper.Client

func Init() (err error) {
c, err = influxdbhelper.NewClient(influxUrl, "", "", "ns")
if err != nil {
return
}
// Create MM database if it doesn't already exist
// Create database if it doesn't already exist
q := client.NewQuery("CREATE DATABASE "+db, "", "")
res, err := c.InfluxClient().Query(q)
if err != nil {
Expand All @@ -44,6 +44,7 @@ func Init() (err error) {
}

type EnvSample struct {
InfluxMeasurement influxdbhelper.Measurement
Time time.Time `influx:"time"`
Location string `influx:"location,tag"`
Temperature float64 `influx:"temperature"`
Expand All @@ -56,6 +57,7 @@ func generateSampleData() []EnvSample {

for i, _ := range ret {
ret[i] = EnvSample{
InfluxMeasurement: "test"
Time: time.Now(),
Location: "Rm 243",
Temperature: 70 + float64(i),
Expand All @@ -75,8 +77,9 @@ func main() {

// write sample data to database
samples := generateSampleData()
c = c.UseDB(db)
for _, p := range samples {
err := c.WritePoint(db, "test", p)
err := c.WritePoint(p)
if err != nil {
log.Fatal("Error writing point: ", err)
}
Expand All @@ -86,7 +89,7 @@ func main() {
samplesRead := []EnvSample{}

q := `SELECT * FROM test ORDER BY time DESC LIMIT 10`
err = c.Query(db, q, &samplesRead)
err = c.UseDB(db).Query(q, &samplesRead)
if err != nil {
log.Fatal("Query error: ", err)
}
Expand Down Expand Up @@ -134,14 +137,14 @@ libraries that do similiar things, I would be very interested in learning about

Todo:

* [x] handle larger query datasets (multiple series, etc)
* [x] add write capability (directly write Go structs into influxdb)
* [x] add godoc documentation
* [ ] decode/encode val0, val1, val2 fields in influx to Go array
* [ ] use Go struct field tags to help build SELECT statement
* [ ] optimize query for performace (pre-allocate slices, etc)
* [ ] come up with a better name (indecode, etc)
* [ ] finish error checking
- [x] handle larger query datasets (multiple series, etc)
- [x] add write capability (directly write Go structs into influxdb)
- [x] add godoc documentation
- [ ] decode/encode val0, val1, val2 fields in influx to Go array
- [ ] use Go struct field tags to help build SELECT statement
- [ ] optimize query for performace (pre-allocate slices, etc)
- [ ] come up with a better name (indecode, etc)
- [ ] finish error checking

Review/Pull requests welcome!

Expand Down
172 changes: 145 additions & 27 deletions client.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package influxdbhelper

import (
"fmt"
"regexp"
"strings"
"time"

client "github.com/influxdata/influxdb/client/v2"
influxClient "github.com/influxdata/influxdb/client/v2"
)

var reRemoveExtraSpace = regexp.MustCompile(`\s\s+`)
Expand All @@ -18,42 +19,128 @@ func CleanQuery(query string) string {
return ret
}

// A Client represents an influxdbhelper client connection to
// A Client represents an influxdbhelper influxClient connection to
// an InfluxDb server.
type Client struct {
type Client interface {
influxClient.Client

// UseDB sets the DB to use for Query, WritePoint, and WritePointTagsFields.
// This field must be set before WritePoint... calls.
UseDB(db string) Client

// UseMeasurement sets the measurement to use for WritePoint, and WritePointTagsFields.
// If this is not set, a struct field with named InfluxMeasurement is required
// in the write data. The data passed in this call has priority over data fields in
// writes.
UseMeasurement(measurement string) Client

// UseTimeField sets the time field to use for WritePoint, and WritePointTagsFields. This
// call is optional, and a data struct field with a `influx:"time"` tag can also be used.
UseTimeField(fieldName string) Client

// Query executes an InfluxDb query, and unpacks the result into the
// result data structure.
DecodeQuery(query string, result interface{}) error

// WritePoint is used to write arbitrary data into InfluxDb.
WritePoint(data interface{}) error

// WritePointTagsFields is used to write a point specifying tags and fields.
WritePointTagsFields(tags map[string]string, fields map[string]interface{}, t time.Time) error
}

type helperClient struct {
url string
client client.Client
client influxClient.Client
precision string
using *helperUsing
}

type usingValue struct {
value string
retain bool
}

type helperUsing struct {
db *usingValue
measurement *usingValue
timeField *usingValue
}

// NewClient returns a new influxdbhelper client given a url, user,
// NewClient returns a new influxdbhelper influxClient given a url, user,
// password, and precision strings.
//
// url is typically something like: http://localhost:8086
//
// precision can be ‘h’, ‘m’, ‘s’, ‘ms’, ‘u’, or ‘ns’ and is
// used during write operations.
func NewClient(url, user, passwd, precision string) (*Client, error) {
ret := Client{
func NewClient(url, user, passwd, precision string) (Client, error) {
ret := &helperClient{
url: url,
precision: precision,
}

client, err := client.NewHTTPClient(client.HTTPConfig{
client, err := influxClient.NewHTTPClient(influxClient.HTTPConfig{
Addr: url,
Username: user,
Password: passwd,
})

ret.client = client

return &ret, err
return ret, err
}

// Ping checks that status of cluster, and will always return 0 time and no
// error for UDP clients.
func (c *helperClient) Ping(timeout time.Duration) (time.Duration, string, error) {
return c.client.Ping(timeout)
}

// Write takes a BatchPoints object and writes all Points to InfluxDB.
func (c *helperClient) Write(bp influxClient.BatchPoints) error {
return c.client.Write(bp)
}

// Query makes an InfluxDB Query on the database. This will fail if using
// the UDP client.
func (c *helperClient) Query(q influxClient.Query) (*influxClient.Response, error) {
return c.client.Query(q)
}

// Close releases any resources a Client may be using.
func (c *helperClient) Close() error {
return c.client.Close()
}

// UseDB sets the DB to use for Query, WritePoint, and WritePointTagsFields
func (c *helperClient) UseDB(db string) Client {
if c.using == nil {
c.using = &helperUsing{}
}

c.using.db = &usingValue{db, true}
return c
}

// UseMeasurement sets the DB to use for Query, WritePoint, and WritePointTagsFields
func (c *helperClient) UseMeasurement(measurement string) Client {
if c.using == nil {
c.using = &helperUsing{}
}

c.using.measurement = &usingValue{measurement, true}
return c
}

// InfluxClient returns the influxdb/client/v2 client if low level
// queries or writes need to be executed.
func (c Client) InfluxClient() client.Client {
return c.client
// UseDB sets the DB to use for Query, WritePoint, and WritePointTagsFields
func (c *helperClient) UseTimeField(fieldName string) Client {
if c.using == nil {
c.using = &helperUsing{}
}

c.using.timeField = &usingValue{fieldName, true}
return c
}

// Query executes an InfluxDb query, and unpacks the result into the
Expand All @@ -67,16 +154,23 @@ func (c Client) InfluxClient() client.Client {
// and InfluxDb field/tag names typically start with a lower case letter.
// The struct field tag can be set to '-' which indicates this field
// should be ignored.
func (c Client) Query(db, cmd string, result interface{}) (err error) {
query := client.Query{
Command: cmd,
Database: db,
func (c *helperClient) DecodeQuery(q string, result interface{}) (err error) {
if c.using == nil || c.using.db == nil {
return fmt.Errorf("no db set for query")
}

query := influxClient.Query{
Command: q,
Database: c.using.db.value,
Chunked: false,
ChunkSize: 100,
}

var response *client.Response
var response *influxClient.Response
response, err = c.client.Query(query)
if !c.using.db.retain {
c.using.db = nil
}

if response.Error() != nil {
return response.Error()
Expand All @@ -92,8 +186,7 @@ func (c Client) Query(db, cmd string, result interface{}) (err error) {
}

series := results[0].Series[0]

err = decode(series.Columns, series.Values, result)
err = decode(series, result)

return
}
Expand All @@ -105,28 +198,53 @@ func (c Client) Query(db, cmd string, result interface{}) (err error) {
// struct field should be an InfluxDb tag (vs field). A tag of '-' indicates
// the struct field should be ignored. A struct field of Time is required and
// is used for the time of the sample.
func (c Client) WritePoint(db, measurement string, data interface{}) error {
t, tags, fields, err := encode(data)
func (c *helperClient) WritePoint(data interface{}) error {
if c.using == nil || c.using.db == nil {
return fmt.Errorf("no db set for query")
}

t, tags, fields, measurement, err := encode(data, c.using.timeField)

if c.using.measurement == nil {
c.using.measurement = &usingValue{measurement, false}
}

if err != nil {
return err
}

return c.WritePointTagsFields(db, measurement, tags, fields, t)
return c.WritePointTagsFields(tags, fields, t)
}

// WritePointTagsFields is used to write a point specifying tags and fields.
func (c Client) WritePointTagsFields(db, measurement string, tags map[string]string, fields map[string]interface{}, t time.Time) error {
bp, err := client.NewBatchPoints(client.BatchPointsConfig{
Database: db,
func (c *helperClient) WritePointTagsFields(tags map[string]string, fields map[string]interface{}, t time.Time) (err error) {
if c.using == nil || c.using.db == nil {
return fmt.Errorf("no db set for query")
}

if c.using.measurement == nil {
return fmt.Errorf("no measurement set for query")
}

bp, err := influxClient.NewBatchPoints(influxClient.BatchPointsConfig{
Database: c.using.db.value,
Precision: c.precision,
})

if err != nil {
return err
}

pt, err := client.NewPoint(measurement, tags, fields, t)
pt, err := influxClient.NewPoint(c.using.measurement.value, tags, fields, t)
if !c.using.db.retain {
c.using.db = nil
}
if !c.using.measurement.retain {
c.using.measurement = nil
}
if c.using.timeField != nil && !c.using.timeField.retain {
c.using.timeField = nil
}

if err != nil {
return err
Expand Down
10 changes: 5 additions & 5 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,18 @@ func ExampleClient_WritePoint() {
Location string `influx:"location,tag"`
Temperature float64 `influx:"temperature"`
Humidity float64 `influx:"humidity"`
Id string `influx:"-"`
ID string `influx:"-"`
}

s := EnvSample{
Time: time.Now(),
Location: "Rm 243",
Temperature: 70.0,
Humidity: 60.0,
Id: "12432as32",
ID: "12432as32",
}

c.WritePoint("myDb", "test", s)
c.UseDB("myDb").UseMeasurement("test").WritePoint(s)
}

func ExampleClient_Query() {
Expand All @@ -34,14 +34,14 @@ func ExampleClient_Query() {
Location string `influx:"location,tag"`
Temperature float64 `influx:"temperature"`
Humidity float64 `influx:"humidity"`
Id string `influx:"-"`
ID string `influx:"-"`
}

samplesRead := []EnvSample{}

q := `SELECT * FROM test ORDER BY time DESC LIMIT 10`

c.Query("myDb", q, &samplesRead)
c.UseDB("myDb").DecodeQuery(q, &samplesRead)

// samplesRead is now populated with data from InfluxDb
}
Loading

0 comments on commit 7aca52a

Please sign in to comment.