Skip to content

Commit

Permalink
client for health check & multiple user reload for pgBouncer (#107)
Browse files Browse the repository at this point in the history
Signed-off-by: MobarakHsn <[email protected]>
  • Loading branch information
HiranmoyChowdhury authored Jun 4, 2024
1 parent 277b3fb commit 1942b7b
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 84 deletions.
15 changes: 15 additions & 0 deletions pgbouncer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ limitations under the License.
package pgbouncer

import (
"context"
"database/sql"
"sync"

api "kubedb.dev/apimachinery/apis/kubedb/v1alpha2"

"xorm.io/xorm"
)
Expand All @@ -29,3 +33,14 @@ type Client struct {
type XormClient struct {
*xorm.Engine
}

type XormClientList struct {
List []*XormClient
Mutex sync.Mutex
WG sync.WaitGroup

context context.Context
pb *api.PgBouncer
auth *Auth
dbName string
}
216 changes: 132 additions & 84 deletions pgbouncer/kubedb_client_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,162 +18,210 @@ package pgbouncer

import (
"context"
"database/sql"
"errors"
"fmt"

api "kubedb.dev/apimachinery/apis/kubedb/v1alpha2"

_ "github.com/lib/pq"
core "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"kmodules.xyz/client-go/tools/certholder"
appbinding "kmodules.xyz/custom-resources/apis/appcatalog/v1alpha1"
"sigs.k8s.io/controller-runtime/pkg/client"
"xorm.io/xorm"
)

const (
DefaultPgBouncerDB = "pgbouncer"
DefaultBackendDBType = "postgres"
TLSModeDisable = "disable"
)

type Auth struct {
UserName string
Password string
}

type KubeDBClientBuilder struct {
kc client.Client
db *api.PgBouncer
url string
podName string
pgBouncerDB string
kc client.Client
pgbouncer *api.PgBouncer
url string
podName string
backendDBName string
ctx context.Context
databaseRef *api.Database
auth *Auth
}

func NewKubeDBClientBuilder(kc client.Client, db *api.PgBouncer) *KubeDBClientBuilder {
func NewKubeDBClientBuilder(kc client.Client, pb *api.PgBouncer) *KubeDBClientBuilder {
return &KubeDBClientBuilder{
kc: kc,
db: db,
kc: kc,
pgbouncer: pb,
}
}

func (o *KubeDBClientBuilder) WithURL(url string) *KubeDBClientBuilder {
o.url = url
return o
}

func (o *KubeDBClientBuilder) WithAuth(auth *Auth) *KubeDBClientBuilder {
if auth != nil && auth.UserName != "" && auth.Password != "" {
o.auth = auth
}
return o
}

func (o *KubeDBClientBuilder) WithPod(podName string) *KubeDBClientBuilder {
o.podName = podName
return o
}

func (o *KubeDBClientBuilder) WithURL(url string) *KubeDBClientBuilder {
o.url = url
func (o *KubeDBClientBuilder) WithDatabaseRef(db *api.Database) *KubeDBClientBuilder {
o.databaseRef = db
return o
}

func (o *KubeDBClientBuilder) WithPgBouncerDB(pgDB string) *KubeDBClientBuilder {
o.pgBouncerDB = pgDB
func (o *KubeDBClientBuilder) WithPostgresDBName(dbName string) *KubeDBClientBuilder {
if dbName == "" {
o.backendDBName = o.databaseRef.DatabaseName
} else {
o.backendDBName = dbName
}
return o
}

func (o *KubeDBClientBuilder) GetPgbouncerXormClient(ctx context.Context) (*XormClient, error) {
connector, err := o.getConnectionString(ctx)
func (o *KubeDBClientBuilder) WithContext(ctx context.Context) *KubeDBClientBuilder {
o.ctx = ctx
return o
}

func (o *KubeDBClientBuilder) GetPgBouncerXormClient() (*XormClient, error) {
if o.ctx == nil {
o.ctx = context.Background()
}

connector, err := o.getConnectionString()
if err != nil {
return nil, err
}

engine, err := xorm.NewEngine("postgres", connector)
engine, err := xorm.NewEngine(DefaultBackendDBType, connector)
if err != nil {
return nil, fmt.Errorf("failed to generate pgbouncer client using connection string: %v", err)
return nil, err
}
_, err = engine.Query("SHOW HELP")
if err != nil {
return nil, fmt.Errorf("failed to run query: %v", err)
if engine == nil {
return nil, fmt.Errorf("Xorm Engine can't be build for pgbouncer")
}
return &XormClient{engine}, nil

engine.SetDefaultContext(o.ctx)
return &XormClient{
engine,
}, nil
}

func (o *KubeDBClientBuilder) getURL() string {
return fmt.Sprintf("%s.%s.%s.svc", o.podName, o.db.GoverningServiceName(), o.db.Namespace)
return fmt.Sprintf("%s.%s.%s.svc", o.podName, o.pgbouncer.GoverningServiceName(), o.pgbouncer.Namespace)
}

func (o *KubeDBClientBuilder) getPgBouncerAuthCredentials(ctx context.Context) (string, string, error) {
if o.db.Spec.AuthSecret == nil {
return "", "", errors.New("no database secret")
func (o *KubeDBClientBuilder) getBackendAuth() (string, string, error) {
if o.auth != nil {
return o.auth.UserName, o.auth.Password, nil
}
var secret core.Secret
err := o.kc.Get(ctx, client.ObjectKey{Namespace: o.db.Namespace, Name: o.db.Spec.AuthSecret.Name}, &secret)

db := o.databaseRef

if db == nil {
return "", "", fmt.Errorf("there is no DatabaseReference found for pgBouncer %s/%s", o.pgbouncer.Namespace, o.pgbouncer.Name)
}
appBinding := &appbinding.AppBinding{}
err := o.kc.Get(o.ctx, types.NamespacedName{
Name: db.DatabaseRef.Name,
Namespace: db.DatabaseRef.Namespace,
}, appBinding)
if err != nil {
return "", "", err
}
return string(secret.Data[core.BasicAuthUsernameKey]), string(secret.Data[core.BasicAuthPasswordKey]), nil
}
if appBinding.Spec.Secret == nil {
return "", "", fmt.Errorf("backend postgres auth secret unspecified for pgBouncer %s/%s", o.pgbouncer.Namespace, o.pgbouncer.Name)
}

func (o *KubeDBClientBuilder) GetPgBouncerClient(ctx context.Context) (*Client, error) {
connector, err := o.getConnectionString(ctx)
var secret core.Secret
err = o.kc.Get(o.ctx, client.ObjectKey{Namespace: appBinding.Namespace, Name: appBinding.Spec.Secret.Name}, &secret)
if err != nil {
return nil, err
return "", "", err
}
// connect to database
db, err := sql.Open("postgres", connector)
if err != nil {
return nil, err

user, present := secret.Data[core.BasicAuthUsernameKey]
if !present {
return "", "", fmt.Errorf("error getting backend username")
}

// ping to database to check the connection
if _, err := db.QueryContext(ctx, "SHOW HELP;"); err != nil {
closeErr := db.Close()
if closeErr != nil {
klog.Errorf("Failed to close client. error: %v", closeErr)
}
return nil, err
pass, present := secret.Data[core.BasicAuthPasswordKey]
if !present {
return "", "", fmt.Errorf("error getting backend password")
}

return &Client{db}, nil
return string(user), string(pass), nil
}

func (o *KubeDBClientBuilder) getConnectionString(ctx context.Context) (string, error) {
func (o *KubeDBClientBuilder) getConnectionString() (string, error) {
user, pass, err := o.getBackendAuth()
if err != nil {
return "", err
}

if o.podName != "" {
o.url = o.getURL()
}
dnsName := o.url
port := api.PgBouncerDatabasePort

if o.pgBouncerDB == "" {
o.pgBouncerDB = DefaultPgBouncerDB
var listeningPort int = api.PgBouncerDatabasePort
if o.pgbouncer.Spec.ConnectionPool.Port != nil {
listeningPort = int(*o.pgbouncer.Spec.ConnectionPool.Port)
}
// TODO ssl mode is disable now need to work on this after adding tls support
connector := fmt.Sprintf("user=%s password=%s host=%s port=%d connect_timeout=10 dbname=%s sslmode=%s", user, pass, o.url, listeningPort, o.backendDBName, TLSModeDisable)
return connector, nil
}

user, pass, err := o.getPgBouncerAuthCredentials(ctx)
if err != nil {
return "", fmt.Errorf("DB basic auth is not found for PgBouncer %v/%v", o.db.Namespace, o.db.Name)
}
cnnstr := ""
sslMode := o.db.Spec.SSLMode
if sslMode == "" {
sslMode = api.PgBouncerSSLModeDisable
func GetXormClientList(kc client.Client, pb *api.PgBouncer, ctx context.Context, auth *Auth, dbName string) (*XormClientList, error) {
clientlist := &XormClientList{
List: []*XormClient{},
}
clientlist.context = ctx
clientlist.pb = pb
clientlist.auth = auth
clientlist.dbName = dbName

if o.db.Spec.TLS != nil {
paths, err := o.getTLSConfig(ctx)
for i := 0; int32(i) < *pb.Spec.Replicas; i++ {
podName := fmt.Sprintf("%s-%d", pb.OffshootName(), i)
pod := core.Pod{}
err := kc.Get(ctx, types.NamespacedName{Name: podName, Namespace: pb.Namespace}, &pod)
if err != nil {
return "", err
return clientlist, err
}
if o.db.Spec.ConnectionPool.AuthType == api.PgBouncerClientAuthModeCert || o.db.Spec.SSLMode == api.PgBouncerSSLModeVerifyCA || o.db.Spec.SSLMode == api.PgBouncerSSLModeVerifyFull {
cnnstr = fmt.Sprintf("user=%s password=%s host=%s port=%d connect_timeout=15 dbname=%s sslmode=%s sslrootcert=%s sslcert=%s sslkey=%s", user, pass, dnsName, port, o.pgBouncerDB, sslMode, paths.CACert, paths.Cert, paths.Key)
} else {
cnnstr = fmt.Sprintf("user=%s password=%s host=%s port=%d connect_timeout=15 dbname=%s sslmode=%s sslrootcert=%s", user, pass, dnsName, port, o.pgBouncerDB, sslMode, paths.CACert)
}
} else {
cnnstr = fmt.Sprintf("user=%s password=%s host=%s port=%d connect_timeout=15 dbname=%s sslmode=%s", user, pass, dnsName, port, o.pgBouncerDB, sslMode)
clientlist.Mutex.Lock()
clientlist.WG.Add(1)
clientlist.Mutex.Unlock()
go clientlist.addXormClient(kc, podName)
}
return cnnstr, nil
}

func (o *KubeDBClientBuilder) getTLSConfig(ctx context.Context) (*certholder.Paths, error) {
secretName := o.db.GetCertSecretName(api.PgBouncerClientCert)
clientlist.WG.Wait()

var certSecret core.Secret
err := o.kc.Get(ctx, client.ObjectKey{Namespace: o.db.Namespace, Name: secretName}, &certSecret)
if err != nil {
klog.Error(err, "failed to get certificate secret.", secretName)
return nil, err
if len(clientlist.List) != int(*pb.Spec.Replicas) {
return clientlist, fmt.Errorf("Failed to generate Xorm Client List")
}

certs, _ := certholder.DefaultHolder.ForResource(api.SchemeGroupVersion.WithResource(api.ResourcePluralPgBouncer), o.db.ObjectMeta)
paths, err := certs.Save(&certSecret)
return clientlist, nil
}

func (l *XormClientList) addXormClient(kc client.Client, podName string) {
xormClient, err := NewKubeDBClientBuilder(kc, l.pb).WithContext(l.context).WithDatabaseRef(&l.pb.Spec.Database).WithPod(podName).WithAuth(l.auth).WithPostgresDBName(l.dbName).GetPgBouncerXormClient()
l.Mutex.Lock()
defer l.Mutex.Unlock()
if err != nil {
klog.Error(err, "failed to save certificate")
return nil, err
klog.V(5).ErrorS(err, fmt.Sprintf("failed to create xorm client for pgbouncer %s/%s ", l.pb.Namespace, l.pb.Name))
} else {
l.List = append(l.List, xormClient)
}
return paths, nil
l.WG.Done()
}

0 comments on commit 1942b7b

Please sign in to comment.