From 12538f7dcabe20640ff69c9f3189468b7a281230 Mon Sep 17 00:00:00 2001 From: Yash Khare Date: Sun, 22 Oct 2023 23:51:34 +0530 Subject: [PATCH] create default pool and session for nebula --- ...lusterpedia_internalstorage_configmap.yaml | 2 +- ...usterpedia_internalstorage_deployment.yaml | 4 +- ...lusterpedia_internalstorage_configmap.yaml | 2 +- pkg/storage/internalstorage/config.go | 33 +++++++---------- pkg/storage/internalstorage/register.go | 37 ++++++++++++++++--- 5 files changed, 49 insertions(+), 29 deletions(-) diff --git a/deploy/internalstorage/nebula/clusterpedia_internalstorage_configmap.yaml b/deploy/internalstorage/nebula/clusterpedia_internalstorage_configmap.yaml index 2fa1de048..8a1ea01ef 100644 --- a/deploy/internalstorage/nebula/clusterpedia_internalstorage_configmap.yaml +++ b/deploy/internalstorage/nebula/clusterpedia_internalstorage_configmap.yaml @@ -12,6 +12,6 @@ data: internalstorage-config.yaml: | type: "nebula" host: "clusterpedia-internalstorage-nebula" - port: 3699 + port: 9669 user: nebula database: "clusterpedia" diff --git a/deploy/internalstorage/nebula/clusterpedia_internalstorage_deployment.yaml b/deploy/internalstorage/nebula/clusterpedia_internalstorage_deployment.yaml index 4383373f5..75ccd72ef 100644 --- a/deploy/internalstorage/nebula/clusterpedia_internalstorage_deployment.yaml +++ b/deploy/internalstorage/nebula/clusterpedia_internalstorage_deployment.yaml @@ -5,7 +5,7 @@ metadata: namespace: clusterpedia-system spec: ports: - - port: 3699 + - port: 9669 selector: app: clusterpedia-internalstorage internalstorage.clusterpedia.io/type: nebula @@ -59,7 +59,7 @@ spec: key: password ports: - name: nebula - containerPort: 3699 + containerPort: 9669 volumeMounts: - name: data mountPath: /var/lib/nebulaql/data diff --git a/kustomize/internalstorage/nebula/clusterpedia_internalstorage_configmap.yaml b/kustomize/internalstorage/nebula/clusterpedia_internalstorage_configmap.yaml index 485dadf36..8abbbf636 100644 --- a/kustomize/internalstorage/nebula/clusterpedia_internalstorage_configmap.yaml +++ b/kustomize/internalstorage/nebula/clusterpedia_internalstorage_configmap.yaml @@ -7,6 +7,6 @@ data: internalstorage-config.yaml: | type: "nebula" host: "clusterpedia-internalstorage-nebula" - port: 3699 + port: 9669 user: nebula database: "clusterpedia" diff --git a/pkg/storage/internalstorage/config.go b/pkg/storage/internalstorage/config.go index 29d062512..60965c4ee 100644 --- a/pkg/storage/internalstorage/config.go +++ b/pkg/storage/internalstorage/config.go @@ -12,6 +12,7 @@ import ( "github.com/go-sql-driver/mysql" "github.com/jackc/pgx/v4" + gnebula "github.com/vesoft-inc/nebula-go/v3" "gopkg.in/natefinch/lumberjack.v2" "gorm.io/gorm/logger" "k8s.io/klog/v2" @@ -94,7 +95,6 @@ type NebulaConfig struct { IdleTime time.Duration `yaml:"idleTime"` MaxConnPoolSize int `yaml:"maxConnPoolSize"` MinConnPoolSize int `yaml:"minConnPoolSize"` - UseHTTP2 bool `yaml:"useHTTP2"` } type ConnPoolConfig struct { @@ -299,26 +299,21 @@ func (cfg *Config) genSQLiteDSN() (string, error) { return cfg.DSN, nil } -func (cfg *Config) genNebulaDSN() (string, error) { +// Initialize logger for nebula +var nebulalog = gnebula.DefaultLogger{} + +func (cfg *Config) genNebulaConfig() (*gnebula.PoolConfig, error) { if cfg.DSN == "" { - return "", errors.New("nebula: dsn is required") - } - if cfg.Nebula.IdleTime <= 0 { - return "", errors.New("nebula:Idle Time can't be less than or equal to zero") - } - if cfg.Nebula.MaxConnPoolSize <= 0 { - return "", errors.New("nebula: Max connection pool size can't be less than or equal to zero") - } - if cfg.Nebula.MinConnPoolSize <= 0 { - return "", errors.New("nebula: Min connection pool size can't be less than or equal to zero") - } - if cfg.Nebula.TimeOut <= 0 { - return "", errors.New("nebula:Time Out can't be less than or equal to zero") + return nil, errors.New("nebula: dsn is required") } - if !cfg.Nebula.UseHTTP2 { - return "", errors.New("nebula:UseHTTP2 can't be false") - } - return cfg.DSN, nil + + nebulaPoolConfig := gnebula.GetDefaultConf() + nebulaPoolConfig.IdleTime = cfg.Nebula.IdleTime + nebulaPoolConfig.MaxConnPoolSize = cfg.Nebula.MaxConnPoolSize + nebulaPoolConfig.MinConnPoolSize = cfg.Nebula.MinConnPoolSize + nebulaPoolConfig.TimeOut = cfg.Nebula.TimeOut + + return &nebulaPoolConfig, nil } func (cfg *Config) addMysqlErrorNumbers() { diff --git a/pkg/storage/internalstorage/register.go b/pkg/storage/internalstorage/register.go index 932c0ed70..99c3312de 100644 --- a/pkg/storage/internalstorage/register.go +++ b/pkg/storage/internalstorage/register.go @@ -6,10 +6,12 @@ import ( "io" "log" "os" + "strconv" "github.com/go-sql-driver/mysql" "github.com/jackc/pgx/v4/stdlib" "github.com/jinzhu/configor" + gnebula "github.com/vesoft-inc/nebula-go/v3" "gopkg.in/natefinch/lumberjack.v2" gmysql "gorm.io/driver/mysql" gpostgres "gorm.io/driver/postgres" @@ -68,12 +70,35 @@ func NewStorageFactory(configPath string) (storage.StorageFactory, error) { return nil, err } dialector = gsqlite.Open(dsn) - // case "nebula": - // dsn, err := cfg.genNebulaDSN() - // if err != nil { - // return nil, err - // } - // conn,err := gnebula.NewConnectionPool() + case "nebula": + nebulaconfig, err := cfg.genNebulaConfig() + if err != nil { + return nil, err + } + // TODO : dialector like for nebula + port, err := strconv.Atoi(cfg.Port) + if err != nil { + return nil, err + } + + hostAddress := gnebula.HostAddress{Host: cfg.Host, Port: port} + hostList := []gnebula.HostAddress{hostAddress} + // Initialize connection pool + pool, err := gnebula.NewConnectionPool(hostList, *nebulaconfig, nebulalog) + if err != nil { + nebulalog.Fatal(fmt.Sprintf("Fail to initialize the connection pool, host: %s, port: %d, %s", cfg.Host, port, err.Error())) + } + // Close all connections in the pool + defer pool.Close() + + // Create session + session, err := pool.GetSession(cfg.User, cfg.Password) + if err != nil { + nebulalog.Fatal(fmt.Sprintf("Fail to create a new session from connection pool, username: %s, password: %s, %s", + cfg.User, cfg.Password, err.Error())) + } + // Release session and return connection back to connection pool + defer session.Release() default: return nil, fmt.Errorf("not support storage type: %s", cfg.Type) }