Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.x] [Streams] Dashboard linking (#204309) #205842

Merged
merged 7 commits into from
Jan 10, 2025
Merged
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
@@ -1222,6 +1222,9 @@ x-pack/test_serverless/**/test_suites/observability/ai_assistant @elastic/obs-ai
/x-pack/solutions/observability/plugins/infra/server/routes/log_alerts @elastic/obs-ux-logs-team
/x-pack/solutions/observability/plugins/infra/server/routes/log_analysis @elastic/obs-ux-logs-team
/x-pack/solutions/observability/plugins/infra/server/services/rules @elastic/obs-ux-infra_services-team @elastic/obs-ux-logs-team
/x-pack/test/common/utils/synthtrace @elastic/obs-ux-infra_services-team @elastic/obs-ux-logs-team # Assigned per https://github.com/elastic/kibana/blob/main/packages/kbn-apm-synthtrace/kibana.jsonc#L5
/x-pack/test/common/utils/server_route_repository @elastic/obs-knowledge-team

# Infra Monitoring tests
/x-pack/test/api_integration/apis/infra @elastic/obs-ux-infra_services-team
/x-pack/test/functional/apps/infra @elastic/obs-ux-infra_services-team
Original file line number Diff line number Diff line change
@@ -9,6 +9,6 @@

import { get } from 'lodash';

export function isRequestAbortedError(error: unknown): error is Error {
export function isRequestAbortedError(error: unknown): error is Error & { name: 'AbortError' } {
return get(error, 'name') === 'AbortError';
}
Original file line number Diff line number Diff line change
@@ -213,7 +213,7 @@ type DecodedRequestParamsOfType<TRouteParamsRT extends RouteParamsRT> =
: never;

export type EndpointOf<TServerRouteRepository extends ServerRouteRepository> =
keyof TServerRouteRepository;
keyof TServerRouteRepository & string;

export type ReturnOf<
TServerRouteRepository extends ServerRouteRepository,
Original file line number Diff line number Diff line change
@@ -63,15 +63,11 @@ export function isStream(subject: any): subject is StreamDefinition {
return isSchema(streamDefintionSchema, subject);
}

export function isIngestStream(
subject: IngestStreamDefinition | WiredStreamDefinition
): subject is IngestStreamDefinition {
export function isIngestStream(subject: StreamDefinition): subject is IngestStreamDefinition {
return isSchema(ingestStreamDefinitonSchema, subject);
}

export function isWiredStream(
subject: IngestStreamDefinition | WiredStreamDefinition
): subject is WiredStreamDefinition {
export function isWiredStream(subject: StreamDefinition): subject is WiredStreamDefinition {
return isSchema(wiredStreamDefinitonSchema, subject);
}

Original file line number Diff line number Diff line change
@@ -14,6 +14,7 @@ export const ingestStreamDefinitonSchema = z
name: z.string(),
elasticsearch_assets: z.optional(elasticsearchAssetSchema),
stream: ingestStreamConfigDefinitonSchema,
dashboards: z.optional(z.array(z.string())),
})
.strict();

Original file line number Diff line number Diff line change
@@ -14,6 +14,7 @@ export const wiredStreamDefinitonSchema = z
name: z.string(),
elasticsearch_assets: z.optional(elasticsearchAssetSchema),
stream: wiredStreamConfigDefinitonSchema,
dashboards: z.optional(z.array(z.string())),
})
.strict();

Original file line number Diff line number Diff line change
@@ -7,6 +7,11 @@

import { schema } from '@kbn/config-schema';

const savedObjectReferenceSchema = schema.object({
type: schema.string(),
id: schema.string(),
});

export const findRulesOptionsSchema = schema.object(
{
perPage: schema.maybe(schema.number()),
@@ -19,10 +24,7 @@ export const findRulesOptionsSchema = schema.object(
sortField: schema.maybe(schema.string()),
sortOrder: schema.maybe(schema.oneOf([schema.literal('asc'), schema.literal('desc')])),
hasReference: schema.maybe(
schema.object({
type: schema.string(),
id: schema.string(),
})
schema.oneOf([savedObjectReferenceSchema, schema.arrayOf(savedObjectReferenceSchema)])
),
fields: schema.maybe(schema.arrayOf(schema.string())),
filter: schema.maybe(
Original file line number Diff line number Diff line change
@@ -24,7 +24,7 @@ import { esqlResultToPlainObjects } from '../esql_result_to_plain_objects';
type SearchRequest = ESSearchRequest & {
index: string | string[];
track_total_hits: number | boolean;
size: number | boolean;
size: number;
};

export interface EsqlOptions {
@@ -112,10 +112,12 @@ export function createObservabilityEsClient({
client,
logger,
plugin,
labels,
}: {
client: ElasticsearchClient;
logger: Logger;
plugin: string;
plugin?: string;
labels?: Record<string, string>;
}): ObservabilityElasticsearchClient {
// wraps the ES calls in a named APM span for better analysis
// (otherwise it would just eg be a _search span)
@@ -129,7 +131,8 @@ export function createObservabilityEsClient({
{
name: operationName,
labels: {
plugin,
...labels,
...(plugin ? { plugin } : {}),
},
},
callback,
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Storage adapter

Storage adapters are an abstraction for managing & writing data into Elasticsearch, from Kibana plugins.

There are several ways one can use Elasticsearch in Kibana, for instance:

- a simple id-based CRUD table
- timeseries data with regular indices
- timeseries data with data streams

But then there are many choices to be made that make this a very complex problem:

- Elasticsearch asset managmeent
- Authentication
- Schema changes
- Kibana's distributed nature
- Stateful versus serverless

The intent of storage adapters is to come up with an abstraction that allows Kibana developers to have a common interface for writing to and reading data from Elasticsearch. For instance, for setting up your data store, it should not matter how you authenticate (internal user? current user? API keys?).

## Saved objects

Some of these problems are solved by Saved Objects. But Saved Objects come with a lot of baggage - Kibana RBAC, relationships, spaces, all of which might not be
needed for your use case but are still restrictive. One could consider Saved Objects to be the target of an adapter, but Storage Adapters aim to address a wider set of use-cases.

## Philosophy

Storage adapters should largely adhere to the following principles:

- Interfaces are as close to Elasticsearch as possible. Meaning, the `search` method is practically a pass-through for `_search`.
- Strongly-typed. TypeScript types are inferred from the schema. This makes it easy to create fully-typed clients for any storage.
- Lazy writes. No Elasticsearch assets (templates, indices, aliases) get installed unless necessary. Anything that gets persisted to Elasticsearch raises questions (in SDHs, UIs, APIs) and should be avoided when possible. This also helps avoidable upgrade issues (e.g. conflicting mappings for something that never got used).
- Recoverable. If somehow Elasticsearch assets get borked, the adapters should make a best-effort attempt to recover, or log warnings with clear remediation steps.

## Future goals

Currently, we only have the StorageIndexAdapter which writes to plain indices. In the future, we'll want more:

- A StorageDataStreamAdapter or StorageSavedObjectAdapter
- Federated search
- Data/Index Lifecycle Management
- Migration scripts
- Runtime mappings for older versions

## Usage

### Storage index adapter

To use the storage index adapter, instantiate it with an authenticated Elasticsearch client:

```ts
const storageSettings = {
name: '.kibana_streams_assets',
schema: {
properties: {
[ASSET_ASSET_ID]: types.keyword({ required: true }),
[ASSET_TYPE]: types.enum(Object.values(ASSET_TYPES), { required: true }),
},
},
} satisfies IndexStorageSettings;

// create and configure the adapter
const adapter = new StorageIndexAdapter(
esClient: coreStart.elasticsearch.client.asInternalUser,
this.logger.get('assets'),
storageSettings
);

// get the client (its interface is shared across all adapters)
const client = adapter.getClient();

const response = await client.search('operation_name', {
track_total_hits: true
});

```
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import stringify from 'json-stable-stringify';
import objectHash from 'object-hash';
import { IndexStorageSettings } from '.';

export function getSchemaVersion(storage: IndexStorageSettings): string {
const version = objectHash(stringify(storage.schema.properties));
return version;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type {
BulkOperationContainer,
BulkRequest,
BulkResponse,
DeleteRequest,
DeleteResponse,
IndexRequest,
IndexResponse,
SearchRequest,
} from '@elastic/elasticsearch/lib/api/types';
import { InferSearchResponseOf } from '@kbn/es-types';
import { StorageFieldTypeOf, StorageMappingProperty } from './types';

interface StorageSchemaProperties {
[x: string]: StorageMappingProperty;
}

export interface StorageSchema {
properties: StorageSchemaProperties;
}

interface StorageSettingsBase {
schema: StorageSchema;
}

export interface IndexStorageSettings extends StorageSettingsBase {
name: string;
}

export type StorageSettings = IndexStorageSettings;

export type StorageAdapterSearchRequest = Omit<SearchRequest, 'index'>;
export type StorageAdapterSearchResponse<
TDocument,
TSearchRequest extends Omit<SearchRequest, 'index'>
> = InferSearchResponseOf<TDocument, TSearchRequest>;

export type StorageAdapterBulkOperation = Pick<BulkOperationContainer, 'delete' | 'index'>;

export type StorageAdapterBulkRequest<TDocument extends Record<string, any>> = Omit<
BulkRequest,
'operations' | 'index'
> & {
operations: Array<StorageAdapterBulkOperation | TDocument>;
};
export type StorageAdapterBulkResponse = BulkResponse;

export type StorageAdapterDeleteRequest = DeleteRequest;
export type StorageAdapterDeleteResponse = DeleteResponse;

export type StorageAdapterIndexRequest<TDocument = unknown> = Omit<
IndexRequest<TDocument>,
'index'
>;
export type StorageAdapterIndexResponse = IndexResponse;

export interface IStorageAdapter<TStorageSettings extends StorageSettings = never> {
bulk<TDocument extends Record<string, any>>(
request: StorageAdapterBulkRequest<TDocument>
): Promise<StorageAdapterBulkResponse>;
search<TDocument, TSearchRequest extends Omit<SearchRequest, 'index'>>(
request: StorageAdapterSearchRequest
): Promise<StorageAdapterSearchResponse<TDocument, TSearchRequest>>;
index<TDocument>(
request: StorageAdapterIndexRequest<TDocument>
): Promise<StorageAdapterIndexResponse>;
delete(request: StorageAdapterDeleteRequest): Promise<StorageAdapterDeleteResponse>;
}

export type StorageSettingsOf<TStorageAdapter extends IStorageAdapter<StorageSettings>> =
TStorageAdapter extends IStorageAdapter<infer TStorageSettings>
? TStorageSettings extends StorageSettings
? TStorageSettings
: never
: never;

export type StorageDocumentOf<TStorageSettings extends StorageSettings> = {
[TKey in keyof TStorageSettings['schema']['properties']]: StorageFieldTypeOf<
TStorageSettings['schema']['properties'][TKey]
>;
} & { _id: string };

export { StorageIndexAdapter } from './index_adapter';
export { StorageClient } from './storage_client';
export { types } from './types';
Loading