From a0f5644d3adf5097a30f927e81c60a5f7caa08ae Mon Sep 17 00:00:00 2001 From: Marco Manino Date: Mon, 22 Jul 2024 17:49:34 +0200 Subject: [PATCH] Improve list query --- pkg/kine/drivers/generic/generic.go | 149 +++++++++++++++------------- pkg/kine/drivers/sqlite/schema.go | 15 ++- pkg/kine/drivers/sqlite/sqlite.go | 7 +- test/list_test.go | 3 - test/util_test.go | 46 +++++---- 5 files changed, 121 insertions(+), 99 deletions(-) diff --git a/pkg/kine/drivers/generic/generic.go b/pkg/kine/drivers/generic/generic.go index 360efb9d..0e39c843 100644 --- a/pkg/kine/drivers/generic/generic.go +++ b/pkg/kine/drivers/generic/generic.go @@ -62,56 +62,56 @@ var ( columns = "kv.id as theid, kv.name, kv.created, kv.deleted, kv.create_revision, kv.prev_revision, kv.lease, kv.value, kv.old_value" revSQL = ` - SELECT MAX(rkv.id) AS id - FROM kine AS rkv` - + SELECT MAX(id) AS id + FROM kine` + + // listSQL query looks for the latest version of every row in + // the range and returns all columns from it. + // The search for the "latest id" (table `maxkv` in the query) + // can be carried on quickly with a covering index (kine_name_index). + // The `deleted <= ?` is used to select deleted rows: + // - when the argument is 0 (false), the only rows selected are + // those with deleted = 0 (i.e. alive) + // - when the argument is 1 (true), all rows will be selected, + // including deleted ones. + // Unfortunately, using a normal JOIN operation will confuse + // SQLite planner and insert a SORT temp table at the end of + // the plan, forcing SQLite to load and sort the entire set + // before returning it (and making the cost of a paginated + // query very high) and returning an unsorted set would make + // pagination impossible. + // To workaround this silly misplan, a ORDER by in the first + // table forces ordering of `maxkv` (without paying for it + // as it is the same order as the index) and CROSS JOIN is + // used as it forces SQLite to keep the outer-loop order + // when joining tables. See https://www.sqlite.org/optoverview.html#crossjoin + // for more details. listSQL = fmt.Sprintf(` - SELECT %s - FROM kine kv - JOIN ( - SELECT MAX(mkv.id) as id - FROM kine mkv + WITH maxkv AS ( + SELECT MAX(id) AS id + FROM kine WHERE - mkv.name >= ? AND mkv.name < ? + name >= ? AND name < ? %%s - GROUP BY mkv.name) maxkv + GROUP BY name + HAVING deleted <= ? + ORDER BY name + ) + SELECT %s + FROM maxkv CROSS JOIN kine kv ON maxkv.id = kv.id - WHERE - (kv.deleted = 0 OR ?) - ORDER BY kv.name ASC, kv.id ASC - `, columns) - - revisionAfterSQL = fmt.Sprintf(` - SELECT * - FROM ( - SELECT %s - FROM kine AS kv - JOIN ( - SELECT MAX(mkv.id) AS id - FROM kine AS mkv - WHERE mkv.name >= ? AND mkv.name < ? - AND mkv.id <= ? - GROUP BY mkv.name - ) AS maxkv - ON maxkv.id = kv.id - WHERE - ? OR kv.deleted = 0 - ) AS lkv - ORDER BY lkv.name ASC, lkv.theid ASC `, columns) revisionIntervalSQL = ` SELECT ( - SELECT crkv.prev_revision - FROM kine AS crkv - WHERE crkv.name = 'compact_rev_key' + SELECT prev_revision + FROM kine + WHERE name = 'compact_rev_key' ORDER BY prev_revision DESC LIMIT 1 ) AS low, ( - SELECT id + SELECT MAX(id) FROM kine - ORDER BY id - DESC LIMIT 1 ) AS high` ) @@ -138,7 +138,6 @@ type Generic struct { GetRevisionSQL string RevisionSQL string ListRevisionStartSQL string - GetRevisionAfterSQL string CountCurrentSQL string CountRevisionSQL string AfterSQLPrefix string @@ -224,40 +223,54 @@ func Open(ctx context.Context, driverName, dataSourceName string, paramCharacter DB: prepared.New(db, 100), GetRevisionSQL: q(fmt.Sprintf(` - SELECT - %s - FROM kine kv - WHERE kv.id = ?`, columns), paramCharacter, numbered), + SELECT %s + FROM kine AS kv + WHERE id = ?`, columns), paramCharacter, numbered), GetCurrentSQL: q(fmt.Sprintf(listSQL, ""), paramCharacter, numbered), - ListRevisionStartSQL: q(fmt.Sprintf(listSQL, "AND mkv.id <= ?"), paramCharacter, numbered), - GetRevisionAfterSQL: q(revisionAfterSQL, paramCharacter, numbered), + ListRevisionStartSQL: q(fmt.Sprintf(listSQL, "AND id <= ?"), paramCharacter, numbered), - CountCurrentSQL: q(fmt.Sprintf(` - SELECT (%s), COUNT(*) + CountCurrentSQL: q(` + SELECT ( + SELECT COALESCE(MAX(id), 0) AS id + FROM kine + ), COUNT(*) FROM ( - %s - ) c`, revSQL, fmt.Sprintf(listSQL, "")), paramCharacter, numbered), - - CountRevisionSQL: q(fmt.Sprintf(` - SELECT (%s), COUNT(c.theid) + SELECT MAX(id) AS id + FROM kine + WHERE + name >= ? AND name < ? + GROUP BY name + HAVING deleted = 0 + ) c`, paramCharacter, numbered), + + CountRevisionSQL: q(` + SELECT ( + SELECT COALESCE(MAX(id), 0) AS id + FROM kine + ), COUNT(*) FROM ( - %s - ) c`, revSQL, fmt.Sprintf(listSQL, "AND mkv.id <= ?")), paramCharacter, numbered), + SELECT MAX(id) AS id + FROM kine + WHERE + name >= ? AND name < ? + AND id <= ? + GROUP BY name + HAVING deleted = 0 + ) c`, paramCharacter, numbered), AfterSQLPrefix: q(fmt.Sprintf(` SELECT %s FROM kine AS kv - WHERE - kv.name >= ? AND kv.name < ? - AND kv.id > ? - ORDER BY kv.id ASC`, columns), paramCharacter, numbered), + WHERE name >= ? AND name < ? + AND id > ? + ORDER BY id ASC`, columns), paramCharacter, numbered), AfterSQL: q(fmt.Sprintf(` SELECT %s - FROM kine AS kv - WHERE kv.id > ? - ORDER BY kv.id ASC + FROM kine AS kv + WHERE id > ? + ORDER BY id ASC `, columns), paramCharacter, numbered), DeleteSQL: q(` @@ -408,7 +421,7 @@ func (d *Generic) Count(ctx context.Context, prefix, startKey string, revision i if startKey != "" { start = startKey + "\x01" } - rows, err := d.query(ctx, "count_revision", d.CountRevisionSQL, start, end, revision, false) + rows, err := d.query(ctx, "count_revision", d.CountRevisionSQL, start, end, revision) if err != nil { return 0, 0, err } @@ -528,19 +541,15 @@ func (d *Generic) ListCurrent(ctx context.Context, prefix, startKey string, limi func (d *Generic) List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeleted bool) (*sql.Rows, error) { start, end := getPrefixRange(prefix) - if startKey == "" { - sql := d.ListRevisionStartSQL - if limit > 0 { - sql = fmt.Sprintf("%s LIMIT %d", sql, limit) - } - return d.query(ctx, "list_revision_start_sql", sql, start, end, revision, includeDeleted) + if startKey != "" { + start = startKey + "\x01" } - sql := d.GetRevisionAfterSQL + sql := d.ListRevisionStartSQL if limit > 0 { sql = fmt.Sprintf("%s LIMIT %d", sql, limit) } - return d.query(ctx, "get_revision_after_sql", sql, startKey+"\x01", end, revision, includeDeleted) + return d.query(ctx, "list_revision_start_sql", sql, start, end, revision, includeDeleted) } func (d *Generic) CurrentRevision(ctx context.Context) (int64, error) { diff --git a/pkg/kine/drivers/sqlite/schema.go b/pkg/kine/drivers/sqlite/schema.go index 28b4d0e1..ee14d7db 100644 --- a/pkg/kine/drivers/sqlite/schema.go +++ b/pkg/kine/drivers/sqlite/schema.go @@ -11,7 +11,7 @@ import ( // present anymore, and unexpired rows of the key_value table with // the latest revisions must have been recorded in the Kine table // already -const databaseSchemaVersion = 1 +const databaseSchemaVersion = 2 // applySchemaV1 moves the schema from version 0 to version 1, // taking into account the possible unversioned schema from @@ -61,6 +61,19 @@ CREATE TABLE kine return nil } +// applySchemaV2 moves the schema from version 1 to version 2 +func applySchemaV2(ctx context.Context, txn *sql.Tx) error { + if _, err := txn.ExecContext(ctx, `DROP INDEX kine_name_index`); err != nil { + return err + } + + if _, err := txn.ExecContext(ctx, `CREATE UNIQUE INDEX kine_name_index ON kine(name, id, deleted)`); err != nil { + return err + } + + return nil +} + // hasTable checks if a table exists. func hasTable(ctx context.Context, txn *sql.Tx, tableName string) (bool, error) { // FIXME: why we can't use `pragma_table_list()`? Is dqlite/sqlite using diff --git a/pkg/kine/drivers/sqlite/sqlite.go b/pkg/kine/drivers/sqlite/sqlite.go index 5e31ad0d..9eadae3a 100644 --- a/pkg/kine/drivers/sqlite/sqlite.go +++ b/pkg/kine/drivers/sqlite/sqlite.go @@ -157,11 +157,12 @@ func migrate(ctx context.Context, txn *sql.Tx) error { return err } fallthrough + case 1: + if err := applySchemaV2(ctx, txn); err != nil { + return err + } case databaseSchemaVersion: break - default: - // FIXME this needs better handling - return errors.Errorf("unsupported version: %d", userVersion) } setUserVersionSQL := fmt.Sprintf(`PRAGMA user_version = %d`, databaseSchemaVersion) diff --git a/test/list_test.go b/test/list_test.go index d127bb24..d3006c21 100644 --- a/test/list_test.go +++ b/test/list_test.go @@ -183,17 +183,14 @@ func BenchmarkList(b *testing.B) { if err := insertMany(ctx, tx, "key", payloadSize, n); err != nil { return err } - b.Log("insert", n) if err := updateMany(ctx, tx, "key", payloadSize, n/2); err != nil { return err } - b.Log("update", n) if err := deleteMany(ctx, tx, "key", n/2); err != nil { return err } - b.Log("delete", n) return nil } backends := []string{endpoint.SQLiteBackend, endpoint.DQLiteBackend} diff --git a/test/util_test.go b/test/util_test.go index 6bee3970..442d3393 100644 --- a/test/util_test.go +++ b/test/util_test.go @@ -212,42 +212,44 @@ FROM gen_id, revision` func updateMany(ctx context.Context, tx *sql.Tx, prefix string, valueSize, n int) error { updateManyQuery := fmt.Sprintf(` +WITH maxkv AS ( + SELECT MAX(id) AS id + FROM kine + WHERE + ?||'/' <= name AND name < ?||'0' + GROUP BY name + HAVING deleted = 0 + ORDER BY name +) INSERT INTO kine( name, created, deleted, create_revision, prev_revision, lease, value, old_value ) SELECT kv.name, 0, 0, kv.create_revision, kv.id, 0, randomblob(?), kv.value -FROM kine AS kv -JOIN ( - SELECT MAX(mkv.id) as id - FROM kine mkv - WHERE ?||'/' <= mkv.name AND mkv.name < ?||'0' - GROUP BY mkv.name -) maxkv ON maxkv.id = kv.id -WHERE kv.deleted = 0 -ORDER BY kv.name -LIMIT %d - `, n) +FROM maxkv CROSS JOIN kine kv + ON maxkv.id = kv.id +LIMIT %d`, n) _, err := tx.ExecContext(ctx, updateManyQuery, valueSize, prefix, prefix) return err } func deleteMany(ctx context.Context, tx *sql.Tx, prefix string, n int) error { deleteManyQuery := fmt.Sprintf(` +WITH maxkv AS ( + SELECT MAX(id) AS id + FROM kine + WHERE + ?||'/' <= name AND name < ?||'0' + GROUP BY name + HAVING deleted = 0 + ORDER BY name +) INSERT INTO kine( name, created, deleted, create_revision, prev_revision, lease, value, old_value ) SELECT kv.name, 0, 1, kv.create_revision, kv.id, 0, kv.value, kv.value -FROM kine AS kv -JOIN ( - SELECT MAX(mkv.id) as id - FROM kine mkv - WHERE ?||'/' <= mkv.name AND mkv.name < ?||'0' - GROUP BY mkv.name -) maxkv ON maxkv.id = kv.id -WHERE kv.deleted = 0 -ORDER BY kv.name -LIMIT %d - `, n) +FROM maxkv CROSS JOIN kine kv + ON maxkv.id = kv.id +LIMIT %d`, n) _, err := tx.ExecContext(ctx, deleteManyQuery, prefix, prefix) return err }