Skip to content

Commit 1ce643d

Browse files
committed
Reindexing fixes
Signed-off-by: Andrey Sobolev <[email protected]>
1 parent cc1cd74 commit 1ce643d

File tree

19 files changed

+203
-114
lines changed

19 files changed

+203
-114
lines changed

.vscode/launch.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -158,17 +158,17 @@
158158
// "PORT": "4700",// For mongo
159159
"PORT": "4710", // for cockroach
160160
"FULLTEXT_DB_URL": "http://localhost:9201",
161-
"DB_URL": "mongodb://localhost:27018",
161+
// "DB_URL": "mongodb://localhost:27018",
162162
// "DB_URL": "postgresql://postgres:example@localhost:5432",
163-
// "DB_URL": "postgresql://[email protected]:26257/defaultdb?sslmode=disable",
163+
"DB_URL": "postgresql://[email protected]:26258/defaultdb?sslmode=disable",
164164
"STORAGE_CONFIG": "minio|localhost?accessKey=minioadmin&secretKey=minioadmin",
165165
"SERVER_SECRET": "secret",
166166
"REKONI_URL": "http://localhost:4004",
167167
"MODEL_JSON": "${workspaceRoot}/models/all/bundle/model.json",
168168
"ELASTIC_INDEX_NAME": "local_storage_index",
169169
"REGION": "",
170170
"STATS_URL": "http://huly.local:4901",
171-
"ACCOUNTS_URL": "http://localhost:3003",
171+
"ACCOUNTS_URL": "http://huly.local:3003",
172172
"QUEUE_CONFIG": "localhost:19093;-staging"
173173
},
174174
"runtimeArgs": ["--nolazy", "-r", "ts-node/register"],

dev/tool/src/index.ts

Lines changed: 64 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import accountPlugin, {
1919
createWorkspaceRecord,
2020
flattenStatus,
2121
getAccountDB,
22-
getWorkspaceById,
2322
getWorkspaceInfoWithStatusById,
2423
getWorkspaces,
2524
getWorkspacesInfoWithStatusByIds,
@@ -117,9 +116,9 @@ import { performGmailAccountMigrations } from './gmail'
117116
import { getToolToken, getWorkspace, getWorkspaceTransactorEndpoint } from './utils'
118117

119118
import { createRestClient } from '@hcengineering/api-client'
120-
import { mkdir, writeFile } from 'fs/promises'
121-
import { basename, dirname } from 'path'
122119
import { existsSync } from 'fs'
120+
import { mkdir, writeFile } from 'fs/promises'
121+
import { dirname } from 'path'
123122
import { restoreMarkupRefs } from './markup'
124123

125124
const colorConstants = {
@@ -438,58 +437,67 @@ export function devTool (
438437
// })
439438
// })
440439

441-
program
442-
.command('upgrade-workspace <name>')
443-
.description('upgrade workspace')
444-
.option('-f|--force [force]', 'Force update', true)
445-
.option('-i|--indexes [indexes]', 'Force indexes rebuild', false)
446-
.action(async (workspace, cmd: { force: boolean, indexes: boolean }) => {
447-
const { version, txes, migrateOperations } = prepareTools()
440+
async function doUpgrade (
441+
toolCtx: MeasureMetricsContext,
442+
workspace: WorkspaceUuid,
443+
forceUpdate: boolean,
444+
forceIndexes: boolean
445+
): Promise<void> {
446+
const { version, txes, migrateOperations } = prepareTools()
448447

449-
await withAccountDatabase(async (db) => {
450-
const info = await getWorkspace(db, workspace)
451-
if (info === null) {
452-
throw new Error(`workspace ${workspace} not found`)
453-
}
448+
await withAccountDatabase(async (db) => {
449+
const info = await getWorkspace(db, workspace)
450+
if (info === null) {
451+
throw new Error(`workspace ${workspace} not found`)
452+
}
454453

455-
const wsInfo = await getWorkspaceInfoWithStatusById(db, info.uuid)
456-
if (wsInfo === null) {
457-
throw new Error(`workspace ${workspace} not found`)
458-
}
454+
const wsInfo = await getWorkspaceInfoWithStatusById(db, info.uuid)
455+
if (wsInfo === null) {
456+
throw new Error(`workspace ${workspace} not found`)
457+
}
459458

460-
const coreWsInfo = flattenStatus(wsInfo)
461-
const measureCtx = new MeasureMetricsContext('upgrade-workspace', {})
462-
const accountClient = getAccountClient(getToolToken(wsInfo.uuid))
463-
const queue = getPlatformQueue('tool', info.region)
464-
const wsProducer = queue.getProducer<QueueWorkspaceMessage>(toolCtx, QueueTopic.Workspace)
465-
await upgradeWorkspace(
466-
measureCtx,
467-
version,
468-
txes,
469-
migrateOperations,
470-
accountClient,
471-
coreWsInfo,
472-
consoleModelLogger,
473-
wsProducer,
474-
async () => {},
475-
cmd.force,
476-
cmd.indexes,
477-
true
478-
)
459+
const coreWsInfo = flattenStatus(wsInfo)
460+
const measureCtx = new MeasureMetricsContext('upgrade-workspace', {})
461+
const accountClient = getAccountClient(getToolToken(wsInfo.uuid))
462+
const queue = getPlatformQueue('tool', info.region)
463+
const wsProducer = queue.getProducer<QueueWorkspaceMessage>(toolCtx, QueueTopic.Workspace)
464+
await upgradeWorkspace(
465+
measureCtx,
466+
version,
467+
txes,
468+
migrateOperations,
469+
accountClient,
470+
coreWsInfo,
471+
consoleModelLogger,
472+
wsProducer,
473+
async () => {},
474+
forceUpdate,
475+
forceIndexes,
476+
true
477+
)
479478

480-
await updateWorkspaceInfo(measureCtx, db, null, getToolToken(), {
481-
workspaceUuid: info.uuid,
482-
event: 'upgrade-done',
483-
version,
484-
progress: 100
485-
})
479+
await updateWorkspaceInfo(measureCtx, db, null, getToolToken(), {
480+
workspaceUuid: info.uuid,
481+
event: 'upgrade-done',
482+
version,
483+
progress: 100
484+
})
486485

487-
console.log(metricsToString(measureCtx.metrics, 'upgrade', 60))
486+
console.log(metricsToString(measureCtx.metrics, 'upgrade', 60))
488487

489-
await wsProducer.send(info.uuid, [workspaceEvents.upgraded()])
490-
await queue.shutdown()
491-
console.log('upgrade-workspace done')
492-
})
488+
await wsProducer.send(info.uuid, [workspaceEvents.upgraded()])
489+
await queue.shutdown()
490+
console.log('upgrade-workspace done')
491+
})
492+
}
493+
494+
program
495+
.command('upgrade-workspace <name>')
496+
.description('upgrade workspace')
497+
.option('-f|--force [force]', 'Force update', true)
498+
.option('-i|--indexes [indexes]', 'Force indexes rebuild', false)
499+
.action(async (workspace, cmd: { force: boolean, indexes: boolean }) => {
500+
await doUpgrade(toolCtx, workspace, cmd.force, cmd.indexes)
493501
})
494502

495503
// program
@@ -1133,6 +1141,7 @@ export function devTool (
11331141
.option('-i, --include <include>', 'A list of ; separated domain names to include during backup', '*')
11341142
.option('-s, --skip <skip>', 'A list of ; separated domain names to skip during backup', '')
11351143
.option('--use-storage <useStorage>', 'Use workspace storage adapter from env variable', '')
1144+
.option('--upgrade', 'Upgrade workspace', false)
11361145
.option(
11371146
'--history-file <historyFile>',
11381147
'Store blob send info into file. Will skip already send documents.',
@@ -1152,6 +1161,7 @@ export function devTool (
11521161
skip: string
11531162
useStorage: string
11541163
historyFile: string
1164+
upgrade: boolean
11551165
}
11561166
) => {
11571167
await withAccountDatabase(async (db) => {
@@ -1181,6 +1191,11 @@ export function devTool (
11811191
storageAdapter: workspaceStorage,
11821192
historyFile: cmd.historyFile
11831193
})
1194+
1195+
if (cmd.upgrade) {
1196+
await doUpgrade(toolCtx, workspace, true, true)
1197+
}
1198+
11841199
const queue = getPlatformQueue('tool', ws.region)
11851200
const wsProducer = queue.getProducer<QueueWorkspaceMessage>(toolCtx, QueueTopic.Workspace)
11861201
await wsProducer.send(ws.uuid, [workspaceEvents.fullReindex()])

models/core/src/migration.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -236,8 +236,13 @@ async function processMigrateContentFor (
236236
try {
237237
const buffer = Buffer.from(value)
238238
await storageAdapter.put(client.ctx, client.wsIds, blobId, buffer, 'application/json', buffer.length)
239-
} catch (err) {
240-
client.logger.error('failed to process document', { _class: doc._class, _id: doc._id, err })
239+
} catch (err: any) {
240+
client.logger.error('failed to process document', {
241+
_class: doc._class,
242+
_id: doc._id,
243+
err: err.message,
244+
stack: err.stack
245+
})
241246
}
242247

243248
update[attributeName] = blobId

packages/account-client/src/client.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ class AccountClientImpl implements AccountClient {
267267
return ws
268268
}
269269

270-
const result = { ...ws, ...status }
270+
const result = { ...ws, ...status, processingAttemps: status.processingAttempts ?? 0 }
271271
delete result.status
272272

273273
return result

packages/core/src/classes.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -839,6 +839,7 @@ export interface WorkspaceInfoWithStatus extends WorkspaceInfo {
839839
mode: WorkspaceMode
840840
processingProgress?: number
841841
backupInfo?: BackupStatus
842+
processingAttemps: number
842843
}
843844

844845
export interface WorkspaceMemberInfo {

pods/fulltext/src/__tests__/indexing.spec.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ class TestWorkspaceManager extends WorkspaceManager {
4545
dataId: decodedToken.workspace as unknown as WorkspaceDataId,
4646
mode: 'active',
4747
processingProgress: 0,
48+
processingAttemps: 0,
4849
backupInfo: {
4950
dataSize: 0,
5051
blobsSize: 0,

pods/fulltext/src/manager.ts

Lines changed: 81 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,16 @@
11
/* eslint-disable @typescript-eslint/unbound-method */
2-
import type { Doc, MeasureContext, Space, Tx, TxCUD, WorkspaceInfoWithStatus, WorkspaceUuid } from '@hcengineering/core'
3-
import { Hierarchy, systemAccountUuid } from '@hcengineering/core'
2+
import type {
3+
Doc,
4+
MeasureContext,
5+
Space,
6+
Tx,
7+
TxCreateDoc,
8+
TxCUD,
9+
Version,
10+
WorkspaceInfoWithStatus,
11+
WorkspaceUuid
12+
} from '@hcengineering/core'
13+
import core, { Hierarchy, systemAccountUuid, TxProcessor, versionToString } from '@hcengineering/core'
414
import { getAccountClient, getTransactorEndpoint } from '@hcengineering/server-client'
515
import {
616
createContentAdapter,
@@ -50,6 +60,11 @@ export class WorkspaceManager {
5060
for (const tx of model) {
5161
this.sysHierarchy.tx(tx)
5262
}
63+
this.supportedVersion = TxProcessor.createDoc2Doc(
64+
model.find(
65+
(it) => TxProcessor.isExtendsCUD(it._class) && (it as TxCUD<Doc>).objectClass === core.class.Version
66+
) as TxCreateDoc<Version>
67+
)
5368

5469
this.workspaceProducer = this.opt.queue.getProducer<QueueWorkspaceMessage>(this.ctx, QueueTopic.Workspace)
5570
}
@@ -64,6 +79,8 @@ export class WorkspaceManager {
6479

6580
txInformer: any
6681

82+
supportedVersion: Version
83+
6784
async startIndexer (): Promise<void> {
6885
this.contentAdapter = await this.ctx.with('create content adapter', {}, (ctx) =>
6986
createContentAdapter(this.opt.config.contentAdapters, this.opt.config.defaultContentAdapter)
@@ -125,7 +142,12 @@ export class WorkspaceManager {
125142
}
126143

127144
const indexer = await this.getIndexer(this.ctx, ws, token, true)
128-
await indexer?.fulltext.processDocuments(this.ctx, m.value, control)
145+
// TODO: If workspace is not upgraded, we will loose the fulltext index
146+
if (indexer === undefined) {
147+
// this.workspaceProducer
148+
} else {
149+
await indexer?.fulltext.processDocuments(this.ctx, m.value, control)
150+
}
129151
}
130152
}
131153

@@ -159,6 +181,10 @@ export class WorkspaceManager {
159181
ws,
160182
classes.map((it) => workspaceEvents.reindex(it.domain, it.classes))
161183
)
184+
} else {
185+
await this.workspaceProducer.send(ws, [workspaceEvents.fullReindex()]).catch((err) => {
186+
this.ctx.error('error', { err })
187+
})
162188
}
163189
} else if (
164190
mm.type === QueueWorkspaceEvent.Deleted ||
@@ -176,6 +202,9 @@ export class WorkspaceManager {
176202
const indexer = await this.getIndexer(this.ctx, ws, token, true)
177203
const mmd = mm as QueueWorkspaceReindexMessage
178204
await indexer?.reindex(this.ctx, mmd.domain, mmd.classes, control)
205+
} else if (mm.type === QueueWorkspaceEvent.Upgraded) {
206+
console.error('Upgraded', this.supportedVersion)
207+
await this.closeWorkspace(ws)
179208
}
180209
}
181210
}
@@ -195,20 +224,38 @@ export class WorkspaceManager {
195224
return (await getTransactorEndpoint(token, 'internal')).replace('wss://', 'https://').replace('ws://', 'http://')
196225
}
197226

198-
async getIndexer (
227+
async createIndexer (
199228
ctx: MeasureContext,
200229
workspace: WorkspaceUuid,
201-
token: string | undefined,
202-
create: boolean = false
203-
): Promise<WorkspaceIndexer | undefined> {
204-
let idx = this.indexers.get(workspace)
205-
if (idx === undefined && create) {
230+
token: string | undefined
231+
): Promise<WorkspaceIndexer> {
232+
while (true) {
206233
const workspaceInfo = await this.getWorkspaceInfo(ctx, token)
207234
if (workspaceInfo === undefined) {
208-
return
235+
throw new Error('Workspace not found')
236+
}
237+
if (
238+
workspaceInfo.versionMajor !== this.supportedVersion.major ||
239+
workspaceInfo.versionMinor !== this.supportedVersion.minor ||
240+
workspaceInfo.versionPatch !== this.supportedVersion.patch
241+
) {
242+
ctx.warn('wrong version', {
243+
workspace,
244+
version: versionToString({
245+
major: workspaceInfo.versionMajor,
246+
minor: workspaceInfo.versionMinor,
247+
patch: workspaceInfo.versionPatch
248+
}),
249+
supportedVersion: this.supportedVersion
250+
})
251+
if (workspaceInfo.processingAttemps < 4) {
252+
await new Promise((resolve) => setTimeout(resolve, 10000))
253+
continue
254+
}
255+
throw new Error('Workspace limit reached')
209256
}
210257
ctx.warn('indexer created', { workspace })
211-
idx = WorkspaceIndexer.create(
258+
return await WorkspaceIndexer.create(
212259
ctx,
213260
this.model,
214261
{
@@ -223,12 +270,32 @@ export class WorkspaceManager {
223270
(token) => this.getTransactorAPIEndpoint(token),
224271
this.opt.listener
225272
)
226-
this.indexers.set(workspace, idx)
227273
}
228-
if (idx instanceof Promise) {
229-
idx = await idx
274+
}
275+
276+
async getIndexer (
277+
ctx: MeasureContext,
278+
workspace: WorkspaceUuid,
279+
token: string | undefined,
280+
create: boolean = false
281+
): Promise<WorkspaceIndexer | undefined> {
282+
let idx = this.indexers.get(workspace)
283+
if (idx === undefined && create) {
284+
idx = this.createIndexer(ctx, workspace, token)
230285
this.indexers.set(workspace, idx)
231286
}
287+
if (idx === undefined) {
288+
return undefined
289+
}
290+
try {
291+
if (idx instanceof Promise) {
292+
idx = await idx
293+
this.indexers.set(workspace, idx)
294+
}
295+
} catch (err: any) {
296+
this.indexers.delete(workspace)
297+
return undefined
298+
}
232299
return idx
233300
}
234301

pods/fulltext/src/server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,7 @@ export async function startIndexer (
221221
req.body = {}
222222

223223
ctx.info('reindex', { workspace: decoded.workspace })
224+
await manager.closeWorkspace(decoded.workspace)
224225
const indexer = await manager.getIndexer(ctx, decoded.workspace, token, true)
225226
if (indexer !== undefined) {
226227
indexer.lastUpdate = Date.now()

0 commit comments

Comments
 (0)