Skip to content

Commit

Permalink
make assigning rowid safe for concurrent access
Browse files Browse the repository at this point in the history
  • Loading branch information
dgllghr committed Feb 15, 2024
1 parent 18a0915 commit 58ddc35
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 94 deletions.
36 changes: 3 additions & 33 deletions src/PendingInserts.zig
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ const SortKeyRange = index_mod.SortKeyRange;

ctx: *const VtabCtx,

table_data: *TableData,
next_rowid: i64,

insert_stmt: StmtCell,
cursor_from_start: StmtPool,
cursor_from_key: StmtPool,
Expand All @@ -42,29 +39,15 @@ const Self = @This();
const StmtCell = prep_stmt.Cell(VtabCtx);
const StmtPool = prep_stmt.Pool(VtabCtx);

pub fn init(
allocator: Allocator,
tmp_arena: *ArenaAllocator,
ctx: *const VtabCtx,
table_data: *TableData,
) !Self {
var self = Self{
pub fn init(allocator: Allocator, ctx: *const VtabCtx) !Self {
return Self{
.ctx = ctx,
.table_data = table_data,
.next_rowid = 1,
.insert_stmt = StmtCell.init(&insertDml),
.cursor_from_start = StmtPool.init(allocator, &cursorFromStartQuery),
.cursor_from_key = StmtPool.init(allocator, &cursorFromKeyQuery),
.delete_from = StmtCell.init(&deleteFromQuery),
.delete_range = StmtCell.init(&deleteRangeQuery),
};

// Initialize the rowid to the proper value if the table exists
if (try self.table().checkExists(tmp_arena)) {
try self.loadNextRowid(tmp_arena);
}

return self;
}

pub fn deinit(self: *Self) void {
Expand Down Expand Up @@ -143,10 +126,7 @@ pub fn table(self: Self) ShadowTable {
return .{ .ctx = self.ctx };
}

pub fn insert(self: *Self, tmp_arena: *ArenaAllocator, values: anytype) !i64 {
const rowid = self.next_rowid;
self.next_rowid += 1;

pub fn insert(self: *Self, tmp_arena: *ArenaAllocator, rowid: i64, values: anytype) !void {
const stmt = try self.insert_stmt.acquire(tmp_arena, self.ctx.*);
defer self.insert_stmt.release();

Expand All @@ -157,8 +137,6 @@ pub fn insert(self: *Self, tmp_arena: *ArenaAllocator, values: anytype) !i64 {
}

try stmt.exec();

return rowid;
}

fn insertDml(ctx: VtabCtx, arena: *ArenaAllocator) ![]const u8 {
Expand Down Expand Up @@ -193,14 +171,6 @@ test "pending inserts: insert dml" {
try testing.expectEqualSlices(u8, expected, result);
}

pub fn persistNextRowid(self: *Self, tmp_arena: *ArenaAllocator) !void {
try self.table_data.writeInt(tmp_arena, .next_rowid, self.next_rowid);
}

pub fn loadNextRowid(self: *Self, tmp_arena: *ArenaAllocator) !void {
self.next_rowid = (try self.table_data.readInt(tmp_arena, .next_rowid)) orelse 1;
}

pub const Cursor = struct {
stmt: Stmt,
pool: ?*StmtPool,
Expand Down
111 changes: 65 additions & 46 deletions src/Table.zig
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,15 @@ pending_inserts: PendingInserts,

row_group_creator: RowGroupCreator,

dirty: bool,
/// In order to synchronize `next_rowid` across multiple connections to the same sqlite database,
/// it is loaded from the table data when the first insert in a transaction occurs. This is safe
/// because only one write transaction can be active in a sqlite database at a time. The in memory
/// value of `next_rowid` is incremented and used as additional inserts happen within a transaction
/// so that rowid generation is fast for subsequent inserts. When the transaction is about to
/// commit, `next_rowid` is persisted to the table data and cleared from memory. This process
/// allows `next_rowid` to be used as a "dirty" flag to see if any writes occurred in a
/// transaction.
next_rowid: ?i64,
warned_update_delete_not_supported: bool = false,

pub const InitError = error{
Expand Down Expand Up @@ -138,12 +146,7 @@ pub fn create(
);
};

self.pending_inserts = PendingInserts.init(
allocator,
cb_ctx.arena,
&self.ctx,
&self.table_data,
) catch |e| {
self.pending_inserts = PendingInserts.init(allocator, &self.ctx) catch |e| {
return cb_ctx.captureErrMsg(e, "failed to init pending inserts", .{});
};
errdefer self.pending_inserts.deinit();
Expand All @@ -167,7 +170,7 @@ pub fn create(
return cb_ctx.captureErrMsg(e, "failed to init row group creator", .{});
};

self.dirty = false;
self.next_rowid = null;
}

test "create table" {
Expand Down Expand Up @@ -253,12 +256,7 @@ pub fn connect(
return cb_ctx.captureErrMsg(e, "`{s}_rowgroupindex` shadow table does not exist", .{name});
};

self.pending_inserts = PendingInserts.init(
allocator,
cb_ctx.arena,
&self.ctx,
&self.table_data,
) catch |e| {
self.pending_inserts = PendingInserts.init(allocator, &self.ctx) catch |e| {
return cb_ctx.captureErrMsg(e, "failed to init pending inserts", .{});
};
errdefer self.pending_inserts.deinit();
Expand All @@ -282,7 +280,7 @@ pub fn connect(
return cb_ctx.captureErrMsg(e, "failed to init row group creator", .{});
};

self.dirty = false;
self.next_rowid = null;
}

pub fn disconnect(self: *Self) void {
Expand Down Expand Up @@ -358,6 +356,18 @@ pub fn rename(self: *Self, cb_ctx: *vtab.CallbackContext, new_name: [:0]const u8
return;
}

pub fn bestIndex(
self: *Self,
cb_ctx: *vtab.CallbackContext,
best_index_info: vtab.BestIndexInfo,
) !bool {
index.chooseBestIndex(cb_ctx.arena, self.ctx.sortKey(), best_index_info) catch |e| {
return cb_ctx.captureErrMsg(e, "error occurred while choosing the best index", .{});
};
// There is always a query solution because a table scan always works
return true;
}

pub fn update(
self: *Self,
cb_ctx: *vtab.CallbackContext,
Expand All @@ -366,10 +376,18 @@ pub fn update(
) !void {
const change_type = change_set.changeType();
if (change_type == .Insert) {
rowid.* = self.pending_inserts.insert(cb_ctx.arena, change_set) catch |e| {
if (self.next_rowid == null) {
self.loadNextRowid(cb_ctx.arena) catch |e| {
return cb_ctx.captureErrMsg(e, "error loading the next rowid", .{});
};
}
rowid.* = self.next_rowid.?;
self.next_rowid.? += 1;

self.pending_inserts.insert(cb_ctx.arena, rowid.*, change_set) catch |e| {
return cb_ctx.captureErrMsg(e, "error inserting into pending inserts", .{});
};
self.dirty = true;

return;
}

Expand All @@ -379,42 +397,27 @@ pub fn update(
}
}

pub fn bestIndex(
self: *Self,
cb_ctx: *vtab.CallbackContext,
best_index_info: vtab.BestIndexInfo,
) !bool {
index.chooseBestIndex(cb_ctx.arena, self.ctx.sortKey(), best_index_info) catch |e| {
return cb_ctx.captureErrMsg(e, "error occurred while choosing the best index", .{});
};
// There is always a query solution because a table scan always works
return true;
}

pub fn begin(_: *Self, _: *vtab.CallbackContext) !void {
log.debug("txn begin", .{});
}

pub fn sync(self: *Self, cb_ctx: *vtab.CallbackContext) !void {
_ = cb_ctx;
_ = self;
if (self.next_rowid) |_| {
try self.row_group_creator.createAll(cb_ctx.arena);
try self.unloadNextRowid(cb_ctx.arena);
}
}

pub fn commit(self: *Self, cb_ctx: *vtab.CallbackContext) !void {
_ = cb_ctx;
_ = self;
log.debug("txn commit", .{});
if (self.dirty) {
try self.pending_inserts.persistNextRowid(cb_ctx.arena);
// TODO should this be called in sync so that an error causes the transaction to be
// aborted?
try self.row_group_creator.createAll(cb_ctx.arena);
self.dirty = false;
}
}

pub fn rollback(self: *Self, cb_ctx: *vtab.CallbackContext) !void {
pub fn rollback(self: *Self, _: *vtab.CallbackContext) !void {
log.debug("txn rollback", .{});
if (self.dirty) {
try self.pending_inserts.loadNextRowid(cb_ctx.arena);
if (self.next_rowid) |_| {
self.clearNextRowid();
}
}

Expand All @@ -424,18 +427,34 @@ pub fn savepoint(_: *Self, _: *vtab.CallbackContext, savepoint_id: i32) !void {

pub fn release(self: *Self, cb_ctx: *vtab.CallbackContext, savepoint_id: i32) !void {
log.debug("txn savepoint {d} release", .{savepoint_id});
if (self.dirty) {
try self.pending_inserts.persistNextRowid(cb_ctx.arena);
if (self.next_rowid) |_| {
// TODO Is this necessary? Releasing the savepoint does not end the transaction so I don't
// think it is necessary to persist and clear the next rowid.
try self.unloadNextRowid(cb_ctx.arena);
}
}

pub fn rollbackTo(self: *Self, cb_ctx: *vtab.CallbackContext, savepoint_id: i32) !void {
pub fn rollbackTo(self: *Self, _: *vtab.CallbackContext, savepoint_id: i32) !void {
log.debug("txn savepoint {d} rollback", .{savepoint_id});
if (self.dirty) {
try self.pending_inserts.loadNextRowid(cb_ctx.arena);
if (self.next_rowid) |_| {
self.clearNextRowid();
}
}

fn loadNextRowid(self: *Self, tmp_arena: *ArenaAllocator) !void {
self.next_rowid = (try self.table_data.readInt(tmp_arena, .next_rowid)) orelse 1;
}

/// Persists the next rowid and removes it from memory
fn unloadNextRowid(self: *Self, tmp_arena: *ArenaAllocator) !void {
try self.table_data.writeInt(tmp_arena, .next_rowid, self.next_rowid.?);
self.next_rowid = null;
}

fn clearNextRowid(self: *Self) void {
self.next_rowid = null;
}

pub fn isShadowName(suffix: [:0]const u8) bool {
log.debug("checking shadow name: {s}", .{suffix});
inline for (.{ TableData, BlobManager, SchemaManager, RowGroupIndex, PendingInserts }) |st| {
Expand Down
22 changes: 13 additions & 9 deletions src/row_group/Creator.zig
Original file line number Diff line number Diff line change
Expand Up @@ -615,15 +615,15 @@ test "row group: create single from pending inserts" {
var row_group_index = Index.init(&ctx);
defer row_group_index.deinit();
try row_group_index.table().create(&arena);
var pending_inserts = try PendingInserts.init(arena.allocator(), &arena, &ctx, &table_data);
var pending_inserts = try PendingInserts.init(arena.allocator(), &ctx);
errdefer pending_inserts.deinit();
try pending_inserts.table().create(&arena);

const table_values = datasets.planets.fixed_data[0..4];
const rowids = [_]i64{ 1, 2, 4, 3 };

for (table_values) |*row| {
_ = try pending_inserts.insert(&arena, MemoryTuple{ .values = row });
for (table_values, 1..) |*row, rowid| {
_ = try pending_inserts.insert(&arena, @intCast(rowid), MemoryTuple{ .values = row });
}

var new_row_group: Self.NewRowGroup = undefined;
Expand Down Expand Up @@ -698,15 +698,15 @@ test "row group: create all" {
var row_group_index = Index.init(&ctx);
defer row_group_index.deinit();
try row_group_index.table().create(&arena);
var pending_inserts = try PendingInserts.init(arena.allocator(), &arena, &ctx, &table_data);
var pending_inserts = try PendingInserts.init(arena.allocator(), &ctx);
errdefer pending_inserts.deinit();
try pending_inserts.table().create(&arena);

const table_values = datasets.planets.fixed_data[0..4];
const rowids = [_]i64{ 1, 2, 4, 3 };

for (table_values) |*row| {
_ = try pending_inserts.insert(&arena, MemoryTuple{ .values = row });
for (table_values, 1..) |*row, rowid| {
_ = try pending_inserts.insert(&arena, @intCast(rowid), MemoryTuple{ .values = row });
}

{
Expand Down Expand Up @@ -776,7 +776,7 @@ pub fn benchRowGroupCreate() !void {
var row_group_index = Index.init(&ctx);
defer row_group_index.deinit();
try row_group_index.table().create(&arena);
var pending_inserts = try PendingInserts.init(arena.allocator(), &arena, &ctx, &table_data);
var pending_inserts = try PendingInserts.init(arena.allocator(), &ctx);
errdefer pending_inserts.deinit();
try pending_inserts.table().create(&arena);

Expand All @@ -785,11 +785,14 @@ pub fn benchRowGroupCreate() !void {

// Create a row group from pending inserts only (no merge)

var rowid: i64 = 1;

try conn.exec("BEGIN");
const start_insert = std.time.microTimestamp();
for (0..row_group_len) |_| {
var row = datasets.planets.randomRecord(&prng);
_ = try pending_inserts.insert(&arena, MemoryTuple{ .values = &row });
_ = try pending_inserts.insert(&arena, @intCast(rowid), MemoryTuple{ .values = &row });
rowid += 1;
}
try conn.exec("COMMIT");
const end_insert = std.time.microTimestamp();
Expand Down Expand Up @@ -826,7 +829,8 @@ pub fn benchRowGroupCreate() !void {
try conn.exec("BEGIN");
for (0..(row_group_len * 3)) |_| {
var row = datasets.planets.randomRecord(&prng);
_ = try pending_inserts.insert(&arena, MemoryTuple{ .values = &row });
_ = try pending_inserts.insert(&arena, rowid, MemoryTuple{ .values = &row });
rowid += 1;
}
try conn.exec("COMMIT");

Expand Down
6 changes: 0 additions & 6 deletions src/sqlite3/ChangeSet.zig
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,6 @@ pub fn changeType(self: Self) ChangeType {
return .Update;
}

pub fn readRowid(self: Self) ValueRef {
return .{
.value = self.values[1],
};
}

/// Number of values in this change set (not including rowid). Should not be called when
/// change type is `.Delete`
pub fn valuesLen(self: Self) usize {
Expand Down

0 comments on commit 58ddc35

Please sign in to comment.