Skip to content

Commit

Permalink
pinot controller/broker support (#6582)
Browse files Browse the repository at this point in the history
* pinot controller/broker support

* pinot driver changes

* fix
  • Loading branch information
pjain1 authored Feb 6, 2025
1 parent a441f31 commit 9eb6fc9
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 128 deletions.
4 changes: 2 additions & 2 deletions docs/docs/reference/olap-engines/pinot.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ Rill supports connecting to an existing Pinot cluster and using it as an OLAP en

## Connection string (DSN)

Rill connects to Pinot using the [Pinot Golang Client](https://docs.pinot.apache.org/users/clients/golang) and requires a connection string of the following format: `http://<user>:<password>@<host>:<port>`.
Rill connects to Pinot using the [Pinot Golang Client](https://docs.pinot.apache.org/users/clients/golang) and requires a connection string of the following format: `http://<user>:<password>@<broker_host>:<port>?controller=<controller_host>:<port>`.
`host`and `port` should be of the Pinot Controller server. If `user` or `password` contain special characters they should be URL encoded (i.e. `p@ssword` -> `p%40ssword`). This should be set in the `connector.pinot.dsn` property in Rill.

As an example, this typically looks something like:

```bash
connector.pinot.dsn="https://username:password@localhost:9000"
connector.pinot.dsn="http(s)://username:password@localhost:8000?controller=localhost:9000"
```

:::info Need help connecting to Pinot?
Expand Down
2 changes: 2 additions & 0 deletions runtime/drivers/olap.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,8 @@ func (d Dialect) SelectInlineResults(result *Result) (string, []any, []any, erro
prefix += ", "
}
}
} else if i > 0 {
prefix += ", "
}

if d == DialectDuckDB {
Expand Down
11 changes: 9 additions & 2 deletions runtime/drivers/pinot/olap.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/jmoiron/sqlx"
runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"github.com/rilldata/rill/runtime/drivers"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

Expand Down Expand Up @@ -55,6 +56,9 @@ func (c *connection) WithConnection(ctx context.Context, priority int, longRunni
}

func (c *connection) Exec(ctx context.Context, stmt *drivers.Statement) error {
if c.logQueries {
c.logger.Info("pinot query", zap.String("sql", stmt.Query), zap.Any("args", stmt.Args))
}
res, err := c.Execute(ctx, stmt)
if err != nil {
return err
Expand All @@ -66,6 +70,9 @@ func (c *connection) Exec(ctx context.Context, stmt *drivers.Statement) error {
}

func (c *connection) Execute(ctx context.Context, stmt *drivers.Statement) (*drivers.Result, error) {
if c.logQueries {
c.logger.Info("pinot query", zap.String("sql", stmt.Query), zap.Any("args", stmt.Args))
}
if stmt.DryRun {
rows, err := c.db.QueryxContext(ctx, "EXPLAIN PLAN FOR "+stmt.Query, stmt.Args...)
if err != nil {
Expand Down Expand Up @@ -122,7 +129,7 @@ func (c *connection) InformationSchema() drivers.InformationSchema {

func (i informationSchema) All(ctx context.Context, like string) ([]*drivers.Table, error) {
// query /tables endpoint, for each table name, query /tables/{tableName}/schema
req, _ := http.NewRequestWithContext(ctx, http.MethodGet, i.c.baseURL+"/tables", http.NoBody)
req, _ := http.NewRequestWithContext(ctx, http.MethodGet, i.c.schemaURL+"/tables", http.NoBody)
for k, v := range i.c.headers {
req.Header.Set(k, v)
}
Expand Down Expand Up @@ -183,7 +190,7 @@ func (i informationSchema) All(ctx context.Context, like string) ([]*drivers.Tab
}

func (i informationSchema) Lookup(ctx context.Context, db, schema, name string) (*drivers.Table, error) {
req, _ := http.NewRequestWithContext(ctx, http.MethodGet, i.c.baseURL+"/tables/"+name+"/schema", http.NoBody)
req, _ := http.NewRequestWithContext(ctx, http.MethodGet, i.c.schemaURL+"/tables/"+name+"/schema", http.NoBody)
for k, v := range i.c.headers {
req.Header.Set(k, v)
}
Expand Down
87 changes: 62 additions & 25 deletions runtime/drivers/pinot/pinot.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,40 @@ var spec = drivers.Spec{
Type: drivers.StringPropertyType,
Required: true,
DisplayName: "Connection string",
Placeholder: "http(s)://username:password@localhost:9000",
Placeholder: "http(s)://username:password@localhost:8000?controller=localhost:9000",
Secret: true,
NoPrompt: true,
},
{
Key: "host",
Key: "broker_host",
Type: drivers.StringPropertyType,
Required: true,
DisplayName: "Host",
Description: "Hostname or IP address of the Pinot server",
DisplayName: "Broker Host",
Description: "Hostname or IP address of the Pinot broker server",
Placeholder: "localhost",
},
{
Key: "port",
Key: "broker_port",
Type: drivers.NumberPropertyType,
Required: false,
DisplayName: "Port",
Description: "Port number of the Pinot server",
DisplayName: "Broker Port",
Description: "Port number of the broker Pinot broker server",
Placeholder: "8000",
},
{
Key: "controller_host",
Type: drivers.StringPropertyType,
Required: true,
DisplayName: "Controller Host",
Description: "Hostname or IP address of the Pinot controller server",
Placeholder: "localhost",
},
{
Key: "controller_port",
Type: drivers.NumberPropertyType,
Required: false,
DisplayName: "Controller Port",
Description: "Port number of the Pinot controller server",
Placeholder: "9000",
},
{
Expand Down Expand Up @@ -83,11 +99,13 @@ type driver struct{}

type configProperties struct {
// DSN is the connection string. Set either DSN or properties below.
DSN string `mapstructure:"dsn"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
Host string `mapstructure:"host"`
Port int `mapstructure:"port"`
DSN string `mapstructure:"dsn"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
BrokerHost string `mapstructure:"broker_host"`
BrokerPort int `mapstructure:"broker_port"`
ControllerHost string `mapstructure:"controller_host"`
ControllerPort int `mapstructure:"controller_port"`
// SSL determines whether secured connection need to be established. To be set when setting individual fields.
SSL bool `mapstructure:"ssl"`
// LogQueries controls whether to log the raw SQL passed to OLAP.Execute.
Expand All @@ -109,12 +127,24 @@ func (d driver) Open(instanceID string, config map[string]any, st *storage.Clien
var dsn string
if conf.DSN != "" {
dsn = conf.DSN
} else if conf.Host != "" {
} else if conf.ControllerHost != "" && conf.BrokerHost != "" {
var controllerURL url.URL
if conf.ControllerPort == 0 {
controllerURL.Host = conf.ControllerHost
} else {
controllerURL.Host = fmt.Sprintf("%v:%v", conf.ControllerHost, conf.ControllerPort)
}
if conf.SSL {
controllerURL.Scheme = "https"
} else {
controllerURL.Scheme = "http"
}

var dsnURL url.URL
dsnURL.Host = conf.Host
dsnURL.Host = conf.BrokerHost
// set port
if conf.Port != 0 {
dsnURL.Host = fmt.Sprintf("%v:%v", conf.Host, conf.Port)
if conf.BrokerPort != 0 {
dsnURL.Host = fmt.Sprintf("%v:%v", conf.BrokerHost, conf.BrokerPort)
}

// set scheme
Expand All @@ -131,6 +161,7 @@ func (d driver) Open(instanceID string, config map[string]any, st *storage.Clien
dsnURL.User = url.User(conf.Username)
}

dsnURL.RawQuery = "controller=" + controllerURL.String()
dsn = dsnURL.String()
} else {
return nil, fmt.Errorf("pinot connection parameters not set. Set `dsn` or individual properties")
Expand All @@ -154,16 +185,19 @@ func (d driver) Open(instanceID string, config map[string]any, st *storage.Clien
return nil, fmt.Errorf("pinot: %w", err)
}

controller, headers, err := sqldriver.ParseDSN(dsn)
broker, controller, headers, err := sqldriver.ParseDSN(dsn)
if err != nil {
return nil, err
}

conn := &connection{
db: dbx,
config: config,
baseURL: controller,
headers: headers,
db: dbx,
config: config,
queryURL: broker,
schemaURL: controller,
headers: headers,
logQueries: conf.LogQueries,
logger: logger,
}
return conn, nil
}
Expand All @@ -181,10 +215,13 @@ func (d driver) TertiarySourceConnectors(ctx context.Context, src map[string]any
}

type connection struct {
db *sqlx.DB
config map[string]any
baseURL string
headers map[string]string
db *sqlx.DB
config map[string]any
queryURL string
schemaURL string
headers map[string]string
logQueries bool
logger *zap.Logger
}

// Ping implements drivers.Handle.
Expand Down
85 changes: 23 additions & 62 deletions runtime/drivers/pinot/sqldriver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"math/big"
"net/url"
"reflect"
"strings"
"time"

"github.com/startreedata/pinot-client-go/pinot"
Expand All @@ -21,22 +20,19 @@ import (
type pinotDriver struct{}

func (d *pinotDriver) Open(dsn string) (sqlDriver.Conn, error) {
address, headers, err := ParseDSN(dsn)
broker, _, headers, err := ParseDSN(dsn)
if err != nil {
return nil, err
}
pinotConn, err := pinot.NewWithConfig(&pinot.ClientConfig{
ExtraHTTPHeader: headers,
ControllerConfig: &pinot.ControllerConfig{
ExtraControllerAPIHeaders: headers,
ControllerAddress: address,
},
ExtraHTTPHeader: headers,
BrokerList: []string{broker},
UseMultistageEngine: true, // We have joins and nested queries which are supported by multistage engine
})
if err != nil {
return nil, err
}
// We have joins and nested queries which are supported by multistage engine
pinotConn.UseMultistageEngine(true)

return &conn{pinotConn: pinotConn}, nil
}

Expand All @@ -61,15 +57,18 @@ func (c *conn) Begin() (sqlDriver.Tx, error) {
}

func (c *conn) QueryContext(ctx context.Context, query string, args []sqlDriver.NamedValue) (sqlDriver.Rows, error) {
var resp *pinot.BrokerResponse
var err error
if len(args) > 0 {
q, err := completeQuery(query, args)
if err != nil {
return nil, err
var params []interface{}
for _, arg := range args {
params = append(params, arg.Value)
}
query = q
// TODO: cancel the query if ctx is done
resp, err = c.pinotConn.ExecuteSQLWithParams("", query, params)
} else {
resp, err = c.pinotConn.ExecuteSQL("", query)
}
// TODO: cancel the query if ctx is done
resp, err := c.pinotConn.ExecuteSQL("", query)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -227,19 +226,20 @@ func (r *rows) goValue(rowIdx, coldIdx int, pinotType string) interface{} {
}

// ParseDSN parses the DSN string to extract the controller address and basic auth credentials
func ParseDSN(dsn string) (string, map[string]string, error) {
func ParseDSN(dsn string) (string, string, map[string]string, error) {
// DSN format: http(s)://username:password@broker:port?controller=http(s)://controller:port
// validate dsn - it should be a valid URL, may contain basic auth credentials
u, err := url.Parse(dsn)
if err != nil {
return "", nil, fmt.Errorf("invalid DSN: %w", err)
return "", "", nil, fmt.Errorf("invalid DSN: %w", err)
}

var authHeader map[string]string
if u.User != nil {
uname := u.User.Username()
pwd, passwordSet := u.User.Password()
if uname == "" || !passwordSet {
return "", nil, fmt.Errorf("DSN should contain valid basic auth credentials")
return "", "", nil, fmt.Errorf("DSN should contain valid basic auth credentials")
}
// clear user info from URL so that u.String() doesn't include it
u.User = nil
Expand All @@ -248,51 +248,12 @@ func ParseDSN(dsn string) (string, map[string]string, error) {
"Authorization": fmt.Sprintf("Basic %s", base64.StdEncoding.EncodeToString([]byte(authString))),
}
}
return u.String(), authHeader, nil
}

func completeQuery(query string, args []sqlDriver.NamedValue) (string, error) {
parts := strings.Split(query, "?")
if len(parts)-1 != len(args) {
return "", fmt.Errorf("mismatch in the number of placeholders and arguments")
}

var sb strings.Builder
for i, part := range parts {
sb.WriteString(part)
if i < len(args) {
argStr, err := formatArg(args[i].Value)
if err != nil {
return "", err
}
sb.WriteString(argStr)
}
controllerURL := u.Query().Get("controller")
if controllerURL == "" {
return "", "", nil, fmt.Errorf("controller URL not provided, dsn is form http(s)://username:password@broker:port?controller=http(s)://controller:port")
}

return sb.String(), nil
}

func formatArg(value sqlDriver.Value) (string, error) {
switch v := value.(type) {
case string:
// Escape any single quotes in the string
escaped := strings.ReplaceAll(v, "'", "''")
return fmt.Sprintf("'%s'", escaped), nil
case *big.Int, *big.Float:
// For pinot types - BIG_INT and BIG_DECIMAL - enclose in single quotes
return fmt.Sprintf("'%v'", v), nil
case []byte:
// For pinot type - BYTES - convert to Hex string and enclose in single quotes
hexString := fmt.Sprintf("%x", v)
return fmt.Sprintf("'%s'", hexString), nil
case time.Time:
// For pinot type - TIMESTAMP - convert to below ISO8601 format that it expects and enclose in single quotes
return fmt.Sprintf("'%s'", v.Format("2006-01-02 15:04:05.000Z")), nil
case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64, float32, float64, bool:
// For types - INT, LONG, FLOAT, DOUBLE and BOOLEAN use as-is
return fmt.Sprintf("%v", v), nil
default:
// Throw error for unsupported types
return "", fmt.Errorf("unsupported type: %T", v)
}
u.RawQuery = ""
return u.String(), controllerURL, authHeader, nil
}
37 changes: 0 additions & 37 deletions runtime/drivers/pinot/sqldriver/sql_param_test.go

This file was deleted.

0 comments on commit 9eb6fc9

Please sign in to comment.