Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add system monitor support #32

Merged
merged 8 commits into from
Feb 11, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
@@ -14,3 +14,7 @@ config :logger,
]

config :statix, port: 15310

config :instruments,
# Don't start the system monitor by default, the tests will start it
enable_sysmon: false
6 changes: 6 additions & 0 deletions lib/instruments/application.ex
Original file line number Diff line number Diff line change
@@ -18,6 +18,12 @@ defmodule Instruments.Application do
Probe.Supervisor,
]

children = if Application.get_env(:instruments, :enable_sysmon, false) do
[Instruments.Sysmon.Supervisor | children]
else
children
end

Supervisor.start_link(children, strategy: :one_for_one, name: Instruments.Supervisor)
end
end
81 changes: 81 additions & 0 deletions lib/sysmon/emitter.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
defmodule Instruments.Sysmon.Emitter do
@moduledoc """
The Emitter is a simple module that subscribes to the Reporter and will invoke
the corresponding handler on the Receiver.
"""

use GenServer

require Logger

alias Instruments.Sysmon.Reporter

defstruct [
receiver_module: nil
]

def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end

@doc """
Sets the receiver module to handle system monitor events. Receiver modules must implement the `Instruments.Sysmon.Receiver` behaviour.
"""
@spec set_receiver(term()) :: :ok
def set_receiver(receiver_module) do
GenServer.call(__MODULE__, {:set_receiver, receiver_module})
end

@impl true
def init(_) do
Reporter.subscribe()
{:ok, %__MODULE__{
receiver_module: Application.get_env(:instruments, :sysmon_receiver, Instruments.Sysmon.Receiver.Metrics)
}}
end


@impl true
def handle_call({:set_receiver, receiver_module}, _from, %__MODULE__{} = state) do
{:reply, :ok, %__MODULE__{state | receiver_module: receiver_module}}
end

@impl true
def handle_info({Reporter, event, data}, state) do
handle_event(state, event, data)
{:noreply, state}
end

def handle_info(unknown, state) do
Logger.error("Emitter received unknown message: #{inspect(unknown)}")
{:noreply, state}
end

defp handle_event(%__MODULE__{} = state, :busy_dist_port, %{pid: pid, port: port}) do
state.receiver_module.handle_busy_dist_port(pid, port)
end

defp handle_event(%__MODULE__{} = state, :busy_port, %{pid: pid, port: port}) do
state.receiver_module.handle_busy_port(pid, port)
end

defp handle_event(%__MODULE__{} = state, :long_gc, %{pid: pid, info: info}) do
state.receiver_module.handle_long_gc(pid, info)
end

defp handle_event(%__MODULE__{} = state, :long_message_queue, %{pid: pid, info: long}) do
state.receiver_module.handle_long_message_queue(pid, long)
end

defp handle_event(%__MODULE__{} = state, :long_schedule, %{pid: pid, info: info}) do
state.receiver_module.handle_long_schedule(pid, info)
end

defp handle_event(%__MODULE__{} = state, :large_heap, %{pid: pid, info: info}) do
state.receiver_module.handle_large_heap(pid, info)
end

defp handle_event(%__MODULE__{}, event, data) do
Logger.warn("Emitter received unknown event #{inspect(event)} with data #{inspect(data)}")
end
end
20 changes: 20 additions & 0 deletions lib/sysmon/receiver.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
defmodule Instruments.Sysmon.Receiver do
@moduledoc """
The Receiver behavior defines callbacks that are invoked by the Emitter in
response to system monitor events.
"""

@type info :: List.t({term(), term()})

@callback handle_busy_port(pid(), port()) :: :ok

@callback handle_busy_dist_port(pid(), port()) :: :ok

@callback handle_long_gc(pid(), info()) :: :ok

@callback handle_long_message_queue(pid(), boolean()) :: :ok

@callback handle_long_schedule(pid(), info()) :: :ok

@callback handle_large_heap(pid(), info()) :: :ok
end
43 changes: 43 additions & 0 deletions lib/sysmon/receiver/log.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
defmodule Instruments.Sysmon.Receiver.Log do
@moduledoc """
This module emits system monitor events
"""

@behaviour Instruments.Sysmon.Receiver

require Logger

@impl true
def handle_busy_port(pid, port) do
Logger.warn("Busy port: #{inspect(pid)} #{inspect(port)}")
end

@impl true
def handle_busy_dist_port(pid, port) do
Logger.warn("Busy dist port: #{inspect(pid)} #{inspect(port)}")
end

@impl true
def handle_long_gc(pid, info) do
Logger.warn("Long GC: #{inspect(pid)} #{inspect(info)}")
end

@impl true
def handle_long_message_queue(pid, long) do
if long do
Logger.warn("Long message queue: #{inspect(pid)}")
else
Logger.info("Long message queue resolved: #{inspect(pid)}")
end
end

@impl true
def handle_long_schedule(pid, info) do
Logger.warn("Long schedule: #{inspect(pid)} #{inspect(info)}")
end

@impl true
def handle_large_heap(pid, info) do
Logger.warn("Large heap: #{inspect(pid)} #{inspect(info)}")
end
end
38 changes: 38 additions & 0 deletions lib/sysmon/receiver/metrics.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
defmodule Instruments.Sysmon.Receiver.Metrics do
@moduledoc """
This module emits system monitor events
"""
require Instruments

@behaviour Instruments.Sysmon.Receiver

@impl true
def handle_busy_dist_port(_, _) do
Instruments.increment("erlang.sysmon.busy_dist_port")
end

@impl true
def handle_busy_port(_, _) do
Instruments.increment("erlang.sysmon.busy_port")
end

@impl true
def handle_long_gc(_, _) do
Instruments.increment("erlang.sysmon.long_gc")
end

@impl true
def handle_long_message_queue(_, long) do
Instruments.increment("erlang.sysmon.long_message_queue", tags: ["long:#{long}"])
end

@impl true
def handle_long_schedule(_, _) do
Instruments.increment("erlang.sysmon.long_schedule")
end

@impl true
def handle_large_heap(_, _) do
Instruments.increment("erlang.sysmon.large_heap")
end
end
187 changes: 187 additions & 0 deletions lib/sysmon/reporter.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
defmodule Instruments.Sysmon.Reporter do
@moduledoc """
Since only one process can subscribe to system monitor events, the Reporter
acts as a relay for system monitor events, allowing multiple subscribers to
receive system monitor events.

On startup, the Reporter will subscribe to the system monitor events
configured in `:sysmon_events` in the `:instruments` application environment.
If no events are configured, the Reporter will not subscribe to any events.
"""
use GenServer

require Logger

@type sysmon_event ::
{:long_gc, pos_integer()}
| {:long_schedule, pos_integer()}
| {:large_heap, pos_integer()}
| :busy_port
| :busy_dist_port

@type t :: %__MODULE__{
subscribers: %{reference() => pid()},
events: [sysmon_event()]
}

defstruct subscribers: Map.new(), events: []

def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end

@doc """
Subscribes the provided pid to configured system monitor events.
"""
@spec subscribe(pid()) :: :ok
def subscribe(pid \\ self()) do
GenServer.call(__MODULE__, {:subscribe, pid})
end

@doc """
Unsubscribes the provided pid from system monitor events.
"""
@spec unsubscribe(pid()) :: :ok
def unsubscribe(pid \\ self()) do
GenServer.call(__MODULE__, {:unsubscribe, pid})
end

@doc """
Sets the system monitor events to subscribe to. If no events are provided, the Reporter will not register itself as the system monitor process.
"""
@spec set_events([sysmon_event()]) :: :ok
def set_events(events) do
GenServer.call(__MODULE__, {:set_events, events})
end

@doc """
Returns the system monitor events the Reporter is subscribed to.
"""
@spec get_events() :: [sysmon_event()]
def get_events() do
GenServer.call(__MODULE__, :get_events)
end

@impl true
def init(_) do
sysmon_events = Application.get_env(:instruments, :sysmon_events, [])
enable_sysmon(sysmon_events)

{:ok,
%__MODULE__{
events: sysmon_events
}}
end

@impl true
def handle_call({:subscribe, pid}, _from, %__MODULE__{} = state) do
existing = Map.values(state.subscribers)

state =
if Enum.member?(existing, pid) do
state
else
ref = Process.monitor(pid)

%__MODULE__{
state
| subscribers: Map.put(state.subscribers, ref, pid)
}
end

{:reply, :ok, state}
end

def handle_call({:unsubscribe, pid}, _from, %__MODULE__{} = state) do
entries = Enum.filter(state.subscribers, fn {_, p} -> p == pid end)

state =
case entries do
[{ref, _pid}] ->
Process.demonitor(ref)

%__MODULE__{
state
| subscribers: Map.delete(state.subscribers, ref)
}

_ ->
state
end

{:reply, :ok, state}
end

def handle_call({:set_events, events}, _from, %__MODULE__{} = state) do
enable_sysmon(events)
{:reply, :ok, %__MODULE__{state | events: events}}
end

def handle_call(:get_events, _from, %__MODULE__{} = state) do
{:reply, state.events, state}
end

@impl true
def handle_info({:DOWN, ref, :process, _pid, _reason}, %__MODULE__{} = state) do
state = %__MODULE__{
state
| subscribers: Map.delete(state.subscribers, ref)
}

{:noreply, state}
end

def handle_info(msg, %__MODULE__{} = state) do
to_forward =
case msg do
{:monitor, pid, event, port} when event == :busy_dist_port or event == :busy_port ->
{__MODULE__, event,
%{
pid: pid,
port: port
}}

{:monitor, pid, event, info} ->
{__MODULE__, event,
%{
pid: pid,
info: info
}}

unknown ->
{__MODULE__, :unknown, unknown}
end

Enum.each(state.subscribers, fn {_, pid} -> send(pid, to_forward) end)
{:noreply, state}
end

defp enable_sysmon(nil) do
enable_sysmon([])
end

defp enable_sysmon([]) do
:ok
end

defp enable_sysmon(events) do
# Log if we're going to overwrite an existing system monitor
our_pid = self()

case :erlang.system_monitor() do
:undefined ->
# No system monitor is configured
:ok

{^our_pid, _} ->
# We are already receiving system monitor events
:ok

{pid, _} ->
# Another process is already receiving system monitor events, log a warning
Logger.warn("Overwriting system monitor process: #{inspect(pid)}")
end

:erlang.system_monitor(our_pid, events)
end
end
Loading