diff --git a/README.adoc b/README.adoc index d2d2087..2448f1c 100644 --- a/README.adoc +++ b/README.adoc @@ -41,6 +41,13 @@ See the link:test/environment.adoc[test environment here], and link:test/main.go go run ./test/ ---- +Create a ksqlDB Client + +[source,go] +---- +client := ksqldb.NewClient("http://ksqldb:8088").Debug() +---- + === Pull query [source,go] @@ -49,7 +56,7 @@ ctx, ctxCancel := context.WithTimeout(context.Background(), 10 * time.Second) defer ctxCancel() k := "SELECT TIMESTAMPTOSTRING(WINDOWSTART,'yyyy-MM-dd HH:mm:ss','Europe/London') AS WINDOW_START, TIMESTAMPTOSTRING(WINDOWEND,'HH:mm:ss','Europe/London') AS WINDOW_END, DOG_SIZE, DOGS_CT FROM DOGS_BY_SIZE WHERE DOG_SIZE='" + s + "';" -_, r, e := ksqldb.Pull(ctx, ksqlDBServer, k) +_, r, e := client.Pull(ctx, k) if e != nil { // handle the error better here, e.g. check for no rows returned @@ -96,7 +103,7 @@ go func() { ctx, ctxCancel := context.WithTimeout(context.Background(), 10 * time.Second) defer ctxCancel() -e := ksqldb.Push(ctx, ksqlDBServer, k, rc, hc) +e := client.Push(ctx, k, rc, hc) if e != nil { // handle the error better here, e.g. check for no rows returned @@ -108,7 +115,7 @@ if e != nil { [source,go] ---- -if err := ksqldb.Execute(ctx, ksqlDBServer, ` +if err := client.Execute(ctx, ksqlDBServer, ` CREATE STREAM DOGS (ID STRING KEY, NAME STRING, DOGSIZE STRING, diff --git a/client.go b/client.go new file mode 100644 index 0000000..3bae2be --- /dev/null +++ b/client.go @@ -0,0 +1,11 @@ +package ksqldb + +import "log" + +//NewClient creates new ksqldb client with log.Println default logging +func NewClient(url string) *Client { + return &Client{ + url: url, + logf: log.Printf, + } +} diff --git a/debug.go b/debug.go new file mode 100644 index 0000000..60cd615 --- /dev/null +++ b/debug.go @@ -0,0 +1,22 @@ +package ksqldb + +//log logging data +func (cl *Client) log(format string, v ...interface{}) { + if cl.isDebug { + cl.logf(format, v) + } +} + +//SetLogFunc sets custom logging function with func(format string, v ...interface{}) profile +func (cl *Client) SetLogFunc(fn func(format string, v ...interface{})) *Client { + if fn != nil { + cl.logf = fn + } + return cl +} + +//Debug sets debug mode for logging +func (cl *Client) Debug() *Client { + cl.isDebug = true + return cl +} diff --git a/errors.go b/errors.go new file mode 100644 index 0000000..69565a2 --- /dev/null +++ b/errors.go @@ -0,0 +1,7 @@ +package ksqldb + +import "errors" + +var ( + ErrNotFound = errors.New("No result found") +) diff --git a/execute.go b/execute.go index a4026a5..6370b3b 100644 --- a/execute.go +++ b/execute.go @@ -3,7 +3,6 @@ package ksqldb import ( "fmt" "io/ioutil" - "log" "net/http" "strings" ) @@ -19,7 +18,7 @@ import ( // // TODO Add support for commandSequenceNumber and streamsProperties // TODO Add better support for responses to CREATE/DROP/TERMINATE (e.g. commandID, commandStatus.status, etc) -func Execute(u string, q string) (err error) { +func (cl *Client) Execute(q string) (err error) { // Create the client client := &http.Client{} @@ -29,8 +28,8 @@ func Execute(u string, q string) (err error) { q = strings.ReplaceAll(q, "\n", "") // make the request payload := strings.NewReader("{\"ksql\":\"" + q + "\"}") - req, err := http.NewRequest("POST", u+"/ksql", payload) - log.Printf("Sending ksqlDB request:\n\t%v", q) + req, err := http.NewRequest("POST", cl.url+"/ksql", payload) + cl.log("Sending ksqlDB request:\n\t%v", q) if err != nil { return err } diff --git a/pull.go b/pull.go index ce5612d..f191367 100644 --- a/pull.go +++ b/pull.go @@ -6,7 +6,6 @@ import ( "encoding/json" "fmt" "io/ioutil" - "log" "net" "net/http" "strings" @@ -17,8 +16,7 @@ import ( // Pull queries are like "traditional" RDBMS queries in which // the query terminates once the state has been queried. // -// To use this function pass in the base URL of your -// ksqlDB server, and the SQL query statement. +// To use this function pass in the the SQL query statement. // // The function returns a ksqldb.Header and ksqldb.Payload // which will hold one or more rows of data. You will need to @@ -32,7 +30,7 @@ import ( // // Do other stuff with the data here // } // } -func Pull(ctx context.Context, u string, q string) (h Header, r Payload, err error) { +func (cl *Client) Pull(ctx context.Context, q string) (h Header, r Payload, err error) { // Create the client, force it to use HTTP2 (to avoid `http2: unsupported scheme`) client := http.Client{ @@ -49,7 +47,7 @@ func Pull(ctx context.Context, u string, q string) (h Header, r Payload, err err // Create the request payload := strings.NewReader("{\"sql\":\"" + q + "\"}") - req, err := http.NewRequestWithContext(ctx, "POST", u+"/query-stream", payload) + req, err := http.NewRequestWithContext(ctx, "POST", cl.url+"/query-stream", payload) if err != nil { return h, r, err @@ -80,12 +78,12 @@ func Pull(ctx context.Context, u string, q string) (h Header, r Payload, err err switch len(x) { case 0: - return h, r, fmt.Errorf("No results (not even a header row) returned from lookup. Maybe we got an error:%v", err) + return h, r, fmt.Errorf("%w (not even a header row) returned from lookup. Maybe we got an error:%v", ErrNotFound, err) case 1: // len 1 means we just got a header, no rows // Should we define our own error types here so we can return more clearly // an indicator that no rows were found? - return h, r, fmt.Errorf("No result found") + return h, r, ErrNotFound default: for _, z := range x { switch zz := z.(type) { @@ -95,27 +93,27 @@ func Pull(ctx context.Context, u string, q string) (h Header, r Payload, err err if _, ok := zz["queryId"].(string); ok { h.queryId = zz["queryId"].(string) } else { - log.Println("(Query ID not found - this is expected for a pull query)") + cl.log("(Query ID not found - this is expected for a pull query)") } names, okn := zz["columnNames"].([]interface{}) types, okt := zz["columnTypes"].([]interface{}) if okn && okt { for col := range names { - if n, o := names[col].(string); n != "" && o == true { - if t, o := types[col].(string); t != "" && o == true { + if n, ok := names[col].(string); n != "" && ok { + if t, ok := types[col].(string); t != "" && ok { a := Column{Name: n, Type: t} h.columns = append(h.columns, a) } else { - log.Printf("Nil type found for column %v", col) + cl.log("Nil type found for column %v", col) } } else { - log.Printf("Nil name found for column %v", col) + cl.log("Nil name found for column %v", col) } } } else { - log.Printf("Column names/types not found in header:\n%v", zz) + cl.log("Column names/types not found in header:\n%v", zz) } case []interface{}: diff --git a/push.go b/push.go index 112e950..0e30b51 100644 --- a/push.go +++ b/push.go @@ -22,8 +22,8 @@ import ( // to which it can write new rows of data as and when they are // received. // -// To use this function pass in a context, the base URL of your -// ksqlDB server, the SQL query statement, and two channels: +// To use this function pass in a context, the SQL query statement, +// and two channels: // // * ksqldb.Row - rows of data // * ksqldb.Header - header (including column definitions). @@ -39,7 +39,7 @@ import ( // if row != nil { // DATA_TS = row[0].(float64) // ID = row[1].(string) -func Push(ctx context.Context, u string, q string, rc chan<- Row, hc chan<- Header) (err error) { +func (cl *Client) Push(ctx context.Context, q string, rc chan<- Row, hc chan<- Header) (err error) { // Create the client, force it to use HTTP2 (to avoid `http2: unsupported scheme`) client := http.Client{ @@ -55,8 +55,8 @@ func Push(ctx context.Context, u string, q string, rc chan<- Row, hc chan<- Head } // make the request payload := strings.NewReader("{\"properties\":{\"ksql.streams.auto.offset.reset\": \"latest\"},\"sql\":\"" + q + "\"}") - req, err := http.NewRequestWithContext(ctx, "POST", u+"/query-stream", payload) - log.Printf("Sending ksqlDB request:\n\t%v", q) + req, err := http.NewRequestWithContext(ctx, "POST", cl.url+"/query-stream", payload) + cl.log("Sending ksqlDB request:\n\t%v", q) if err != nil { return err } @@ -82,8 +82,8 @@ func Push(ctx context.Context, u string, q string, rc chan<- Row, hc chan<- Head defer func() { doThis = false }() // Try to close the query payload := strings.NewReader("{\"queryId\":\"" + h.queryId + "\"}") - req, err := http.NewRequestWithContext(ctx, "POST", u+"/close-query", payload) - log.Printf("Closing ksqlDB query\t%v", h.queryId) + req, err := http.NewRequestWithContext(ctx, "POST", cl.url+"/close-query", payload) + cl.log("Closing ksqlDB query\t%v", h.queryId) if err != nil { return fmt.Errorf("Failed to construct HTTP request to cancel query\n%v", err) } @@ -121,27 +121,27 @@ func Push(ctx context.Context, u string, q string, rc chan<- Row, hc chan<- Head if _, ok := zz["queryId"].(string); ok { h.queryId = zz["queryId"].(string) } else { - log.Println("Query ID not found - this is expected for a pull query") + cl.log("Query ID not found - this is expected for a pull query") } names, okn := zz["columnNames"].([]interface{}) types, okt := zz["columnTypes"].([]interface{}) if okn && okt { for col := range names { - if n, o := names[col].(string); n != "" && o == true { - if t, o := types[col].(string); t != "" && o == true { + if n, ok := names[col].(string); n != "" && ok { + if t, ok := types[col].(string); t != "" && ok { a := Column{Name: n, Type: t} h.columns = append(h.columns, a) } else { - log.Printf("Nil type found for column %v", col) + cl.log("Nil type found for column %v", col) } } else { - log.Printf("Nil name found for column %v", col) + cl.log("Nil name found for column %v", col) } } } else { - log.Printf("Column names/types not found in header:\n%v", zz) + cl.log("Column names/types not found in header:\n%v", zz) } // log.Println("Header:", h) hc <- h diff --git a/test/main.go b/test/main.go index 56c3e66..028e7cb 100644 --- a/test/main.go +++ b/test/main.go @@ -13,8 +13,9 @@ const ksqlDBServer string = "http://localhost:8088" func main() { - if e := setup(); e != nil { - log.Printf("Failed to run setup statements.\n%v\nExiting.", e) + client, err := setup() + if err != nil { + log.Printf("Failed to run setup statements.\n%v\nExiting.", err) os.Exit(1) } @@ -26,7 +27,7 @@ func main() { Check this out, we can do pull queries, which are like K/V lookups against materialised views of state built from streams of events in Kafka:` + "\n\n") - if e := getDogStats("medium"); e != nil { + if e := getDogStats(client, "medium"); e != nil { fmt.Printf("error calling getDogStats:\n%v", e) } @@ -42,7 +43,7 @@ func main() { to terminate it after 10 seconds, but by default it will run until the program is killed.` + "\n\n\n") time.Sleep(2 * time.Second) - if e := getDogUpdates(); e != nil { + if e := getDogUpdates(client); e != nil { fmt.Printf("error calling getDogUpdates:\n%v", e) } } diff --git a/test/pull.go b/test/pull.go index f38d4dc..f040dc8 100644 --- a/test/pull.go +++ b/test/pull.go @@ -8,13 +8,13 @@ import ( "github.com/rmoff/ksqldb-go" ) -func getDogStats(s string) (e error) { +func getDogStats(client *ksqldb.Client, s string) (e error) { k := "SELECT TIMESTAMPTOSTRING(WINDOWSTART,'yyyy-MM-dd HH:mm:ss','Europe/London') AS WINDOW_START, TIMESTAMPTOSTRING(WINDOWEND,'HH:mm:ss','Europe/London') AS WINDOW_END, DOG_SIZE, DOGS_CT FROM DOGS_BY_SIZE WHERE DOG_SIZE='" + s + "';" ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second) defer cancel() - _, r, e := ksqldb.Pull(ctx, ksqlDBServer, k) + _, r, e := client.Pull(ctx, k) if e != nil { // handle the error better here, e.g. check for no rows returned diff --git a/test/push.go b/test/push.go index 16ad52e..046052e 100644 --- a/test/push.go +++ b/test/push.go @@ -8,7 +8,7 @@ import ( "github.com/rmoff/ksqldb-go" ) -func getDogUpdates() (err error) { +func getDogUpdates(client *ksqldb.Client) (err error) { rc := make(chan ksqldb.Row) hc := make(chan ksqldb.Header, 1) @@ -45,7 +45,7 @@ func getDogUpdates() (err error) { ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second) defer cancel() - e := ksqldb.Push(ctx, ksqlDBServer, k, rc, hc) + e := client.Push(ctx, k, rc, hc) if e != nil { // handle the error better here, e.g. check for no rows returned diff --git a/test/setup.go b/test/setup.go index 034ae30..bccafac 100644 --- a/test/setup.go +++ b/test/setup.go @@ -7,10 +7,11 @@ import ( "github.com/rmoff/ksqldb-go" ) -func setup() (err error) { - +func setup() (*ksqldb.Client, error) { + //create ksqldb client + client := ksqldb.NewClient(ksqlDBServer).Debug() // Create the dummy data connector - if err := ksqldb.Execute(ksqlDBServer, ` + if err := client.Execute(` CREATE SOURCE CONNECTOR DOGS WITH ( 'connector.class' = 'io.mdrogalis.voluble.VolubleSourceConnector', 'key.converter' = 'org.apache.kafka.connect.storage.StringConverter', @@ -23,7 +24,7 @@ func setup() (err error) { 'topic.dogs.throttle.ms' = 1000 ); `); err != nil { - return fmt.Errorf("Error creating the source connector.\n%v", err) + return nil, fmt.Errorf("Error creating the source connector.\n%v", err) } // This is a bit lame but without doing the cool stuff with CommandId etc @@ -31,7 +32,7 @@ func setup() (err error) { time.Sleep(5 * time.Second) // Create the DOGS stream - if err := ksqldb.Execute(ksqlDBServer, ` + if err := client.Execute(` CREATE STREAM DOGS (ID STRING KEY, NAME STRING, DOGSIZE STRING, @@ -39,7 +40,7 @@ func setup() (err error) { WITH (KAFKA_TOPIC='dogs', VALUE_FORMAT='JSON'); `); err != nil { - return fmt.Errorf("Error creating the DOGS stream.\n%v", err) + return nil, fmt.Errorf("Error creating the DOGS stream.\n%v", err) } // This is a bit lame but without doing the cool stuff with CommandId etc @@ -47,17 +48,17 @@ func setup() (err error) { time.Sleep(5 * time.Second) // Create the DOGS_BY_SIZE table - if err := ksqldb.Execute(ksqlDBServer, ` + if err := client.Execute(` CREATE TABLE DOGS_BY_SIZE AS SELECT DOGSIZE AS DOG_SIZE, COUNT(*) AS DOGS_CT FROM DOGS WINDOW TUMBLING (SIZE 15 MINUTE) GROUP BY DOGSIZE; `); err != nil { - return fmt.Errorf("Error creating the DOGS stream.\n%v", err) + return nil, fmt.Errorf("Error creating the DOGS stream.\n%v", err) } // This is a bit lame but without doing the cool stuff with CommandId etc // it's the easiest way to make sure the table exists before continuing time.Sleep(10 * time.Second) - return nil + return client, nil } diff --git a/types.go b/types.go index 4b83824..a81e526 100644 --- a/types.go +++ b/types.go @@ -17,3 +17,9 @@ type Column struct { Name string Type string } + +type Client struct { + url string + isDebug bool + logf func(format string, v ...interface{}) +}