Skip to content

Commit

Permalink
Merge in context changes (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
rmoff committed Oct 8, 2020
2 parents a6bada4 + 46ac504 commit 6b26fb2
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 46 deletions.
35 changes: 10 additions & 25 deletions README.adoc
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
= ksqlDB Go library
Robin Moffatt <robin@moffatt.me>
v0.02, 7 August 2020
v0.03, 8 October 2020

:toc:

Expand Down Expand Up @@ -45,8 +45,11 @@ go run ./test/

[source,go]
----
ctx, ctxCancel := context.WithTimeout(context.Background(), 10 * time.Second)
defer ctxCancel()
k := "SELECT TIMESTAMPTOSTRING(WINDOWSTART,'yyyy-MM-dd HH:mm:ss','Europe/London') AS WINDOW_START, TIMESTAMPTOSTRING(WINDOWEND,'HH:mm:ss','Europe/London') AS WINDOW_END, DOG_SIZE, DOGS_CT FROM DOGS_BY_SIZE WHERE DOG_SIZE='" + s + "';"
_, r, e := ksqldb.Pull(ksqlDBServer, k)
_, r, e := ksqldb.Pull(ctx, ksqlDBServer, k)
if e != nil {
// handle the error better here, e.g. check for no rows returned
Expand All @@ -71,7 +74,6 @@ for _, row := range r {
----
rc := make(chan ksqldb.Row)
hc := make(chan ksqldb.Header, 1)
cc := make(chan bool)
k := "SELECT ROWTIME, ID, NAME, DOGSIZE, AGE FROM DOGS EMIT CHANGES;"
Expand All @@ -89,17 +91,12 @@ go func() {
fmt.Printf("🐾%v: %v\n", NAME, DOG_SIZE)
}
}
}()
// This Go routine shows how you can cancel a Push query as and when required
go func() {
time.Sleep(10 * time.Second)
log.Println("⏱️ Terminating the continuous query now, we've seen enough")
cc <- true
}()
ctx, ctxCancel := context.WithTimeout(context.Background(), 10 * time.Second)
defer ctxCancel()
e := ksqldb.Push(ksqlDBServer, k, rc, hc, cc)
e := ksqldb.Push(ctx, ksqlDBServer, k, rc, hc)
if e != nil {
// handle the error better here, e.g. check for no rows returned
Expand All @@ -111,7 +108,7 @@ if e != nil {

[source,go]
----
if err := ksqldb.Execute(ksqlDBServer, `
if err := ksqldb.Execute(ctx, ksqlDBServer, `
CREATE STREAM DOGS (ID STRING KEY,
NAME STRING,
DOGSIZE STRING,
Expand All @@ -125,16 +122,4 @@ if err := ksqldb.Execute(ksqlDBServer, `

== TODO

☑️ Add support for the newer https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/streaming-endpoint/[`query-stream`] endpoint (N.B. HTTP2 only)

☑️ Add support for https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/ksql-endpoint/[`ksql`] endpoint so that applications can create their own materialised views etc natively

🔲 Check error handling is sufficient

🔲 Add custom error types for different conditions (e.g. `no rows returned`)

🔲 Add support for passing parameters in queries

🔲 Add support for streaming inserts (https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/streaming-endpoint/#inserting-rows-into-an-existing-stream)

🔲 Add support for CommandId etc in executing commands so that they can be executed sequentially
See https://github.com/rmoff/ksqldb-go/issues?q=is%3Aissue+is%3Aopen+label%3A%22help+wanted%22
8 changes: 4 additions & 4 deletions pull.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ksqldb

import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
Expand All @@ -15,8 +16,7 @@ import (
// Pull queries are like "traditional" RDBMS queries in which
// the query terminates once the state has been queried.
//
// To use this function pass in the base URL of your
// ksqlDB server, and the SQL query statement.
// To use this function pass in the the SQL query statement.
//
// The function returns a ksqldb.Header and ksqldb.Payload
// which will hold one or more rows of data. You will need to
Expand All @@ -30,7 +30,7 @@ import (
// // Do other stuff with the data here
// }
// }
func (cl *Client) Pull(q string) (h Header, r Payload, err error) {
func (cl *Client) Pull(ctx context.Context, q string) (h Header, r Payload, err error) {

// Create the client, force it to use HTTP2 (to avoid `http2: unsupported scheme`)
client := http.Client{
Expand All @@ -47,7 +47,7 @@ func (cl *Client) Pull(q string) (h Header, r Payload, err error) {

// Create the request
payload := strings.NewReader("{\"sql\":\"" + q + "\"}")
req, err := http.NewRequest("POST", cl.url+"/query-stream", payload)
req, err := http.NewRequestWithContext(ctx, "POST", cl.url+"/query-stream", payload)

if err != nil {
return h, r, err
Expand Down
14 changes: 7 additions & 7 deletions push.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ksqldb

import (
"bufio"
"context"
"crypto/tls"
"encoding/json"
"fmt"
Expand All @@ -21,14 +22,13 @@ import (
// to which it can write new rows of data as and when they are
// received.
//
// To use this function pass in the base URL of your
// ksqlDB server, the SQL query statement, and three channels:
// To use this function pass in a context, the SQL query statement,
// and two channels:
//
// * ksqldb.Row - rows of data
// * ksqldb.Header - header (including column definitions).
// If you don't want to block before receiving
// row data then make this channel buffered.
// * boolean - write true to this channel to cancel the push query
//
// The channel is populated with ksqldb.Row which represents
// one row of data. You will need to define variables to hold
Expand All @@ -39,7 +39,7 @@ import (
// if row != nil {
// DATA_TS = row[0].(float64)
// ID = row[1].(string)
func (cl *Client) Push(q string, rc chan<- Row, hc chan<- Header, cc <-chan bool) (err error) {
func (cl *Client) Push(ctx context.Context, q string, rc chan<- Row, hc chan<- Header) (err error) {

// Create the client, force it to use HTTP2 (to avoid `http2: unsupported scheme`)
client := http.Client{
Expand All @@ -55,7 +55,7 @@ func (cl *Client) Push(q string, rc chan<- Row, hc chan<- Header, cc <-chan bool
}
// make the request
payload := strings.NewReader("{\"properties\":{\"ksql.streams.auto.offset.reset\": \"latest\"},\"sql\":\"" + q + "\"}")
req, err := http.NewRequest("POST", cl.url+"/query-stream", payload)
req, err := http.NewRequestWithContext(ctx, "POST", cl.url+"/query-stream", payload)
cl.log("Sending ksqlDB request:\n\t%v", q)
if err != nil {
return err
Expand All @@ -75,14 +75,14 @@ func (cl *Client) Push(q string, rc chan<- Row, hc chan<- Header, cc <-chan bool

for doThis {
select {
case <-cc:
case <-ctx.Done():
// close the channels and terminate the loop regardless
defer close(rc)
defer close(hc)
defer func() { doThis = false }()
// Try to close the query
payload := strings.NewReader("{\"queryId\":\"" + h.queryId + "\"}")
req, err := http.NewRequest("POST", cl.url+"/close-query", payload)
req, err := http.NewRequestWithContext(ctx, "POST", cl.url+"/close-query", payload)
cl.log("Closing ksqlDB query\t%v", h.queryId)
if err != nil {
return fmt.Errorf("Failed to construct HTTP request to cancel query\n%v", err)
Expand Down
7 changes: 6 additions & 1 deletion test/pull.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
package main

import (
"context"
"fmt"
"time"

"github.com/rmoff/ksqldb-go"
)

func getDogStats(client *ksqldb.Client, s string) (e error) {

k := "SELECT TIMESTAMPTOSTRING(WINDOWSTART,'yyyy-MM-dd HH:mm:ss','Europe/London') AS WINDOW_START, TIMESTAMPTOSTRING(WINDOWEND,'HH:mm:ss','Europe/London') AS WINDOW_END, DOG_SIZE, DOGS_CT FROM DOGS_BY_SIZE WHERE DOG_SIZE='" + s + "';"
_, r, e := client.Pull(k)

ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
defer cancel()
_, r, e := client.Pull(ctx, k)

if e != nil {
// handle the error better here, e.g. check for no rows returned
Expand Down
13 changes: 4 additions & 9 deletions test/push.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package main

import (
"context"
"fmt"
"log"
"time"

"github.com/rmoff/ksqldb-go"
Expand All @@ -12,7 +12,6 @@ func getDogUpdates(client *ksqldb.Client) (err error) {

rc := make(chan ksqldb.Row)
hc := make(chan ksqldb.Header, 1)
cc := make(chan bool)

k := "SELECT ROWTIME, ID, NAME, DOGSIZE, AGE FROM DOGS EMIT CHANGES;"

Expand Down Expand Up @@ -43,14 +42,10 @@ func getDogUpdates(client *ksqldb.Client) (err error) {

}()

// This Go routine shows how you can cancel a Push query as and when required
go func() {
time.Sleep(10 * time.Second)
log.Println("⏱️ Terminating the continuous query now, we've seen enough")
cc <- true
}()
ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
defer cancel()

e := client.Push(k, rc, hc, cc)
e := client.Push(ctx, k, rc, hc)

if e != nil {
// handle the error better here, e.g. check for no rows returned
Expand Down
4 changes: 4 additions & 0 deletions types.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package ksqldb

// Row represents a row returned from a query
type Row []interface{}

// Payload represents multiple rows
type Payload []Row

// Header represents a header returned from a query
type Header struct {
queryId string
columns []Column
}

// Column represents the metadata for a column in a Row
type Column struct {
Name string
Type string
Expand Down

0 comments on commit 6b26fb2

Please sign in to comment.