Skip to content

Commit

Permalink
Merge pull request #7 from ganganxiaojiu/main
Browse files Browse the repository at this point in the history
NINE-9 support interactive sql
  • Loading branch information
nineinfra authored Dec 17, 2023
2 parents badd8f3 + 9934a8e commit 67dc9c6
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 19 deletions.
72 changes: 56 additions & 16 deletions cmd/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/duration"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/remotecommand"
Expand Down Expand Up @@ -73,20 +74,32 @@ func runExecCommand(pdName string, namespace string, tty bool, cmd []string) err
return err
}

var stdout bytes.Buffer
var stderr bytes.Buffer

err = executor.StreamWithContext(context.TODO(), remotecommand.StreamOptions{
Stdin: os.Stdin,
Stdout: &stdout,
Stderr: &stderr,
Tty: tty,
})
if DEBUG {
fmt.Printf("runExecCommand command output:%s,command err:%s,exec err:%s\n", stdout.String(), stderr.String(), err.Error())
}
if err != nil {
return errors.New(stderr.String())
if !tty {
var stdout bytes.Buffer
var stderr bytes.Buffer

err = executor.StreamWithContext(context.TODO(), remotecommand.StreamOptions{
Stdin: os.Stdin,
Stdout: &stdout,
Stderr: &stderr,
Tty: false,
})
if DEBUG {
fmt.Printf("runExecCommand command output:%s,command err:%s,exec err:%s\n", stdout.String(), stderr.String(), err.Error())
}
if err != nil {
return errors.New(stderr.String())
}
} else {
err = executor.StreamWithContext(context.TODO(), remotecommand.StreamOptions{
Stdin: os.Stdin,
Stdout: os.Stdout,
Stderr: os.Stderr,
Tty: true,
})
if err != nil {
return err
}
}
return nil
}
Expand Down Expand Up @@ -378,7 +391,34 @@ func GetSvcAccessInfo(svcName string, portName string, ns string) (string, int32
return accessIP, accessPort
}

func GenThriftSvcName(name string) string {
return name + DefaultNineSuffix + "-kyuubi"
}
func GetThriftIpAndPort(name string, ns string) (string, int32) {
svcName := name + DefaultNineSuffix + "-kyuubi"
return GetSvcAccessInfo(svcName, DefaultThriftPortName, ns)
return GetSvcAccessInfo(GenThriftSvcName(name), DefaultThriftPortName, ns)
}

func GetThriftPodName(name string, ns string) (string, error) {
path, _ := rootCmd.Flags().GetString(kubeconfig)
client, err := GetKubeClient(path)
if err != nil {
return "", err
}
svc, err := client.CoreV1().Services(ns).Get(context.TODO(), GenThriftSvcName(name), metav1.GetOptions{})
if err != nil {
return "", err
}
selector := labels.Set(svc.Spec.Selector).AsSelector()

pods, err := client.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{
LabelSelector: selector.String()})

if err != nil {
return "", err
}

if len(pods.Items) == 0 {
return "", errors.New("pod not found")
}
return pods.Items[0].Name, nil
}
37 changes: 34 additions & 3 deletions cmd/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package cmd

import (
"context"
"errors"
"fmt"
"github.com/beltran/gohive"
"github.com/olekukonko/tablewriter"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"io"
"os"
Expand All @@ -32,6 +32,8 @@ const (
type SqlOptions struct {
Name string
NS string
TTY bool
Silent bool
Statement string
}

Expand Down Expand Up @@ -65,6 +67,8 @@ func newClusterSqlCmd(out io.Writer, errOut io.Writer) *cobra.Command {
cmd = DisableHelp(cmd)
f := cmd.Flags()
f.StringVarP(&c.sqlOpts.NS, "namespace", "n", "", "k8s namespace for this ninecluster")
f.BoolVar(&c.sqlOpts.TTY, "tty", false, "interactive SQL operation")
f.BoolVar(&c.sqlOpts.Silent, "silent", true, "be more silent")
f.StringVarP(&c.sqlOpts.Statement, "statement", "s", "show databases", "simple sql statement")
return cmd
}
Expand All @@ -77,10 +81,29 @@ func (s *sqlCmd) validate(args []string) error {
return ValidateClusterArgs("sql", args)
}

func (s *sqlCmd) run(_ []string) error {
func (s *sqlCmd) interactiveSQL() error {
podName, err := GetThriftPodName(s.sqlOpts.Name, s.sqlOpts.NS)
if err != nil {
return err
}
thriftIP, thriftPort := GetThriftIpAndPort(s.sqlOpts.Name, s.sqlOpts.NS)
if thriftIP == "" || thriftPort == 0 {
return errors.New("Invalid Thrift Access Info!")
return errors.New("invalid Thrift Access Info")
}
pBeelineCmd := []string{"/opt/kyuubi/bin/beeline",
"-u", fmt.Sprintf("jdbc:hive2://%s:%d", thriftIP, thriftPort),
"--silent", fmt.Sprintf("%v", s.sqlOpts.Silent)}
err = runExecCommand(podName, s.sqlOpts.NS, true, pBeelineCmd)
if err != nil {
return err
}
return nil
}

func (s *sqlCmd) directSQL() error {
thriftIP, thriftPort := GetThriftIpAndPort(s.sqlOpts.Name, s.sqlOpts.NS)
if thriftIP == "" || thriftPort == 0 {
return errors.New("invalid Thrift Access Info")
}
conf := gohive.NewConnectConfiguration()
conn, err := gohive.Connect(thriftIP, int(thriftPort), "NONE", conf)
Expand Down Expand Up @@ -129,3 +152,11 @@ func (s *sqlCmd) run(_ []string) error {
}
return nil
}

func (s *sqlCmd) run(_ []string) error {
if !s.sqlOpts.TTY {
return s.directSQL()
} else {
return s.interactiveSQL()
}
}

0 comments on commit 67dc9c6

Please sign in to comment.