diff --git a/README.adoc b/README.adoc index 6113d9b..d2d2087 100644 --- a/README.adoc +++ b/README.adoc @@ -1,6 +1,6 @@ = ksqlDB Go library Robin Moffatt -v0.02, 7 August 2020 +v0.03, 8 October 2020 :toc: @@ -45,8 +45,11 @@ go run ./test/ [source,go] ---- +ctx, ctxCancel := context.WithTimeout(context.Background(), 10 * time.Second) +defer ctxCancel() + k := "SELECT TIMESTAMPTOSTRING(WINDOWSTART,'yyyy-MM-dd HH:mm:ss','Europe/London') AS WINDOW_START, TIMESTAMPTOSTRING(WINDOWEND,'HH:mm:ss','Europe/London') AS WINDOW_END, DOG_SIZE, DOGS_CT FROM DOGS_BY_SIZE WHERE DOG_SIZE='" + s + "';" -_, r, e := ksqldb.Pull(ksqlDBServer, k) +_, r, e := ksqldb.Pull(ctx, ksqlDBServer, k) if e != nil { // handle the error better here, e.g. check for no rows returned @@ -71,7 +74,6 @@ for _, row := range r { ---- rc := make(chan ksqldb.Row) hc := make(chan ksqldb.Header, 1) -cc := make(chan bool) k := "SELECT ROWTIME, ID, NAME, DOGSIZE, AGE FROM DOGS EMIT CHANGES;" @@ -89,17 +91,12 @@ go func() { fmt.Printf("🐾%v: %v\n", NAME, DOG_SIZE) } } - }() -// This Go routine shows how you can cancel a Push query as and when required -go func() { - time.Sleep(10 * time.Second) - log.Println("⏱️ Terminating the continuous query now, we've seen enough") - cc <- true -}() +ctx, ctxCancel := context.WithTimeout(context.Background(), 10 * time.Second) +defer ctxCancel() -e := ksqldb.Push(ksqlDBServer, k, rc, hc, cc) +e := ksqldb.Push(ctx, ksqlDBServer, k, rc, hc) if e != nil { // handle the error better here, e.g. check for no rows returned @@ -111,7 +108,7 @@ if e != nil { [source,go] ---- -if err := ksqldb.Execute(ksqlDBServer, ` +if err := ksqldb.Execute(ctx, ksqlDBServer, ` CREATE STREAM DOGS (ID STRING KEY, NAME STRING, DOGSIZE STRING, @@ -125,16 +122,4 @@ if err := ksqldb.Execute(ksqlDBServer, ` == TODO -☑️ Add support for the newer https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/streaming-endpoint/[`query-stream`] endpoint (N.B. HTTP2 only) - -☑️ Add support for https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/ksql-endpoint/[`ksql`] endpoint so that applications can create their own materialised views etc natively - -🔲 Check error handling is sufficient - -🔲 Add custom error types for different conditions (e.g. `no rows returned`) - -🔲 Add support for passing parameters in queries - -🔲 Add support for streaming inserts (https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/streaming-endpoint/#inserting-rows-into-an-existing-stream) - -🔲 Add support for CommandId etc in executing commands so that they can be executed sequentially +See https://github.com/rmoff/ksqldb-go/issues?q=is%3Aissue+is%3Aopen+label%3A%22help+wanted%22 diff --git a/pull.go b/pull.go index 763eb39..f191367 100644 --- a/pull.go +++ b/pull.go @@ -1,6 +1,7 @@ package ksqldb import ( + "context" "crypto/tls" "encoding/json" "fmt" @@ -15,8 +16,7 @@ import ( // Pull queries are like "traditional" RDBMS queries in which // the query terminates once the state has been queried. // -// To use this function pass in the base URL of your -// ksqlDB server, and the SQL query statement. +// To use this function pass in the the SQL query statement. // // The function returns a ksqldb.Header and ksqldb.Payload // which will hold one or more rows of data. You will need to @@ -30,7 +30,7 @@ import ( // // Do other stuff with the data here // } // } -func (cl *Client) Pull(q string) (h Header, r Payload, err error) { +func (cl *Client) Pull(ctx context.Context, q string) (h Header, r Payload, err error) { // Create the client, force it to use HTTP2 (to avoid `http2: unsupported scheme`) client := http.Client{ @@ -47,7 +47,7 @@ func (cl *Client) Pull(q string) (h Header, r Payload, err error) { // Create the request payload := strings.NewReader("{\"sql\":\"" + q + "\"}") - req, err := http.NewRequest("POST", cl.url+"/query-stream", payload) + req, err := http.NewRequestWithContext(ctx, "POST", cl.url+"/query-stream", payload) if err != nil { return h, r, err diff --git a/push.go b/push.go index ac90aad..0e30b51 100644 --- a/push.go +++ b/push.go @@ -2,6 +2,7 @@ package ksqldb import ( "bufio" + "context" "crypto/tls" "encoding/json" "fmt" @@ -21,14 +22,13 @@ import ( // to which it can write new rows of data as and when they are // received. // -// To use this function pass in the base URL of your -// ksqlDB server, the SQL query statement, and three channels: +// To use this function pass in a context, the SQL query statement, +// and two channels: // // * ksqldb.Row - rows of data // * ksqldb.Header - header (including column definitions). // If you don't want to block before receiving // row data then make this channel buffered. -// * boolean - write true to this channel to cancel the push query // // The channel is populated with ksqldb.Row which represents // one row of data. You will need to define variables to hold @@ -39,7 +39,7 @@ import ( // if row != nil { // DATA_TS = row[0].(float64) // ID = row[1].(string) -func (cl *Client) Push(q string, rc chan<- Row, hc chan<- Header, cc <-chan bool) (err error) { +func (cl *Client) Push(ctx context.Context, q string, rc chan<- Row, hc chan<- Header) (err error) { // Create the client, force it to use HTTP2 (to avoid `http2: unsupported scheme`) client := http.Client{ @@ -55,7 +55,7 @@ func (cl *Client) Push(q string, rc chan<- Row, hc chan<- Header, cc <-chan bool } // make the request payload := strings.NewReader("{\"properties\":{\"ksql.streams.auto.offset.reset\": \"latest\"},\"sql\":\"" + q + "\"}") - req, err := http.NewRequest("POST", cl.url+"/query-stream", payload) + req, err := http.NewRequestWithContext(ctx, "POST", cl.url+"/query-stream", payload) cl.log("Sending ksqlDB request:\n\t%v", q) if err != nil { return err @@ -75,14 +75,14 @@ func (cl *Client) Push(q string, rc chan<- Row, hc chan<- Header, cc <-chan bool for doThis { select { - case <-cc: + case <-ctx.Done(): // close the channels and terminate the loop regardless defer close(rc) defer close(hc) defer func() { doThis = false }() // Try to close the query payload := strings.NewReader("{\"queryId\":\"" + h.queryId + "\"}") - req, err := http.NewRequest("POST", cl.url+"/close-query", payload) + req, err := http.NewRequestWithContext(ctx, "POST", cl.url+"/close-query", payload) cl.log("Closing ksqlDB query\t%v", h.queryId) if err != nil { return fmt.Errorf("Failed to construct HTTP request to cancel query\n%v", err) diff --git a/test/pull.go b/test/pull.go index 7591f98..f040dc8 100644 --- a/test/pull.go +++ b/test/pull.go @@ -1,7 +1,9 @@ package main import ( + "context" "fmt" + "time" "github.com/rmoff/ksqldb-go" ) @@ -9,7 +11,10 @@ import ( func getDogStats(client *ksqldb.Client, s string) (e error) { k := "SELECT TIMESTAMPTOSTRING(WINDOWSTART,'yyyy-MM-dd HH:mm:ss','Europe/London') AS WINDOW_START, TIMESTAMPTOSTRING(WINDOWEND,'HH:mm:ss','Europe/London') AS WINDOW_END, DOG_SIZE, DOGS_CT FROM DOGS_BY_SIZE WHERE DOG_SIZE='" + s + "';" - _, r, e := client.Pull(k) + + ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second) + defer cancel() + _, r, e := client.Pull(ctx, k) if e != nil { // handle the error better here, e.g. check for no rows returned diff --git a/test/push.go b/test/push.go index 449903c..046052e 100644 --- a/test/push.go +++ b/test/push.go @@ -1,8 +1,8 @@ package main import ( + "context" "fmt" - "log" "time" "github.com/rmoff/ksqldb-go" @@ -12,7 +12,6 @@ func getDogUpdates(client *ksqldb.Client) (err error) { rc := make(chan ksqldb.Row) hc := make(chan ksqldb.Header, 1) - cc := make(chan bool) k := "SELECT ROWTIME, ID, NAME, DOGSIZE, AGE FROM DOGS EMIT CHANGES;" @@ -43,14 +42,10 @@ func getDogUpdates(client *ksqldb.Client) (err error) { }() - // This Go routine shows how you can cancel a Push query as and when required - go func() { - time.Sleep(10 * time.Second) - log.Println("⏱️ Terminating the continuous query now, we've seen enough") - cc <- true - }() + ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second) + defer cancel() - e := client.Push(k, rc, hc, cc) + e := client.Push(ctx, k, rc, hc) if e != nil { // handle the error better here, e.g. check for no rows returned diff --git a/types.go b/types.go index 6f4ffb6..a81e526 100644 --- a/types.go +++ b/types.go @@ -1,14 +1,18 @@ package ksqldb +// Row represents a row returned from a query type Row []interface{} +// Payload represents multiple rows type Payload []Row +// Header represents a header returned from a query type Header struct { queryId string columns []Column } +// Column represents the metadata for a column in a Row type Column struct { Name string Type string