diff --git a/client.go b/client.go new file mode 100644 index 0000000..3bae2be --- /dev/null +++ b/client.go @@ -0,0 +1,11 @@ +package ksqldb + +import "log" + +//NewClient creates new ksqldb client with log.Println default logging +func NewClient(url string) *Client { + return &Client{ + url: url, + logf: log.Printf, + } +} diff --git a/debug.go b/debug.go new file mode 100644 index 0000000..60cd615 --- /dev/null +++ b/debug.go @@ -0,0 +1,22 @@ +package ksqldb + +//log logging data +func (cl *Client) log(format string, v ...interface{}) { + if cl.isDebug { + cl.logf(format, v) + } +} + +//SetLogFunc sets custom logging function with func(format string, v ...interface{}) profile +func (cl *Client) SetLogFunc(fn func(format string, v ...interface{})) *Client { + if fn != nil { + cl.logf = fn + } + return cl +} + +//Debug sets debug mode for logging +func (cl *Client) Debug() *Client { + cl.isDebug = true + return cl +} diff --git a/errors.go b/errors.go new file mode 100644 index 0000000..69565a2 --- /dev/null +++ b/errors.go @@ -0,0 +1,7 @@ +package ksqldb + +import "errors" + +var ( + ErrNotFound = errors.New("No result found") +) diff --git a/execute.go b/execute.go index a4026a5..6370b3b 100644 --- a/execute.go +++ b/execute.go @@ -3,7 +3,6 @@ package ksqldb import ( "fmt" "io/ioutil" - "log" "net/http" "strings" ) @@ -19,7 +18,7 @@ import ( // // TODO Add support for commandSequenceNumber and streamsProperties // TODO Add better support for responses to CREATE/DROP/TERMINATE (e.g. commandID, commandStatus.status, etc) -func Execute(u string, q string) (err error) { +func (cl *Client) Execute(q string) (err error) { // Create the client client := &http.Client{} @@ -29,8 +28,8 @@ func Execute(u string, q string) (err error) { q = strings.ReplaceAll(q, "\n", "") // make the request payload := strings.NewReader("{\"ksql\":\"" + q + "\"}") - req, err := http.NewRequest("POST", u+"/ksql", payload) - log.Printf("Sending ksqlDB request:\n\t%v", q) + req, err := http.NewRequest("POST", cl.url+"/ksql", payload) + cl.log("Sending ksqlDB request:\n\t%v", q) if err != nil { return err } diff --git a/pull.go b/pull.go index e38a16d..763eb39 100644 --- a/pull.go +++ b/pull.go @@ -5,7 +5,6 @@ import ( "encoding/json" "fmt" "io/ioutil" - "log" "net" "net/http" "strings" @@ -31,7 +30,7 @@ import ( // // Do other stuff with the data here // } // } -func Pull(u string, q string) (h Header, r Payload, err error) { +func (cl *Client) Pull(q string) (h Header, r Payload, err error) { // Create the client, force it to use HTTP2 (to avoid `http2: unsupported scheme`) client := http.Client{ @@ -48,7 +47,7 @@ func Pull(u string, q string) (h Header, r Payload, err error) { // Create the request payload := strings.NewReader("{\"sql\":\"" + q + "\"}") - req, err := http.NewRequest("POST", u+"/query-stream", payload) + req, err := http.NewRequest("POST", cl.url+"/query-stream", payload) if err != nil { return h, r, err @@ -79,12 +78,12 @@ func Pull(u string, q string) (h Header, r Payload, err error) { switch len(x) { case 0: - return h, r, fmt.Errorf("No results (not even a header row) returned from lookup. Maybe we got an error:%v", err) + return h, r, fmt.Errorf("%w (not even a header row) returned from lookup. Maybe we got an error:%v", ErrNotFound, err) case 1: // len 1 means we just got a header, no rows // Should we define our own error types here so we can return more clearly // an indicator that no rows were found? - return h, r, fmt.Errorf("No result found") + return h, r, ErrNotFound default: for _, z := range x { switch zz := z.(type) { @@ -94,27 +93,27 @@ func Pull(u string, q string) (h Header, r Payload, err error) { if _, ok := zz["queryId"].(string); ok { h.queryId = zz["queryId"].(string) } else { - log.Println("(Query ID not found - this is expected for a pull query)") + cl.log("(Query ID not found - this is expected for a pull query)") } names, okn := zz["columnNames"].([]interface{}) types, okt := zz["columnTypes"].([]interface{}) if okn && okt { for col := range names { - if n, o := names[col].(string); n != "" && o == true { - if t, o := types[col].(string); t != "" && o == true { + if n, ok := names[col].(string); n != "" && ok { + if t, ok := types[col].(string); t != "" && ok { a := Column{Name: n, Type: t} h.columns = append(h.columns, a) } else { - log.Printf("Nil type found for column %v", col) + cl.log("Nil type found for column %v", col) } } else { - log.Printf("Nil name found for column %v", col) + cl.log("Nil name found for column %v", col) } } } else { - log.Printf("Column names/types not found in header:\n%v", zz) + cl.log("Column names/types not found in header:\n%v", zz) } case []interface{}: diff --git a/push.go b/push.go index 0b072e3..ac90aad 100644 --- a/push.go +++ b/push.go @@ -39,7 +39,7 @@ import ( // if row != nil { // DATA_TS = row[0].(float64) // ID = row[1].(string) -func Push(u string, q string, rc chan<- Row, hc chan<- Header, cc <-chan bool) (err error) { +func (cl *Client) Push(q string, rc chan<- Row, hc chan<- Header, cc <-chan bool) (err error) { // Create the client, force it to use HTTP2 (to avoid `http2: unsupported scheme`) client := http.Client{ @@ -55,8 +55,8 @@ func Push(u string, q string, rc chan<- Row, hc chan<- Header, cc <-chan bool) ( } // make the request payload := strings.NewReader("{\"properties\":{\"ksql.streams.auto.offset.reset\": \"latest\"},\"sql\":\"" + q + "\"}") - req, err := http.NewRequest("POST", u+"/query-stream", payload) - log.Printf("Sending ksqlDB request:\n\t%v", q) + req, err := http.NewRequest("POST", cl.url+"/query-stream", payload) + cl.log("Sending ksqlDB request:\n\t%v", q) if err != nil { return err } @@ -82,8 +82,8 @@ func Push(u string, q string, rc chan<- Row, hc chan<- Header, cc <-chan bool) ( defer func() { doThis = false }() // Try to close the query payload := strings.NewReader("{\"queryId\":\"" + h.queryId + "\"}") - req, err := http.NewRequest("POST", u+"/close-query", payload) - log.Printf("Closing ksqlDB query\t%v", h.queryId) + req, err := http.NewRequest("POST", cl.url+"/close-query", payload) + cl.log("Closing ksqlDB query\t%v", h.queryId) if err != nil { return fmt.Errorf("Failed to construct HTTP request to cancel query\n%v", err) } @@ -121,27 +121,27 @@ func Push(u string, q string, rc chan<- Row, hc chan<- Header, cc <-chan bool) ( if _, ok := zz["queryId"].(string); ok { h.queryId = zz["queryId"].(string) } else { - log.Println("Query ID not found - this is expected for a pull query") + cl.log("Query ID not found - this is expected for a pull query") } names, okn := zz["columnNames"].([]interface{}) types, okt := zz["columnTypes"].([]interface{}) if okn && okt { for col := range names { - if n, o := names[col].(string); n != "" && o == true { - if t, o := types[col].(string); t != "" && o == true { + if n, ok := names[col].(string); n != "" && ok { + if t, ok := types[col].(string); t != "" && ok { a := Column{Name: n, Type: t} h.columns = append(h.columns, a) } else { - log.Printf("Nil type found for column %v", col) + cl.log("Nil type found for column %v", col) } } else { - log.Printf("Nil name found for column %v", col) + cl.log("Nil name found for column %v", col) } } } else { - log.Printf("Column names/types not found in header:\n%v", zz) + cl.log("Column names/types not found in header:\n%v", zz) } // log.Println("Header:", h) hc <- h diff --git a/types.go b/types.go index f9a897e..6f4ffb6 100644 --- a/types.go +++ b/types.go @@ -13,3 +13,9 @@ type Column struct { Name string Type string } + +type Client struct { + url string + isDebug bool + logf func(format string, v ...interface{}) +}