Skip to content

Commit

Permalink
Merge pull request #1 from knocklabs/jazambuja_update_to_elixir_1.15
Browse files Browse the repository at this point in the history
PR to add the fix for > OTP26 lexmag#72
  • Loading branch information
christianjgreen authored Aug 14, 2024
2 parents cc3f93b + 3ec98c6 commit 98e2843
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 43 deletions.
9 changes: 4 additions & 5 deletions lib/statix.ex
Original file line number Diff line number Diff line change
Expand Up @@ -254,12 +254,12 @@ defmodule Statix do
"123"
"""
@callback measure(key, options, function :: (() -> result)) :: result when result: var
@callback measure(key, options, function :: (-> result)) :: result when result: var

@doc """
Same as `measure(key, [], function)`.
"""
@callback measure(key, function :: (() -> result)) :: result when result: var
@callback measure(key, function :: (-> result)) :: result when result: var

defmacro __using__(opts) do
current_statix =
Expand Down Expand Up @@ -359,11 +359,10 @@ defmodule Statix do
@doc false
def new(module, options) do
config = get_config(module, options)
conn = Conn.new(config.host, config.port)
header = IO.iodata_to_binary([conn.header | config.prefix])
conn = Conn.new(config.host, config.port, config.prefix)

%__MODULE__{
conn: %{conn | header: header},
conn: conn,
pool: build_pool(module, config.pool_size),
tags: config.tags
}
Expand Down
59 changes: 39 additions & 20 deletions lib/statix/conn.ex
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
defmodule Statix.Conn do
@moduledoc false

defstruct [:sock, :header]
defstruct [:sock, :address, :port, :prefix]

alias Statix.Packet

require Logger

def new(host, port) when is_binary(host) do
new(String.to_charlist(host), port)
otp_release = :erlang.system_info(:otp_release)
@otp_gte_26 otp_release >= ~c"26"

def new(host, port, prefix) when is_binary(host) do
new(String.to_charlist(host), port, prefix)
end

def new(host, port) when is_list(host) or is_tuple(host) do
def new(host, port, prefix) when is_list(host) or is_tuple(host) do
case :inet.getaddr(host, :inet) do
{:ok, address} ->
header = Packet.header(address, port)
%__MODULE__{header: header}
%__MODULE__{address: address, port: port, prefix: prefix}

{:error, reason} ->
raise(
Expand All @@ -30,34 +32,51 @@ defmodule Statix.Conn do
%__MODULE__{conn | sock: sock}
end

def transmit(%__MODULE__{header: header, sock: sock}, type, key, val, options)
def transmit(%__MODULE__{sock: sock, prefix: prefix} = conn, type, key, val, options)
when is_binary(val) and is_list(options) do
result =
header
prefix
|> Packet.build(type, key, val, options)
|> transmit(sock)
|> transmit(conn)

if result == {:error, :port_closed} do
with {:error, error} <- result do
Logger.error(fn ->
if(is_atom(sock), do: "", else: "Statix ") <>
"#{inspect(sock)} #{type} metric \"#{key}\" lost value #{val}" <> " due to port closure"
"#{inspect(sock)} #{type} metric \"#{key}\" lost value #{val}" <>
" error=#{inspect(error)}"
end)
end

result
end

defp transmit(packet, sock) do
try do
Port.command(sock, packet)
rescue
ArgumentError ->
defp transmit(packet, %__MODULE__{address: address, port: port, sock: sock_name}) do
sock = Process.whereis(sock_name)

# The check below was implemented for backwards compatibility to avoid a performance issue on
# OTP on versiones older than 26. If the OTP version is 26 or above, we use gen_udp.send/4. If
# the version is older than that, we use the existing performance issue workaround.
if @otp_gte_26 do
if sock do
:gen_udp.send(sock, address, port, packet)
else
{:error, :port_closed}
end
else
true ->
receive do
{:inet_reply, _port, status} -> status
end
# This branch will only be executed on older version of OTP and will be eventually removed.
packet_with_header = Packet.add_header(packet, address, port)

try do
Port.command(sock, packet_with_header)
rescue
ArgumentError ->
{:error, :port_closed}
else
true ->
receive do
{:inet_reply, _port, status} -> status
end
end
end
end
end
33 changes: 21 additions & 12 deletions lib/statix/packet.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@ defmodule Statix.Packet do

import Bitwise

def header({n1, n2, n3, n4}, port) do
@doc """
Adds header to a built packet.
This is implemented to keep backwards compatibility with older OTP versions
(< 26). Will be eventually removed.
"""
def add_header(built_packet, {n1, n2, n3, n4}, port) do
true = Code.ensure_loaded?(:gen_udp)

anc_data_part =
Expand All @@ -13,19 +18,23 @@ defmodule Statix.Packet do
[]
end

[
_addr_family = 1,
band(bsr(port, 8), 0xFF),
band(port, 0xFF),
band(n1, 0xFF),
band(n2, 0xFF),
band(n3, 0xFF),
band(n4, 0xFF)
] ++ anc_data_part
header =
[
_addr_family = 1,
band(bsr(port, 8), 0xFF),
band(port, 0xFF),
band(n1, 0xFF),
band(n2, 0xFF),
band(n3, 0xFF),
band(n4, 0xFF)
] ++ anc_data_part

header_as_bytes = Enum.into(header, <<>>, fn byte -> <<byte>> end)
[header_as_bytes | built_packet]
end

def build(header, name, key, val, options) do
[header, key, ?:, val, ?|, metric_type(name)]
def build(prefix, name, key, val, options) do
[prefix, key, ?:, val, ?|, metric_type(name)]
|> set_option(:sample_rate, options[:sample_rate])
|> set_option(:tags, options[:tags])
end
Expand Down
16 changes: 10 additions & 6 deletions test/statix_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ defmodule StatixTest do

defp close_port() do
%{pool: pool} = current_statix()
Enum.each(pool, &Port.close/1)

Enum.each(pool, fn module_name ->
sock = Process.whereis(module_name)
:gen_udp.close(sock)
end)
end

setup do
Expand Down Expand Up @@ -156,22 +160,22 @@ defmodule StatixTest do

assert capture_log(fn ->
assert {:error, :port_closed} == increment("sample")
end) =~ "counter metric \"sample\" lost value 1 due to port closure"
end) =~ "counter metric \"sample\" lost value 1 error=:port_closed\n\e[0m"

assert capture_log(fn ->
assert {:error, :port_closed} == decrement("sample")
end) =~ "counter metric \"sample\" lost value -1 due to port closure"
end) =~ "counter metric \"sample\" lost value -1 error=:port_closed\n\e[0m"

assert capture_log(fn ->
assert {:error, :port_closed} == gauge("sample", 2)
end) =~ "gauge metric \"sample\" lost value 2 due to port closure"
end) =~ "gauge metric \"sample\" lost value 2 error=:port_closed\n\e[0m"

assert capture_log(fn ->
assert {:error, :port_closed} == histogram("sample", 3)
end) =~ "histogram metric \"sample\" lost value 3 due to port closure"
end) =~ "histogram metric \"sample\" lost value 3 error=:port_closed\n\e[0m"

assert capture_log(fn ->
assert {:error, :port_closed} == timing("sample", 2.5)
end) =~ "timing metric \"sample\" lost value 2.5 due to port closure"
end) =~ "timing metric \"sample\" lost value 2.5 error=:port_closed\n\e[0m"
end
end
11 changes: 11 additions & 0 deletions test/support/test_server.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ defmodule Statix.TestServer do
@impl true
def init(port) do
{:ok, socket} = :gen_udp.open(port, [:binary, active: true])
Process.flag(:trap_exit, true)
{:ok, %{socket: socket, test: nil}}
end

Expand All @@ -20,13 +21,23 @@ defmodule Statix.TestServer do
end
end

@impl true
def handle_info({:EXIT, _pid, reason}, state) do
{:stop, reason, state}
end

@impl true
def handle_info({:udp, socket, host, port, packet}, %{socket: socket, test: test} = state) do
metadata = %{host: host, port: port, socket: socket}
send(test, {:test_server, metadata, packet})
{:noreply, state}
end

@impl true
def terminate(_reason, %{socket: socket}) do
:gen_udp.close(socket)
end

def setup(test_module) do
:ok = set_current_test(test_module, self())
ExUnit.Callbacks.on_exit(fn -> set_current_test(test_module, nil) end)
Expand Down

0 comments on commit 98e2843

Please sign in to comment.