Skip to content

Commit

Permalink
created ksqldb client
Browse files Browse the repository at this point in the history
  • Loading branch information
k.s.franchuk committed Oct 7, 2020
1 parent b9f70ea commit a6bada4
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 26 deletions.
11 changes: 11 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package ksqldb

import "log"

//NewClient creates new ksqldb client with log.Println default logging
func NewClient(url string) *Client {
return &Client{
url: url,
logf: log.Printf,
}
}
22 changes: 22 additions & 0 deletions debug.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package ksqldb

//log logging data
func (cl *Client) log(format string, v ...interface{}) {
if cl.isDebug {
cl.logf(format, v)
}
}

//SetLogFunc sets custom logging function with func(format string, v ...interface{}) profile
func (cl *Client) SetLogFunc(fn func(format string, v ...interface{})) *Client {
if fn != nil {
cl.logf = fn
}
return cl
}

//Debug sets debug mode for logging
func (cl *Client) Debug() *Client {
cl.isDebug = true
return cl
}
7 changes: 7 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package ksqldb

import "errors"

var (
ErrNotFound = errors.New("No result found")
)
7 changes: 3 additions & 4 deletions execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package ksqldb
import (
"fmt"
"io/ioutil"
"log"
"net/http"
"strings"
)
Expand All @@ -19,7 +18,7 @@ import (
//
// TODO Add support for commandSequenceNumber and streamsProperties
// TODO Add better support for responses to CREATE/DROP/TERMINATE (e.g. commandID, commandStatus.status, etc)
func Execute(u string, q string) (err error) {
func (cl *Client) Execute(q string) (err error) {

// Create the client
client := &http.Client{}
Expand All @@ -29,8 +28,8 @@ func Execute(u string, q string) (err error) {
q = strings.ReplaceAll(q, "\n", "")
// make the request
payload := strings.NewReader("{\"ksql\":\"" + q + "\"}")
req, err := http.NewRequest("POST", u+"/ksql", payload)
log.Printf("Sending ksqlDB request:\n\t%v", q)
req, err := http.NewRequest("POST", cl.url+"/ksql", payload)
cl.log("Sending ksqlDB request:\n\t%v", q)
if err != nil {
return err
}
Expand Down
21 changes: 10 additions & 11 deletions pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net"
"net/http"
"strings"
Expand All @@ -31,7 +30,7 @@ import (
// // Do other stuff with the data here
// }
// }
func Pull(u string, q string) (h Header, r Payload, err error) {
func (cl *Client) Pull(q string) (h Header, r Payload, err error) {

// Create the client, force it to use HTTP2 (to avoid `http2: unsupported scheme`)
client := http.Client{
Expand All @@ -48,7 +47,7 @@ func Pull(u string, q string) (h Header, r Payload, err error) {

// Create the request
payload := strings.NewReader("{\"sql\":\"" + q + "\"}")
req, err := http.NewRequest("POST", u+"/query-stream", payload)
req, err := http.NewRequest("POST", cl.url+"/query-stream", payload)

if err != nil {
return h, r, err
Expand Down Expand Up @@ -79,12 +78,12 @@ func Pull(u string, q string) (h Header, r Payload, err error) {

switch len(x) {
case 0:
return h, r, fmt.Errorf("No results (not even a header row) returned from lookup. Maybe we got an error:%v", err)
return h, r, fmt.Errorf("%w (not even a header row) returned from lookup. Maybe we got an error:%v", ErrNotFound, err)
case 1:
// len 1 means we just got a header, no rows
// Should we define our own error types here so we can return more clearly
// an indicator that no rows were found?
return h, r, fmt.Errorf("No result found")
return h, r, ErrNotFound
default:
for _, z := range x {
switch zz := z.(type) {
Expand All @@ -94,27 +93,27 @@ func Pull(u string, q string) (h Header, r Payload, err error) {
if _, ok := zz["queryId"].(string); ok {
h.queryId = zz["queryId"].(string)
} else {
log.Println("(Query ID not found - this is expected for a pull query)")
cl.log("(Query ID not found - this is expected for a pull query)")
}

names, okn := zz["columnNames"].([]interface{})
types, okt := zz["columnTypes"].([]interface{})
if okn && okt {
for col := range names {
if n, o := names[col].(string); n != "" && o == true {
if t, o := types[col].(string); t != "" && o == true {
if n, ok := names[col].(string); n != "" && ok {
if t, ok := types[col].(string); t != "" && ok {
a := Column{Name: n, Type: t}
h.columns = append(h.columns, a)

} else {
log.Printf("Nil type found for column %v", col)
cl.log("Nil type found for column %v", col)
}
} else {
log.Printf("Nil name found for column %v", col)
cl.log("Nil name found for column %v", col)
}
}
} else {
log.Printf("Column names/types not found in header:\n%v", zz)
cl.log("Column names/types not found in header:\n%v", zz)
}

case []interface{}:
Expand Down
22 changes: 11 additions & 11 deletions push.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (
// if row != nil {
// DATA_TS = row[0].(float64)
// ID = row[1].(string)
func Push(u string, q string, rc chan<- Row, hc chan<- Header, cc <-chan bool) (err error) {
func (cl *Client) Push(q string, rc chan<- Row, hc chan<- Header, cc <-chan bool) (err error) {

// Create the client, force it to use HTTP2 (to avoid `http2: unsupported scheme`)
client := http.Client{
Expand All @@ -55,8 +55,8 @@ func Push(u string, q string, rc chan<- Row, hc chan<- Header, cc <-chan bool) (
}
// make the request
payload := strings.NewReader("{\"properties\":{\"ksql.streams.auto.offset.reset\": \"latest\"},\"sql\":\"" + q + "\"}")
req, err := http.NewRequest("POST", u+"/query-stream", payload)
log.Printf("Sending ksqlDB request:\n\t%v", q)
req, err := http.NewRequest("POST", cl.url+"/query-stream", payload)
cl.log("Sending ksqlDB request:\n\t%v", q)
if err != nil {
return err
}
Expand All @@ -82,8 +82,8 @@ func Push(u string, q string, rc chan<- Row, hc chan<- Header, cc <-chan bool) (
defer func() { doThis = false }()
// Try to close the query
payload := strings.NewReader("{\"queryId\":\"" + h.queryId + "\"}")
req, err := http.NewRequest("POST", u+"/close-query", payload)
log.Printf("Closing ksqlDB query\t%v", h.queryId)
req, err := http.NewRequest("POST", cl.url+"/close-query", payload)
cl.log("Closing ksqlDB query\t%v", h.queryId)
if err != nil {
return fmt.Errorf("Failed to construct HTTP request to cancel query\n%v", err)
}
Expand Down Expand Up @@ -121,27 +121,27 @@ func Push(u string, q string, rc chan<- Row, hc chan<- Header, cc <-chan bool) (
if _, ok := zz["queryId"].(string); ok {
h.queryId = zz["queryId"].(string)
} else {
log.Println("Query ID not found - this is expected for a pull query")
cl.log("Query ID not found - this is expected for a pull query")
}

names, okn := zz["columnNames"].([]interface{})
types, okt := zz["columnTypes"].([]interface{})
if okn && okt {
for col := range names {
if n, o := names[col].(string); n != "" && o == true {
if t, o := types[col].(string); t != "" && o == true {
if n, ok := names[col].(string); n != "" && ok {
if t, ok := types[col].(string); t != "" && ok {
a := Column{Name: n, Type: t}
h.columns = append(h.columns, a)

} else {
log.Printf("Nil type found for column %v", col)
cl.log("Nil type found for column %v", col)
}
} else {
log.Printf("Nil name found for column %v", col)
cl.log("Nil name found for column %v", col)
}
}
} else {
log.Printf("Column names/types not found in header:\n%v", zz)
cl.log("Column names/types not found in header:\n%v", zz)
}
// log.Println("Header:", h)
hc <- h
Expand Down
6 changes: 6 additions & 0 deletions types.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,9 @@ type Column struct {
Name string
Type string
}

type Client struct {
url string
isDebug bool
logf func(format string, v ...interface{})
}

0 comments on commit a6bada4

Please sign in to comment.