Skip to content

Commit

Permalink
parallelize repo indexing (#163)
Browse files Browse the repository at this point in the history
* hacked together a example of using zoekt grpc api

* provide tenant id to zoekt git indexer

* update zoekt version to point to multitenant branch

* pipe tenant id through header to zoekt

* remove incorrect submodule reference and settings typo

* update zoekt commit

* remove unused yarn script

* remove unused grpc client in web server

* remove unneeded deps and improve tenant id log

* pass tenant id when creating repo in db

* add mt yarn script

* add pol of bullmq into backend

* add better error handling and concurrency setting

* spin up redis instance in dockerfile

* cleanup transaction logic when adding repos to index queue

* add NEW index status fetch condition

* move bullmq deps to backend

---------

Co-authored-by: bkellam <[email protected]>
  • Loading branch information
msukkari and brendan-kellam authored Jan 15, 2025
1 parent bd027f7 commit 7029aa7
Show file tree
Hide file tree
Showing 10 changed files with 306 additions and 43 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ ENV POSTHOG_PAPIK=$POSTHOG_PAPIK
# ENV SOURCEBOT_TELEMETRY_DISABLED=1

# Configure dependencies
RUN apk add --no-cache git ca-certificates bind-tools tini jansson wget supervisor uuidgen curl perl jq
RUN apk add --no-cache git ca-certificates bind-tools tini jansson wget supervisor uuidgen curl perl jq redis

# Configure zoekt
COPY vendor/zoekt/install-ctags-alpine.sh .
Expand Down
8 changes: 5 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
"scripts": {
"build": "yarn workspaces run build",
"test": "yarn workspaces run test",
"dev": "npm-run-all --print-label --parallel dev:zoekt dev:backend dev:web",
"dev:mt": "npm-run-all --print-label --parallel dev:zoekt:mt dev:backend dev:web",
"dev": "npm-run-all --print-label --parallel dev:zoekt dev:backend dev:web dev:redis",
"dev:mt": "npm-run-all --print-label --parallel dev:zoekt:mt dev:backend dev:web dev:redis",
"dev:zoekt": "export PATH=\"$PWD/bin:$PATH\" && export SRC_TENANT_ENFORCEMENT_MODE=none && zoekt-webserver -index .sourcebot/index -rpc",
"dev:zoekt:mt": "export PATH=\"$PWD/bin:$PATH\" && export SRC_TENANT_ENFORCEMENT_MODE=strict && zoekt-webserver -index .sourcebot/index -rpc",
"dev:backend": "yarn workspace @sourcebot/backend dev:watch",
"dev:web": "yarn workspace @sourcebot/web dev"
"dev:web": "yarn workspace @sourcebot/web dev",
"dev:redis": "docker ps --filter \"name=redis\" --format \"{{.Names}}\" | grep -q \"^redis$\" && docker rm -f redis; docker run -d --name redis -p 6379:6379 redis"

},
"devDependencies": {
"npm-run-all": "^4.1.5"
Expand Down
4 changes: 3 additions & 1 deletion packages/backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
"posthog-node": "^4.2.1",
"simple-git": "^3.27.0",
"strip-json-comments": "^5.0.1",
"winston": "^3.15.0"
"winston": "^3.15.0",
"bullmq": "^5.34.10",
"ioredis": "^5.4.2"
}
}
3 changes: 2 additions & 1 deletion packages/backend/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { Settings } from "./types.js";
export const DEFAULT_SETTINGS: Settings = {
maxFileSize: 2 * 1024 * 1024, // 2MB in bytes
autoDeleteStaleRepos: true,
reindexIntervalMs: 1000 * 60 * 60, // 1 hour in milliseconds
reindexIntervalMs: 1000 * 60,
resyncIntervalMs: 1000 * 60 * 60 * 24, // 1 day in milliseconds
indexConcurrencyMultiple: 3,
}
132 changes: 96 additions & 36 deletions packages/backend/src/main.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { PrismaClient, Repo } from '@sourcebot/db';
import { PrismaClient, Repo, RepoIndexingStatus } from '@sourcebot/db';
import { existsSync, watch } from 'fs';
import { syncConfig } from "./config.js";
import { cloneRepository, fetchRepository } from "./git.js";
Expand All @@ -8,6 +8,9 @@ import { AppContext } from "./types.js";
import { getRepoPath, isRemotePath, measure } from "./utils.js";
import { indexGitRepository } from "./zoekt.js";
import { DEFAULT_SETTINGS } from './constants.js';
import { Queue, Worker, Job } from 'bullmq';
import { Redis } from 'ioredis';
import * as os from 'os';

const logger = createLogger('main');

Expand Down Expand Up @@ -53,6 +56,23 @@ const syncGitRepository = async (repo: Repo, ctx: AppContext) => {
}
}

async function addReposToQueue(db: PrismaClient, queue: Queue, repos: Repo[]) {
for (const repo of repos) {
await db.$transaction(async (tx) => {
await tx.repo.update({
where: { id: repo.id },
data: { repoIndexingStatus: RepoIndexingStatus.IN_INDEX_QUEUE },
});

// Add the job to the queue
await queue.add('indexJob', repo);
logger.info(`Added job to queue for repo ${repo.id}`);
}).catch((err) => {
logger.error(`Failed to add job to queue for repo ${repo.id}: ${err}`);
});
}
}

export const main = async (db: PrismaClient, context: AppContext) => {
let abortController = new AbortController();
let isSyncing = false;
Expand Down Expand Up @@ -97,50 +117,90 @@ export const main = async (db: PrismaClient, context: AppContext) => {
// Sync immediately on startup
await _syncConfig();

while (true) {
const repos = await db.repo.findMany();

for (const repo of repos) {
const lastIndexed = repo.indexedAt ?? new Date(0);

if (lastIndexed.getTime() > (Date.now() - DEFAULT_SETTINGS.reindexIntervalMs)) {
continue;
}
const redis = new Redis({
host: 'localhost',
port: 6379,
maxRetriesPerRequest: null
});
redis.ping().then(() => {
logger.info('Connected to redis');
}).catch((err) => {
logger.error('Failed to connect to redis');
console.error(err);
process.exit(1);
});

const indexQueue = new Queue('indexQueue');

const numCores = os.cpus().length;
const numWorkers = numCores * DEFAULT_SETTINGS.indexConcurrencyMultiple;
logger.info(`Detected ${numCores} cores. Setting max concurrency to ${numWorkers}`);
const worker = new Worker('indexQueue', async (job) => {
const repo = job.data as Repo;

let indexDuration_s: number | undefined;
let fetchDuration_s: number | undefined;
let cloneDuration_s: number | undefined;

const stats = await syncGitRepository(repo, context);
indexDuration_s = stats.indexDuration_s;
fetchDuration_s = stats.fetchDuration_s;
cloneDuration_s = stats.cloneDuration_s;

captureEvent('repo_synced', {
vcs: 'git',
codeHost: repo.external_codeHostType,
indexDuration_s,
fetchDuration_s,
cloneDuration_s,
});

try {
let indexDuration_s: number | undefined;
let fetchDuration_s: number | undefined;
let cloneDuration_s: number | undefined;

const stats = await syncGitRepository(repo, context);
indexDuration_s = stats.indexDuration_s;
fetchDuration_s = stats.fetchDuration_s;
cloneDuration_s = stats.cloneDuration_s;

captureEvent('repo_synced', {
vcs: 'git',
codeHost: repo.external_codeHostType,
indexDuration_s,
fetchDuration_s,
cloneDuration_s,
});
} catch (err: any) {
// @todo : better error handling here..
logger.error(err);
continue;
await db.repo.update({
where: {
id: repo.id,
},
data: {
indexedAt: new Date(),
repoIndexingStatus: RepoIndexingStatus.INDEXED,
}

});
}, { connection: redis, concurrency: numWorkers });

worker.on('completed', (job) => {
logger.info(`Job ${job.id} completed`);
});
worker.on('failed', async (job: Job | undefined, err) => {
logger.info(`Job failed with error: ${err}`);
if (job) {
await db.repo.update({
where: {
id: repo.id,
id: job.data.id,
},
data: {
indexedAt: new Date(),
repoIndexingStatus: RepoIndexingStatus.FAILED,
}
});
})
}
});

await new Promise(resolve => setTimeout(resolve, 1000));
while (true) {
const thresholdDate = new Date(Date.now() - DEFAULT_SETTINGS.reindexIntervalMs);
const repos = await db.repo.findMany({
where: {
repoIndexingStatus: {
notIn: [RepoIndexingStatus.IN_INDEX_QUEUE, RepoIndexingStatus.FAILED]
},
OR: [
{ indexedAt: null },
{ indexedAt: { lt: thresholdDate } },
{ repoIndexingStatus: RepoIndexingStatus.NEW }
]
}
});
logger.info(`Found ${repos.length} repos to index...`);
addReposToQueue(db, indexQueue, repos);


await new Promise(resolve => setTimeout(resolve, 1000));
}
}
4 changes: 4 additions & 0 deletions packages/backend/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ export type Settings = {
* The interval (in milliseconds) at which the configuration file should be re-synced.
*/
resyncIntervalMs: number;
/**
* The multiple of the number of CPUs to use for indexing.
*/
indexConcurrencyMultiple: number;
}

// @see : https://stackoverflow.com/a/61132308
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
-- RedefineTables
PRAGMA defer_foreign_keys=ON;
PRAGMA foreign_keys=OFF;
CREATE TABLE "new_Repo" (
"id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
"name" TEXT NOT NULL,
"createdAt" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" DATETIME NOT NULL,
"indexedAt" DATETIME,
"isFork" BOOLEAN NOT NULL,
"isArchived" BOOLEAN NOT NULL,
"metadata" JSONB NOT NULL,
"cloneUrl" TEXT NOT NULL,
"tenantId" INTEGER NOT NULL,
"repoIndexingStatus" TEXT NOT NULL DEFAULT 'NEW',
"external_id" TEXT NOT NULL,
"external_codeHostType" TEXT NOT NULL,
"external_codeHostUrl" TEXT NOT NULL
);
INSERT INTO "new_Repo" ("cloneUrl", "createdAt", "external_codeHostType", "external_codeHostUrl", "external_id", "id", "indexedAt", "isArchived", "isFork", "metadata", "name", "tenantId", "updatedAt") SELECT "cloneUrl", "createdAt", "external_codeHostType", "external_codeHostUrl", "external_id", "id", "indexedAt", "isArchived", "isFork", "metadata", "name", "tenantId", "updatedAt" FROM "Repo";
DROP TABLE "Repo";
ALTER TABLE "new_Repo" RENAME TO "Repo";
CREATE UNIQUE INDEX "Repo_external_id_external_codeHostUrl_key" ON "Repo"("external_id", "external_codeHostUrl");
PRAGMA foreign_keys=ON;
PRAGMA defer_foreign_keys=OFF;
10 changes: 10 additions & 0 deletions packages/db/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ datasource db {
url = env("DATABASE_URL")
}

enum RepoIndexingStatus {
NEW
IN_INDEX_QUEUE
INDEXING
INDEXED
FAILED
}

model Repo {
id Int @id @default(autoincrement())
name String
Expand All @@ -22,6 +30,8 @@ model Repo {
cloneUrl String
tenantId Int
repoIndexingStatus RepoIndexingStatus @default(NEW)
// The id of the repo in the external service
external_id String
// The type of the external service (e.g., github, gitlab, etc.)
Expand Down
9 changes: 9 additions & 0 deletions supervisord.conf
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,13 @@ autorestart=true
startretries=3
stdout_logfile=/dev/fd/1
stdout_logfile_maxbytes=0
redirect_stderr=true

[program:redis]
command=redis-server
autostart=true
autorestart=true
startretries=3
stdout_logfile=/dev/fd/1
stdout_logfile_maxbytes=0
redirect_stderr=true
Loading

0 comments on commit 7029aa7

Please sign in to comment.