Skip to content

Commit

Permalink
data: optimize ClickHouse queries via materialized views (#875)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhenghaoz authored Oct 26, 2024
1 parent 76a3ca5 commit b959e38
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 29 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ jobs:
--health-retries 5
clickhouse:
image: clickhouse/clickhouse-server:21.10
image: clickhouse/clickhouse-server:22
ports:
- 8123
options: >-
Expand Down
2 changes: 1 addition & 1 deletion client/docker-compose.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ services:
{% elif database == 'clickhouse' %}

clickhouse:
image: clickhouse/clickhouse-server:21.10
image: clickhouse/clickhouse-server:22
ports:
- 8123:8123
environment:
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ services:
# - mongo_data:/data/db

# clickhouse:
# image: clickhouse/clickhouse-server:21.10
# image: clickhouse/clickhouse-server:22
# ports:
# - 8123:8123
# environment:
Expand Down
6 changes: 5 additions & 1 deletion storage/data/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ package data
import (
"context"
"encoding/json"
"reflect"
"net/url"
"reflect"
"sort"
"strings"
"time"
Expand Down Expand Up @@ -147,6 +147,10 @@ type Feedback struct {
Comment string `gorm:"column:comment" mapsstructure:"comment"`
}

type UserFeedback Feedback

type ItemFeedback Feedback

// SortFeedbacks sorts feedback from latest to oldest.
func SortFeedbacks(feedback []Feedback) {
sort.Sort(feedbackSorter(feedback))
Expand Down
14 changes: 9 additions & 5 deletions storage/data/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ var (
positiveFeedbackType = "positiveFeedbackType"
negativeFeedbackType = "negativeFeedbackType"
duplicateFeedbackType = "duplicateFeedbackType"
dateTime64Zero = time.Date(1900, 1, 1, 0, 0, 0, 0, time.UTC)
)

type baseTestSuite struct {
Expand Down Expand Up @@ -279,8 +280,8 @@ func (suite *baseTestSuite) TestFeedback() {
suite.Equal(strconv.Itoa(i*2), item.ItemId)
if item.ItemId != "0" {
if suite.isClickHouse() {
// ClickHouse returns 1970-01-01 as zero date.
suite.Zero(item.Timestamp.Unix())
// ClickHouse returns 1900-01-01 00:00:00 +0000 UTC as zero date.
suite.Equal(dateTime64Zero, item.Timestamp)
} else {
suite.Zero(item.Timestamp)
}
Expand Down Expand Up @@ -310,9 +311,10 @@ func (suite *baseTestSuite) TestFeedback() {
// Get typed feedback by user
ret, err = suite.Database.GetUserFeedback(ctx, "2", lo.ToPtr(time.Now()), positiveFeedbackType)
suite.NoError(err)
suite.Equal(1, len(ret))
suite.Equal("2", ret[0].UserId)
suite.Equal("4", ret[0].ItemId)
if suite.Equal(1, len(ret)) {
suite.Equal("2", ret[0].UserId)
suite.Equal("4", ret[0].ItemId)
}
// Get all feedback by user
ret, err = suite.Database.GetUserFeedback(ctx, "2", lo.ToPtr(time.Now()))
suite.NoError(err)
Expand Down Expand Up @@ -579,6 +581,8 @@ func (suite *baseTestSuite) TestDeleteFeedback() {
// RowAffected isn't supported by ClickHouse,
suite.Equal(3, deleteCount)
}
err = suite.Database.Optimize()
suite.NoError(err)
ret, err = suite.Database.GetUserItemFeedback(ctx, "2", "3")
suite.NoError(err)
suite.Empty(ret)
Expand Down
110 changes: 90 additions & 20 deletions storage/data/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ type SQLDatabase struct {
// Optimize is used by ClickHouse only.
func (d *SQLDatabase) Optimize() error {
if d.driver == ClickHouse {
for _, tableName := range []string{d.UsersTable(), d.ItemsTable(), d.FeedbackTable()} {
for _, tableName := range []string{d.UsersTable(), d.ItemsTable(), d.FeedbackTable(), d.UserFeedbackTable(), d.ItemFeedbackTable()} {
_, err := d.client.Exec("OPTIMIZE TABLE " + tableName)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -225,7 +225,7 @@ func (d *SQLDatabase) Init() error {
ItemId string `gorm:"column:item_id;type:String"`
IsHidden int `gorm:"column:is_hidden;type:Boolean;default:0"`
Categories string `gorm:"column:categories;type:String;default:'[]'"`
Timestamp time.Time `gorm:"column:time_stamp;type:Datetime"`
Timestamp time.Time `gorm:"column:time_stamp;type:Datetime64(9,'UTC')"`
Labels string `gorm:"column:labels;type:String;default:'[]'"`
Comment string `gorm:"column:comment;type:String"`
Version struct{} `gorm:"column:version;type:DateTime"`
Expand All @@ -247,16 +247,37 @@ func (d *SQLDatabase) Init() error {
}
type Feedback struct {
FeedbackType string `gorm:"column:feedback_type;type:String"`
UserId string `gorm:"column:user_id;type:String;index:user_index,type:bloom_filter(0.01),granularity:1"`
ItemId string `gorm:"column:item_id;type:String;index:item_index,type:bloom_filter(0.01),granularity:1"`
Timestamp time.Time `gorm:"column:time_stamp;type:DateTime"`
UserId string `gorm:"column:user_id;type:String"`
ItemId string `gorm:"column:item_id;type:String"`
Timestamp time.Time `gorm:"column:time_stamp;type:DateTime64(9,'UTC')"`
Comment string `gorm:"column:comment;type:String"`
Version struct{} `gorm:"column:version;type:DateTime"`
}
err = d.gormDB.Set("gorm:table_options", "ENGINE = ReplacingMergeTree(version) ORDER BY (feedback_type, user_id, item_id)").AutoMigrate(Feedback{})
if err != nil {
return errors.Trace(err)
}
// create materialized views
type UserFeedback Feedback
err = d.gormDB.Set("gorm:table_options", "ENGINE = ReplacingMergeTree(version) ORDER BY (user_id, item_id, feedback_type)").AutoMigrate(UserFeedback{})
if err != nil {
return errors.Trace(err)
}
err = d.gormDB.Exec(fmt.Sprintf("CREATE MATERIALIZED VIEW IF NOT EXISTS %s_mv TO %s AS SELECT * FROM %s",
d.UserFeedbackTable(), d.UserFeedbackTable(), d.FeedbackTable())).Error
if err != nil {
return errors.Trace(err)
}
type ItemFeedback Feedback
err = d.gormDB.Set("gorm:table_options", "ENGINE = ReplacingMergeTree(version) ORDER BY (item_id, user_id, feedback_type)").AutoMigrate(ItemFeedback{})
if err != nil {
return errors.Trace(err)
}
err = d.gormDB.Exec(fmt.Sprintf("CREATE MATERIALIZED VIEW IF NOT EXISTS %s_mv TO %s AS SELECT * FROM %s",
d.ItemFeedbackTable(), d.ItemFeedbackTable(), d.FeedbackTable())).Error
if err != nil {
return errors.Trace(err)
}
}
return nil
}
Expand All @@ -271,15 +292,16 @@ func (d *SQLDatabase) Close() error {
}

func (d *SQLDatabase) Purge() error {
tables := []string{d.ItemsTable(), d.FeedbackTable(), d.UsersTable()}
if d.driver == ClickHouse {
tables := []string{d.ItemsTable(), d.FeedbackTable(), d.UsersTable(), d.UserFeedbackTable(), d.ItemFeedbackTable()}
for _, tableName := range tables {
err := d.gormDB.Exec(fmt.Sprintf("alter table %s delete where 1=1", tableName)).Error
if err != nil {
return errors.Trace(err)
}
}
} else {
tables := []string{d.ItemsTable(), d.FeedbackTable(), d.UsersTable()}
for _, tableName := range tables {
err := d.gormDB.Exec(fmt.Sprintf("DELETE FROM %s", tableName)).Error
if err != nil {
Expand Down Expand Up @@ -357,6 +379,14 @@ func (d *SQLDatabase) DeleteItem(ctx context.Context, itemId string) error {
if err := d.gormDB.WithContext(ctx).Delete(&Feedback{}, "item_id = ?", itemId).Error; err != nil {
return errors.Trace(err)
}
if d.driver == ClickHouse {
if err := d.gormDB.WithContext(ctx).Delete(&ItemFeedback{}, "item_id = ?", itemId).Error; err != nil {
return errors.Trace(err)
}
if err := d.gormDB.WithContext(ctx).Delete(&UserFeedback{}, "item_id = ?", itemId).Error; err != nil {
return errors.Trace(err)
}
}
return nil
}

Expand Down Expand Up @@ -492,10 +522,18 @@ func (d *SQLDatabase) GetItemStream(ctx context.Context, batchSize int, timeLimi

// GetItemFeedback returns feedback of a item from MySQL.
func (d *SQLDatabase) GetItemFeedback(ctx context.Context, itemId string, feedbackTypes ...string) ([]Feedback, error) {
tx := d.gormDB.WithContext(ctx).Table(d.FeedbackTable()).Select("user_id, item_id, feedback_type, time_stamp")
tx := d.gormDB.WithContext(ctx)
if d.driver == ClickHouse {
tx = tx.Table(d.ItemFeedbackTable())
} else {
tx = tx.Table(d.FeedbackTable())
}
tx = tx.Select("user_id, item_id, feedback_type, time_stamp")
switch d.driver {
case SQLite:
tx.Where("time_stamp <= DATETIME() AND item_id = ?", itemId)
case ClickHouse:
tx.Where("time_stamp <= NOW('UTC') AND item_id = ?", itemId)
default:
tx.Where("time_stamp <= NOW() AND item_id = ?", itemId)
}
Expand Down Expand Up @@ -559,6 +597,14 @@ func (d *SQLDatabase) DeleteUser(ctx context.Context, userId string) error {
if err := d.gormDB.WithContext(ctx).Delete(&Feedback{}, "user_id = ?", userId).Error; err != nil {
return errors.Trace(err)
}
if d.driver == ClickHouse {
if err := d.gormDB.WithContext(ctx).Delete(&ItemFeedback{}, "user_id = ?", userId).Error; err != nil {
return errors.Trace(err)
}
if err := d.gormDB.WithContext(ctx).Delete(&UserFeedback{}, "user_id = ?", userId).Error; err != nil {
return errors.Trace(err)
}
}
return nil
}

Expand Down Expand Up @@ -674,8 +720,13 @@ func (d *SQLDatabase) GetUserStream(ctx context.Context, batchSize int) (chan []

// GetUserFeedback returns feedback of a user from MySQL.
func (d *SQLDatabase) GetUserFeedback(ctx context.Context, userId string, endTime *time.Time, feedbackTypes ...string) ([]Feedback, error) {
tx := d.gormDB.WithContext(ctx).Table(d.FeedbackTable()).
Select("feedback_type, user_id, item_id, time_stamp, comment").
tx := d.gormDB.WithContext(ctx)
if d.driver == ClickHouse {
tx = tx.Table(d.UserFeedbackTable())
} else {
tx = tx.Table(d.FeedbackTable())
}
tx = tx.Select("feedback_type, user_id, item_id, time_stamp, comment").
Where("user_id = ?", userId)
if endTime != nil {
tx.Where("time_stamp <= ?", d.convertTimeZone(endTime))
Expand Down Expand Up @@ -955,8 +1006,13 @@ func (d *SQLDatabase) GetFeedbackStream(ctx context.Context, batchSize int, scan

// GetUserItemFeedback gets a feedback by user id and item id from MySQL.
func (d *SQLDatabase) GetUserItemFeedback(ctx context.Context, userId, itemId string, feedbackTypes ...string) ([]Feedback, error) {
tx := d.gormDB.WithContext(ctx).Table(d.FeedbackTable()).
Select("feedback_type, user_id, item_id, time_stamp, comment").
tx := d.gormDB.WithContext(ctx)
if d.driver == ClickHouse {
tx = tx.Table(d.UserFeedbackTable())
} else {
tx = tx.Table(d.FeedbackTable())
}
tx = tx.Select("feedback_type, user_id, item_id, time_stamp, comment").
Where("user_id = ? AND item_id = ?", userId, itemId)
if len(feedbackTypes) > 0 {
tx.Where("feedback_type IN ?", feedbackTypes)
Expand All @@ -979,18 +1035,32 @@ func (d *SQLDatabase) GetUserItemFeedback(ctx context.Context, userId, itemId st

// DeleteUserItemFeedback deletes a feedback by user id and item id from MySQL.
func (d *SQLDatabase) DeleteUserItemFeedback(ctx context.Context, userId, itemId string, feedbackTypes ...string) (int, error) {
tx := d.gormDB.WithContext(ctx).Where("user_id = ? AND item_id = ?", userId, itemId)
if len(feedbackTypes) > 0 {
tx.Where("feedback_type IN ?", feedbackTypes)
deleteUserItemFeedback := func(value any) (int, error) {
tx := d.gormDB.WithContext(ctx).Where("user_id = ? AND item_id = ?", userId, itemId)
if len(feedbackTypes) > 0 {
tx.Where("feedback_type IN ?", feedbackTypes)
}
tx.Delete(value)
if tx.Error != nil {
return 0, errors.Trace(tx.Error)
}
return int(tx.RowsAffected), nil
}
tx.Delete(&Feedback{})
if tx.Error != nil {
return 0, errors.Trace(tx.Error)
rowAffected, err := deleteUserItemFeedback(&Feedback{})
if err != nil {
return 0, errors.Trace(err)
}
if tx.Error != nil && d.driver != ClickHouse {
return 0, errors.Trace(tx.Error)
if d.driver == ClickHouse {
_, err = deleteUserItemFeedback(&UserFeedback{})
if err != nil {
return 0, errors.Trace(err)
}
_, err = deleteUserItemFeedback(&ItemFeedback{})
if err != nil {
return 0, errors.Trace(err)
}
}
return int(tx.RowsAffected), nil
return rowAffected, nil
}

func (d *SQLDatabase) convertTimeZone(timestamp *time.Time) time.Time {
Expand Down
10 changes: 10 additions & 0 deletions storage/scheme.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,16 @@ func (tp TablePrefix) FeedbackTable() string {
return string(tp) + "feedback"
}

// UserFeedbackTable returns the materialized view of user feedback.
func (tp TablePrefix) UserFeedbackTable() string {
return string(tp) + "user_feedback"
}

// ItemFeedbackTable returns the materialized view of item feedback.
func (tp TablePrefix) ItemFeedbackTable() string {
return string(tp) + "item_feedback"
}

func (tp TablePrefix) Key(key string) string {
return string(tp) + key
}
Expand Down

0 comments on commit b959e38

Please sign in to comment.