Skip to content

Commit

Permalink
Merge pull request #11 from rmoff/merge3-into-2
Browse files Browse the repository at this point in the history
Added disable logging, typed 'Not found' error
  • Loading branch information
rmoff committed Oct 8, 2020
2 parents 46ac504 + 636025b commit ab08602
Show file tree
Hide file tree
Showing 12 changed files with 102 additions and 50 deletions.
13 changes: 10 additions & 3 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ See the link:test/environment.adoc[test environment here], and link:test/main.go
go run ./test/
----

Create a ksqlDB Client

[source,go]
----
client := ksqldb.NewClient("http://ksqldb:8088").Debug()
----

=== Pull query

[source,go]
Expand All @@ -49,7 +56,7 @@ ctx, ctxCancel := context.WithTimeout(context.Background(), 10 * time.Second)
defer ctxCancel()
k := "SELECT TIMESTAMPTOSTRING(WINDOWSTART,'yyyy-MM-dd HH:mm:ss','Europe/London') AS WINDOW_START, TIMESTAMPTOSTRING(WINDOWEND,'HH:mm:ss','Europe/London') AS WINDOW_END, DOG_SIZE, DOGS_CT FROM DOGS_BY_SIZE WHERE DOG_SIZE='" + s + "';"
_, r, e := ksqldb.Pull(ctx, ksqlDBServer, k)
_, r, e := client.Pull(ctx, k)
if e != nil {
// handle the error better here, e.g. check for no rows returned
Expand Down Expand Up @@ -96,7 +103,7 @@ go func() {
ctx, ctxCancel := context.WithTimeout(context.Background(), 10 * time.Second)
defer ctxCancel()
e := ksqldb.Push(ctx, ksqlDBServer, k, rc, hc)
e := client.Push(ctx, k, rc, hc)
if e != nil {
// handle the error better here, e.g. check for no rows returned
Expand All @@ -108,7 +115,7 @@ if e != nil {

[source,go]
----
if err := ksqldb.Execute(ctx, ksqlDBServer, `
if err := client.Execute(ctx, ksqlDBServer, `
CREATE STREAM DOGS (ID STRING KEY,
NAME STRING,
DOGSIZE STRING,
Expand Down
11 changes: 11 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package ksqldb

import "log"

//NewClient creates new ksqldb client with log.Println default logging
func NewClient(url string) *Client {
return &Client{
url: url,
logf: log.Printf,
}
}
22 changes: 22 additions & 0 deletions debug.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package ksqldb

//log logging data
func (cl *Client) log(format string, v ...interface{}) {
if cl.isDebug {
cl.logf(format, v)
}
}

//SetLogFunc sets custom logging function with func(format string, v ...interface{}) profile
func (cl *Client) SetLogFunc(fn func(format string, v ...interface{})) *Client {
if fn != nil {
cl.logf = fn
}
return cl
}

//Debug sets debug mode for logging
func (cl *Client) Debug() *Client {
cl.isDebug = true
return cl
}
7 changes: 7 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package ksqldb

import "errors"

var (
ErrNotFound = errors.New("No result found")
)
7 changes: 3 additions & 4 deletions execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package ksqldb
import (
"fmt"
"io/ioutil"
"log"
"net/http"
"strings"
)
Expand All @@ -19,7 +18,7 @@ import (
//
// TODO Add support for commandSequenceNumber and streamsProperties
// TODO Add better support for responses to CREATE/DROP/TERMINATE (e.g. commandID, commandStatus.status, etc)
func Execute(u string, q string) (err error) {
func (cl *Client) Execute(q string) (err error) {

// Create the client
client := &http.Client{}
Expand All @@ -29,8 +28,8 @@ func Execute(u string, q string) (err error) {
q = strings.ReplaceAll(q, "\n", "")
// make the request
payload := strings.NewReader("{\"ksql\":\"" + q + "\"}")
req, err := http.NewRequest("POST", u+"/ksql", payload)
log.Printf("Sending ksqlDB request:\n\t%v", q)
req, err := http.NewRequest("POST", cl.url+"/ksql", payload)
cl.log("Sending ksqlDB request:\n\t%v", q)
if err != nil {
return err
}
Expand Down
24 changes: 11 additions & 13 deletions pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net"
"net/http"
"strings"
Expand All @@ -17,8 +16,7 @@ import (
// Pull queries are like "traditional" RDBMS queries in which
// the query terminates once the state has been queried.
//
// To use this function pass in the base URL of your
// ksqlDB server, and the SQL query statement.
// To use this function pass in the the SQL query statement.
//
// The function returns a ksqldb.Header and ksqldb.Payload
// which will hold one or more rows of data. You will need to
Expand All @@ -32,7 +30,7 @@ import (
// // Do other stuff with the data here
// }
// }
func Pull(ctx context.Context, u string, q string) (h Header, r Payload, err error) {
func (cl *Client) Pull(ctx context.Context, q string) (h Header, r Payload, err error) {

// Create the client, force it to use HTTP2 (to avoid `http2: unsupported scheme`)
client := http.Client{
Expand All @@ -49,7 +47,7 @@ func Pull(ctx context.Context, u string, q string) (h Header, r Payload, err err

// Create the request
payload := strings.NewReader("{\"sql\":\"" + q + "\"}")
req, err := http.NewRequestWithContext(ctx, "POST", u+"/query-stream", payload)
req, err := http.NewRequestWithContext(ctx, "POST", cl.url+"/query-stream", payload)

if err != nil {
return h, r, err
Expand Down Expand Up @@ -80,12 +78,12 @@ func Pull(ctx context.Context, u string, q string) (h Header, r Payload, err err

switch len(x) {
case 0:
return h, r, fmt.Errorf("No results (not even a header row) returned from lookup. Maybe we got an error:%v", err)
return h, r, fmt.Errorf("%w (not even a header row) returned from lookup. Maybe we got an error:%v", ErrNotFound, err)
case 1:
// len 1 means we just got a header, no rows
// Should we define our own error types here so we can return more clearly
// an indicator that no rows were found?
return h, r, fmt.Errorf("No result found")
return h, r, ErrNotFound
default:
for _, z := range x {
switch zz := z.(type) {
Expand All @@ -95,27 +93,27 @@ func Pull(ctx context.Context, u string, q string) (h Header, r Payload, err err
if _, ok := zz["queryId"].(string); ok {
h.queryId = zz["queryId"].(string)
} else {
log.Println("(Query ID not found - this is expected for a pull query)")
cl.log("(Query ID not found - this is expected for a pull query)")
}

names, okn := zz["columnNames"].([]interface{})
types, okt := zz["columnTypes"].([]interface{})
if okn && okt {
for col := range names {
if n, o := names[col].(string); n != "" && o == true {
if t, o := types[col].(string); t != "" && o == true {
if n, ok := names[col].(string); n != "" && ok {
if t, ok := types[col].(string); t != "" && ok {
a := Column{Name: n, Type: t}
h.columns = append(h.columns, a)

} else {
log.Printf("Nil type found for column %v", col)
cl.log("Nil type found for column %v", col)
}
} else {
log.Printf("Nil name found for column %v", col)
cl.log("Nil name found for column %v", col)
}
}
} else {
log.Printf("Column names/types not found in header:\n%v", zz)
cl.log("Column names/types not found in header:\n%v", zz)
}

case []interface{}:
Expand Down
26 changes: 13 additions & 13 deletions push.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import (
// to which it can write new rows of data as and when they are
// received.
//
// To use this function pass in a context, the base URL of your
// ksqlDB server, the SQL query statement, and two channels:
// To use this function pass in a context, the SQL query statement,
// and two channels:
//
// * ksqldb.Row - rows of data
// * ksqldb.Header - header (including column definitions).
Expand All @@ -39,7 +39,7 @@ import (
// if row != nil {
// DATA_TS = row[0].(float64)
// ID = row[1].(string)
func Push(ctx context.Context, u string, q string, rc chan<- Row, hc chan<- Header) (err error) {
func (cl *Client) Push(ctx context.Context, q string, rc chan<- Row, hc chan<- Header) (err error) {

// Create the client, force it to use HTTP2 (to avoid `http2: unsupported scheme`)
client := http.Client{
Expand All @@ -55,8 +55,8 @@ func Push(ctx context.Context, u string, q string, rc chan<- Row, hc chan<- Head
}
// make the request
payload := strings.NewReader("{\"properties\":{\"ksql.streams.auto.offset.reset\": \"latest\"},\"sql\":\"" + q + "\"}")
req, err := http.NewRequestWithContext(ctx, "POST", u+"/query-stream", payload)
log.Printf("Sending ksqlDB request:\n\t%v", q)
req, err := http.NewRequestWithContext(ctx, "POST", cl.url+"/query-stream", payload)
cl.log("Sending ksqlDB request:\n\t%v", q)
if err != nil {
return err
}
Expand All @@ -82,8 +82,8 @@ func Push(ctx context.Context, u string, q string, rc chan<- Row, hc chan<- Head
defer func() { doThis = false }()
// Try to close the query
payload := strings.NewReader("{\"queryId\":\"" + h.queryId + "\"}")
req, err := http.NewRequestWithContext(ctx, "POST", u+"/close-query", payload)
log.Printf("Closing ksqlDB query\t%v", h.queryId)
req, err := http.NewRequestWithContext(ctx, "POST", cl.url+"/close-query", payload)
cl.log("Closing ksqlDB query\t%v", h.queryId)
if err != nil {
return fmt.Errorf("Failed to construct HTTP request to cancel query\n%v", err)
}
Expand Down Expand Up @@ -121,27 +121,27 @@ func Push(ctx context.Context, u string, q string, rc chan<- Row, hc chan<- Head
if _, ok := zz["queryId"].(string); ok {
h.queryId = zz["queryId"].(string)
} else {
log.Println("Query ID not found - this is expected for a pull query")
cl.log("Query ID not found - this is expected for a pull query")
}

names, okn := zz["columnNames"].([]interface{})
types, okt := zz["columnTypes"].([]interface{})
if okn && okt {
for col := range names {
if n, o := names[col].(string); n != "" && o == true {
if t, o := types[col].(string); t != "" && o == true {
if n, ok := names[col].(string); n != "" && ok {
if t, ok := types[col].(string); t != "" && ok {
a := Column{Name: n, Type: t}
h.columns = append(h.columns, a)

} else {
log.Printf("Nil type found for column %v", col)
cl.log("Nil type found for column %v", col)
}
} else {
log.Printf("Nil name found for column %v", col)
cl.log("Nil name found for column %v", col)
}
}
} else {
log.Printf("Column names/types not found in header:\n%v", zz)
cl.log("Column names/types not found in header:\n%v", zz)
}
// log.Println("Header:", h)
hc <- h
Expand Down
9 changes: 5 additions & 4 deletions test/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ const ksqlDBServer string = "http://localhost:8088"

func main() {

if e := setup(); e != nil {
log.Printf("Failed to run setup statements.\n%v\nExiting.", e)
client, err := setup()
if err != nil {
log.Printf("Failed to run setup statements.\n%v\nExiting.", err)
os.Exit(1)
}

Expand All @@ -26,7 +27,7 @@ func main() {
Check this out, we can do pull queries, which are like K/V lookups
against materialised views of state built from streams of events in Kafka:` + "\n\n")
if e := getDogStats("medium"); e != nil {
if e := getDogStats(client, "medium"); e != nil {
fmt.Printf("error calling getDogStats:\n%v", e)
}

Expand All @@ -42,7 +43,7 @@ func main() {
to terminate it after 10 seconds, but by default it will run until the program
is killed.` + "\n\n\n")
time.Sleep(2 * time.Second)
if e := getDogUpdates(); e != nil {
if e := getDogUpdates(client); e != nil {
fmt.Printf("error calling getDogUpdates:\n%v", e)
}
}
4 changes: 2 additions & 2 deletions test/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ import (
"github.com/rmoff/ksqldb-go"
)

func getDogStats(s string) (e error) {
func getDogStats(client *ksqldb.Client, s string) (e error) {

k := "SELECT TIMESTAMPTOSTRING(WINDOWSTART,'yyyy-MM-dd HH:mm:ss','Europe/London') AS WINDOW_START, TIMESTAMPTOSTRING(WINDOWEND,'HH:mm:ss','Europe/London') AS WINDOW_END, DOG_SIZE, DOGS_CT FROM DOGS_BY_SIZE WHERE DOG_SIZE='" + s + "';"

ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
defer cancel()
_, r, e := ksqldb.Pull(ctx, ksqlDBServer, k)
_, r, e := client.Pull(ctx, k)

if e != nil {
// handle the error better here, e.g. check for no rows returned
Expand Down
4 changes: 2 additions & 2 deletions test/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/rmoff/ksqldb-go"
)

func getDogUpdates() (err error) {
func getDogUpdates(client *ksqldb.Client) (err error) {

rc := make(chan ksqldb.Row)
hc := make(chan ksqldb.Header, 1)
Expand Down Expand Up @@ -45,7 +45,7 @@ func getDogUpdates() (err error) {
ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
defer cancel()

e := ksqldb.Push(ctx, ksqlDBServer, k, rc, hc)
e := client.Push(ctx, k, rc, hc)

if e != nil {
// handle the error better here, e.g. check for no rows returned
Expand Down
19 changes: 10 additions & 9 deletions test/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ import (
"github.com/rmoff/ksqldb-go"
)

func setup() (err error) {

func setup() (*ksqldb.Client, error) {
//create ksqldb client
client := ksqldb.NewClient(ksqlDBServer).Debug()
// Create the dummy data connector
if err := ksqldb.Execute(ksqlDBServer, `
if err := client.Execute(`
CREATE SOURCE CONNECTOR DOGS WITH (
'connector.class' = 'io.mdrogalis.voluble.VolubleSourceConnector',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
Expand All @@ -23,41 +24,41 @@ func setup() (err error) {
'topic.dogs.throttle.ms' = 1000
);
`); err != nil {
return fmt.Errorf("Error creating the source connector.\n%v", err)
return nil, fmt.Errorf("Error creating the source connector.\n%v", err)
}

// This is a bit lame but without doing the cool stuff with CommandId etc
// it's the easiest way to make sure the topic exists before continuing
time.Sleep(5 * time.Second)

// Create the DOGS stream
if err := ksqldb.Execute(ksqlDBServer, `
if err := client.Execute(`
CREATE STREAM DOGS (ID STRING KEY,
NAME STRING,
DOGSIZE STRING,
AGE STRING)
WITH (KAFKA_TOPIC='dogs',
VALUE_FORMAT='JSON');
`); err != nil {
return fmt.Errorf("Error creating the DOGS stream.\n%v", err)
return nil, fmt.Errorf("Error creating the DOGS stream.\n%v", err)
}

// This is a bit lame but without doing the cool stuff with CommandId etc
// it's the easiest way to make sure the stream exists before continuing
time.Sleep(5 * time.Second)

// Create the DOGS_BY_SIZE table
if err := ksqldb.Execute(ksqlDBServer, `
if err := client.Execute(`
CREATE TABLE DOGS_BY_SIZE AS
SELECT DOGSIZE AS DOG_SIZE, COUNT(*) AS DOGS_CT
FROM DOGS WINDOW TUMBLING (SIZE 15 MINUTE)
GROUP BY DOGSIZE;
`); err != nil {
return fmt.Errorf("Error creating the DOGS stream.\n%v", err)
return nil, fmt.Errorf("Error creating the DOGS stream.\n%v", err)
}
// This is a bit lame but without doing the cool stuff with CommandId etc
// it's the easiest way to make sure the table exists before continuing
time.Sleep(10 * time.Second)

return nil
return client, nil
}
Loading

0 comments on commit ab08602

Please sign in to comment.