diff --git a/lib/statix.ex b/lib/statix.ex index 03a7bc2..818101f 100644 --- a/lib/statix.ex +++ b/lib/statix.ex @@ -254,12 +254,12 @@ defmodule Statix do "123" """ - @callback measure(key, options, function :: (() -> result)) :: result when result: var + @callback measure(key, options, function :: (-> result)) :: result when result: var @doc """ Same as `measure(key, [], function)`. """ - @callback measure(key, function :: (() -> result)) :: result when result: var + @callback measure(key, function :: (-> result)) :: result when result: var defmacro __using__(opts) do current_statix = @@ -359,11 +359,10 @@ defmodule Statix do @doc false def new(module, options) do config = get_config(module, options) - conn = Conn.new(config.host, config.port) - header = IO.iodata_to_binary([conn.header | config.prefix]) + conn = Conn.new(config.host, config.port, config.prefix) %__MODULE__{ - conn: %{conn | header: header}, + conn: conn, pool: build_pool(module, config.pool_size), tags: config.tags } diff --git a/lib/statix/conn.ex b/lib/statix/conn.ex index 757d572..6a7a425 100644 --- a/lib/statix/conn.ex +++ b/lib/statix/conn.ex @@ -1,21 +1,23 @@ defmodule Statix.Conn do @moduledoc false - defstruct [:sock, :header] + defstruct [:sock, :address, :port, :prefix] alias Statix.Packet require Logger - def new(host, port) when is_binary(host) do - new(String.to_charlist(host), port) + otp_release = :erlang.system_info(:otp_release) + @otp_gte_26 otp_release >= ~c"26" + + def new(host, port, prefix) when is_binary(host) do + new(String.to_charlist(host), port, prefix) end - def new(host, port) when is_list(host) or is_tuple(host) do + def new(host, port, prefix) when is_list(host) or is_tuple(host) do case :inet.getaddr(host, :inet) do {:ok, address} -> - header = Packet.header(address, port) - %__MODULE__{header: header} + %__MODULE__{address: address, port: port, prefix: prefix} {:error, reason} -> raise( @@ -30,34 +32,51 @@ defmodule Statix.Conn do %__MODULE__{conn | sock: sock} end - def transmit(%__MODULE__{header: header, sock: sock}, type, key, val, options) + def transmit(%__MODULE__{sock: sock, prefix: prefix} = conn, type, key, val, options) when is_binary(val) and is_list(options) do result = - header + prefix |> Packet.build(type, key, val, options) - |> transmit(sock) + |> transmit(conn) - if result == {:error, :port_closed} do + with {:error, error} <- result do Logger.error(fn -> if(is_atom(sock), do: "", else: "Statix ") <> - "#{inspect(sock)} #{type} metric \"#{key}\" lost value #{val}" <> " due to port closure" + "#{inspect(sock)} #{type} metric \"#{key}\" lost value #{val}" <> + " error=#{inspect(error)}" end) end result end - defp transmit(packet, sock) do - try do - Port.command(sock, packet) - rescue - ArgumentError -> + defp transmit(packet, %__MODULE__{address: address, port: port, sock: sock_name}) do + sock = Process.whereis(sock_name) + + # The check below was implemented for backwards compatibility to avoid a performance issue on + # OTP on versiones older than 26. If the OTP version is 26 or above, we use gen_udp.send/4. If + # the version is older than that, we use the existing performance issue workaround. + if @otp_gte_26 do + if sock do + :gen_udp.send(sock, address, port, packet) + else {:error, :port_closed} + end else - true -> - receive do - {:inet_reply, _port, status} -> status - end + # This branch will only be executed on older version of OTP and will be eventually removed. + packet_with_header = Packet.add_header(packet, address, port) + + try do + Port.command(sock, packet_with_header) + rescue + ArgumentError -> + {:error, :port_closed} + else + true -> + receive do + {:inet_reply, _port, status} -> status + end + end end end end diff --git a/lib/statix/packet.ex b/lib/statix/packet.ex index c2cdd6d..9518d91 100644 --- a/lib/statix/packet.ex +++ b/lib/statix/packet.ex @@ -3,7 +3,12 @@ defmodule Statix.Packet do import Bitwise - def header({n1, n2, n3, n4}, port) do + @doc """ + Adds header to a built packet. + This is implemented to keep backwards compatibility with older OTP versions + (< 26). Will be eventually removed. + """ + def add_header(built_packet, {n1, n2, n3, n4}, port) do true = Code.ensure_loaded?(:gen_udp) anc_data_part = @@ -13,19 +18,23 @@ defmodule Statix.Packet do [] end - [ - _addr_family = 1, - band(bsr(port, 8), 0xFF), - band(port, 0xFF), - band(n1, 0xFF), - band(n2, 0xFF), - band(n3, 0xFF), - band(n4, 0xFF) - ] ++ anc_data_part + header = + [ + _addr_family = 1, + band(bsr(port, 8), 0xFF), + band(port, 0xFF), + band(n1, 0xFF), + band(n2, 0xFF), + band(n3, 0xFF), + band(n4, 0xFF) + ] ++ anc_data_part + + header_as_bytes = Enum.into(header, <<>>, fn byte -> <> end) + [header_as_bytes | built_packet] end - def build(header, name, key, val, options) do - [header, key, ?:, val, ?|, metric_type(name)] + def build(prefix, name, key, val, options) do + [prefix, key, ?:, val, ?|, metric_type(name)] |> set_option(:sample_rate, options[:sample_rate]) |> set_option(:tags, options[:tags]) end diff --git a/test/statix_test.exs b/test/statix_test.exs index 6528f1a..077f0d9 100644 --- a/test/statix_test.exs +++ b/test/statix_test.exs @@ -9,7 +9,11 @@ defmodule StatixTest do defp close_port() do %{pool: pool} = current_statix() - Enum.each(pool, &Port.close/1) + + Enum.each(pool, fn module_name -> + sock = Process.whereis(module_name) + :gen_udp.close(sock) + end) end setup do @@ -156,22 +160,22 @@ defmodule StatixTest do assert capture_log(fn -> assert {:error, :port_closed} == increment("sample") - end) =~ "counter metric \"sample\" lost value 1 due to port closure" + end) =~ "counter metric \"sample\" lost value 1 error=:port_closed\n\e[0m" assert capture_log(fn -> assert {:error, :port_closed} == decrement("sample") - end) =~ "counter metric \"sample\" lost value -1 due to port closure" + end) =~ "counter metric \"sample\" lost value -1 error=:port_closed\n\e[0m" assert capture_log(fn -> assert {:error, :port_closed} == gauge("sample", 2) - end) =~ "gauge metric \"sample\" lost value 2 due to port closure" + end) =~ "gauge metric \"sample\" lost value 2 error=:port_closed\n\e[0m" assert capture_log(fn -> assert {:error, :port_closed} == histogram("sample", 3) - end) =~ "histogram metric \"sample\" lost value 3 due to port closure" + end) =~ "histogram metric \"sample\" lost value 3 error=:port_closed\n\e[0m" assert capture_log(fn -> assert {:error, :port_closed} == timing("sample", 2.5) - end) =~ "timing metric \"sample\" lost value 2.5 due to port closure" + end) =~ "timing metric \"sample\" lost value 2.5 error=:port_closed\n\e[0m" end end diff --git a/test/support/test_server.exs b/test/support/test_server.exs index d175bf8..a51df16 100644 --- a/test/support/test_server.exs +++ b/test/support/test_server.exs @@ -8,6 +8,7 @@ defmodule Statix.TestServer do @impl true def init(port) do {:ok, socket} = :gen_udp.open(port, [:binary, active: true]) + Process.flag(:trap_exit, true) {:ok, %{socket: socket, test: nil}} end @@ -20,6 +21,11 @@ defmodule Statix.TestServer do end end + @impl true + def handle_info({:EXIT, _pid, reason}, state) do + {:stop, reason, state} + end + @impl true def handle_info({:udp, socket, host, port, packet}, %{socket: socket, test: test} = state) do metadata = %{host: host, port: port, socket: socket} @@ -27,6 +33,11 @@ defmodule Statix.TestServer do {:noreply, state} end + @impl true + def terminate(_reason, %{socket: socket}) do + :gen_udp.close(socket) + end + def setup(test_module) do :ok = set_current_test(test_module, self()) ExUnit.Callbacks.on_exit(fn -> set_current_test(test_module, nil) end)