Skip to content

Commit

Permalink
Add AddTagTTL and AddEdgeTTL for Session Pool (#323)
Browse files Browse the repository at this point in the history
  • Loading branch information
haoxins authored Feb 29, 2024
1 parent 5bfccb7 commit cefdede
Show file tree
Hide file tree
Showing 2 changed files with 186 additions and 15 deletions.
20 changes: 20 additions & 0 deletions session_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,16 @@ func (pool *SessionPool) CreateTag(tag LabelSchema) (*ResultSet, error) {
return rs, nil
}

func (pool *SessionPool) AddTagTTL(tagName string, colName string, duration uint) (*ResultSet, error) {
q := fmt.Sprintf(`ALTER TAG %s TTL_DURATION = %d, TTL_COL = "%s";`, tagName, duration, colName)
rs, err := pool.ExecuteAndCheck(q)
if err != nil {
return nil, err
}

return rs, nil
}

func (pool *SessionPool) DescTag(tagName string) ([]Label, error) {
q := fmt.Sprintf("DESC TAG %s;", tagName)
rs, err := pool.ExecuteAndCheck(q)
Expand Down Expand Up @@ -353,6 +363,16 @@ func (pool *SessionPool) CreateEdge(edge LabelSchema) (*ResultSet, error) {
return rs, nil
}

func (pool *SessionPool) AddEdgeTTL(tagName string, colName string, duration uint) (*ResultSet, error) {
q := fmt.Sprintf(`ALTER EDGE %s TTL_DURATION = %d, TTL_COL = "%s";`, tagName, duration, colName)
rs, err := pool.ExecuteAndCheck(q)
if err != nil {
return nil, err
}

return rs, nil
}

func (pool *SessionPool) DescEdge(edgeName string) ([]Label, error) {
q := fmt.Sprintf("DESC EDGE %s;", edgeName)
rs, err := pool.ExecuteAndCheck(q)
Expand Down
181 changes: 166 additions & 15 deletions session_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ func TestSessionPoolSpaceChange(t *testing.T) {
assert.Equal(t, resultSet.GetSpaceName(), "test_space_1", "space name should be test_space_1")
}

func TestSessionPoolApplySchema(t *testing.T) {
func TestSessionPoolCreateTagAndEdge(t *testing.T) {
spaceName := "test_space_schema"
err := prepareSpace(spaceName)
if err != nil {
Expand Down Expand Up @@ -415,27 +415,29 @@ func TestSessionPoolApplySchema(t *testing.T) {
},
},
}

_, err = sessionPool.CreateTag(tagSchema)
if err != nil {
t.Fatal(err)
}

tags, err := sessionPool.ShowTags()
if err != nil {
t.Fatal(err)
}
assert.Equal(t, 1, len(tags), "should have 1 tags")
assert.Equal(t, "account", tags[0].Name, "tag name should be account")
assert.Equal(t, 1, len(tags))
assert.Equal(t, "account", tags[0].Name)
labels, err := sessionPool.DescTag("account")
if err != nil {
t.Fatal(err)
}
assert.Equal(t, 3, len(labels), "should have 3 labels")
assert.Equal(t, "name", labels[0].Field, "field name should be name")
assert.Equal(t, "string", labels[0].Type, "field type should be string")
assert.Equal(t, "email", labels[1].Field, "field name should be email")
assert.Equal(t, "string", labels[1].Type, "field type should be string")
assert.Equal(t, "phone", labels[2].Field, "field name should be phone")
assert.Equal(t, "int64", labels[2].Type, "field type should be int64")
assert.Equal(t, 3, len(labels))
assert.Equal(t, "name", labels[0].Field)
assert.Equal(t, "string", labels[0].Type)
assert.Equal(t, "email", labels[1].Field)
assert.Equal(t, "string", labels[1].Type)
assert.Equal(t, "phone", labels[2].Field)
assert.Equal(t, "int64", labels[2].Type)

edgeSchema := LabelSchema{
Name: "account_email",
Expand All @@ -446,23 +448,172 @@ func TestSessionPoolApplySchema(t *testing.T) {
},
},
}

_, err = sessionPool.CreateEdge(edgeSchema)
if err != nil {
t.Fatal(err)
}

edges, err := sessionPool.ShowEdges()
if err != nil {
t.Fatal(err)
}
assert.Equal(t, 1, len(edges), "should have 1 edges")
assert.Equal(t, "account_email", edges[0].Name, "edge name should be account_email")
assert.Equal(t, 1, len(edges))
assert.Equal(t, "account_email", edges[0].Name)
labels, err = sessionPool.DescEdge("account_email")
if err != nil {
t.Fatal(err)
}
assert.Equal(t, 1, len(labels), "should have 1 labels")
assert.Equal(t, "email", labels[0].Field, "field name should be email")
assert.Equal(t, "string", labels[0].Type, "field type should be string")
assert.Equal(t, 1, len(labels))
assert.Equal(t, "email", labels[0].Field)
assert.Equal(t, "string", labels[0].Type)
}

func TestSessionPoolAddTTL(t *testing.T) {
spaceName := "test_space_ttl"
err := prepareSpace(spaceName)
if err != nil {
t.Fatal(err)
}
defer dropSpace(spaceName)

hostAddress := HostAddress{Host: address, Port: port}
config, err := NewSessionPoolConf(
"root",
"nebula",
[]HostAddress{hostAddress},
spaceName)
if err != nil {
t.Errorf("failed to create session pool config, %s", err.Error())
}

// allow only one session in the pool so it is easier to test
config.maxSize = 1

// create session pool
sessionPool, err := NewSessionPool(*config, DefaultLogger{})
if err != nil {
t.Fatal(err)
}
defer sessionPool.Close()

spaces, err := sessionPool.ShowSpaces()
if err != nil {
t.Fatal(err)
}
assert.LessOrEqual(t, 1, len(spaces), "should have at least 1 space")
var spaceNames []string
for _, space := range spaces {
spaceNames = append(spaceNames, space.Name)
}
assert.Contains(t, spaceNames, spaceName)

tagSchema := LabelSchema{
Name: "user",
Fields: []LabelFieldSchema{
{
Field: "created_at",
Type: "int64",
Nullable: false,
},
},
}

_, err = sessionPool.CreateTag(tagSchema)
if err != nil {
t.Fatal(err)
}

// Add TTL to tag
_, err = sessionPool.AddTagTTL("user", "created_at", 5)
if err != nil {
t.Fatal(err)
}

tags, err := sessionPool.ShowTags()
if err != nil {
t.Fatal(err)
}
assert.Equal(t, 1, len(tags))
assert.Equal(t, "user", tags[0].Name)
labels, err := sessionPool.DescTag("user")
if err != nil {
t.Fatal(err)
}
assert.Equal(t, 1, len(labels))
assert.Equal(t, "created_at", labels[0].Field)
assert.Equal(t, "int64", labels[0].Type)

edgeSchema := LabelSchema{
Name: "friend",
Fields: []LabelFieldSchema{
{
Field: "created_at",
Type: "int64",
Nullable: false,
},
},
}

_, err = sessionPool.CreateEdge(edgeSchema)
if err != nil {
t.Fatal(err)
}

// Add TTL to edge
_, err = sessionPool.AddEdgeTTL("friend", "created_at", 3)
if err != nil {
t.Fatal(err)
}

edges, err := sessionPool.ShowEdges()
if err != nil {
t.Fatal(err)
}
assert.Equal(t, 1, len(edges))
assert.Equal(t, "friend", edges[0].Name)
labels, err = sessionPool.DescEdge("friend")
if err != nil {
t.Fatal(err)
}
assert.Equal(t, 1, len(labels))
assert.Equal(t, "created_at", labels[0].Field)
assert.Equal(t, "int64", labels[0].Type)

// Wait for 5 seconds to wait for the schema is ready
time.Sleep(5 * time.Second)

now := time.Now().Unix()
// Insert vertices and edges
q := fmt.Sprintf(`INSERT VERTEX user(created_at) VALUES "test1":(%d);`, now)
_, err = sessionPool.ExecuteAndCheck(q)
if err != nil {
t.Fatal(err)
}
q = fmt.Sprintf(`INSERT VERTEX user(created_at) VALUES "test2":(%d);`, now)
_, err = sessionPool.ExecuteAndCheck(q)
if err != nil {
t.Fatal(err)

}
q = fmt.Sprintf(`INSERT EDGE friend(created_at) VALUES "test1" -> "test2":(%d);`, now)
_, err = sessionPool.ExecuteAndCheck(q)
if err != nil {
t.Fatal(err)
}

rs, err := sessionPool.ExecuteAndCheck(`FETCH PROP ON friend "test1" -> "test2" YIELD edge AS e;`)
if err != nil {
t.Fatal(err)
}
assert.Equal(t, 1, len(rs.GetRows()))
// Sleep for 5 seconds to wait for the tag to be expired
time.Sleep(5 * time.Second)
rs, err = sessionPool.ExecuteAndCheck(`FETCH PROP ON friend "test1" -> "test2" YIELD edge AS e;`)
if err != nil {
t.Fatal(err)
}
assert.Equal(t, 0, len(rs.GetRows()))
}

func TestIdleSessionCleaner(t *testing.T) {
Expand Down

0 comments on commit cefdede

Please sign in to comment.