Skip to content

Commit

Permalink
Merge pull request #12 from rmoff/add_auth
Browse files Browse the repository at this point in the history
Add auth
  • Loading branch information
rmoff committed Oct 8, 2020
2 parents ab08602 + 4e3b321 commit 7461281
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 33 deletions.
4 changes: 3 additions & 1 deletion README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,11 @@ Create a ksqlDB Client

[source,go]
----
client := ksqldb.NewClient("http://ksqldb:8088").Debug()
client := ksqldb.NewClient("http://ksqldb:8088","username","password").Debug()
----

For no authentication just use blank username and password values.

=== Pull query

[source,go]
Expand Down
8 changes: 5 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package ksqldb
import "log"

//NewClient creates new ksqldb client with log.Println default logging
func NewClient(url string) *Client {
func NewClient(url string, u string, p string) *Client {
return &Client{
url: url,
logf: log.Printf,
url: url,
username: u,
password: p,
logf: log.Printf,
}
}
33 changes: 20 additions & 13 deletions pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,6 @@ import (
// }
func (cl *Client) Pull(ctx context.Context, q string) (h Header, r Payload, err error) {

// Create the client, force it to use HTTP2 (to avoid `http2: unsupported scheme`)
client := http.Client{
Transport: &http2.Transport{
// So http2.Transport doesn't complain the URL scheme isn't 'https'
AllowHTTP: true,
// Pretend we are dialing a TLS endpoint.
// Note, we ignore the passed tls.Config
DialTLS: func(network, addr string, cfg *tls.Config) (net.Conn, error) {
return net.Dial(network, addr)
},
},
}

// Create the request
payload := strings.NewReader("{\"sql\":\"" + q + "\"}")
req, err := http.NewRequestWithContext(ctx, "POST", cl.url+"/query-stream", payload)
Expand All @@ -54,6 +41,26 @@ func (cl *Client) Pull(ctx context.Context, q string) (h Header, r Payload, err
}
req.Header.Add("Accept", "application/json; charset=utf-8")

// If we've got creds to pass, let's pass them
if cl.username != "" {
req.SetBasicAuth(cl.username, cl.password)
}

client := &http.Client{}
if req.URL.Scheme == "http" {
// ksqlDB uses HTTP2 and if the server is on HTTP then Golang will not
// use HTTP2 unless we force it to, thus.
// Without this you get the error `http2: unsupported scheme`
client.Transport = &http2.Transport{
AllowHTTP: true,
// Pretend we are dialing a TLS endpoint.
// Note, we ignore the passed tls.Config
DialTLS: func(network, addr string, cfg *tls.Config) (net.Conn, error) {
return net.Dial(network, addr)
},
}
}

res, err := client.Do(req)
if err != nil {
return h, r, err
Expand Down
30 changes: 19 additions & 11 deletions push.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,26 +41,34 @@ import (
// ID = row[1].(string)
func (cl *Client) Push(ctx context.Context, q string, rc chan<- Row, hc chan<- Header) (err error) {

// Create the client, force it to use HTTP2 (to avoid `http2: unsupported scheme`)
client := http.Client{
Transport: &http2.Transport{
// So http2.Transport doesn't complain the URL scheme isn't 'https'
payload := strings.NewReader("{\"properties\":{\"ksql.streams.auto.offset.reset\": \"latest\"},\"sql\":\"" + q + "\"}")
req, err := http.NewRequestWithContext(ctx, "POST", cl.url+"/query-stream", payload)
if err != nil {
return err
}

// If we've got creds to pass, let's pass them
if cl.username != "" {
req.SetBasicAuth(cl.username, cl.password)
}

client := &http.Client{}
if req.URL.Scheme == "http" {
// ksqlDB uses HTTP2 and if the server is on HTTP then Golang will not
// use HTTP2 unless we force it to, thus.
// Without this you get the error `http2: unsupported scheme`
client.Transport = &http2.Transport{
AllowHTTP: true,
// Pretend we are dialing a TLS endpoint.
// Note, we ignore the passed tls.Config
DialTLS: func(network, addr string, cfg *tls.Config) (net.Conn, error) {
return net.Dial(network, addr)
},
},
}
}

// make the request
payload := strings.NewReader("{\"properties\":{\"ksql.streams.auto.offset.reset\": \"latest\"},\"sql\":\"" + q + "\"}")
req, err := http.NewRequestWithContext(ctx, "POST", cl.url+"/query-stream", payload)
cl.log("Sending ksqlDB request:\n\t%v", q)
if err != nil {
return err
}

res, err := client.Do(req)
if err != nil {
return err
Expand Down
2 changes: 2 additions & 0 deletions test/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
)

const ksqlDBServer string = "http://localhost:8088"
const ksqlDBUser string = ""
const ksqlDBPW string = ""

func main() {

Expand Down
6 changes: 4 additions & 2 deletions test/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (
)

func setup() (*ksqldb.Client, error) {
//create ksqldb client
client := ksqldb.NewClient(ksqlDBServer).Debug()

//create ksqlDB client
client := ksqldb.NewClient(ksqlDBServer, ksqlDBUser, ksqlDBPW).Debug()

// Create the dummy data connector
if err := client.Execute(`
CREATE SOURCE CONNECTOR DOGS WITH (
Expand Down
9 changes: 6 additions & 3 deletions types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ type Column struct {
Type string
}

// The ksqlDB client
type Client struct {
url string
isDebug bool
logf func(format string, v ...interface{})
url string
username string
password string
isDebug bool
logf func(format string, v ...interface{})
}

0 comments on commit 7461281

Please sign in to comment.