Skip to content

Commit

Permalink
[Fleet] Chunk asset installation during package install (elastic#189045)
Browse files Browse the repository at this point in the history
**Resolves: elastic#189043

## Summary

This PR limits the number of saved objects installed in a single
request. During saved object installation a lot of auxiliary objects are
created in memory. Chunking allows for the garbage collection of memory
objects that are not needed for response.

**Memory consumption before**

![Screenshot 2024-07-12 at 12 44
58](https://github.com/user-attachments/assets/2c7bb609-f107-46bd-bd98-43a0c58107e8)


**After**
![Screenshot 2024-07-18 at 11 53
30](https://github.com/user-attachments/assets/ff6529dd-033e-4c2b-9630-0a2ea3927c21)
  • Loading branch information
xcrzx authored Jul 25, 2024
1 parent 425d6b1 commit b7e66eb
Showing 1 changed file with 103 additions and 76 deletions.
179 changes: 103 additions & 76 deletions x-pack/plugins/fleet/server/services/epm/kibana/assets/install.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import type {
Logger,
} from '@kbn/core/server';
import { createListStream } from '@kbn/utils';
import { partition } from 'lodash';
import { partition, chunk } from 'lodash';

import { getAssetFromAssetsMap, getPathParts } from '../../archive';
import { KibanaAssetType, KibanaSavedObjectType } from '../../../../types';
Expand All @@ -36,6 +36,8 @@ import { withPackageSpan } from '../../packages/utils';
import { tagKibanaAssets } from './tag_assets';
import { getSpaceAwareSaveobjectsClients } from './saved_objects';

const MAX_ASSETS_TO_INSTALL_IN_PARALLEL = 1000;

type SavedObjectsImporterContract = Pick<ISavedObjectsImporter, 'import' | 'resolveImportErrors'>;
const formatImportErrorsForLog = (errors: SavedObjectsImportFailure[]) =>
JSON.stringify(
Expand Down Expand Up @@ -144,11 +146,33 @@ export async function installKibanaAssets(options: {

await makeManagedIndexPatternsGlobal(savedObjectsClient);

const installedAssets = await installKibanaSavedObjects({
logger,
savedObjectsImporter,
kibanaAssets: assetsToInstall,
});
let installedAssets: SavedObjectsImportSuccess[] = [];

if (
assetsToInstall.length > MAX_ASSETS_TO_INSTALL_IN_PARALLEL &&
!hasReferences(assetsToInstall)
) {
// If the package size is too large, we need to install in chunks to avoid
// memory issues as the SO import creates a lot of objects in memory

// NOTE: if there are references, we can't chunk the install because
// referenced objects might end up in different chunks leading to import
// errors.
for (const assetChunk of chunk(assetsToInstall, MAX_ASSETS_TO_INSTALL_IN_PARALLEL)) {
const result = await installKibanaSavedObjects({
logger,
savedObjectsImporter,
kibanaAssets: assetChunk,
});
installedAssets = installedAssets.concat(result);
}
} else {
installedAssets = await installKibanaSavedObjects({
logger,
savedObjectsImporter,
kibanaAssets: assetsToInstall,
});
}

return installedAssets;
}
Expand Down Expand Up @@ -395,95 +419,94 @@ export async function installKibanaSavedObjects({
savedObjectsImporter: SavedObjectsImporterContract;
logger: Logger;
}) {
const toBeSavedObjects = await Promise.all(
kibanaAssets.map((asset) => createSavedObjectKibanaAsset(asset))
);
if (!kibanaAssets.length) {
return [];
}

const toBeSavedObjects = kibanaAssets.map((asset) => createSavedObjectKibanaAsset(asset));

let allSuccessResults: SavedObjectsImportSuccess[] = [];

if (toBeSavedObjects.length === 0) {
return [];
} else {
const {
successResults: importSuccessResults = [],
errors: importErrors = [],
success,
} = await retryImportOnConflictError(() =>
savedObjectsImporter.import({
overwrite: true,
readStream: createListStream(toBeSavedObjects),
createNewCopies: false,
refresh: false,
managed: true,
})
);
const {
successResults: importSuccessResults = [],
errors: importErrors = [],
success,
} = await retryImportOnConflictError(() => {
const readStream = createListStream(toBeSavedObjects);
return savedObjectsImporter.import({
overwrite: true,
readStream,
createNewCopies: false,
refresh: false,
managed: true,
});
});

if (success) {
allSuccessResults = importSuccessResults;
}
if (success) {
allSuccessResults = importSuccessResults;
}

const [referenceErrors, otherErrors] = partition(
importErrors,
(e) => e?.error?.type === 'missing_references'
);
const [referenceErrors, otherErrors] = partition(
importErrors,
(e) => e?.error?.type === 'missing_references'
);

if (otherErrors?.length) {
throw new KibanaSOReferenceError(
`Encountered ${
otherErrors.length
} errors creating saved objects: ${formatImportErrorsForLog(otherErrors)}`
);
}
if (otherErrors?.length) {
throw new KibanaSOReferenceError(
`Encountered ${otherErrors.length} errors creating saved objects: ${formatImportErrorsForLog(
otherErrors
)}`
);
}

/*
/*
A reference error here means that a saved object reference in the references
array cannot be found. This is an error in the package its-self but not a fatal
one. For example a dashboard may still refer to the legacy `metricbeat-*` index
pattern. We ignore reference errors here so that legacy version of a package
can still be installed, but if a warning is logged it should be reported to
the integrations team. */
if (referenceErrors.length) {
logger.debug(
() =>
`Resolving ${
referenceErrors.length
} reference errors creating saved objects: ${formatImportErrorsForLog(referenceErrors)}`
);

const retries = toBeSavedObjects.map(({ id, type }) => {
if (referenceErrors.find(({ id: idToSearch }) => idToSearch === id)) {
return {
id,
type,
ignoreMissingReferences: true,
replaceReferences: [],
overwrite: true,
};
}
return { id, type, overwrite: true, replaceReferences: [] };
});
if (referenceErrors.length) {
logger.debug(
() =>
`Resolving ${
referenceErrors.length
} reference errors creating saved objects: ${formatImportErrorsForLog(referenceErrors)}`
);

const { successResults: resolveSuccessResults = [], errors: resolveErrors = [] } =
await savedObjectsImporter.resolveImportErrors({
readStream: createListStream(toBeSavedObjects),
createNewCopies: false,
managed: true,
retries,
});

if (resolveErrors?.length) {
throw new KibanaSOReferenceError(
`Encountered ${
resolveErrors.length
} errors resolving reference errors: ${formatImportErrorsForLog(resolveErrors)}`
);
const retries = toBeSavedObjects.map(({ id, type }) => {
if (referenceErrors.find(({ id: idToSearch }) => idToSearch === id)) {
return {
id,
type,
ignoreMissingReferences: true,
replaceReferences: [],
overwrite: true,
};
}
return { id, type, overwrite: true, replaceReferences: [] };
});

const { successResults: resolveSuccessResults = [], errors: resolveErrors = [] } =
await savedObjectsImporter.resolveImportErrors({
readStream: createListStream(toBeSavedObjects),
createNewCopies: false,
managed: true,
retries,
});

allSuccessResults = allSuccessResults.concat(resolveSuccessResults);
if (resolveErrors?.length) {
throw new KibanaSOReferenceError(
`Encountered ${
resolveErrors.length
} errors resolving reference errors: ${formatImportErrorsForLog(resolveErrors)}`
);
}

return allSuccessResults;
allSuccessResults = allSuccessResults.concat(resolveSuccessResults);
}

return allSuccessResults;
}

// Filter out any reserved index patterns
Expand All @@ -498,3 +521,7 @@ export function toAssetReference({ id, type }: SavedObject) {

return reference;
}

function hasReferences(assetsToInstall: ArchiveAsset[]) {
return assetsToInstall.some((asset) => asset.references?.length);
}

0 comments on commit b7e66eb

Please sign in to comment.