Skip to content

Commit

Permalink
metrics: optimize tag usage counting
Browse files Browse the repository at this point in the history
from upwards of a day to my db to 1 minute with this one simple trick

instead of seq scan over tag_files for each tag, iterate over files and
fetch its tags (not seq scan, because file hash is part of the key),
increment on in-memory hashmap
  • Loading branch information
lun-4 committed Oct 6, 2023
1 parent 6427d91 commit 467b9c0
Showing 1 changed file with 69 additions and 14 deletions.
83 changes: 69 additions & 14 deletions src/metrics_main.zig
Original file line number Diff line number Diff line change
Expand Up @@ -122,29 +122,84 @@ fn runMetricsTagUsage(ctx: *Context, metrics_timestamp: Timestamp) !void {
.{metrics_timestamp},
);

// for every tag core, run count(*) over tag_files

var stmt = try ctx.db.prepare(
\\ select distinct tag_names.core_hash,
\\ (select count(*) from tag_files where core_hash = tag_names.core_hash) AS relationship_count
\\ from tag_names;
// for every file, count tag cores
var files_stmt = try ctx.db.prepare(
\\ select distinct file_hash
\\ from files;
);
defer stmt.deinit();
defer files_stmt.deinit();

var string_arena = std.heap.ArenaAllocator.init(ctx.allocator);
var string_allocator = string_arena.allocator();
defer string_arena.deinit();

var core_counts = std.StringHashMap(usize).init(ctx.allocator);
defer core_counts.deinit();

var files_it = try files_stmt.iterator(struct { file_hash: ID.SQL }, .{});
var files_timer = try std.time.Timer.start();
var files_all_timer = try std.time.Timer.start();
var files_counter: usize = 0;

var buffer: [512 * 1024]u8 = undefined;
while (try files_it.next(.{})) |row| {
var fba = std.heap.FixedBufferAllocator{
.buffer = &buffer,
.end_index = 0,
};
defer fba.reset();
var tags_alloc = fba.allocator();

const file_ref = Context.File{
.ctx = ctx,
.local_path = undefined,
.hash = Context.Hash{
.id = ID.new(row.file_hash),
.hash_data = undefined,
},
};

const tags = try file_ref.fetchTags(tags_alloc);

for (tags) |file_tag| {
var owned_data = try string_allocator.dupe(u8, &file_tag.core.id.data);
var entry = try core_counts.getOrPut(owned_data);

if (entry.found_existing) {
entry.value_ptr.* += 1;
} else {
entry.value_ptr.* = 1;
}
}

var it = try stmt.iterator(struct { core_hash: ID.SQL, relationship_count: usize }, .{});
var timer = try std.time.Timer.start();
while (try it.next(.{})) |row| {
const exec_time_ns = timer.lap();
logger.info("{} took {:.2}ms to fetch", .{ row, exec_time_ns / std.time.ns_per_ms });
if (files_counter > 0 and files_counter % 10000 == 0) {
const exec_time_ns = files_timer.lap();
logger.info("up to {d} took {:.2}ms to fetch", .{ files_counter, exec_time_ns / std.time.ns_per_ms });
}
files_counter += 1;
}
const exec_all_time_ns = files_all_timer.lap();
logger.info("all took {:.2}ms to fetch", .{exec_all_time_ns / std.time.ns_per_ms});

const core_hash = ID.new(row.core_hash);
var core_it = core_counts.iterator();
var core_insert_timer = try std.time.Timer.start();
var core_insert_counter: usize = 0;
while (core_it.next()) |entry| {
const core_hash: [26]u8 = entry.key_ptr.*[0..26].*;
const relationship_count: usize = entry.value_ptr.*;

try ctx.db.exec(
"insert into metrics_tag_usage_values (timestamp, core_hash, relationship_count) values (?, ?, ?)",
.{},
.{ metrics_timestamp, core_hash.sql(), row.relationship_count },
.{ metrics_timestamp, core_hash, relationship_count },
);
core_insert_counter += 1;
}
const insert_all_time_ns = core_insert_timer.lap();
logger.info(
"insert all {d} cores took {:.2}ms",
.{ core_insert_counter, insert_all_time_ns / std.time.ns_per_ms },
);
}

fn runTagSourceCounters(ctx: *Context, metrics_timestamp: Timestamp) !void {
Expand Down

0 comments on commit 467b9c0

Please sign in to comment.