Skip to content

Commit

Permalink
add authentication support (user/pass, token, nkey, jwt)
Browse files Browse the repository at this point in the history
Also, fix TLS handling to make NGS connection work.
  • Loading branch information
RoadRunnr committed Jan 15, 2025
1 parent 7f4bc00 commit 60d1d90
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 26 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
run: |
apk update
apk --no-cache upgrade
apk --no-cache add docker git zstd iproute2 iproute2-ss
apk --no-cache add docker git zstd iproute2 iproute2-ss make
-
name: git special configs
run: |
Expand Down
42 changes: 42 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,45 @@ Build
-----

$ rebar3 compile

Usage
-----

```
application:ensure_all_started([enats]),
{ok, C} = nats:connect({127,0,0,1}, 4222, #{verbose => false}),
receive {C, ready} -> ok end,
{ok, Sid} = nats:sub(C, ~"foo.*"),
nats:pub(C, ~"foo.bar", ~"My payload"),
receive
{C, Sid, {msg, ~"foo.bar", Body, _}} ->
io:format("Got NATS Msg: ~0tp~n", [Body]),
ok
after 1000 ->
throw(did_not_receive_a_msg)
end.
```

Authentication
--------------

```
# with user and password
{ok, C} = nats:connect({127,0,0,1}, 4222, #{user => ~"myname", pass => ~"password", auth_required => true}).
# with token
{ok, C} = nats:connect({127,0,0,1}, 4222, #{token => ~"secret", auth_required => true}).
# with an nkey seed
{ok, C} = nats:connect({127,0,0,1}, 4222, #{nkey_seed => ~"SUAM...", auth_required => true}).
# with decentralized user credentials (JWT)
{ok, C} = nats:connect({127,0,0,1}, 4222, #{nkey_seed => ~"SUAM...", jwt => ~"eyJ0eX...", auth_required => true}).
# connect to NGS with JWT
{ok, C} = nats:connect("connect.ngs.global", 4222, #{tls_required => true, tls_opts => [{verify, verify_peer}, {cacerts, public_key:cacerts_get()}], nkey_seed => ~"SUAM...", jwt => ~"eyJ0eX...", auth_required => true}).
```
3 changes: 2 additions & 1 deletion rebar.config
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
%%-*-Erlang-*-
{erl_opts, [debug_info, warnings_as_errors]}.

{deps, [%%{enats_msg, "0.9.0"}
{deps, [{crc, {git, "https://github.com/TattdCodeMonkey/crc.git", {tag, "v0.10.5"}}},
%%{enats_msg, "0.9.0"},
{enats_msg, {git, "https://github.com/travelping/enats_msg.git", {branch, "main"}}}
]}.

Expand Down
135 changes: 111 additions & 24 deletions src/nats.erl
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,8 @@
#{verbose => false,
pedantic => false,
tls_required => false,
auth_token => undefined,
user => undefined,
pass => undefined,
tls_opts => [],
auth_required => false,
name => <<"nats">>,
lang => <<"Erlang">>,
version => undefined,
Expand All @@ -81,6 +80,15 @@
-define(DEFAULT_SOCKET_OPTS,
#{reuseaddr => true}).

%% set optional server info values to defaults
-define(DEFAULT_SERVER_INFO,
#{auth_required => false,
tls_required => false,
tls_verify => false,
tls_available => false,
ldm => false,
jetstream => false}).

-include_lib("kernel/include/logger.hrl").
-include_lib("stdlib/include/ms_transform.hrl").

Expand All @@ -96,6 +104,8 @@
verbose => boolean(),
pedantic => boolean(),
tls_required => boolean(),
tls_opts => [ssl:tls_client_option()],
auth_required => boolean(),
auth_token => binary(),
user => binary(),
pass => binary(),
Expand Down Expand Up @@ -140,9 +150,11 @@
verbose := boolean(),
pedantic := boolean(),
tls_required := boolean(),
auth_token := undefined | binary(),
user := undefined | binary(),
pass := undefined | binary(),
tls_opts := [ssl:tls_client_option()],
auth_required := boolean(),
auth_token => binary(),
user => binary(),
pass => binary(),
name := binary(),
lang := binary(),
version := binary(),
Expand Down Expand Up @@ -377,14 +389,14 @@ handle_event(enter, _, closed, Data) ->
notify_parent(closed, Data),
{stop, normal};

handle_event(enter, _, connected, #{socket := Socket}) ->
handle_event(enter, _, connected, #{socket := Socket} = Data) ->
?LOG(debug, "NATS enter connected state, socket ~p", [Socket]),
ok = inet:setopts(Socket, [{active,once}]),
ok = setopts(Data, [{active,once}]),
keep_state_and_data;
handle_event(enter, OldState, State, #{socket := Socket} = Data)
handle_event(enter, OldState, State, #{socket := _} = Data)
when not is_record(OldState, ready), is_record(State, ready) ->
?LOG(debug, "NATS enter ready state"),
ok = inet:setopts(Socket, [{active, true}]),
ok = setopts(Data, [{active, true}]),
notify_parent(ready, Data),
keep_state_and_data;
handle_event(enter, _, State, _Data)
Expand Down Expand Up @@ -552,7 +564,12 @@ handle_event(Event, EventContent, State, Data) ->
format_status(Status) ->
maps:map(
fun(data, Data) ->
Data#{pass := redacted, auth_token := redacted};
maps:map(
fun(pass, _) -> redacted;
(auth_token, _) -> redacted;
(nkey_seed, _) -> redacted;
(_, V) -> V
end, Data);
(_, Value) ->
Value
end, Status).
Expand Down Expand Up @@ -607,8 +624,8 @@ send_msg_with_reply(From, Reply, Msg,
send_msg_with_reply(From, Reply, Msg, #ready{pending = undefined}, Data) ->
{keep_state, enqueue_msg(Msg, Data), [{reply, From, Reply}]}.

socket_active(connected, connected, #{socket := Socket}) ->
_ = inet:setopts(Socket, [{active,once}]),
socket_active(connected, connected, Data) ->
_ = setopts(Data, [{active,once}]),
ok;
socket_active(_, _, _) ->
ok.
Expand Down Expand Up @@ -911,10 +928,16 @@ handle_nats_info(Payload, Data0) ->
{JSON, ok, _} ->
maybe
?LOG(debug, "NATS Info JSON: ~p", [JSON]),
{ok, Data} ?= ssl_upgrade(JSON, Data0#{server_info := JSON}),
ServerInfo = maps:merge(?DEFAULT_SERVER_INFO, JSON),
{ok, Data} ?= ssl_upgrade(JSON, Data0#{server_info := ServerInfo}),
?LOG(debug, "NATS Client Info: ~p", [client_info(Data)]),
Msg = client_info(Data),
{ok, Msg} ?= client_info(Data),
continue_enqueue_batch([Msg], #ready{}, Data)
else
Error ->
?LOG(debug, "NATS Info Error: ~p", [Error]),
notify_parent(Error, Data0),
{stop, {closed, Data0}}
end
catch
C:E ->
Expand All @@ -923,8 +946,30 @@ handle_nats_info(Payload, Data0) ->
{stop, {closed, Data0}}
end.

tls_server_name_indication(Host, TlsOpts) ->
case proplists:is_defined(server_name_indication, TlsOpts) of
false when is_list(Host); is_tuple(Host) ->
[{server_name_indication, Host}|TlsOpts];
false when is_binary(Host) ->
[{server_name_indication, binary_to_list(Host)}|TlsOpts];
_ ->
TlsOpts
end.

tls_customize_hostname_check(TlsOpts) ->
case proplists:is_defined(customize_hostname_check, TlsOpts) of
true -> TlsOpts;
false ->
[{customize_hostname_check, [{match_fun, public_key:pkix_verify_hostname_match_fun(https)}]} |
TlsOpts]
end.

tls_opts(#{host := Host, tls_opts := TlsOpts0}) ->
TlsOpts1 = tls_server_name_indication(Host, TlsOpts0),
_TlsOpts = tls_customize_hostname_check(TlsOpts1).

ssl_upgrade(#{tls_required := true}, #{socket := Socket} = Data) ->
case ssl:connect(Socket, []) of
case ssl:connect(Socket, tls_opts(Data)) of
{ok, NewSocket} ->
{ok, Data#{socket := NewSocket, tls := true}};
{error, _Reason} = Error ->
Expand All @@ -933,6 +978,13 @@ ssl_upgrade(#{tls_required := true}, #{socket := Socket} = Data) ->
ssl_upgrade(_, State) ->
{ok, State}.

setopts(#{socket := Socket, tls := true}, SockOpts) ->
ssl:setopts(Socket, SockOpts);
setopts(#{socket := Socket, tls := false}, SockOpts) ->
inet:setopts(Socket, SockOpts);
setopts(_, _) ->
{error, not_connected}.

send(Bin, #{socket := Socket, tls := false}) ->
gen_tcp:send(Socket, Bin);
send(Bin, #{socket := Socket, tls := true}) ->
Expand Down Expand Up @@ -987,16 +1039,51 @@ flush_batch(Data0) ->
Data.

client_info(#{server_info := ServerInfo} = Data) ->
%% Include user and name iff the server requires it
FieldsList = [verbose, pedantic, tls_required, name, lang,
version, headers, no_responders],
NewFieldsList =
case maps:get(auth_required, ServerInfo, false) of
true -> [user, pass, auth_token | FieldsList];
_ -> FieldsList
end,
Nats = maps:with(NewFieldsList, Data),
nats_msg:connect(json:encode(Nats)).
Nats0 = maps:with(FieldsList, Data),
maybe
{ok, Nats} ?= client_auth(ServerInfo, Data, Nats0),
{ok, nats_msg:connect(json:encode(Nats))}
end.

%% Include authentication properties iff the server requires it
client_auth(#{auth_required := true}, #{auth_required := false}, _) ->
{error, server_requires_auth};
client_auth(#{auth_required := false}, #{auth_required := true}, _) ->
{error, client_mandates_auth};
client_auth(#{auth_required := true}, #{user := User, pass := Pass}, Nats)
when is_binary(User), is_binary(Pass) ->
{ok, Nats#{user => User, pass => Pass}};
client_auth(#{auth_required := true}, #{auth_token := Token}, Nats)
when is_binary(Token) ->
{ok, Nats#{auth_token => Token}};
client_auth(#{auth_required := true, nonce := Nonce},
#{nkey_seed := <<"SU", _/binary>> = Seed, jwt := JWT}, Nats) ->
case nats_nkey:from_seed(Seed) of
{ok, NKey} ->
SignatureBin = nats_nkey:sign(NKey, Nonce),
Signature = base64:encode(SignatureBin, #{mode => urlsafe, padding => false}),
{ok, Nats#{jwt => JWT, sig => Signature}};
{error, _} ->
{error, invalid_seed}
end;
client_auth(#{auth_required := true, nonce := Nonce},
#{nkey_seed := <<"SU", _/binary>> = Seed}, Nats) ->
case nats_nkey:from_seed(Seed) of
{ok, NKey} ->
SignatureBin = nats_nkey:sign(NKey, Nonce),
Signature = base64:encode(SignatureBin, #{mode => urlsafe, padding => false}),
Public = nats_nkey:public(NKey),
{ok, Nats#{nkey => Public, sig => Signature}};
{error, _} ->
{error, invalid_seed}
end;
client_auth(#{auth_required := true}, _, _) ->
%% server requires authentication, but no valid credentials provided
{error, server_requires_auth};
client_auth(_, _, Nats) ->
{ok, Nats}.

log_connection_error(Error, #{host := Host, port := Port}) ->
?LOG(debug, "NATS connection to ~s:~w failed with ~p", [inet:ntoa(Host), Port, Error]),
Expand Down

0 comments on commit 60d1d90

Please sign in to comment.