Skip to content

Commit

Permalink
create default pool and session for nebula
Browse files Browse the repository at this point in the history
  • Loading branch information
khareyash05 committed Oct 22, 2023
1 parent 8464b60 commit 12538f7
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ data:
internalstorage-config.yaml: |
type: "nebula"
host: "clusterpedia-internalstorage-nebula"
port: 3699
port: 9669
user: nebula
database: "clusterpedia"
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ metadata:
namespace: clusterpedia-system
spec:
ports:
- port: 3699
- port: 9669
selector:
app: clusterpedia-internalstorage
internalstorage.clusterpedia.io/type: nebula
Expand Down Expand Up @@ -59,7 +59,7 @@ spec:
key: password
ports:
- name: nebula
containerPort: 3699
containerPort: 9669
volumeMounts:
- name: data
mountPath: /var/lib/nebulaql/data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ data:
internalstorage-config.yaml: |
type: "nebula"
host: "clusterpedia-internalstorage-nebula"
port: 3699
port: 9669
user: nebula
database: "clusterpedia"
33 changes: 14 additions & 19 deletions pkg/storage/internalstorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/go-sql-driver/mysql"
"github.com/jackc/pgx/v4"
gnebula "github.com/vesoft-inc/nebula-go/v3"
"gopkg.in/natefinch/lumberjack.v2"
"gorm.io/gorm/logger"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -94,7 +95,6 @@ type NebulaConfig struct {
IdleTime time.Duration `yaml:"idleTime"`
MaxConnPoolSize int `yaml:"maxConnPoolSize"`
MinConnPoolSize int `yaml:"minConnPoolSize"`
UseHTTP2 bool `yaml:"useHTTP2"`
}

type ConnPoolConfig struct {
Expand Down Expand Up @@ -299,26 +299,21 @@ func (cfg *Config) genSQLiteDSN() (string, error) {
return cfg.DSN, nil
}

func (cfg *Config) genNebulaDSN() (string, error) {
// Initialize logger for nebula
var nebulalog = gnebula.DefaultLogger{}

func (cfg *Config) genNebulaConfig() (*gnebula.PoolConfig, error) {
if cfg.DSN == "" {
return "", errors.New("nebula: dsn is required")
}
if cfg.Nebula.IdleTime <= 0 {
return "", errors.New("nebula:Idle Time can't be less than or equal to zero")
}
if cfg.Nebula.MaxConnPoolSize <= 0 {
return "", errors.New("nebula: Max connection pool size can't be less than or equal to zero")
}
if cfg.Nebula.MinConnPoolSize <= 0 {
return "", errors.New("nebula: Min connection pool size can't be less than or equal to zero")
}
if cfg.Nebula.TimeOut <= 0 {
return "", errors.New("nebula:Time Out can't be less than or equal to zero")
return nil, errors.New("nebula: dsn is required")
}
if !cfg.Nebula.UseHTTP2 {
return "", errors.New("nebula:UseHTTP2 can't be false")
}
return cfg.DSN, nil

nebulaPoolConfig := gnebula.GetDefaultConf()
nebulaPoolConfig.IdleTime = cfg.Nebula.IdleTime
nebulaPoolConfig.MaxConnPoolSize = cfg.Nebula.MaxConnPoolSize
nebulaPoolConfig.MinConnPoolSize = cfg.Nebula.MinConnPoolSize
nebulaPoolConfig.TimeOut = cfg.Nebula.TimeOut

return &nebulaPoolConfig, nil
}

func (cfg *Config) addMysqlErrorNumbers() {
Expand Down
37 changes: 31 additions & 6 deletions pkg/storage/internalstorage/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (
"io"
"log"
"os"
"strconv"

"github.com/go-sql-driver/mysql"
"github.com/jackc/pgx/v4/stdlib"
"github.com/jinzhu/configor"
gnebula "github.com/vesoft-inc/nebula-go/v3"
"gopkg.in/natefinch/lumberjack.v2"
gmysql "gorm.io/driver/mysql"
gpostgres "gorm.io/driver/postgres"
Expand Down Expand Up @@ -68,12 +70,35 @@ func NewStorageFactory(configPath string) (storage.StorageFactory, error) {
return nil, err
}
dialector = gsqlite.Open(dsn)
// case "nebula":
// dsn, err := cfg.genNebulaDSN()
// if err != nil {
// return nil, err
// }
// conn,err := gnebula.NewConnectionPool()
case "nebula":
nebulaconfig, err := cfg.genNebulaConfig()
if err != nil {
return nil, err
}
// TODO : dialector like for nebula
port, err := strconv.Atoi(cfg.Port)
if err != nil {
return nil, err
}

hostAddress := gnebula.HostAddress{Host: cfg.Host, Port: port}
hostList := []gnebula.HostAddress{hostAddress}
// Initialize connection pool
pool, err := gnebula.NewConnectionPool(hostList, *nebulaconfig, nebulalog)
if err != nil {
nebulalog.Fatal(fmt.Sprintf("Fail to initialize the connection pool, host: %s, port: %d, %s", cfg.Host, port, err.Error()))
}
// Close all connections in the pool
defer pool.Close()

// Create session
session, err := pool.GetSession(cfg.User, cfg.Password)
if err != nil {
nebulalog.Fatal(fmt.Sprintf("Fail to create a new session from connection pool, username: %s, password: %s, %s",
cfg.User, cfg.Password, err.Error()))
}
// Release session and return connection back to connection pool
defer session.Release()
default:
return nil, fmt.Errorf("not support storage type: %s", cfg.Type)
}
Expand Down

0 comments on commit 12538f7

Please sign in to comment.