Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,50 +81,61 @@ async function handler(
});

// get 10 init dataset.data
const arr = new Array(10).fill(0);
const max = global.systemEnv?.vectorMaxProcess || 10;
const arr = new Array(max * 2).fill(0);

for await (const _ of arr) {
await mongoSessionRun(async (session) => {
const data = await MongoDatasetData.findOneAndUpdate(
{
teamId,
datasetId,
rebuilding: true
},
{
$unset: {
rebuilding: null
try {
const hasNext = await mongoSessionRun(async (session) => {
// get next dataset.data
const data = await MongoDatasetData.findOneAndUpdate(
{
rebuilding: true,
teamId,
datasetId
},
{
$unset: {
rebuilding: null
},
updateTime: new Date()
},
updateTime: new Date()
},
{
session
}
).select({
_id: 1,
collectionId: 1
});

if (data) {
await MongoDatasetTraining.create(
[
{
teamId,
tmbId,
datasetId,
collectionId: data.collectionId,
billId,
mode: TrainingModeEnum.chunk,
model: vectorModel,
q: '1',
dataId: data._id
}
],
{
session
}
);
).select({
_id: 1,
collectionId: 1
});

if (data) {
await MongoDatasetTraining.create(
[
{
teamId,
tmbId,
datasetId,
collectionId: data.collectionId,
billId,
mode: TrainingModeEnum.chunk,
model: vectorModel,
q: '1',
dataId: data._id
}
],
{
session
}
);
}

return !!data;
});

if (!hasNext) {
break;
}
});
} catch (error) {}
}

return {};
Expand Down
129 changes: 59 additions & 70 deletions projects/app/src/service/events/generateVector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,27 +158,69 @@ const rebuildData = async ({

const deleteVectorIdList = mongoData.indexes.map((index) => index.dataId);

const { tokens } = await mongoSessionRun(async (session) => {
// update vector, update dataset.data rebuilding status, delete data from training
const updateResult = await Promise.all(
mongoData.indexes.map(async (index, i) => {
const result = await insertDatasetDataVector({
query: index.text,
model: getVectorModel(trainingData.model),
teamId: mongoData.teamId,
datasetId: mongoData.datasetId,
collectionId: mongoData.collectionId
});
mongoData.indexes[i].dataId = result.insertId;
return result;
})
);
// Find next rebuilding data to insert training queue
await mongoSessionRun(async (session) => {
// get new mongoData insert to training
const newRebuildingData = await MongoDatasetData.findOneAndUpdate(
{
teamId: mongoData.teamId,
datasetId: mongoData.datasetId,
rebuilding: true
},
{
$unset: {
rebuilding: null
},
updateTime: new Date()
},
{ session }
).select({
_id: 1,
collectionId: 1
});

// Ensure that the training data is deleted after the Mongo update is successful
if (newRebuildingData) {
await MongoDatasetTraining.create(
[
{
teamId: mongoData.teamId,
tmbId: trainingData.tmbId,
datasetId: mongoData.datasetId,
collectionId: newRebuildingData.collectionId,
billId: trainingData.billId,
mode: TrainingModeEnum.chunk,
model: trainingData.model,
q: '1',
dataId: newRebuildingData._id
}
],
{ session }
);
}
});

// update vector, update dataset_data rebuilding status, delete data from training
// 1. Insert new vector to dataset_data
const updateResult = await Promise.all(
mongoData.indexes.map(async (index, i) => {
const result = await insertDatasetDataVector({
query: index.text,
model: getVectorModel(trainingData.model),
teamId: mongoData.teamId,
datasetId: mongoData.datasetId,
collectionId: mongoData.collectionId
});
mongoData.indexes[i].dataId = result.insertId;
return result;
})
);
const { tokens } = await mongoSessionRun(async (session) => {
// 2. Ensure that the training data is deleted after the Mongo update is successful
await mongoData.save({ session });
// 3. Delete the training data
await trainingData.deleteOne({ session });

// delete old vector
// 4. Delete old vector
await deleteDatasetDataVector({
teamId: mongoData.teamId,
idList: deleteVectorIdList
Expand All @@ -189,59 +231,6 @@ const rebuildData = async ({
};
});

// find next data insert to training queue
const arr = new Array(5).fill(0);

for await (const _ of arr) {
try {
const hasNextData = await mongoSessionRun(async (session) => {
// get new mongoData insert to training
const newRebuildingData = await MongoDatasetData.findOneAndUpdate(
{
teamId: mongoData.teamId,
datasetId: mongoData.datasetId,
rebuilding: true
},
{
$unset: {
rebuilding: null
},
updateTime: new Date()
},
{ session }
).select({
_id: 1,
collectionId: 1
});

if (newRebuildingData) {
await MongoDatasetTraining.create(
[
{
teamId: mongoData.teamId,
tmbId: trainingData.tmbId,
datasetId: mongoData.datasetId,
collectionId: newRebuildingData.collectionId,
billId: trainingData.billId,
mode: TrainingModeEnum.chunk,
model: trainingData.model,
q: '1',
dataId: newRebuildingData._id
}
],
{ session }
);
}

return !!newRebuildingData;
});

if (!hasNextData) {
break;
}
} catch (error) {}
}

return { tokens };
};

Expand Down