diff --git a/README.adoc b/README.adoc index 2448f1c..dd83713 100644 --- a/README.adoc +++ b/README.adoc @@ -45,9 +45,11 @@ Create a ksqlDB Client [source,go] ---- -client := ksqldb.NewClient("http://ksqldb:8088").Debug() +client := ksqldb.NewClient("http://ksqldb:8088","username","password").Debug() ---- +For no authentication just use blank username and password values. + === Pull query [source,go] diff --git a/client.go b/client.go index 3bae2be..54b1f8c 100644 --- a/client.go +++ b/client.go @@ -3,9 +3,11 @@ package ksqldb import "log" //NewClient creates new ksqldb client with log.Println default logging -func NewClient(url string) *Client { +func NewClient(url string, u string, p string) *Client { return &Client{ - url: url, - logf: log.Printf, + url: url, + username: u, + password: p, + logf: log.Printf, } } diff --git a/pull.go b/pull.go index f191367..a58bf0e 100644 --- a/pull.go +++ b/pull.go @@ -32,19 +32,6 @@ import ( // } func (cl *Client) Pull(ctx context.Context, q string) (h Header, r Payload, err error) { - // Create the client, force it to use HTTP2 (to avoid `http2: unsupported scheme`) - client := http.Client{ - Transport: &http2.Transport{ - // So http2.Transport doesn't complain the URL scheme isn't 'https' - AllowHTTP: true, - // Pretend we are dialing a TLS endpoint. - // Note, we ignore the passed tls.Config - DialTLS: func(network, addr string, cfg *tls.Config) (net.Conn, error) { - return net.Dial(network, addr) - }, - }, - } - // Create the request payload := strings.NewReader("{\"sql\":\"" + q + "\"}") req, err := http.NewRequestWithContext(ctx, "POST", cl.url+"/query-stream", payload) @@ -54,6 +41,26 @@ func (cl *Client) Pull(ctx context.Context, q string) (h Header, r Payload, err } req.Header.Add("Accept", "application/json; charset=utf-8") + // If we've got creds to pass, let's pass them + if cl.username != "" { + req.SetBasicAuth(cl.username, cl.password) + } + + client := &http.Client{} + if req.URL.Scheme == "http" { + // ksqlDB uses HTTP2 and if the server is on HTTP then Golang will not + // use HTTP2 unless we force it to, thus. + // Without this you get the error `http2: unsupported scheme` + client.Transport = &http2.Transport{ + AllowHTTP: true, + // Pretend we are dialing a TLS endpoint. + // Note, we ignore the passed tls.Config + DialTLS: func(network, addr string, cfg *tls.Config) (net.Conn, error) { + return net.Dial(network, addr) + }, + } + } + res, err := client.Do(req) if err != nil { return h, r, err diff --git a/push.go b/push.go index 0e30b51..2e00ade 100644 --- a/push.go +++ b/push.go @@ -41,26 +41,34 @@ import ( // ID = row[1].(string) func (cl *Client) Push(ctx context.Context, q string, rc chan<- Row, hc chan<- Header) (err error) { - // Create the client, force it to use HTTP2 (to avoid `http2: unsupported scheme`) - client := http.Client{ - Transport: &http2.Transport{ - // So http2.Transport doesn't complain the URL scheme isn't 'https' + payload := strings.NewReader("{\"properties\":{\"ksql.streams.auto.offset.reset\": \"latest\"},\"sql\":\"" + q + "\"}") + req, err := http.NewRequestWithContext(ctx, "POST", cl.url+"/query-stream", payload) + if err != nil { + return err + } + + // If we've got creds to pass, let's pass them + if cl.username != "" { + req.SetBasicAuth(cl.username, cl.password) + } + + client := &http.Client{} + if req.URL.Scheme == "http" { + // ksqlDB uses HTTP2 and if the server is on HTTP then Golang will not + // use HTTP2 unless we force it to, thus. + // Without this you get the error `http2: unsupported scheme` + client.Transport = &http2.Transport{ AllowHTTP: true, // Pretend we are dialing a TLS endpoint. // Note, we ignore the passed tls.Config DialTLS: func(network, addr string, cfg *tls.Config) (net.Conn, error) { return net.Dial(network, addr) }, - }, + } } + // make the request - payload := strings.NewReader("{\"properties\":{\"ksql.streams.auto.offset.reset\": \"latest\"},\"sql\":\"" + q + "\"}") - req, err := http.NewRequestWithContext(ctx, "POST", cl.url+"/query-stream", payload) cl.log("Sending ksqlDB request:\n\t%v", q) - if err != nil { - return err - } - res, err := client.Do(req) if err != nil { return err diff --git a/types.go b/types.go index a81e526..805dbd5 100644 --- a/types.go +++ b/types.go @@ -18,8 +18,11 @@ type Column struct { Type string } +// The ksqlDB client type Client struct { - url string - isDebug bool - logf func(format string, v ...interface{}) + url string + username string + password string + isDebug bool + logf func(format string, v ...interface{}) }