From 76f9fb1a829f161defe1c2ffc3c674930fd0b2a7 Mon Sep 17 00:00:00 2001 From: Robin Moffatt Date: Thu, 8 Oct 2020 16:10:48 +0100 Subject: [PATCH 1/2] #7 add auth support --- README.adoc | 4 +++- client.go | 8 +++++--- pull.go | 33 ++++++++++++++++++++------------- push.go | 30 +++++++++++++++++++----------- types.go | 9 ++++++--- 5 files changed, 53 insertions(+), 31 deletions(-) diff --git a/README.adoc b/README.adoc index 2448f1c..dd83713 100644 --- a/README.adoc +++ b/README.adoc @@ -45,9 +45,11 @@ Create a ksqlDB Client [source,go] ---- -client := ksqldb.NewClient("http://ksqldb:8088").Debug() +client := ksqldb.NewClient("http://ksqldb:8088","username","password").Debug() ---- +For no authentication just use blank username and password values. + === Pull query [source,go] diff --git a/client.go b/client.go index 3bae2be..54b1f8c 100644 --- a/client.go +++ b/client.go @@ -3,9 +3,11 @@ package ksqldb import "log" //NewClient creates new ksqldb client with log.Println default logging -func NewClient(url string) *Client { +func NewClient(url string, u string, p string) *Client { return &Client{ - url: url, - logf: log.Printf, + url: url, + username: u, + password: p, + logf: log.Printf, } } diff --git a/pull.go b/pull.go index f191367..a58bf0e 100644 --- a/pull.go +++ b/pull.go @@ -32,19 +32,6 @@ import ( // } func (cl *Client) Pull(ctx context.Context, q string) (h Header, r Payload, err error) { - // Create the client, force it to use HTTP2 (to avoid `http2: unsupported scheme`) - client := http.Client{ - Transport: &http2.Transport{ - // So http2.Transport doesn't complain the URL scheme isn't 'https' - AllowHTTP: true, - // Pretend we are dialing a TLS endpoint. - // Note, we ignore the passed tls.Config - DialTLS: func(network, addr string, cfg *tls.Config) (net.Conn, error) { - return net.Dial(network, addr) - }, - }, - } - // Create the request payload := strings.NewReader("{\"sql\":\"" + q + "\"}") req, err := http.NewRequestWithContext(ctx, "POST", cl.url+"/query-stream", payload) @@ -54,6 +41,26 @@ func (cl *Client) Pull(ctx context.Context, q string) (h Header, r Payload, err } req.Header.Add("Accept", "application/json; charset=utf-8") + // If we've got creds to pass, let's pass them + if cl.username != "" { + req.SetBasicAuth(cl.username, cl.password) + } + + client := &http.Client{} + if req.URL.Scheme == "http" { + // ksqlDB uses HTTP2 and if the server is on HTTP then Golang will not + // use HTTP2 unless we force it to, thus. + // Without this you get the error `http2: unsupported scheme` + client.Transport = &http2.Transport{ + AllowHTTP: true, + // Pretend we are dialing a TLS endpoint. + // Note, we ignore the passed tls.Config + DialTLS: func(network, addr string, cfg *tls.Config) (net.Conn, error) { + return net.Dial(network, addr) + }, + } + } + res, err := client.Do(req) if err != nil { return h, r, err diff --git a/push.go b/push.go index 0e30b51..2e00ade 100644 --- a/push.go +++ b/push.go @@ -41,26 +41,34 @@ import ( // ID = row[1].(string) func (cl *Client) Push(ctx context.Context, q string, rc chan<- Row, hc chan<- Header) (err error) { - // Create the client, force it to use HTTP2 (to avoid `http2: unsupported scheme`) - client := http.Client{ - Transport: &http2.Transport{ - // So http2.Transport doesn't complain the URL scheme isn't 'https' + payload := strings.NewReader("{\"properties\":{\"ksql.streams.auto.offset.reset\": \"latest\"},\"sql\":\"" + q + "\"}") + req, err := http.NewRequestWithContext(ctx, "POST", cl.url+"/query-stream", payload) + if err != nil { + return err + } + + // If we've got creds to pass, let's pass them + if cl.username != "" { + req.SetBasicAuth(cl.username, cl.password) + } + + client := &http.Client{} + if req.URL.Scheme == "http" { + // ksqlDB uses HTTP2 and if the server is on HTTP then Golang will not + // use HTTP2 unless we force it to, thus. + // Without this you get the error `http2: unsupported scheme` + client.Transport = &http2.Transport{ AllowHTTP: true, // Pretend we are dialing a TLS endpoint. // Note, we ignore the passed tls.Config DialTLS: func(network, addr string, cfg *tls.Config) (net.Conn, error) { return net.Dial(network, addr) }, - }, + } } + // make the request - payload := strings.NewReader("{\"properties\":{\"ksql.streams.auto.offset.reset\": \"latest\"},\"sql\":\"" + q + "\"}") - req, err := http.NewRequestWithContext(ctx, "POST", cl.url+"/query-stream", payload) cl.log("Sending ksqlDB request:\n\t%v", q) - if err != nil { - return err - } - res, err := client.Do(req) if err != nil { return err diff --git a/types.go b/types.go index a81e526..805dbd5 100644 --- a/types.go +++ b/types.go @@ -18,8 +18,11 @@ type Column struct { Type string } +// The ksqlDB client type Client struct { - url string - isDebug bool - logf func(format string, v ...interface{}) + url string + username string + password string + isDebug bool + logf func(format string, v ...interface{}) } From 4e3b32146b66079fa577cda6741cecb360b1cbe2 Mon Sep 17 00:00:00 2001 From: Robin Moffatt Date: Thu, 8 Oct 2020 16:11:23 +0100 Subject: [PATCH 2/2] Update test --- test/main.go | 2 ++ test/setup.go | 6 ++++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/test/main.go b/test/main.go index 028e7cb..7b47f2c 100644 --- a/test/main.go +++ b/test/main.go @@ -10,6 +10,8 @@ import ( ) const ksqlDBServer string = "http://localhost:8088" +const ksqlDBUser string = "" +const ksqlDBPW string = "" func main() { diff --git a/test/setup.go b/test/setup.go index bccafac..d75e950 100644 --- a/test/setup.go +++ b/test/setup.go @@ -8,8 +8,10 @@ import ( ) func setup() (*ksqldb.Client, error) { - //create ksqldb client - client := ksqldb.NewClient(ksqlDBServer).Debug() + + //create ksqlDB client + client := ksqldb.NewClient(ksqlDBServer, ksqlDBUser, ksqlDBPW).Debug() + // Create the dummy data connector if err := client.Execute(` CREATE SOURCE CONNECTOR DOGS WITH (