Skip to content

Commit

Permalink
Add Alert insights (#1878)
Browse files Browse the repository at this point in the history
  • Loading branch information
michaeljguarino authored Feb 10, 2025
1 parent f941814 commit 0989d18
Show file tree
Hide file tree
Showing 31 changed files with 626 additions and 30 deletions.
3 changes: 3 additions & 0 deletions assets/src/generated/graphql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ export type AiDelta = {
/** A representation of a LLM-derived insight */
export type AiInsight = {
__typename?: 'AiInsight';
alert?: Maybe<Alert>;
cluster?: Maybe<Cluster>;
clusterInsightComponent?: Maybe<ClusterInsightComponent>;
/** any errors generated when compiling this insight */
Expand Down Expand Up @@ -299,6 +300,8 @@ export type Alert = {
fingerprint?: Maybe<Scalars['String']['output']>;
id: Scalars['ID']['output'];
insertedAt?: Maybe<Scalars['DateTime']['output']>;
/** an insight explaining the state of this alert */
insight?: Maybe<AiInsight>;
message?: Maybe<Scalars['String']['output']>;
/** the project this alert was associated with */
project?: Maybe<Project>;
Expand Down
1 change: 1 addition & 0 deletions config/prod.exs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ config :console, Console.Cron.Scheduler,
{"*/5 * * * *", {Console.AI.Cron, :services, []}},
{"*/5 * * * *", {Console.AI.Cron, :stacks, []}},
{"*/5 * * * *", {Console.AI.Cron, :clusters, []}},
{"*/5 * * * *", {Console.AI.Cron, :alerts, []}},
{"30 * * * *", {Console.AI.Cron, :threads, []}},
{"0 0 1-31/2 * *", {Console.Deployments.Cron, :backfill_deprecations, []}},
{"*/15 * * * *", {Console.Deployments.Cron, :backfill_global_services, []}},
Expand Down
7 changes: 5 additions & 2 deletions go/client/models_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions lib/console/ai/cron.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule Console.AI.Cron do
alias Console.AI.{Worker, Chat}
alias Console.Deployments.Settings
alias Console.Schema.{
Alert,
AiInsight,
Stack,
Service,
Expand Down Expand Up @@ -64,6 +65,18 @@ defmodule Console.AI.Cron do
end)
end

def alerts() do
if_enabled(fn ->
Alert.firing()
|> Alert.ordered(asc: :id)
|> Repo.stream(method: :keyset)
|> Console.throttle()
|> Stream.chunk_every(@chunk)
|> Stream.map(&batch_insight(PubSub.AlertInsight, &1))
|> Stream.run()
end)
end

def chats() do
if_enabled(fn ->
ChatThread.with_expired_chats()
Expand Down
36 changes: 36 additions & 0 deletions lib/console/ai/evidence/alert.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
defimpl Console.AI.Evidence, for: Console.Schema.Alert do
use Console.AI.Evidence.Base
alias Console.AI.Evidence.{Logs, Context}
alias Console.Schema.{Alert, Service, Cluster}
alias Console.Repo

def generate(%Alert{state: :firing, service: %Service{} = service} = alert) do
[{:user, alert_prompt(alert)}]
|> Logs.with_logging(service, force: true, lines: 100)
|> Context.prompt({:user, "Please use the data I've listed above to give a clear root cause analysis of this issue."})
|> Context.result()
end
def generate(%Alert{state: :resolved}), do: {:error, "alert is already resolved"}
def generate(_), do: {:error, "insights only supported for service bound alerts"}

def preload(%Alert{} = alert), do: Repo.preload(alert, [:insight, service: :cluster])

def insight(%Alert{insight: insight}), do: insight

def custom(_), do: false

defp alert_prompt(%Alert{title: t, message: msg, service: %Service{name: s, cluster: %Cluster{name: n, distro: d}}}) do
"""
There is an alert firing on a workload within the #{distro(d)} kubernetes cluster named: #{n}. The workload itself
is deployed using the Plural service #{s}.
The title of the alert is: #{t}
The message of the alert is:
#{msg}
I'll list some of the logs related to this workload to help analyze the root cause.
"""
end
end
15 changes: 15 additions & 0 deletions lib/console/ai/evidence/component/certificate.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
defmodule Console.AI.Evidence.Component.Certificate do
use Console.AI.Evidence.Base

def hydrate(%Kube.Certificate{metadata: %MetaV1.ObjectMeta{namespace: ns, name: n}}) when is_binary(ns) do
Kube.Client.list_certificate_requests(ns)
|> default_empty(fn %Kube.CertificateRequest.List{items: requests} ->
Enum.filter(requests, fn
%Kube.CertificateRequest{metadata: %MetaV1.ObjectMeta{owner_references: [%{name: ^n} | _]}} -> true
_ -> false
end)
|> Enum.map(& {:user, "the certificate manages a set of certificate requests #{component(&1)} with current state:\n#{encode(&1)}"})
end)
end
def hydrate(_), do: {:ok, []}
end
4 changes: 3 additions & 1 deletion lib/console/ai/evidence/component/resource.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ defmodule Console.AI.Evidence.Component.Resource do
Ingress,
CronJob,
Job,
Raw
Raw,
Certificate
}
alias Console.Schema.ServiceComponent

Expand Down Expand Up @@ -81,6 +82,7 @@ defmodule Console.AI.Evidence.Component.Resource do
def hydrate(%NetworkingV1.Ingress{} = ing), do: Ingress.hydrate(ing)
def hydrate(%BatchV1.CronJob{} = cj), do: CronJob.hydrate(cj)
def hydrate(%BatchV1.Job{} = cj), do: Job.hydrate(cj)
def hydrate(%Kube.Certificate{} = cert), do: Certificate.hydrate(cert)
def hydrate(%{"metadata" => _} = raw), do: Raw.hydrate(raw)
def hydrate(_), do: {:ok, []}

Expand Down
26 changes: 18 additions & 8 deletions lib/console/ai/evidence/logs.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule Console.AI.Evidence.Logs do

require Logger

@base [query: "error fatal", limit: 10]
@base [query: "error fatal exception", limit: 10]
@format ~s({"timestamp": datetime, "log": string})

@preface """
Expand All @@ -19,10 +19,12 @@ defmodule Console.AI.Evidence.Logs do
"""

@spec with_logging(Provider.history, Service.t | ClusterInsightComponent.t) :: Context.t
def with_logging(history, parent) do
def with_logging(history, parent, opts \\ []) do
force = Keyword.get(opts, :force, false)
args = Keyword.take(opts, ~w(lines q)a)
with %DeploymentSettings{logging: %{enabled: true}} <- Settings.cached(),
{:ok, [%{logging: %{result: %Logging{required: true}}} | _]} <- Provider.tool_call(history, [Logging], preface: @preface),
{:ok, query} <- query(parent),
true <- use_logs?(history, force),
{:ok, query} <- query(parent, args),
{:ok, [_ | _] = logs} <- LogEngine.query(query) do
history
|> Context.new()
Expand All @@ -36,17 +38,25 @@ defmodule Console.AI.Evidence.Logs do
end
end

defp query(%Service{} = svc), do: build_query(svc, @base ++ [service_id: svc.id])
defp query(%ClusterInsightComponent{namespace: ns} = comp) when is_binary(ns) do
defp query(%Service{} = svc, args), do: build_query(svc, args ++ @base ++ [service_id: svc.id])
defp query(%ClusterInsightComponent{namespace: ns} = comp, args) when is_binary(ns) do
%{cluster: cluster} = Repo.preload(comp, [:cluster])
build_query(cluster, @base ++ [cluster_id: cluster.id, namespaces: [comp.namespace]])
build_query(cluster, args ++ @base ++ [cluster_id: cluster.id, namespaces: [comp.namespace]])
end
defp query(_), do: {:error, :invalid_parent}
defp query(_, _), do: {:error, :invalid_parent}

defp build_query(resource, args), do: {:ok, %{Query.new(args) | resource: resource}}

defp log_attrs(%Query{} = q, lines) do
Map.take(q, ~w(service_id cluster_id)a)
|> Map.put(:lines, Enum.map(lines, &Map.take(&1, ~w(timestamp log)a)))
end

defp use_logs?(_, true), do: true
defp use_logs?(history, _) do
case Provider.tool_call(history, [Logging], preface: @preface) do
{:ok, [%{logging: %{result: %Logging{required: true}}} | _]} -> true
_ -> false
end
end
end
6 changes: 4 additions & 2 deletions lib/console/ai/pubsub/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Console.AI.PubSub.Consumer do
max_demand: 10
import Console.Services.Base, only: [handle_notify: 2]
alias Console.PubSub
alias Console.Schema.{AiInsight, Service, Stack, StackState}
alias Console.Schema.{AiInsight, Service, Stack, StackState, Alert}
alias Console.AI.{PubSub.Insightful, Cron}
require Logger

Expand All @@ -19,13 +19,15 @@ defmodule Console.AI.PubSub.Consumer do
end

def maybe_send_event({:ok, insight} = res) do
case Console.Repo.preload(insight, [:stack, :service, :stack_state]) do
case Console.Repo.preload(insight, [:stack, :service, :stack_state, :alert]) do
%AiInsight{service: %Service{} = svc} ->
handle_notify(PubSub.ServiceInsight, {svc, insight})
%AiInsight{stack: %Stack{} = stack} ->
handle_notify(PubSub.StackInsight, {stack, insight})
%AiInsight{stack_state: %StackState{} = state} ->
handle_notify(PubSub.StackStateInsight, {state, insight})
%AiInsight{alert: %Alert{} = alert} ->
handle_notify(PubSub.AlertInsight, {alert, insight})
_ -> :ok
end
res
Expand Down
6 changes: 6 additions & 0 deletions lib/console/ai/pubsub/protocol.ex
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,9 @@ defimpl Console.AI.PubSub.Insightful, for: Console.PubSub.StackUpdated do
def resource(%@for{item: %Stack{status: :failed} = stack}), do: {:ok, stack}
def resource(_), do: :ok
end

defimpl Console.AI.PubSub.Insightful, for: Console.PubSub.AlertCreated do
alias Console.Schema.Alert
def resource(%@for{item: %Alert{state: :firing} = alert}), do: {:ok, alert}
def resource(_), do: :ok
end
8 changes: 7 additions & 1 deletion lib/console/deployments/cron.ex
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ defmodule Console.Deployments.Cron do
|> Repo.delete_all()
end

@concurrency [
ordered: false,
timeout: :timer.seconds(120),
on_timeout: :kill_task
]

def cache_warm() do
Task.async(fn -> Git.warm_helm_cache() end)
Cluster.stream()
Expand All @@ -79,7 +85,7 @@ defmodule Console.Deployments.Cron do
Logger.error "hit error trying to warm node caches for cluster=#{cluster.handle}"
Logger.error(Exception.format(:error, e, __STACKTRACE__))
end
end, max_concurrency: clamp(Clusters.count()), ordered: false, timeout: :timer.seconds(30), on_timeout: :kill_task)
end, [max_concurrency: clamp(Clusters.count())] ++ @concurrency)
|> Stream.run()
end

Expand Down
2 changes: 2 additions & 0 deletions lib/console/deployments/events.ex
Original file line number Diff line number Diff line change
Expand Up @@ -74,5 +74,7 @@ defmodule Console.PubSub.ServiceInsight, do: use Piazza.PubSub.Event
defmodule Console.PubSub.StackInsight, do: use Piazza.PubSub.Event
defmodule Console.PubSub.ClusterInsight, do: use Piazza.PubSub.Event
defmodule Console.PubSub.StackStateInsight, do: use Piazza.PubSub.Event
defmodule Console.PubSub.AlertInsight, do: use Piazza.PubSub.Event


defmodule Console.PubSub.AlertCreated, do: use Piazza.PubSub.Event
1 change: 0 additions & 1 deletion lib/console/deployments/observability.ex
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ defmodule Console.Deployments.Observability do
"""
@spec persist_alert([map]) :: {:ok, Alert.t} | error
def persist_alert(alerts) when is_list(alerts) do
IO.inspect(alerts)
Enum.with_index(alerts)
|> Enum.reduce(start_transaction(), fn
{%{fingerprint: fp} = attrs, ind}, acc when is_binary(fp) ->
Expand Down
17 changes: 17 additions & 0 deletions lib/console/deployments/pubsub/protocols/notifiable.ex
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,23 @@ defimpl Console.Deployments.PubSub.Notifiable, for: Console.PubSub.ClusterInsigh
def message(_), do: :ok
end

defimpl Console.Deployments.PubSub.Notifiable, for: Console.PubSub.AlertInsight do
alias Console.Deployments.Notifications.Utils
alias Console.Schema.AiInsight
require Logger

def message(%{item: {alert, %AiInsight{text: t, sha: sha} = insight}}) when byte_size(t) > 0 do
Utils.nested_dedupe([
{{:insight_sha, sha}, [ttl: :timer.hours(24)]},
{:alert_insight, alert.id}
], fn ->
alert = Console.Repo.preload(alert, [:service])
{"alert.insight", Utils.filters(alert), %{alert: alert, insight: insight, text: Utils.insight(insight)}}
end)
end
def message(_), do: :ok
end

defimpl Console.Deployments.PubSub.Notifiable, for: Console.PubSub.AlertCreated do
alias Console.Deployments.Notifications.Utils

Expand Down
6 changes: 5 additions & 1 deletion lib/console/graphql/ai.ex
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,13 @@ defmodule Console.GraphQl.AI do
field :sha, :string, description: "a deduplication sha for this insight"
field :text, :string, description: "the text of this insight"
field :summary, :string, description: "a shortish summary of this insight"
field :freshness, :insight_freshness, resolve: fn insight, _, _ -> {:ok, Console.Schema.AiInsight.freshness(insight)} end
field :error, list_of(:service_error), description: "any errors generated when compiling this insight"

field :freshness, :insight_freshness, resolve: fn insight, _, _ ->
{:ok, Console.Schema.AiInsight.freshness(insight)}
end

field :alert, :alert, resolve: dataloader(Deployments)
field :service, :service_deployment, resolve: dataloader(Deployments)
field :stack, :infrastructure_stack, resolve: dataloader(Deployments)
field :cluster, :cluster, resolve: dataloader(Deployments)
Expand Down
7 changes: 4 additions & 3 deletions lib/console/graphql/deployments/observability.ex
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,10 @@ defmodule Console.GraphQl.Deployments.Observability do

field :tags, list_of(:tag), resolve: dataloader(Deployments), description: "key/value tags to filter clusters"

field :cluster, :cluster, resolve: dataloader(Deployments), description: "the cluster this alert was associated with"
field :service, :service, resolve: dataloader(Deployments), description: "the service this alert was associated with"
field :project, :project, resolve: dataloader(Deployments), description: "the project this alert was associated with"
field :insight, :ai_insight, resolve: dataloader(Deployments), description: "an insight explaining the state of this alert"
field :cluster, :cluster, resolve: dataloader(Deployments), description: "the cluster this alert was associated with"
field :service, :service, resolve: dataloader(Deployments), description: "the service this alert was associated with"
field :project, :project, resolve: dataloader(Deployments), description: "the project this alert was associated with"

timestamps()
end
Expand Down
2 changes: 2 additions & 0 deletions lib/console/schema/ai_insight.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
defmodule Console.Schema.AiInsight do
use Piazza.Ecto.Schema
alias Console.Schema.{
Alert,
Service,
Stack,
Cluster,
Expand All @@ -24,6 +25,7 @@ defmodule Console.Schema.AiInsight do
field :message, :string
end

has_one :alert, Alert, foreign_key: :insight_id
has_one :service, Service, foreign_key: :insight_id
has_one :stack, Stack, foreign_key: :insight_id
has_one :cluster, Cluster, foreign_key: :insight_id
Expand Down
18 changes: 16 additions & 2 deletions lib/console/schema/alert.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
defmodule Console.Schema.Alert do
use Piazza.Ecto.Schema
alias Console.Schema.{Project, Cluster, Service, Tag, ObservabilityWebhook}
alias Console.Schema.{
Project,
Cluster,
Service,
Tag,
ObservabilityWebhook,
AiInsight
}

defenum Severity, low: 0, medium: 1, high: 2, critical: 3, undefined: 4
defenum State, firing: 0, resolved: 1
Expand All @@ -18,6 +25,7 @@ defmodule Console.Schema.Alert do
field :annotations, :map
field :url, :string

belongs_to :insight, AiInsight
belongs_to :project, Project
belongs_to :cluster, Cluster
belongs_to :service, Service
Expand All @@ -27,6 +35,10 @@ defmodule Console.Schema.Alert do
timestamps()
end

def firing(query \\ __MODULE__) do
from(a in query, where: a.state == :firing)
end

def for_service(query \\ __MODULE__, id) do
from(a in query, where: a.service_id == ^id)
end
Expand All @@ -50,15 +62,17 @@ defmodule Console.Schema.Alert do
end
def ordered(query, order), do: from(a in query, order_by: ^order)

@valid ~w(type severity state title message fingerprint annotations url project_id cluster_id service_id)a
@valid ~w(type severity state title message fingerprint annotations url project_id cluster_id insight_id service_id)a

def changeset(model, attrs) do
model
|> cast(attrs, @valid)
|> cast_assoc(:tags)
|> cast_assoc(:insight)
|> foreign_key_constraint(:project_id)
|> foreign_key_constraint(:cluster_id)
|> foreign_key_constraint(:service_id)
|> foreign_key_constraint(:insight_id)
|> validate_required(~w(type title state severity message fingerprint)a)
end
end
Loading

0 comments on commit 0989d18

Please sign in to comment.