From e34ddf0a266bc7fbcc8e1f6aab45e3e3ba547558 Mon Sep 17 00:00:00 2001 From: Blake Jackson Date: Tue, 11 Aug 2020 23:12:11 -0400 Subject: [PATCH 1/2] using context --- README.adoc | 19 ++++++++----------- pull.go | 5 +++-- push.go | 14 +++++++------- test/pull.go | 7 ++++++- test/push.go | 13 ++++--------- types.go | 4 ++++ 6 files changed, 32 insertions(+), 30 deletions(-) diff --git a/README.adoc b/README.adoc index 6113d9b..aba6040 100644 --- a/README.adoc +++ b/README.adoc @@ -45,8 +45,11 @@ go run ./test/ [source,go] ---- +ctx, ctxCancel := context.WithTimeout(context.Background(), 10 * time.Second) +defer ctxCancel() + k := "SELECT TIMESTAMPTOSTRING(WINDOWSTART,'yyyy-MM-dd HH:mm:ss','Europe/London') AS WINDOW_START, TIMESTAMPTOSTRING(WINDOWEND,'HH:mm:ss','Europe/London') AS WINDOW_END, DOG_SIZE, DOGS_CT FROM DOGS_BY_SIZE WHERE DOG_SIZE='" + s + "';" -_, r, e := ksqldb.Pull(ksqlDBServer, k) +_, r, e := ksqldb.Pull(ctx, ksqlDBServer, k) if e != nil { // handle the error better here, e.g. check for no rows returned @@ -71,7 +74,6 @@ for _, row := range r { ---- rc := make(chan ksqldb.Row) hc := make(chan ksqldb.Header, 1) -cc := make(chan bool) k := "SELECT ROWTIME, ID, NAME, DOGSIZE, AGE FROM DOGS EMIT CHANGES;" @@ -89,17 +91,12 @@ go func() { fmt.Printf("🐾%v: %v\n", NAME, DOG_SIZE) } } - }() -// This Go routine shows how you can cancel a Push query as and when required -go func() { - time.Sleep(10 * time.Second) - log.Println("⏱️ Terminating the continuous query now, we've seen enough") - cc <- true -}() +ctx, ctxCancel := context.WithTimeout(context.Background(), 10 * time.Second) +defer ctxCancel() -e := ksqldb.Push(ksqlDBServer, k, rc, hc, cc) +e := ksqldb.Push(ctx, ksqlDBServer, k, rc, hc) if e != nil { // handle the error better here, e.g. check for no rows returned @@ -111,7 +108,7 @@ if e != nil { [source,go] ---- -if err := ksqldb.Execute(ksqlDBServer, ` +if err := ksqldb.Execute(ctx, ksqlDBServer, ` CREATE STREAM DOGS (ID STRING KEY, NAME STRING, DOGSIZE STRING, diff --git a/pull.go b/pull.go index e38a16d..ce5612d 100644 --- a/pull.go +++ b/pull.go @@ -1,6 +1,7 @@ package ksqldb import ( + "context" "crypto/tls" "encoding/json" "fmt" @@ -31,7 +32,7 @@ import ( // // Do other stuff with the data here // } // } -func Pull(u string, q string) (h Header, r Payload, err error) { +func Pull(ctx context.Context, u string, q string) (h Header, r Payload, err error) { // Create the client, force it to use HTTP2 (to avoid `http2: unsupported scheme`) client := http.Client{ @@ -48,7 +49,7 @@ func Pull(u string, q string) (h Header, r Payload, err error) { // Create the request payload := strings.NewReader("{\"sql\":\"" + q + "\"}") - req, err := http.NewRequest("POST", u+"/query-stream", payload) + req, err := http.NewRequestWithContext(ctx, "POST", u+"/query-stream", payload) if err != nil { return h, r, err diff --git a/push.go b/push.go index 0b072e3..112e950 100644 --- a/push.go +++ b/push.go @@ -2,6 +2,7 @@ package ksqldb import ( "bufio" + "context" "crypto/tls" "encoding/json" "fmt" @@ -21,14 +22,13 @@ import ( // to which it can write new rows of data as and when they are // received. // -// To use this function pass in the base URL of your -// ksqlDB server, the SQL query statement, and three channels: +// To use this function pass in a context, the base URL of your +// ksqlDB server, the SQL query statement, and two channels: // // * ksqldb.Row - rows of data // * ksqldb.Header - header (including column definitions). // If you don't want to block before receiving // row data then make this channel buffered. -// * boolean - write true to this channel to cancel the push query // // The channel is populated with ksqldb.Row which represents // one row of data. You will need to define variables to hold @@ -39,7 +39,7 @@ import ( // if row != nil { // DATA_TS = row[0].(float64) // ID = row[1].(string) -func Push(u string, q string, rc chan<- Row, hc chan<- Header, cc <-chan bool) (err error) { +func Push(ctx context.Context, u string, q string, rc chan<- Row, hc chan<- Header) (err error) { // Create the client, force it to use HTTP2 (to avoid `http2: unsupported scheme`) client := http.Client{ @@ -55,7 +55,7 @@ func Push(u string, q string, rc chan<- Row, hc chan<- Header, cc <-chan bool) ( } // make the request payload := strings.NewReader("{\"properties\":{\"ksql.streams.auto.offset.reset\": \"latest\"},\"sql\":\"" + q + "\"}") - req, err := http.NewRequest("POST", u+"/query-stream", payload) + req, err := http.NewRequestWithContext(ctx, "POST", u+"/query-stream", payload) log.Printf("Sending ksqlDB request:\n\t%v", q) if err != nil { return err @@ -75,14 +75,14 @@ func Push(u string, q string, rc chan<- Row, hc chan<- Header, cc <-chan bool) ( for doThis { select { - case <-cc: + case <-ctx.Done(): // close the channels and terminate the loop regardless defer close(rc) defer close(hc) defer func() { doThis = false }() // Try to close the query payload := strings.NewReader("{\"queryId\":\"" + h.queryId + "\"}") - req, err := http.NewRequest("POST", u+"/close-query", payload) + req, err := http.NewRequestWithContext(ctx, "POST", u+"/close-query", payload) log.Printf("Closing ksqlDB query\t%v", h.queryId) if err != nil { return fmt.Errorf("Failed to construct HTTP request to cancel query\n%v", err) diff --git a/test/pull.go b/test/pull.go index d490150..f38d4dc 100644 --- a/test/pull.go +++ b/test/pull.go @@ -1,7 +1,9 @@ package main import ( + "context" "fmt" + "time" "github.com/rmoff/ksqldb-go" ) @@ -9,7 +11,10 @@ import ( func getDogStats(s string) (e error) { k := "SELECT TIMESTAMPTOSTRING(WINDOWSTART,'yyyy-MM-dd HH:mm:ss','Europe/London') AS WINDOW_START, TIMESTAMPTOSTRING(WINDOWEND,'HH:mm:ss','Europe/London') AS WINDOW_END, DOG_SIZE, DOGS_CT FROM DOGS_BY_SIZE WHERE DOG_SIZE='" + s + "';" - _, r, e := ksqldb.Pull(ksqlDBServer, k) + + ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second) + defer cancel() + _, r, e := ksqldb.Pull(ctx, ksqlDBServer, k) if e != nil { // handle the error better here, e.g. check for no rows returned diff --git a/test/push.go b/test/push.go index 22b08fe..16ad52e 100644 --- a/test/push.go +++ b/test/push.go @@ -1,8 +1,8 @@ package main import ( + "context" "fmt" - "log" "time" "github.com/rmoff/ksqldb-go" @@ -12,7 +12,6 @@ func getDogUpdates() (err error) { rc := make(chan ksqldb.Row) hc := make(chan ksqldb.Header, 1) - cc := make(chan bool) k := "SELECT ROWTIME, ID, NAME, DOGSIZE, AGE FROM DOGS EMIT CHANGES;" @@ -43,14 +42,10 @@ func getDogUpdates() (err error) { }() - // This Go routine shows how you can cancel a Push query as and when required - go func() { - time.Sleep(10 * time.Second) - log.Println("⏱️ Terminating the continuous query now, we've seen enough") - cc <- true - }() + ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second) + defer cancel() - e := ksqldb.Push(ksqlDBServer, k, rc, hc, cc) + e := ksqldb.Push(ctx, ksqlDBServer, k, rc, hc) if e != nil { // handle the error better here, e.g. check for no rows returned diff --git a/types.go b/types.go index f9a897e..4b83824 100644 --- a/types.go +++ b/types.go @@ -1,14 +1,18 @@ package ksqldb +// Row represents a row returned from a query type Row []interface{} +// Payload represents multiple rows type Payload []Row +// Header represents a header returned from a query type Header struct { queryId string columns []Column } +// Column represents the metadata for a column in a Row type Column struct { Name string Type string From 46ac504c35ebc2bc643d0d993636f3002bc97b97 Mon Sep 17 00:00:00 2001 From: Robin Moffatt Date: Thu, 8 Oct 2020 11:03:44 +0100 Subject: [PATCH 2/2] Move hardcoded TODO list to GH issues --- README.adoc | 16 ++-------------- 1 file changed, 2 insertions(+), 14 deletions(-) diff --git a/README.adoc b/README.adoc index aba6040..d2d2087 100644 --- a/README.adoc +++ b/README.adoc @@ -1,6 +1,6 @@ = ksqlDB Go library Robin Moffatt -v0.02, 7 August 2020 +v0.03, 8 October 2020 :toc: @@ -122,16 +122,4 @@ if err := ksqldb.Execute(ctx, ksqlDBServer, ` == TODO -☑️ Add support for the newer https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/streaming-endpoint/[`query-stream`] endpoint (N.B. HTTP2 only) - -☑️ Add support for https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/ksql-endpoint/[`ksql`] endpoint so that applications can create their own materialised views etc natively - -🔲 Check error handling is sufficient - -🔲 Add custom error types for different conditions (e.g. `no rows returned`) - -🔲 Add support for passing parameters in queries - -🔲 Add support for streaming inserts (https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/streaming-endpoint/#inserting-rows-into-an-existing-stream) - -🔲 Add support for CommandId etc in executing commands so that they can be executed sequentially +See https://github.com/rmoff/ksqldb-go/issues?q=is%3Aissue+is%3Aopen+label%3A%22help+wanted%22