diff --git a/pkg/data/postgres.go b/pkg/data/postgres.go index 61c3af862..b6d5e1b4a 100644 --- a/pkg/data/postgres.go +++ b/pkg/data/postgres.go @@ -21,16 +21,14 @@ import ( "context" "fmt" "log" - "os" + "os/exec" "strconv" "strings" api "kubedb.dev/apimachinery/apis/kubedb/v1alpha2" cs "kubedb.dev/apimachinery/client/clientset/versioned" - "kubedb.dev/cli/pkg/lib" "github.com/spf13/cobra" - shell "gomodules.xyz/go-sh" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" @@ -38,15 +36,65 @@ import ( "k8s.io/client-go/rest" "k8s.io/klog/v2" cmdutil "k8s.io/kubectl/pkg/cmd/util" - "kmodules.xyz/client-go/tools/portforward" ) const ( - pgCaFile = "/tmp/root.crt" - pgCertFile = "/tmp/postgresql.crt" - pgKeyFile = "/tmp/postgresql.key" + pgCaFile = "/tls/certs/client/ca.crt" + pgCertFile = "/tls/certs/client/client.crt" + pgKeyFile = "/tls/certs/client/client.key" + rowLimit = 100000 ) +type postgresOpts struct { + db *api.Postgres + config *rest.Config + client *kubernetes.Clientset + dbClient *cs.Clientset + + username string + pass string +} + +func newPostgresOpts(f cmdutil.Factory, dbName, namespace string) (*postgresOpts, error) { + config, err := f.ToRESTConfig() + if err != nil { + return nil, err + } + + client, err := kubernetes.NewForConfig(config) + if err != nil { + return nil, err + } + + dbClient, err := cs.NewForConfig(config) + if err != nil { + return nil, err + } + + db, err := dbClient.KubedbV1alpha2().Postgreses(namespace).Get(context.TODO(), dbName, metav1.GetOptions{}) + if err != nil { + return nil, err + } + + if db.Status.Phase != api.DatabasePhaseReady { + return nil, fmt.Errorf("postgres %s/%s is not ready", namespace, dbName) + } + + secret, err := client.CoreV1().Secrets(db.Namespace).Get(context.TODO(), db.Spec.AuthSecret.Name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + + return &postgresOpts{ + db: db, + config: config, + client: client, + dbClient: dbClient, + username: string(secret.Data[corev1.BasicAuthUsernameKey]), + pass: string(secret.Data[corev1.BasicAuthPasswordKey]), + }, nil +} + func InsertPostgresDataCMD(f cmdutil.Factory) *cobra.Command { var ( dbName string @@ -79,14 +127,17 @@ func InsertPostgresDataCMD(f cmdutil.Factory) *cobra.Command { log.Fatalln(err) } - tunnel, err := lib.TunnelToDBService(opts.config, dbName, namespace, api.PostgresDatabasePort) - if err != nil { - log.Fatal("couldn't create tunnel, error: ", err) + if rows <= 0 { + log.Fatal("rows need to be greater than 0") } - defer tunnel.Close() - err = opts.insertDataExecCmd(tunnel, rows) - if err != nil { - log.Fatal(err) + + if rows <= rowLimit { + err = opts.insertDataExecCmd(rows) + if err != nil { + log.Fatal(err) + } + } else { + log.Printf("At most %v rows can be inserted per operation", rowLimit) } }, } @@ -96,21 +147,12 @@ func InsertPostgresDataCMD(f cmdutil.Factory) *cobra.Command { return pgInsertCmd } -func (opts *postgresOpts) insertDataExecCmd(tunnel *portforward.Tunnel, rows int) error { +func (opts *postgresOpts) insertDataExecCmd(rows int) error { if rows <= 0 { return fmt.Errorf("rows need to be greater than 0") } - - command := ` - DO $$ BEGIN - IF NOT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'appscode_kubedb_postgres_test_table') THEN - CREATE TABLE appscode_kubedb_postgres_test_table (value int not null); - END IF; - END $$; - ` + "\n" + - fmt.Sprintf("INSERT INTO appscode_kubedb_postgres_test_table (value) values (generate_series(1,%v));", rows) - - out, err := opts.executeCommand(tunnel.Local, command) + command := fmt.Sprintf(`create table if not exists appscode_kubedb_postgres_test_table (values int not null);insert into appscode_kubedb_postgres_test_table (values) values(generate_series(1,%v))`, rows) + out, err := opts.execCommand(command) if err != nil { return err } @@ -154,13 +196,7 @@ func VerifyPostgresDataCMD(f cmdutil.Factory) *cobra.Command { log.Fatalln(err) } - tunnel, err := lib.TunnelToDBService(opts.config, dbName, namespace, api.PostgresDatabasePort) - if err != nil { - log.Fatal("couldn't create tunnel, error: ", err) - } - defer tunnel.Close() - - err = opts.verifyDataExecCmd(tunnel, rows) + err = opts.verifyDataExecCmd(rows) if err != nil { log.Fatal(err) } @@ -171,17 +207,15 @@ func VerifyPostgresDataCMD(f cmdutil.Factory) *cobra.Command { return pgVerifyCmd } -func (opts *postgresOpts) verifyDataExecCmd(tunnel *portforward.Tunnel, rows int) error { +func (opts *postgresOpts) verifyDataExecCmd(rows int) error { if rows <= 0 { return fmt.Errorf("rows need to be greater than 0") } - - command := "SELECT COUNT(*) FROM appscode_kubedb_postgres_test_table;" - out, err := opts.executeCommand(tunnel.Local, command) + command := `SELECT COUNT(*) FROM appscode_kubedb_postgres_test_table` + out, err := opts.execCommand(command) if err != nil { return err } - output := strings.Split(out, "\n") found := strings.TrimSpace(output[2]) @@ -226,13 +260,7 @@ func DropPostgresDataCMD(f cmdutil.Factory) *cobra.Command { log.Fatalln(err) } - tunnel, err := lib.TunnelToDBService(opts.config, dbName, namespace, api.PostgresDatabasePort) - if err != nil { - log.Fatal("couldn't creat tunnel, error: ", err) - } - defer tunnel.Close() - - err = opts.dropDataExecCmd(tunnel) + err = opts.dropDataExecCmd() if err != nil { log.Fatal(err) } @@ -242,169 +270,59 @@ func DropPostgresDataCMD(f cmdutil.Factory) *cobra.Command { return pgDropCmd } -func (opts *postgresOpts) dropDataExecCmd(tunnel *portforward.Tunnel) error { - command := ` - DO $$ BEGIN - IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'appscode_kubedb_postgres_test_table') THEN - DROP TABLE appscode_kubedb_postgres_test_table; - END IF; - END $$; - ` - - _, err := opts.executeCommand(tunnel.Local, command) +func (opts *postgresOpts) dropDataExecCmd() error { + command := `DROP TABLE if exists appscode_kubedb_postgres_test_table` + _, err := opts.execCommand(command) if err != nil { return err } fmt.Printf("\nSuccess: All the CLI inserted rows DELETED from postgres database %s/%s \n", opts.db.Namespace, opts.db.Name) - return nil } -type postgresOpts struct { - db *api.Postgres - dbImage string - config *rest.Config - client *kubernetes.Clientset - dbClient *cs.Clientset - - username string - pass string - - errWriter *bytes.Buffer -} - -func newPostgresOpts(f cmdutil.Factory, dbName, namespace string) (*postgresOpts, error) { - config, err := f.ToRESTConfig() - if err != nil { - return nil, err - } - - client, err := kubernetes.NewForConfig(config) - if err != nil { - return nil, err - } - - dbClient, err := cs.NewForConfig(config) - if err != nil { - return nil, err - } - - db, err := dbClient.KubedbV1alpha2().Postgreses(namespace).Get(context.TODO(), dbName, metav1.GetOptions{}) - if err != nil { - return nil, err - } - - if db.Status.Phase != api.DatabasePhaseReady { - return nil, fmt.Errorf("postgres %s/%s is not ready", namespace, dbName) - } - - dbVersion, err := dbClient.CatalogV1alpha1().PostgresVersions().Get(context.TODO(), db.Spec.Version, metav1.GetOptions{}) - if err != nil { - return nil, err - } +func (opts *postgresOpts) execCommand(command string) (string, error) { + cmd := opts.getShellCommand(command) - secret, err := client.CoreV1().Secrets(db.Namespace).Get(context.TODO(), db.Spec.AuthSecret.Name, metav1.GetOptions{}) + output, err := opts.runCMD(cmd) if err != nil { - return nil, err + return "", err } - - return &postgresOpts{ - db: db, - dbImage: dbVersion.Spec.DB.Image, - config: config, - client: client, - dbClient: dbClient, - username: string(secret.Data[corev1.BasicAuthUsernameKey]), - pass: string(secret.Data[corev1.BasicAuthPasswordKey]), - errWriter: &bytes.Buffer{}, - }, nil + return string(output), nil } -func (opts *postgresOpts) getDockerShellCommand(localPort int, dockerFlags, postgresExtraFlags []interface{}) (*shell.Session, error) { - sh := shell.NewSession() - sh.ShowCMD = false - sh.Stderr = opts.errWriter - +func (opts *postgresOpts) getShellCommand(command string) string { db := opts.db - dockerCommand := []interface{}{ - "run", "--network=host", - "-e", fmt.Sprintf("PGPASSWORD=%s", opts.pass), - } - dockerCommand = append(dockerCommand, dockerFlags...) - - postgresCommand := []interface{}{ - "psql", - "--host=127.0.0.1", fmt.Sprintf("--port=%d", localPort), - fmt.Sprintf("--username=%s", opts.username), - } + svcName := fmt.Sprintf("svc/%s", db.Name) + cmd := "" if db.Spec.TLS != nil { - secretName := db.CertificateName(api.PostgresClientCert) - certSecret, err := opts.client.CoreV1().Secrets(db.Namespace).Get(context.TODO(), secretName, metav1.GetOptions{}) - if err != nil { - return nil, err - } - - caCrt, ok := certSecret.Data[corev1.ServiceAccountRootCAKey] - if !ok { - return nil, fmt.Errorf("missing %s in secret %s/%s", corev1.ServiceAccountRootCAKey, certSecret.Namespace, certSecret.Name) - } - err = os.WriteFile(pgCaFile, caCrt, 0o644) - if err != nil { - return nil, err + if db.Spec.ClientAuthMode == api.ClientAuthModeCert { + cmd = fmt.Sprintf("kubectl exec -n %s %s -c postgres -- env PGSSLMODE='%s' PGSSLROOTCERT='%s' PGSSLCERT='%s' PGSSLKEY='%s' PGPASSWORD='%s' psql -d postgres -U %s -c '%s'", db.Namespace, svcName, db.Spec.SSLMode, pgCaFile, pgCertFile, pgKeyFile, opts.pass, opts.username, command) + } else { + cmd = fmt.Sprintf("kubectl exec -n %s %s -c postgres -- env PGSSLMODE='%s' PGSSLROOTCERT='%s' PGPASSWORD='%s' psql -d postgres -U %s -c '%s'", db.Namespace, svcName, db.Spec.SSLMode, pgCaFile, opts.pass, opts.username, command) } - - crt, ok := certSecret.Data[corev1.TLSCertKey] - if !ok { - return nil, fmt.Errorf("missing %s in secret %s/%s", corev1.TLSCertKey, certSecret.Namespace, certSecret.Name) - } - err = os.WriteFile(pgCertFile, crt, 0o644) - if err != nil { - return nil, err - } - - key, ok := certSecret.Data[corev1.TLSPrivateKeyKey] - if !ok { - return nil, fmt.Errorf("missing %s in secret %s/%s", corev1.TLSPrivateKeyKey, certSecret.Namespace, certSecret.Name) - } - err = os.WriteFile(pgKeyFile, key, 0o600) - if err != nil { - return nil, err - } - - dockerCommand = append(dockerCommand, - "-v", fmt.Sprintf("%s:%s", "/tmp/", "/root/.postgresql/"), - ) + } else { + cmd = fmt.Sprintf("kubectl exec -n %s %s -c postgres -- env PGSSLMODE=%s PGPASSWORD='%s' psql -d postgres -U %s -c '%s'", db.Namespace, svcName, db.Spec.SSLMode, opts.pass, opts.username, command) } - dockerCommand = append(dockerCommand, opts.dbImage) - finalCommand := append(dockerCommand, postgresCommand...) - if postgresExtraFlags != nil { - finalCommand = append(finalCommand, postgresExtraFlags...) - } - return sh.Command("docker", finalCommand...).SetStdin(os.Stdin), nil + return cmd } -func (opts *postgresOpts) executeCommand(localPort int, command string) (string, error) { - postgresExtraFlags := []interface{}{ - fmt.Sprintf("--command=%s", command), - } - shSession, err := opts.getDockerShellCommand(localPort, nil, postgresExtraFlags) - if err != nil { - return "", err +func (opts *postgresOpts) runCMD(cmd string) ([]byte, error) { + sh := exec.Command("/bin/sh", "-c", cmd) + stdout := bytes.NewBuffer(nil) + stderr := bytes.NewBuffer(nil) + sh.Stdout = stdout + sh.Stderr = stderr + err := sh.Run() + out := stdout.Bytes() + errOut := stderr.Bytes() + errOutput := string(errOut) + if errOutput != "" && !strings.Contains(errOutput, "NOTICE") { + return nil, fmt.Errorf("failed to execute command, stderr: %s", errOutput) } - out, err := shSession.Output() if err != nil { - return "", fmt.Errorf("failed to execute command, error: %s, output: %s\n", err, out) - } - output := "" - if string(out) != "" { - output = string(out) - } - errOutput := opts.errWriter.String() - if errOutput != "" { - return "", fmt.Errorf("failed to execute command, stderr: %s%s", errOutput, output) + return nil, err } - - return string(out), nil + return out, nil }