Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extra formatting of logstash reports as json data. #13

Merged
merged 4 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
otp: [22.3, 23.3, 24.2]
otp: [24.3, 25, 26]
rebar: [3.18.0]
steps:
- uses: actions/checkout@v2
Expand Down
27 changes: 0 additions & 27 deletions .github/workflows/hex.yaml

This file was deleted.

31 changes: 19 additions & 12 deletions src/logstasher.erl
Original file line number Diff line number Diff line change
Expand Up @@ -59,26 +59,33 @@ start_link() ->
%% Supervisor callbacks
%%==============================================================================

-spec init(term()) -> {ok, maps:map()} | {stop, maps:map()}.
-type state() :: #{
transport := module(),
host := string() | binary() | tuple(),
port := pos_integer(),
socket => gen_udp:socket() | gen_tcp:socket() | undefined
}.

-spec init(_) -> {ok, state()}.
init(_) ->
Transport = application:get_env(?MODULE, transport, ?LOGSTASH_TRANSPORT),
Host = application:get_env(?MODULE, host, ?LOGSTASH_HOST),
Port = application:get_env(?MODULE, port, ?LOGSTASH_PORT),
Opts = #{transport => Transport, host => Host, port => Port},
State = Opts#{socket => connect(Opts)},
Opts = #{ transport => Transport, host => Host, port => Port },
State = Opts#{ socket => connect(Opts) },
{ok, State}.

-spec handle_call({send, binary()}, any(), maps:map()) ->
{reply, ok | {error, atom() | {timeout, binary()}}, maps:map()}.
-spec handle_call({send, binary()}, _, state()) ->
{reply, ok | {error, atom() | {timeout, binary()}}, state()}.
handle_call({send, Data}, _, State) ->
Result = maybe_send(Data, State),
{reply, Result, State}.

-spec handle_cast(term(), maps:map()) -> {noreply, maps:map()}.
-spec handle_cast(_, state()) -> {noreply, state()}.
handle_cast(_, State) ->
{noreply, State}.

-spec terminate(term(), maps:map()) -> ok.
-spec terminate(_, state()) -> ok.
terminate(_, #{transport := tcp, socket := Socket}) ->
gen_tcp:close(Socket);
terminate(_, #{transport := udp, socket := Socket}) ->
Expand All @@ -90,7 +97,7 @@ terminate(_, #{transport := console}) ->
%% Internal functions
%%==============================================================================

-spec connect(maps:map()) -> gen_udp:socket() | gen_tcp:socket() | undefined.
-spec connect(state()) -> gen_udp:socket() | gen_tcp:socket() | undefined.
connect(#{transport := tcp, host := Host, port := Port}) ->
Opts = [binary, {active, false}, {keepalive, true}],
case gen_tcp:connect(Host, Port, Opts, ?TCP_CONNECT_TIMEOUT) of
Expand All @@ -112,7 +119,7 @@ connect(#{transport := udp}) ->
connect(#{transport := console}) ->
undefined.

-spec maybe_send(binary(), maps:map()) -> ok | {error, atom()}.
-spec maybe_send(binary(), map()) -> ok | {error, atom()}.
maybe_send(Data, #{transport := console} = State) ->
send(Data, State);
maybe_send(Data, #{socket := undefined} = State) ->
Expand All @@ -124,12 +131,12 @@ maybe_send(Data, State) ->
{error, _} = Error -> Error
end.

-spec send(binary(), maps:map()) -> ok | {error, atom()}.
-spec send(binary(), map()) -> ok | {error, atom()}.
send(Data, #{transport := console}) ->
io:put_chars([ Data, "\n"]);
send(_Data, #{socket := undefined}) ->
{error, closed};
send(Data, #{transport := tcp, socket := Socket}) ->
gen_tcp:send(Socket, Data);
gen_tcp:send(Socket, [ Data, "\n" ]);
send(Data, #{transport := udp, socket := Socket, host := Host, port := Port}) ->
gen_udp:send(Socket, Host, Port, Data).
gen_udp:send(Socket, Host, Port, [ Data, "\n" ]).
186 changes: 156 additions & 30 deletions src/logstasher_h.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,14 @@
%% logger callbacks
-export([log/2]).

%% Testing
-export([log_data/1]).

%% Xref ignores
-ignore_xref([log/2]).
-ignore_xref([log/2, log_data/1]).

%% Truncate binary values beyond this size.
-define(LOG_BINARY_SIZE, 2000).

%%==============================================================================
%% API
Expand All @@ -14,61 +20,181 @@
log(#{level := info, meta := #{error_logger := #{type := progress}}}, _Config) ->
% Ignore supervisor progress reports
ok;
log(#{level := Level, msg := EventData, meta := Meta}, _Config) ->
{Msg, MsgFields} = format_msg(EventData),
Fields = [{severity, Level}] ++ safe_meta(Meta) ++ MsgFields,
Data = #{fields => Fields, '@timestamp' => format_timestamp(Meta), message => Msg},
_ = logstasher:send(jsx:encode(Data)),
ok.
log(LogEvent, _Config) ->
try
Data = log_data(LogEvent),
_ = logstasher:send(jsx:encode(Data)),
ok
catch
_:_ ->
% Ignore crashes on unexpected data, as that would remove the log
% handler from the logger and stop logging.
ok
end.

%%==============================================================================
%% Internal functions
%%==============================================================================

-spec format_msg(Data) -> {binary(), [{binary() | atom(), jsx:json_term()}]} when
Data :: {io:format(), [term()]}
| {report, logger:report()}
| {string, unicode:chardata()}.
log_data(#{level := Level, msg := EventData, meta := Meta}) ->
{Msg, MsgFields} = format_msg(EventData),
{Msg1, MsgFields1} = maybe_extract_message(Msg, MsgFields),
Fields = maps:merge(safe_meta(Meta), MsgFields1),
Fields1 = Fields#{ severity => Level },
#{
fields => Fields1,
'@timestamp' => format_timestamp(Meta),
message => Msg1
}.

%% @doc If there is no message, try to extract the 'text' fields from the message fields
%% and use that as the message.
maybe_extract_message(null, #{ text := Text } = MsgFields) when is_binary(Text) ->
{Text, maps:remove(text, MsgFields)};
maybe_extract_message(Msg, MsgFields) ->
{Msg, MsgFields}.


-spec format_msg(Data) -> {Message, #{ Key => Value } } when
Data :: {io:format(), [ term() ]}
| {report, logger:report()}
| {string, unicode:chardata()},
Message :: binary() | null,
Key :: binary() | atom(),
Value :: jsx:json_term().
format_msg({string, Message}) ->
{unicode:characters_to_binary(Message), []};
{unicode:characters_to_binary(Message), #{}};
format_msg({report, Report}) when is_map(Report) ->
format_msg({report, maps:to_list(Report)});
{maps:get(msg, Report, null), safe_fields(Report)};
format_msg({report, Report}) when is_list(Report) ->
{proplists:get_value(msg, Report, null), safe_fields(Report)};
format_msg({Format, Params}) ->
{unicode:characters_to_binary(io_lib:format(Format, Params)), []}.
format_msg({report, maps:from_list(Report)});
format_msg({"Error in process ~p on node ~p with exit value:~n~p~n", [_, _, {undef, Undef}]}) ->
format_undef(Undef);
format_msg({Format, Params}) when is_list(Format), is_list(Params) ->
{unicode:characters_to_binary(io_lib:format(Format, Params)), #{}};
format_msg(Other) ->
{unicode:characters_to_binary(io_lib:format("~p", [ Other ])), #{}}.

format_undef([ {Module, Function, Args, _} | _ ] = Stack) when is_list(Args) ->
Arity = length(Args),
Message = io_lib:format("Undefined function ~p:~p/~p", [Module, Function, Arity]),
Report = #{
result => error,
reason => undef,
module => Module,
function => Function,
args => Args,
stack => Stack
},
{unicode:characters_to_binary(Message), safe_fields(Report)}.

-spec format_timestamp(logger:metadata()) -> binary().
format_timestamp(#{time := Ts}) ->
list_to_binary(calendar:system_time_to_rfc3339(Ts, [{unit, microsecond}, {offset, "Z"}])).

-spec safe_meta(logger:metadata()) -> [{binary() | atom(), jsx:json_term()}].
-spec safe_meta(logger:metadata()) -> #{ Key => Term } when
Key :: binary() | atom(),
Term :: jsx:json_term().
safe_meta(Meta) ->
safe_fields(maps:to_list(Meta)).
safe_fields(Meta).

-spec safe_fields([{term(), term()}]) -> [{binary() | atom(), jsx:json_term()}].
-spec safe_fields(map()) -> map().
safe_fields(Terms) ->
lists:map(fun safe_field/1, Terms).
maps:fold(
fun(K, V, Acc) ->
{K1, V1} = safe_field(K, V),
Acc#{ K1 => V1 }
end,
#{},
Terms).

-spec safe_field({atom() | binary() | atom(), term()}) -> {atom() | binary(), jsx:json_term()}.
safe_field({Key, Value}) when is_atom(Key); is_binary(Key) ->
-spec safe_field(atom() | binary() | string(), term()) -> {atom() | binary(), jsx:json_term()}.
safe_field(stack, Stack) when is_list(Stack) ->
{stack, safe_stack(Stack)};
safe_field(file, Filename) when is_list(Filename) ->
{file, unicode:characters_to_binary(Filename)};
safe_field(Key, Value) when is_atom(Key); is_binary(Key) ->
{Key, safe_value(Value)};
safe_field({Key, Value}) when is_list(Key) ->
safe_field({list_to_binary(Key), Value}).
safe_field(Key, Value) when is_list(Key) ->
safe_field(unicode:characters_to_binary(Key), Value).

safe_stack(Stack) ->
lists:map(fun safe_stack_entry/1, Stack).

safe_stack_entry({Mod, Fun, Args, _}) when is_atom(Mod), is_atom(Fun), is_list(Args) ->
Arity = length(Args),
Function = io_lib:format("~p:~p/~p", [Mod, Fun, Arity]),
#{
function => unicode:characters_to_binary(Function)
};
safe_stack_entry({Mod, Fun, Arity, Loc}) when is_atom(Mod), is_atom(Fun), is_integer(Arity) ->
Function = io_lib:format("~p:~p/~p", [ Mod, Fun, Arity ]),
#{
function => unicode:characters_to_binary(Function),
at => unicode:characters_to_binary([stack_file(Loc), $:, integer_to_binary(stack_line(Loc))])
};
safe_stack_entry(Entry) ->
safe_value(Entry).

stack_file(Loc) when is_list(Loc) -> proplists:get_value(file, Loc, "");
stack_file({File, _}) -> File;
stack_file({File, _, _}) -> File;
stack_file(_) -> "".

stack_line([ {_, _} | _ ] = Loc) -> proplists:get_value(line, Loc, "");
stack_line({_, Line}) -> Line;
stack_line({_, Line, _}) -> Line;
stack_line(_) -> 0.

-spec safe_value(term()) -> jsx:json_term().
safe_value(Pid) when is_pid(Pid) ->
list_to_binary(pid_to_list(Pid));
safe_value([]) ->
[];
safe_value(List) when is_list(List) ->
case io_lib:char_list(List) of
true ->
list_to_binary(List);
case is_proplist(List) of
true -> safe_value(map_from_proplist(List));
false ->
lists:map(fun safe_value/1, List)
case is_ascii_list(List) of
true -> unicode:characters_to_binary(List);
false -> lists:map(fun safe_value/1, List)
end
end;
safe_value(Map) when is_map(Map) ->
safe_fields(Map);
safe_value(undefined) ->
null;
safe_value(Val) when is_binary(Val); is_atom(Val); is_integer(Val) ->
safe_value(Val) when is_atom(Val); is_number(Val) ->
Val;
safe_value(Val) when is_binary(Val) ->
maybe_truncate(Val);
safe_value(Val) ->
unicode:characters_to_binary(io_lib:format("~p", [Val])).
maybe_truncate(unicode:characters_to_binary(io_lib:format("~p", [Val]))).

% Map a proplists to a map
map_from_proplist(L) ->
lists:foldl(
fun
({K,V}, Acc) -> Acc#{ K => V };
(K, Acc) -> Acc#{ K => true }
end,
#{},
L).

% If something is a proplist, then we will display it as a map.
is_proplist([]) -> true;
is_proplist([ {K, _} | T ]) when is_atom(K); is_binary(K) -> is_proplist(T);
is_proplist([ K | T ]) when is_atom(K) -> is_proplist(T);
is_proplist(_) -> false.

% Simple ASCII character string, typically SQL statements, filenames or literal texts.
is_ascii_list([]) -> true;
is_ascii_list([ C | T ]) when C >= 32, C =< 127 -> is_ascii_list(T);
is_ascii_list([ C | T ]) when C =:= $\n; C =:= $\t -> is_ascii_list(T);
is_ascii_list(_) -> false.

maybe_truncate(Bin) when size(Bin) >= ?LOG_BINARY_SIZE ->
<<Truncated:?LOG_BINARY_SIZE/binary, _/binary>> = Bin,
<<Truncated/binary, "...">>;
maybe_truncate(Bin) ->
Bin.
33 changes: 29 additions & 4 deletions test/logstasher_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,25 @@

-export([all/0, groups/0, init_per_testcase/2, end_per_testcase/2]).

-export([logstasher_udp/1, logstasher_tcp/1]).
-export([logstasher_udp/1, logstasher_tcp/1, logstasher_message/1]).

-spec all() -> [ct_suite:ct_test_def(), ...].
all() ->
[{group, logstasher}].

-spec groups() -> [ct_suite:ct_group_def(), ...].
groups() ->
[{logstasher, [sequence], [logstasher_udp, logstasher_tcp]}].
[{logstasher, [sequence], [
logstasher_udp,
logstasher_tcp,
logstasher_message
]}].

-spec init_per_testcase(ct_suite:ct_testname(), ct_suite:ct_config()) ->
ct_suite:ct_config() | {fail, term()} | {skip, term()}.
init_per_testcase(_Name, Config) ->
ok = logger:add_handler(logstash, logstasher_h, #{level => info}),
ok = logger:update_primary_config(#{level => all}),
Config;
init_per_testcase(_Name, Config) ->
Config.

-spec end_per_testcase(ct_suite:ct_testname(), ct_suite:ct_config()) ->
Expand Down Expand Up @@ -105,3 +107,26 @@ logstasher_tcp(_Config) ->
end,

?assertEqual(list_to_binary(ErrorMsg), Msg).


-spec logstasher_message(ct_suite:ct_config()) -> ok | no_return().
logstasher_message(_Config) ->
{ok, _Started} = application:ensure_all_started(logstasher),
#{
message := <<"Hello">>,
fields := #{ msg := <<"Hello">>, severity := info }
} = logstasher_h:log_data(#{
level => info,
msg => {report, #{ msg => <<"Hello">> }},
meta => #{ time => 0 }
}),
#{
message := <<"Hello">>,
fields := Fields2
} = logstasher_h:log_data(#{
level => info,
msg => {report, #{ text => <<"Hello">> }},
meta => #{ time => 0 }
}),
false = maps:is_key(text, Fields2),
ok.
Loading