Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🌊 Streams: Show data retention on stream #204125

Merged
merged 20 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
aa49c96
show data retention on stream
flash1293 Dec 12, 2024
b4cc7a2
[CI] Auto-commit changed files from 'node scripts/lint_ts_projects --…
kibanamachine Dec 12, 2024
7a4dcd2
Merge remote-tracking branch 'upstream/main' into flash1293/streams-d…
flash1293 Dec 17, 2024
818793d
maybe fix things
flash1293 Dec 17, 2024
de1c7e7
maybe fix things
flash1293 Dec 17, 2024
67edb2b
Merge remote-tracking branch 'upstream/main' into flash1293/streams-d…
flash1293 Dec 19, 2024
5623c43
clean up stuff
flash1293 Dec 19, 2024
5948b4f
[CI] Auto-commit changed files from 'node scripts/notice'
kibanamachine Dec 19, 2024
3614b60
[CI] Auto-commit changed files from 'node scripts/yarn_deduplicate'
kibanamachine Dec 19, 2024
59c24f2
[CI] Auto-commit changed files from 'node scripts/eslint --no-cache -…
kibanamachine Dec 19, 2024
0314451
some fixes
flash1293 Dec 19, 2024
ff8e23a
Merge branch 'flash1293/streams-data-retention' of github.com:flash12…
flash1293 Dec 19, 2024
74a1355
fix tests
flash1293 Dec 20, 2024
2bd1785
Update kibana.jsonc
flash1293 Dec 29, 2024
f39cf0b
Merge remote-tracking branch 'upstream/main' into flash1293/streams-d…
flash1293 Dec 29, 2024
86b1c52
fix things
flash1293 Dec 29, 2024
d9334c3
Merge branch 'flash1293/streams-data-retention' of github.com:flash12…
flash1293 Dec 29, 2024
1f88212
Merge branch 'main' into flash1293/streams-data-retention
elasticmachine Dec 30, 2024
db287c7
Merge remote-tracking branch 'upstream/main' into flash1293/streams-d…
flash1293 Jan 8, 2025
5bb5bf8
fix tests
flash1293 Jan 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions x-pack/packages/kbn-streams-schema/src/models/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,10 @@ export const elasticsearchAssetSchema = z.array(
);

export type ElasticsearchAsset = z.infer<typeof elasticsearchAssetSchema>;

export const lifecycleSchema = z.discriminatedUnion('type', [
z.object({ type: z.literal('dlm'), data_retention: z.optional(z.string()) }),
z.object({ type: z.literal('ilm'), policy: z.string() }),
]);

export type StreamLifecycle = z.infer<typeof lifecycleSchema>;
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@

import { z } from '@kbn/zod';
import { ingestStreamDefinitonSchema } from '../streams';
import { inheritedFieldDefinitionSchema } from '../common';
import { inheritedFieldDefinitionSchema, lifecycleSchema } from '../common';

export const ingestReadStreamDefinitonSchema = ingestStreamDefinitonSchema
.extend({
inherited_fields: inheritedFieldDefinitionSchema.default({}),
lifecycle: lifecycleSchema,
})
.strict();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@

import { z } from '@kbn/zod';
import { wiredStreamDefinitonSchema } from '../streams';
import { inheritedFieldDefinitionSchema } from '../common';
import { inheritedFieldDefinitionSchema, lifecycleSchema } from '../common';

export const wiredReadStreamDefinitonSchema = wiredStreamDefinitonSchema
.extend({
inherited_fields: inheritedFieldDefinitionSchema.default({}),
lifecycle: lifecycleSchema,
})
.strict();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@

export const ILM_LOCATOR_ID = 'ILM_LOCATOR_ID';
export * from './src/policies';
export * from './src/locator';
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { SerializableRecord } from '@kbn/utility-types';

export interface IlmLocatorParams extends SerializableRecord {
page: 'policies_list' | 'policy_edit' | 'policy_create';
policyName?: string;
}
Comment on lines +9 to +12
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: should this go into src/platform/packages/shared/deeplinks/management as other many other apps hold there the locator params? That would let you revert the visibility change for index-lifecycle-management plugin.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that's the right place - it looks like this is just for the top level app routing, not for specific locators within these apps.

As not all management apps are owned by the same team, I would like to keep this in a place owned by stack management as they will probably change the locator and the app itself at the same time.

But happy to go with what @elastic/kibana-management prefers here. If you want to go with a more elaborate solution I can also duplicate and inline the type for now and leave a todo, as it's more of a side quest of this PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, the comment was not meant to be a blocker but to avoid switching visibility on a plugin that would then be mistakenly located in the new dir structure. I'll defer to what the Kibana team feels is best for this case, not a blocker on my end.

Copy link
Contributor

@mattkime mattkime Dec 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to avoid changing the visibility of this plugin although at the moment I'm not sure of the best way to do that. Is the locator API a good option?


After a bit more research, I think moving the plugin is the right call. Its probably best to use the locator API but I haven't fully wrapped my head around it.

If the plugin needs to be moved, that could happen in its own PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I think there is an easier way - check the latest version of this. There is a shared package under x-pack/platform/packages/shared/index-lifecycle-management already, I moved the type over there.

Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@
"target/**/*"
],
"kbn_references": [
"@kbn/utility-types",
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,3 @@ import { IndexLifecycleManagementPlugin } from './plugin';
export const plugin = (initializerContext: PluginInitializerContext) => {
return new IndexLifecycleManagementPlugin(initializerContext);
};

export type { IlmLocatorParams } from './locator';
export { ILM_LOCATOR_ID } from './locator';
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@
* 2.0.
*/

import type { SerializableRecord } from '@kbn/utility-types';
import { ManagementAppLocator } from '@kbn/management-plugin/common';
import { LocatorDefinition } from '@kbn/share-plugin/public';
import { ILM_LOCATOR_ID } from '@kbn/index-lifecycle-management-common-shared';
import { ILM_LOCATOR_ID, IlmLocatorParams } from '@kbn/index-lifecycle-management-common-shared';
import {
getPoliciesListPath,
getPolicyCreatePath,
Expand All @@ -18,11 +17,6 @@ import { PLUGIN } from '../common/constants';

export { ILM_LOCATOR_ID };

export interface IlmLocatorParams extends SerializableRecord {
page: 'policies_list' | 'policy_edit' | 'policy_create';
policyName?: string;
}

export interface IlmLocatorDefinitionDependencies {
managementAppLocator: ManagementAppLocator;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
"@kbn/test-jest-helpers",
"@kbn/core-http-browser-mocks",
"@kbn/i18n",
"@kbn/utility-types",
"@kbn/analytics",
"@kbn/es-ui-shared-plugin",
"@kbn/i18n-react",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
} from '@elastic/elasticsearch/lib/api/types';
import { WiredStreamDefinition } from '@kbn/streams-schema';
import { ASSET_VERSION } from '../../../../common/constants';
import { logsSettings } from './logs_layer';
import { logsSettings, logsLifecycle } from './logs_layer';
import { isRoot } from '../helpers/hierarchy';
import { getComponentTemplateName } from './name';

Expand All @@ -38,6 +38,7 @@ export function generateLayer(
name: getComponentTemplateName(id),
template: {
settings: isRoot(definition.name) ? logsSettings : {},
lifecycle: isRoot(definition.name) ? logsLifecycle : undefined,
mappings: {
subobjects: false,
dynamic: false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
* 2.0.
*/

import { IndicesIndexSettings } from '@elastic/elasticsearch/lib/api/types';
import {
IndicesIndexSettings,
IndicesDataStreamLifecycle,
} from '@elastic/elasticsearch/lib/api/types';

export const logsSettings: IndicesIndexSettings = {
index: {
lifecycle: {
name: 'logs',
},
mode: 'logsdb',
codec: 'best_compression',
mapping: {
total_fields: {
Expand All @@ -21,3 +22,5 @@ export const logsSettings: IndicesIndexSettings = {
},
},
};

export const logsLifecycle: IndicesDataStreamLifecycle = {};
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ import {
ListStreamsResponse,
isWiredStream,
FieldDefinition,
StreamLifecycle,
ReadStreamDefinition,
IngestReadStreamDefinition,
} from '@kbn/streams-schema';
import { omit } from 'lodash';
import { STREAMS_INDEX } from '../../../common/constants';
Expand Down Expand Up @@ -63,8 +66,9 @@ export async function deleteUnmanagedStreamObjects({
scopedClusterClient,
logger,
}: DeleteStreamParams) {
const dataStream = await getDataStream({ name: id, scopedClusterClient });
const unmanagedAssets = await getUnmanagedElasticsearchAssets({
name: id,
dataStream,
scopedClusterClient,
});
const pipelineName = unmanagedAssets.find((asset) => asset.type === 'ingest_pipeline')?.id;
Expand Down Expand Up @@ -152,7 +156,7 @@ async function upsertInternalStream({ definition, scopedClusterClient }: BasePar
return scopedClusterClient.asInternalUser.index({
id: definition.name,
index: STREAMS_INDEX,
document: { ...omit(definition, 'elasticsearch_assets') },
document: { ...omit(definition, 'elasticsearch_assets', 'inherited_fields') },
refresh: 'wait_for',
});
}
Expand All @@ -169,7 +173,9 @@ export async function listStreams({
});

const dataStreams = await listDataStreamsAsStreams({ scopedClusterClient });
let definitions = response.hits.hits.map((hit) => ({ ...hit._source! }));
let definitions: StreamDefinition[] = response.hits.hits.map((hit) => ({
...hit._source!,
}));
const hasAccess = await Promise.all(
definitions.map((definition) => checkReadAccess({ id: definition.name, scopedClusterClient }))
);
Expand All @@ -188,10 +194,28 @@ export async function listStreams({
};
}

function getDataStreamLifecycle(dataStream: IndicesDataStream): StreamLifecycle {
if (
dataStream.ilm_policy &&
(!dataStream.lifecycle || typeof dataStream.prefer_ilm === 'undefined' || dataStream.prefer_ilm)
) {
return {
type: 'ilm',
policy: dataStream.ilm_policy,
};
}
return {
type: 'dlm',
data_retention: dataStream.lifecycle?.data_retention
? String(dataStream.lifecycle.data_retention)
: undefined,
};
}

export async function listDataStreamsAsStreams({
scopedClusterClient,
}: ListStreamsParams): Promise<IngestStreamDefinition[]> {
const response = await scopedClusterClient.asInternalUser.indices.getDataStream();
const response = await scopedClusterClient.asCurrentUser.indices.getDataStream();
return response.data_streams
.filter((dataStream) => dataStream.template.endsWith('@stream') === false)
.map((dataStream) => ({
Expand All @@ -214,7 +238,7 @@ export async function readStream({
id,
scopedClusterClient,
skipAccessCheck,
}: ReadStreamParams): Promise<StreamDefinition> {
}: ReadStreamParams): Promise<ReadStreamDefinition> {
try {
const response = await scopedClusterClient.asInternalUser.get<StreamDefinition>({
id,
Expand All @@ -227,7 +251,12 @@ export async function readStream({
throw new DefinitionNotFound(`Stream definition for ${id} not found.`);
}
}
return definition;
const dataStream = await getDataStream({ name: id, scopedClusterClient });
return {
...definition,
inherited_fields: {},
lifecycle: getDataStreamLifecycle(dataStream),
};
} catch (e) {
if (e.meta?.statusCode === 404) {
return readDataStreamAsStream({ id, scopedClusterClient, skipAccessCheck });
Expand All @@ -237,8 +266,11 @@ export async function readStream({
}

export async function readDataStreamAsStream({ id, scopedClusterClient }: ReadStreamParams) {
const definition: IngestStreamDefinition = {
const dataStream = await getDataStream({ name: id, scopedClusterClient });
const definition: IngestReadStreamDefinition = {
name: id,
lifecycle: getDataStreamLifecycle(dataStream),
inherited_fields: {},
stream: {
ingest: {
routing: [],
Expand All @@ -248,36 +280,43 @@ export async function readDataStreamAsStream({ id, scopedClusterClient }: ReadSt
};

definition.elasticsearch_assets = await getUnmanagedElasticsearchAssets({
name: id,
dataStream,
scopedClusterClient,
});

return definition;
}

interface ReadUnmanagedAssetsParams extends BaseParams {
name: string;
dataStream: IndicesDataStream;
}

async function getUnmanagedElasticsearchAssets({
async function getDataStream({
name,
scopedClusterClient,
}: ReadUnmanagedAssetsParams) {
let dataStream: IndicesDataStream;
}: {
name: string;
scopedClusterClient: IScopedClusterClient;
}) {
try {
const response = await scopedClusterClient.asInternalUser.indices.getDataStream({ name });
dataStream = response.data_streams[0];
const response = await scopedClusterClient.asCurrentUser.indices.getDataStream({ name });
return response.data_streams[0];
} catch (e) {
if (e.meta?.statusCode === 404) {
throw new DefinitionNotFound(`Stream definition for ${name} not found.`);
}
throw e;
}
}

async function getUnmanagedElasticsearchAssets({
dataStream,
scopedClusterClient,
}: ReadUnmanagedAssetsParams) {
// retrieve linked index template, component template and ingest pipeline
const templateName = dataStream.template;
const componentTemplates: string[] = [];
const template = await scopedClusterClient.asInternalUser.indices.getIndexTemplate({
const template = await scopedClusterClient.asCurrentUser.indices.getIndexTemplate({
name: templateName,
});
if (template.index_templates.length) {
Expand All @@ -286,7 +325,7 @@ async function getUnmanagedElasticsearchAssets({
});
}
const writeIndexName = dataStream.indices.at(-1)?.index_name!;
const currentIndex = await scopedClusterClient.asInternalUser.indices.get({
const currentIndex = await scopedClusterClient.asCurrentUser.indices.get({
index: writeIndexName,
});
const ingestPipelineId = currentIndex[writeIndexName].settings?.index?.default_pipeline!;
Expand All @@ -306,7 +345,7 @@ async function getUnmanagedElasticsearchAssets({
},
{
type: 'data_stream' as const,
id: name,
id: dataStream.name,
},
];
}
Expand Down Expand Up @@ -533,8 +572,9 @@ async function syncUnmanagedStream({ scopedClusterClient, definition }: SyncStre
if (definition.stream.ingest.routing.length) {
throw new Error('Unmanaged streams cannot have managed children, coming soon');
}
const dataStream = await getDataStream({ name: definition.name, scopedClusterClient });
const unmanagedAssets = await getUnmanagedElasticsearchAssets({
name: definition.name,
dataStream,
scopedClusterClient,
});
const executionPlan: ExecutionPlanStep[] = [];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { z } from '@kbn/zod';
import { IScopedClusterClient } from '@kbn/core-elasticsearch-server';
import { Logger } from '@kbn/logging';
import { badRequest, internal, notFound } from '@hapi/boom';
import { isWiredStream } from '@kbn/streams-schema';
import { isWiredReadStream } from '@kbn/streams-schema';
import {
DefinitionNotFound,
ForkConditionMissing,
Expand Down Expand Up @@ -80,7 +80,7 @@ export async function deleteStream(
) {
try {
const definition = await readStream({ scopedClusterClient, id });
if (!isWiredStream(definition)) {
if (!isWiredReadStream(definition)) {
await deleteUnmanagedStreamObjects({ scopedClusterClient, id, logger });
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import { z } from '@kbn/zod';
import { badRequest, internal, notFound } from '@hapi/boom';
import { conditionSchema, isWiredStream, WiredStreamDefinition } from '@kbn/streams-schema';
import { conditionSchema, isWiredReadStream, WiredStreamDefinition } from '@kbn/streams-schema';
import {
DefinitionNotFound,
ForkConditionMissing,
Expand Down Expand Up @@ -58,7 +58,7 @@ export const forkStreamsRoute = createServerRoute({
id: params.path.id,
});

if (!isWiredStream(rootDefinition)) {
if (!isWiredReadStream(rootDefinition)) {
throw new MalformedStreamId('Cannot fork a stream that is not managed');
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import { z } from '@kbn/zod';
import { notFound, internal } from '@hapi/boom';
import {
FieldDefinitionConfig,
isIngestStream,
isWiredStream,
isIngestReadStream,
isWiredReadStream,
ReadStreamDefinition,
} from '@kbn/streams-schema';
import { createServerRoute } from '../create_server_route';
Expand Down Expand Up @@ -42,7 +42,7 @@ export const readStreamRoute = createServerRoute({

// TODO: I have no idea why I can just do `isIngestStream` here but when I do,
// streamEntity becomes `streamEntity: never` in the statements afterwards
if (!isWiredStream(streamEntity) && isIngestStream(streamEntity)) {
if (!isWiredReadStream(streamEntity) && isIngestReadStream(streamEntity)) {
return {
...streamEntity,
inherited_fields: {},
Expand Down
Loading