diff --git a/README.md b/README.md index ed74333..262dbc8 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ emqx_auth_http ============== -EMQ X HTTP Auth/ACL Plugin +EMQX HTTP Auth/ACL Plugin Build ----- @@ -96,5 +96,4 @@ Apache License Version 2.0 Author ------ -EMQ X Team. - +EMQX Team. diff --git a/etc/emqx_auth_http.conf b/etc/emqx_auth_http.conf index 86c4ac0..bef9679 100644 --- a/etc/emqx_auth_http.conf +++ b/etc/emqx_auth_http.conf @@ -9,8 +9,8 @@ ## ## Value: URL ## -## Examples: http://127.0.0.1:8991/mqtt/auth, https://[::1]:8991/mqtt/auth -auth.http.auth_req = http://127.0.0.1:8991/mqtt/auth +## Examples: http://127.0.0.1:80/mqtt/auth, https://[::1]:80/mqtt/auth +auth.http.auth_req = http://127.0.0.1:80/mqtt/auth ## Value: post | get auth.http.auth_req.method = post @@ -39,13 +39,13 @@ auth.http.auth_req.params = clientid=%c,username=%u,password=%P ## ## Value: URL ## -## Examples: http://127.0.0.1:8991/mqtt/superuser, https://[::1]:8991/mqtt/superuser -#auth.http.super_req = http://127.0.0.1:8991/mqtt/superuser +## Examples: http://127.0.0.1:80/mqtt/superuser, https://[::1]:80/mqtt/superuser +#auth.http.super_req = http://127.0.0.1:80/mqtt/superuser ## Value: post | get #auth.http.super_req.method = post -## It only works when method=pos +## It only works when method=post ## Value: json | x-www-form-urlencoded #auth.http.super_req.content_type = x-www-form-urlencoded @@ -69,8 +69,8 @@ auth.http.auth_req.params = clientid=%c,username=%u,password=%P ## ## Value: URL ## -## Examples: http://127.0.0.1:8991/mqtt/acl, https://[::1]:8991/mqtt/acl -auth.http.acl_req = http://127.0.0.1:8991/mqtt/acl +## Examples: http://127.0.0.1:80/mqtt/acl, https://[::1]:80/mqtt/acl +auth.http.acl_req = http://127.0.0.1:80/mqtt/acl ## Value: post | get auth.http.acl_req.method = get @@ -101,8 +101,8 @@ auth.http.acl_req.params = access=%A,username=%u,clientid=%c,ipaddr=%a,topic=%t, ## -m: minute, e.g. '5m' for 5 minutes ## -s: second, e.g. '30s' for 30 seconds ## -## Default: 0 -## auth.http.request.timeout = 0 +## Default: 5s +## auth.http.request.timeout = 5s ## Connection time-out time, used during the initial request ## when the client is connecting to the server @@ -110,14 +110,14 @@ auth.http.acl_req.params = access=%A,username=%u,clientid=%c,ipaddr=%a,topic=%t, ## Value: Duration ## ## Default is same with the timeout option -## auth.http.request.connect_timeout = 0 +## auth.http.request.connect_timeout = 5 -## Re-send http reuqest times +## Re-send http request times ## ## Value: integer ## ## Default: 3 -auth.http.request.retry_times = 3 +auth.http.request.retry_times = 5 ## The interval for re-sending the http request ## diff --git a/include/emqx_auth_http.hrl b/include/emqx_auth_http.hrl index 09e58e3..2bbe128 100644 --- a/include/emqx_auth_http.hrl +++ b/include/emqx_auth_http.hrl @@ -1,7 +1,7 @@ -define(APP, emqx_auth_http). --record(http_request, {method = post, content_type, url, params, options = []}). +-record(http_request, {method = post, path, headers, params, request_timeout}). -record(auth_metrics, { success = 'client.auth.success', diff --git a/priv/emqx_auth_http.schema b/priv/emqx_auth_http.schema index e6a9863..b61bcd6 100644 --- a/priv/emqx_auth_http.schema +++ b/priv/emqx_auth_http.schema @@ -11,7 +11,7 @@ {mapping, "auth.http.auth_req.content_type", "emqx_auth_http.auth_req", [ {default, 'x-www-form-urlencoded'}, - {datatype, {enum, [json, 'x-www-form-urlencoded']}} + {datatype, {enum, ['json', 'x-www-form-urlencoded']}} ]}. {mapping, "auth.http.auth_req.params", "emqx_auth_http.auth_req", [ @@ -19,13 +19,13 @@ ]}. {translation, "emqx_auth_http.auth_req", fun(Conf) -> - case cuttlefish:conf_get("auth.http.auth_req", Conf) of + case cuttlefish:conf_get("auth.http.auth_req", Conf, undefined) of undefined -> cuttlefish:unset(); Url -> Params = cuttlefish:conf_get("auth.http.auth_req.params", Conf), [{url, Url}, {method, cuttlefish:conf_get("auth.http.auth_req.method", Conf)}, - {content_type, cuttlefish:conf_get("auth.http.auth_req.content_type", Conf)}, + {content_type, list_to_binary("application/" ++ atom_to_list(cuttlefish:conf_get("auth.http.auth_req.content_type", Conf)))}, {params, [list_to_tuple(string:tokens(S, "=")) || S <- string:tokens(Params, ",")]}] end end}. @@ -41,7 +41,7 @@ end}. {mapping, "auth.http.super_req.content_type", "emqx_auth_http.super_req", [ {default, 'x-www-form-urlencoded'}, - {datatype, {enum, [json, 'x-www-form-urlencoded']}} + {datatype, {enum, ['json', 'x-www-form-urlencoded']}} ]}. {mapping, "auth.http.super_req.params", "emqx_auth_http.super_req", [ @@ -53,7 +53,7 @@ end}. undefined -> cuttlefish:unset(); Url -> Params = cuttlefish:conf_get("auth.http.super_req.params", Conf), [{url, Url}, {method, cuttlefish:conf_get("auth.http.super_req.method", Conf)}, - {content_type, cuttlefish:conf_get("auth.http.super_req.content_type", Conf)}, + {content_type, list_to_binary("application/" ++ atom_to_list(cuttlefish:conf_get("auth.http.super_req.content_type", Conf)))}, {params, [list_to_tuple(string:tokens(S, "=")) || S <- string:tokens(Params, ",")]}] end end}. @@ -70,7 +70,7 @@ end}. {mapping, "auth.http.acl_req.content_type", "emqx_auth_http.acl_req", [ {default, 'x-www-form-urlencoded'}, - {datatype, {enum, [json, 'x-www-form-urlencoded']}} + {datatype, {enum, ['json', 'x-www-form-urlencoded']}} ]}. {mapping, "auth.http.acl_req.params", "emqx_auth_http.acl_req", [ @@ -81,34 +81,56 @@ end}. case cuttlefish:conf_get("auth.http.acl_req", Conf, undefined) of undefined -> cuttlefish:unset(); Url -> Params = cuttlefish:conf_get("auth.http.acl_req.params", Conf), - [{url, Url}, {method, cuttlefish:conf_get("auth.http.acl_req.method", Conf)}, - {content_type, cuttlefish:conf_get("auth.http.acl_req.content_type", Conf)}, + [{url, Url}, + {method, cuttlefish:conf_get("auth.http.acl_req.method", Conf)}, + {content_type, list_to_binary("application/" ++ atom_to_list(cuttlefish:conf_get("auth.http.acl_req.content_type", Conf)))}, {params, [list_to_tuple(string:tokens(S, "=")) || S <- string:tokens(Params, ",")]}] end end}. -{mapping, "auth.http.request.timeout", "emqx_auth_http.http_opts", [ - {default, 0}, +{mapping, "auth.http.request.timeout", "emqx_auth_http.request_timeout", [ + {default, "5s"}, {datatype, [integer, {duration, ms}]} ]}. -{mapping, "auth.http.request.connect_timeout", "emqx_auth_http.http_opts", [ +{mapping, "auth.http.pool_size", "emqx_auth_http.pool_opts", [ + {default, 8}, + {datatype, integer} +]}. + +{mapping, "auth.http.request.connect_timeout", "emqx_auth_http.pool_opts", [ + {default, "5s"}, {datatype, [integer, {duration, ms}]} ]}. -{mapping, "auth.http.ssl.cacertfile", "emqx_auth_http.http_opts", [ +{mapping, "auth.http.ssl.cacertfile", "emqx_auth_http.pool_opts", [ {datatype, string} ]}. -{mapping, "auth.http.ssl.certfile", "emqx_auth_http.http_opts", [ +{mapping, "auth.http.ssl.certfile", "emqx_auth_http.pool_opts", [ {datatype, string} ]}. -{mapping, "auth.http.ssl.keyfile", "emqx_auth_http.http_opts", [ +{mapping, "auth.http.ssl.keyfile", "emqx_auth_http.pool_opts", [ {datatype, string} ]}. -{translation, "emqx_auth_http.http_opts", fun(Conf) -> +{mapping, "auth.http.request.retry_times", "emqx_auth_http.pool_opts", [ + {default, 5}, + {datatype, integer} +]}. + +{mapping, "auth.http.request.retry_interval", "emqx_auth_http.pool_opts", [ + {default, "1s"}, + {datatype, {duration, ms}} +]}. + +{mapping, "auth.http.request.retry_backoff", "emqx_auth_http.pool_opts", [ + {default, 2.0}, + {datatype, float} +]}. + +{translation, "emqx_auth_http.pool_opts", fun(Conf) -> Filter = fun(L) -> [{K, V} || {K, V} <- L, V =/= undefined] end, InfinityFun = fun(0) -> infinity; (Duration) -> Duration @@ -116,8 +138,10 @@ end}. SslOpts = Filter([{cacertfile, cuttlefish:conf_get("auth.http.ssl.cacertfile", Conf, undefined)}, {certfile, cuttlefish:conf_get("auth.http.ssl.certfile", Conf, undefined)}, {keyfile, cuttlefish:conf_get("auth.http.ssl.keyfile", Conf, undefined)}]), - Opts = [{timeout, InfinityFun(cuttlefish:conf_get("auth.http.request.timeout", Conf))}, - {connect_timeout, InfinityFun(cuttlefish:conf_get("auth.http.request.connect_timeout", Conf, undefined))}], + Opts = [{pool_size, cuttlefish:conf_get("auth.http.pool_size", Conf)}, + {connect_timeout, InfinityFun(cuttlefish:conf_get("auth.http.request.connect_timeout", Conf))}, + {retry, cuttlefish:conf_get("auth.http.request.retry_times", Conf)}, + {retry_timeout, cuttlefish:conf_get("auth.http.request.retry_interval", Conf)}], case SslOpts of [] -> Filter(Opts); _ -> @@ -131,26 +155,6 @@ end}. end end}. -{mapping, "auth.http.request.retry_times", "emqx_auth_http.retry_opts", [ - {default, 3}, - {datatype, integer} -]}. - -{mapping, "auth.http.request.retry_interval", "emqx_auth_http.retry_opts", [ - {default, "1s"}, - {datatype, {duration, ms}} -]}. - -{mapping, "auth.http.request.retry_backoff", "emqx_auth_http.retry_opts", [ - {default, 2.0}, - {datatype, float} -]}. - -{translation, "emqx_auth_http.retry_opts", fun(Conf) -> - [{times, cuttlefish:conf_get("auth.http.request.retry_times", Conf)}, - {interval, cuttlefish:conf_get("auth.http.request.retry_interval", Conf)}, - {backoff, cuttlefish:conf_get("auth.http.request.retry_backoff", Conf)}] -end}. {mapping, "auth.http.header.$field", "emqx_auth_http.headers", [ {datatype, string} diff --git a/rebar.config b/rebar.config index 026e6fc..e33e8b4 100644 --- a/rebar.config +++ b/rebar.config @@ -1,4 +1,9 @@ -{deps, []}. +%% Cowlib is only used to pull gun, which is using old rebar file +{deps, + [{cowlib, {git, "https://github.com/ninenines/cowlib", {tag, "2.8.0"}}}, + {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.4"}}}, + {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}} + ]}. {edoc_opts, [{preprocess, true}]}. {erl_opts, [warn_unused_vars, @@ -20,7 +25,7 @@ [{test, [{deps, [{emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "1.2.2"}}}, - {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.0"}}} + {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "v1.2.2"}}} ]} ]} ]}. diff --git a/src/emqx_acl_http.erl b/src/emqx_acl_http.erl index ebe4159..a6f60b4 100644 --- a/src/emqx_acl_http.erl +++ b/src/emqx_acl_http.erl @@ -24,7 +24,7 @@ -logger_header("[ACL http]"). -import(emqx_auth_http_cli, - [ request/8 + [ request/6 , feedvar/2 ]). @@ -48,18 +48,16 @@ check_acl(ClientInfo, PubSub, Topic, AclResult, State) -> do_check_acl(#{username := <<$$, _/binary>>}, _PubSub, _Topic, _AclResult, _Config) -> ok; -do_check_acl(ClientInfo, PubSub, Topic, _AclResult, #{acl_req := AclReq, - http_opts := HttpOpts, - retry_opts := RetryOpts, - headers := Headers}) -> +do_check_acl(ClientInfo, PubSub, Topic, _AclResult, #{acl_req := AclReq, + pool_name := PoolName}) -> ClientInfo1 = ClientInfo#{access => access(PubSub), topic => Topic}, - case check_acl_request(AclReq, ClientInfo1, Headers, HttpOpts, RetryOpts) of - {ok, 200, "ignore"} -> ok; + case check_acl_request(PoolName, AclReq, ClientInfo1) of + {ok, 200, <<"ignore">>} -> ok; {ok, 200, _Body} -> {stop, allow}; {ok, _Code, _Body} -> {stop, deny}; {error, Error} -> - ?LOG(error, "Request ACL url ~s, error: ~p", - [AclReq#http_request.url, Error]), + ?LOG(error, "Request ACL path ~s, error: ~p", + [AclReq#http_request.path, Error]), ok end. @@ -79,13 +77,12 @@ inc_metrics({stop, deny}) -> return_with(Fun, Result) -> Fun(Result), Result. -check_acl_request(#http_request{url = Url, - method = Method, - content_type = ContentType, - params = Params, - options = Options}, - ClientInfo, Headers, HttpOpts, RetryOpts) -> - request(Method, ContentType, Url, feedvar(Params, ClientInfo), Headers, HttpOpts, Options, RetryOpts). +check_acl_request(PoolName, #http_request{path = Path, + method = Method, + headers = Headers, + params = Params, + request_timeout = RequestTimeout}, ClientInfo) -> + request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), RequestTimeout). access(subscribe) -> 1; access(publish) -> 2. diff --git a/src/emqx_auth_http.app.src b/src/emqx_auth_http.app.src index eaabdb3..fa4b6d6 100644 --- a/src/emqx_auth_http.app.src +++ b/src/emqx_auth_http.app.src @@ -1,13 +1,13 @@ {application, emqx_auth_http, - [{description, "EMQ X Authentication/ACL with HTTP API"}, + [{description, "EMQX Authentication/ACL with HTTP API"}, {vsn, "git"}, {modules, []}, {registered, [emqx_auth_http_sup]}, - {applications, [kernel,stdlib]}, + {applications, [kernel,stdlib,gproc,gun]}, {mod, {emqx_auth_http_app, []}}, {env, []}, {licenses, ["Apache-2.0"]}, - {maintainers, ["EMQ X Team "]}, + {maintainers, ["EMQX Team "]}, {links, [{"Homepage", "https://emqx.io/"}, {"Github", "https://github.com/emqx/emqx-auth-http"} ]} diff --git a/src/emqx_auth_http.appup.src b/src/emqx_auth_http.appup.src new file mode 100644 index 0000000..de4139a --- /dev/null +++ b/src/emqx_auth_http.appup.src @@ -0,0 +1,29 @@ +%% -*-: erlang -*- +{VSN, + [ + {<<"4.2.([7-9]|(10))">>, [ + {load_module, emqx_http_client, brutal_purge, soft_purge, []} + ]}, + {"4.2.6", [ + {load_module, emqx_auth_http_cli, brutal_purge, soft_purge, []}, + {load_module, emqx_http_client, brutal_purge, soft_purge, []} + ]}, + {<<"4.2.[0-5]">>, [ + {restart_application, emqx_auth_http} + ]}, + {<<".*">>, []} + ], + [ + {<<"4.2.([7-9]|(10))">>, [ + {load_module, emqx_http_client, brutal_purge, soft_purge, []} + ]}, + {"4.2.6", [ + {load_module, emqx_auth_http_cli, brutal_purge, soft_purge, []}, + {load_module, emqx_http_client, brutal_purge, soft_purge, []} + ]}, + {<<"4.2.[0-5]">>, [ + {restart_application, emqx_auth_http} + ]}, + {<<".*">>, []} + ] +}. \ No newline at end of file diff --git a/src/emqx_auth_http.erl b/src/emqx_auth_http.erl index 54e41c9..20026d6 100644 --- a/src/emqx_auth_http.erl +++ b/src/emqx_auth_http.erl @@ -25,7 +25,7 @@ -logger_header("[Auth http]"). -import(emqx_auth_http_cli, - [ request/8 + [ request/6 , feedvar/2 ]). @@ -41,28 +41,26 @@ register_metrics() -> check(ClientInfo, AuthResult, #{auth_req := AuthReq, super_req := SuperReq, - http_opts := HttpOpts, - retry_opts := RetryOpts, - headers := Headers}) -> - case authenticate(AuthReq, ClientInfo, Headers, HttpOpts, RetryOpts) of - {ok, 200, "ignore"} -> + pool_name := PoolName}) -> + case authenticate(PoolName, AuthReq, ClientInfo) of + {ok, 200, <<"ignore">>} -> emqx_metrics:inc(?AUTH_METRICS(ignore)), ok; {ok, 200, Body} -> emqx_metrics:inc(?AUTH_METRICS(success)), - IsSuperuser = is_superuser(SuperReq, ClientInfo, Headers, HttpOpts, RetryOpts), + IsSuperuser = is_superuser(PoolName, SuperReq, ClientInfo), {stop, AuthResult#{is_superuser => IsSuperuser, auth_result => success, anonymous => false, mountpoint => mountpoint(Body, ClientInfo)}}; {ok, Code, _Body} -> - ?LOG(error, "Deny connection from url: ~s, response http code: ~p", - [AuthReq#http_request.url, Code]), + ?LOG(error, "Deny connection from path: ~s, response http code: ~p", + [AuthReq#http_request.path, Code]), emqx_metrics:inc(?AUTH_METRICS(failure)), {stop, AuthResult#{auth_result => http_to_connack_error(Code), anonymous => false}}; {error, Error} -> - ?LOG(error, "Request auth url: ~s, error: ~p", - [AuthReq#http_request.url, Error]), + ?LOG(error, "Request auth path: ~s, error: ~p", + [AuthReq#http_request.path, Error]), emqx_metrics:inc(?AUTH_METRICS(failure)), %%FIXME later: server_unavailable is not right. {stop, AuthResult#{auth_result => server_unavailable, @@ -75,32 +73,30 @@ description() -> "Authentication by HTTP API". %% Requests %%-------------------------------------------------------------------- -authenticate(#http_request{url = Url, - method = Method, - content_type = ContentType, - params = Params, - options = Options}, - ClientInfo, HttpHeaders, HttpOpts, RetryOpts) -> - request(Method, ContentType, Url, feedvar(Params, ClientInfo), HttpHeaders, HttpOpts, Options, RetryOpts). +authenticate(PoolName, #http_request{path = Path, + method = Method, + headers = Headers, + params = Params, + request_timeout = RequestTimeout}, ClientInfo) -> + request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), RequestTimeout). --spec(is_superuser(maybe(#http_request{}), emqx_types:client(), list(), list(), list()) -> boolean()). -is_superuser(undefined, _ClientInfo, _HttpHeaders, _HttpOpts, _RetryOpts) -> +-spec(is_superuser(atom(), maybe(#http_request{}), emqx_types:client()) -> boolean()). +is_superuser(_PoolName, undefined, _ClientInfo) -> false; -is_superuser(#http_request{url = Url, - method = Method, - content_type = ContentType, - params = Params, - options = Options}, - ClientInfo, HttpHeaders, HttpOpts, RetryOpts) -> - case request(Method, ContentType, Url, feedvar(Params, ClientInfo), HttpHeaders, HttpOpts, Options, RetryOpts) of +is_superuser(PoolName, #http_request{path = Path, + method = Method, + headers = Headers, + params = Params, + request_timeout = RequestTimeout}, ClientInfo) -> + case request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), RequestTimeout) of {ok, 200, _Body} -> true; {ok, _Code, _Body} -> false; - {error, Error} -> ?LOG(error, "Request superuser url ~s, error: ~p", [Url, Error]), + {error, Error} -> ?LOG(error, "Request superuser path ~s, error: ~p", [Path, Error]), false end. mountpoint(Body, #{mountpoint := Mountpoint}) -> - case emqx_json:safe_decode(iolist_to_binary(Body), [return_maps]) of + case emqx_json:safe_decode(Body, [return_maps]) of {error, _} -> Mountpoint; {ok, Json} when is_map(Json) -> maps:get(<<"mountpoint">>, Json, Mountpoint); diff --git a/src/emqx_auth_http_app.erl b/src/emqx_auth_http_app.erl index 51f6762..d79c1fa 100644 --- a/src/emqx_auth_http_app.erl +++ b/src/emqx_auth_http_app.erl @@ -17,7 +17,6 @@ -module(emqx_auth_http_app). -behaviour(application). --behaviour(supervisor). -emqx_plugin(auth). @@ -33,37 +32,35 @@ %%-------------------------------------------------------------------- start(_StartType, _StartArgs) -> - with_env(auth_req, fun load_auth_hook/1), - with_env(acl_req, fun load_acl_hook/1), - supervisor:start_link({local, ?MODULE}, ?MODULE, []). + case translate_env() of + ok -> + {ok, PoolOpts} = application:get_env(?APP, pool_opts), + {ok, Sup} = emqx_http_client_sup:start_link(?APP, ssl(inet(PoolOpts))), + with_env(auth_req, fun load_auth_hook/1), + with_env(acl_req, fun load_acl_hook/1), + {ok, Sup}; + {error, Reason} -> + {error, Reason} + end. load_auth_hook(AuthReq) -> ok = emqx_auth_http:register_metrics(), SuperReq = r(application:get_env(?APP, super_req, undefined)), - HttpOpts = application:get_env(?APP, http_opts, []), - RetryOpts = application:get_env(?APP, retry_opts, []), - Headers = application:get_env(?APP, headers, []), Params = #{auth_req => AuthReq, super_req => SuperReq, - http_opts => HttpOpts, - retry_opts => maps:from_list(RetryOpts), - headers => Headers}, + pool_name => ?APP}, emqx:hook('client.authenticate', {emqx_auth_http, check, [Params]}). load_acl_hook(AclReq) -> ok = emqx_acl_http:register_metrics(), - HttpOpts = application:get_env(?APP, http_opts, []), - RetryOpts = application:get_env(?APP, retry_opts, []), - Headers = application:get_env(?APP, headers, []), - Params = #{acl_req => AclReq, - http_opts => HttpOpts, - retry_opts => maps:from_list(RetryOpts), - headers => Headers}, + Params = #{acl_req => AclReq, + pool_name => ?APP}, emqx:hook('client.check_acl', {emqx_acl_http, check_acl, [Params]}). stop(_State) -> emqx:unhook('client.authenticate', {emqx_auth_http, check}), - emqx:unhook('client.check_acl', {emqx_acl_http, check_acl}). + emqx:unhook('client.check_acl', {emqx_acl_http, check_acl}), + emqx_http_client_sup:stop_pool(?APP). %%-------------------------------------------------------------------- %% Dummy supervisor @@ -85,19 +82,96 @@ with_env(Par, Fun) -> r(undefined) -> undefined; r(Config) -> + Headers = application:get_env(?APP, headers, []), Method = proplists:get_value(method, Config, post), - ContentType = proplists:get_value(content_type, Config, 'x-www-form-urlencoded'), - Url = proplists:get_value(url, Config), + Path = proplists:get_value(path, Config), + NewHeaders = case Method of + Method when Method =:= post orelse Method =:= put -> + ContentType = proplists:get_value(content_type, Config, <<"application/x-www-form-urlencoded">>), + [{<<"content-type">>, ContentType} | Headers]; + _ -> + Headers + end, Params = proplists:get_value(params, Config), - #http_request{method = Method, content_type = ContentType, url = Url, params = Params, options = inet(Url)}. - -inet(Url) -> - case uri_string:parse(Url) of - #{host := Host} -> - case inet:parse_address(Host) of - {ok, Ip} when tuple_size(Ip) =:= 8 -> - [{ipv6_host_with_brackets, true}, {socket_opts, [{ipfamily, inet6}]}]; - _ -> [] - end; - _ -> [] + {ok, RequestTimeout} = application:get_env(?APP, request_timeout), + #http_request{method = Method, path = Path, headers = NewHeaders, params = Params, request_timeout = RequestTimeout}. + +inet(PoolOpts) -> + Host = proplists:get_value(host, PoolOpts), + TransOpts = proplists:get_value(transport_opts, PoolOpts, []), + NewPoolOpts = proplists:delete(transport_opts, PoolOpts), + Inet = case Host of + {_,_,_,_} -> inet; + {_,_,_,_,_,_,_,_} -> inet6; + _ -> + case inet:getaddr(Host, inet6) of + {error, _} -> inet; + {ok, _} -> inet6 + end + end, + [{transport_opts, [Inet | TransOpts]} | NewPoolOpts]. + +ssl(PoolOpts) -> + case proplists:get_value(ssl, PoolOpts, []) of + [] -> + PoolOpts; + SSLOpts -> + TransOpts = proplists:get_value(transport_opts, PoolOpts, []), + NewPoolOpts = proplists:delete(transport_opts, PoolOpts), + [{transport_opts, SSLOpts ++ TransOpts}, {transport, ssl} | NewPoolOpts] end. + +translate_env() -> + URLs = lists:foldl(fun(Name, Acc) -> + case application:get_env(?APP, Name, []) of + [] -> Acc; + Env -> + URL = proplists:get_value(url, Env), + #{host := Host, + path := Path, + scheme := Scheme} = URIMap = uri_string:parse(add_default_scheme(URL)), + Port = maps:get(port, URIMap, case Scheme of + "https" -> 443; + _ -> 80 + end), + [{Name, {Host, Port, path(Path)}} | Acc] + end + end, [], [acl_req, auth_req, super_req]), + case same_host_and_port(URLs) of + true -> + [begin + {ok, Req} = application:get_env(?APP, Name), + application:set_env(?APP, Name, [{path, Path} | Req]) + end || {Name, {_, _, Path}} <- URLs], + {_, {Host, Port, _}} = lists:last(URLs), + PoolOpts = application:get_env(?APP, pool_opts, []), + NHost = case inet:parse_address(Host) of + {ok, {_,_,_,_} = Addr} -> Addr; + {ok, {_,_,_,_,_,_,_,_} = Addr} -> Addr; + {error, einval} -> Host + end, + application:set_env(?APP, pool_opts, [{host, NHost}, {port, Port} | PoolOpts]), + ok; + false -> + {error, different_server} + end. + +add_default_scheme("http://" ++ _ = URL) -> + URL; +add_default_scheme("https://" ++ _ = URL) -> + URL; +add_default_scheme(URL) -> + "http://" ++ URL. + +path("") -> "/"; +path(Path) -> Path. + +same_host_and_port([_]) -> + true; +same_host_and_port([{_, {Host, Port, _}}, {_, {Host, Port, _}}]) -> + true; +same_host_and_port([{_, {Host, Port, _}}, URL = {_, {Host, Port, _}} | Rest]) -> + same_host_and_port([URL | Rest]); +same_host_and_port(_) -> + false. + \ No newline at end of file diff --git a/src/emqx_auth_http_cli.erl b/src/emqx_auth_http_cli.erl index 35a20d0..9a52d83 100644 --- a/src/emqx_auth_http_cli.erl +++ b/src/emqx_auth_http_cli.erl @@ -16,7 +16,9 @@ -module(emqx_auth_http_cli). --export([ request/8 +-include("emqx_auth_http.hrl"). + +-export([ request/6 , feedvar/2 , feedvar/3 ]). @@ -25,37 +27,37 @@ %% HTTP Request %%-------------------------------------------------------------------- -request(get, _ContentType, Url, Params, HttpHeaders, HttpOpts, Options, RetryOpts) -> - Req = {Url ++ "?" ++ cow_qs:qs(bin_kw(Params)), HttpHeaders}, - reply(request_(get, Req, [{autoredirect, true} | HttpOpts], Options, RetryOpts)); +request(PoolName, get, Path, Headers, Params, Timeout) -> + NewPath = Path ++ "?" ++ binary_to_list(cow_qs:qs(bin_kw(Params))), + do_request(get, PoolName, {NewPath, Headers}, Timeout); -request(post, 'x-www-form-urlencoded', Url, Params, HttpHeaders, HttpOpts, Options, RetryOpts) -> - Req = {Url, HttpHeaders, "application/x-www-form-urlencoded", cow_qs:qs(bin_kw(Params))}, - reply(request_(post, Req, [{autoredirect, true} | HttpOpts], Options, RetryOpts)); +request(PoolName, post, Path, Headers, Params, Timeout) -> + Body = case proplists:get_value(<<"content-type">>, Headers) of + <<"application/x-www-form-urlencoded">> -> + cow_qs:qs(bin_kw(Params)); + <<"application/json">> -> + emqx_json:encode(bin_kw(Params)) + end, + do_request(post, PoolName, {Path, Headers, Body}, Timeout). -request(post, json, Url, Params, HttpHeaders, HttpOpts, Options, RetryOpts) -> - Req = {Url, HttpHeaders, "application/json", emqx_json:encode(bin_kw(Params))}, - reply(request_(post, Req, [{autoredirect, true} | HttpOpts], Options, RetryOpts)). +do_request(Method, PoolName, Req, Timeout) -> + do_request(Method, PoolName, Req, Timeout, 3). -request_(Method, Req, HTTPOpts, Opts, RetryOpts = #{times := Times, - interval := Interval, - backoff := BackOff}) -> - case httpc:request(Method, Req, HTTPOpts, Opts) of - {error, _Reason} when Times > 0 -> - timer:sleep(trunc(Interval)), - RetryOpts1 = RetryOpts#{times := Times - 1, - interval := Interval * BackOff}, - request_(Method, Req, HTTPOpts, Opts, RetryOpts1); - Other -> Other +%% Only retry when connection closed by keepalive +do_request(_Method, _PoolName, _Req, _Timeout, 0) -> + {error, normal}; +do_request(Method, PoolName, Req, Timeout, Retry) -> + case emqx_http_client:request(Method, PoolName, Req, Timeout) of + {error, normal} -> + do_request(Method, PoolName, Req, Timeout, Retry - 1); + {error, Reason} -> + {error, Reason}; + {ok, StatusCode, _Headers} -> + {ok, StatusCode, <<>>}; + {ok, StatusCode, _Headers, Body} -> + {ok, StatusCode, Body} end. -reply({ok, {{_, Code, _}, _Headers, Body}}) -> - {ok, Code, Body}; -reply({ok, Code, Body}) -> - {ok, Code, Body}; -reply({error, Error}) -> - {error, Error}. - %% TODO: move this conversion to cuttlefish config and schema bin_kw(KeywordList) when is_list(KeywordList) -> [{bin(K), bin(V)} || {K, V} <- KeywordList]. diff --git a/src/emqx_http_client.erl b/src/emqx_http_client.erl new file mode 100644 index 0000000..bd92e82 --- /dev/null +++ b/src/emqx_http_client.erl @@ -0,0 +1,258 @@ +-module(emqx_http_client). + +-behaviour(gen_server). + +-include_lib("emqx/include/logger.hrl"). + +%% APIs +-export([ start_link/3 + , request/3 + , request/4 + ]). + +%% gen_server callbacks +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). + +-record(state, { + pool :: ecpool:poo_name(), + id :: pos_integer(), + client :: pid() | undefined, + mref :: reference() | undefined, + host :: inet:hostname() | inet:ip_address(), + port :: inet:port_number(), + gun_opts :: proplists:proplist(), + gun_state :: down | up, + requests :: map() + }). + +%%-------------------------------------------------------------------- +%% APIs +%%-------------------------------------------------------------------- + +start_link(Pool, Id, Opts) -> + gen_server:start_link(?MODULE, [Pool, Id, Opts], []). + +request(Method, Pool, Req) -> + request(Method, Pool, Req, 5000). + +request(get, Pool, {Path, Headers}, Timeout) -> + call(pick(Pool), {get, {Path, Headers}, Timeout}, Timeout + 1000); +request(Method, Pool, {Path, Headers, Body}, Timeout) -> + call(pick(Pool), {Method, {Path, Headers, Body}, Timeout}, Timeout + 1000). + +%%-------------------------------------------------------------------- +%% gen_server callbacks +%%-------------------------------------------------------------------- + +init([Pool, Id, Opts]) -> + State = #state{pool = Pool, + id = Id, + client = undefined, + mref = undefined, + host = proplists:get_value(host, Opts), + port = proplists:get_value(port, Opts), + gun_opts = gun_opts(Opts), + gun_state = down, + requests = #{}}, + true = gproc_pool:connect_worker(Pool, {Pool, Id}), + {ok, State}. + +handle_call(Req = {_, _, _}, From, State = #state{client = undefined, gun_state = down}) -> + case open(State) of + {ok, NewState} -> + handle_call(Req, From, NewState); + {error, Reason} -> + {reply, {error, Reason}, State} + end; + +handle_call(Req = {_, _, Timeout}, From, State = #state{client = Client, mref = MRef, gun_state = down}) when is_pid(Client) -> + case gun:await_up(Client, Timeout, MRef) of + {ok, _} -> + handle_call(Req, From, State#state{gun_state = up}); + {error, timeout} -> + {reply, {error, timeout}, State}; + {error, Reason} -> + true = erlang:demonitor(MRef, [flush]), + {reply, {error, Reason}, State#state{client = undefined, mref = undefined}} + end; + +handle_call({Method, Request, Timeout}, From, State = #state{client = Client, requests = Requests, gun_state = up}) when is_pid(Client) -> + StreamRef = do_request(Client, Method, Request), + ExpirationTime = erlang:system_time(millisecond) + Timeout, + {noreply, State#state{requests = maps:put(StreamRef, {From, ExpirationTime, undefined}, Requests)}}; + +handle_call(Req, _From, State) -> + ?LOG(error, "Unexpected call: ~p", [Req]), + {reply, ignored, State}. + +handle_cast(Msg, State) -> + ?LOG(error, "Unexpected cast: ~p", [Msg]), + {noreply, State}. + +handle_info({gun_response, Client, StreamRef, IsFin, StatusCode, Headers}, State = #state{client = Client, requests = Requests}) -> + Now = erlang:system_time(millisecond), + case maps:take(StreamRef, Requests) of + error -> + ?LOG(error, "Received 'gun_response' message from unknown stream ref: ~p", [StreamRef]), + {noreply, State}; + {{_, ExpirationTime, _}, NRequests} when Now > ExpirationTime -> + gun:cancel(Client, StreamRef), + flush_stream(Client, StreamRef), + {noreply, State#state{requests = NRequests}}; + {{From, ExpirationTime, undefined}, NRequests} -> + case IsFin of + fin -> + gen_server:reply(From, {ok, StatusCode, Headers}), + {noreply, State#state{requests = NRequests}}; + nofin -> + {noreply, State#state{requests = NRequests#{StreamRef => {From, ExpirationTime, {StatusCode, Headers, <<>>}}}}} + end; + _ -> + ?LOG(error, "Received 'gun_response' message does not match the state"), + {noreply, State} + end; + +handle_info({gun_data, Client, StreamRef, IsFin, Data}, State = #state{client = Client, requests = Requests}) -> + Now = erlang:system_time(millisecond), + case maps:take(StreamRef, Requests) of + error -> + ?LOG(error, "Received 'gun_data' message from unknown stream ref: ~p", [StreamRef]), + {noreply, State}; + {{_, ExpirationTime, _}, NRequests} when Now > ExpirationTime -> + gun:cancel(Client, StreamRef), + flush_stream(Client, StreamRef), + {noreply, State#state{requests = NRequests}}; + {{From, ExpirationTime, {StatusCode, Headers, Acc}}, NRequests} -> + case IsFin of + fin -> + gen_server:reply(From, {ok, StatusCode, Headers, <>}), + {noreply, State#state{requests = NRequests}}; + nofin -> + {noreply, State#state{requests = NRequests#{StreamRef => {From, ExpirationTime, {StatusCode, Headers, <>}}}}} + end; + _ -> + ?LOG(error, "Received 'gun_data' message does not match the state"), + {noreply, State} + end; + +handle_info({gun_error, Client, StreamRef, Reason}, State = #state{client = Client, requests = Requests}) -> + Now = erlang:system_time(millisecond), + case maps:take(StreamRef, Requests) of + error -> + ?LOG(error, "Received 'gun_error' message from unknown stream ref: ~p~n", [StreamRef]), + {noreply, State}; + {{_, ExpirationTime, _}, NRequests} when Now > ExpirationTime -> + {noreply, State#state{requests = NRequests}}; + {{From, _, _}, NRequests} -> + gen_server:reply(From, {error, Reason}), + {noreply, State#state{requests = NRequests}} + end; + +handle_info({gun_up, Client, _}, State = #state{client = Client}) -> + {noreply, State#state{gun_state = up}}; + +handle_info({gun_down, Client, _, Reason, KilledStreams, _}, State = #state{client = Client, requests = Requests}) -> + ?LOG(warning, "Received gun_down with ~p", [Reason]), + Now = erlang:system_time(millisecond), + NRequests = lists:foldl(fun(StreamRef, Acc) -> + case maps:take(StreamRef, Acc) of + error -> Acc; + {{_, ExpirationTime, _}, NAcc} when Now > ExpirationTime -> + NAcc; + {{From, _, _}, NAcc} -> + gen_server:reply(From, {error, Reason}), + NAcc + end + end, Requests, KilledStreams), + {noreply, State#state{gun_state = down, requests = NRequests}}; + +handle_info({'DOWN', MRef, process, Client, Reason}, State = #state{mref = MRef, client = Client, requests = Requests}) -> + ?LOG(warning, "The process of gun exited due to ~p", [Reason]), + true = erlang:demonitor(MRef, [flush]), + Now = erlang:system_time(millisecond), + lists:foreach(fun({_, {_, ExpirationTime, _}}) when Now > ExpirationTime -> + ok; + ({_, {From, _, _}}) -> + gen_server:reply(From, {error, Reason}) + end, maps:to_list(Requests)), + case open(State#state{requests = #{}}) of + {ok, NewState} -> + {noreply, NewState}; + {error, Reason} -> + {noreply, State#state{mref = undefined, client = undefined}} + end; + +handle_info(Info, State) -> + ?LOG(error, "Unexpected info: ~p", [Info]), + {noreply, State}. + +terminate(_Reason, #state{pool = Pool, id = Id}) -> + gproc_pool:disconnect_worker(Pool, {Pool, Id}), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +open(State = #state{host = Host, port = Port, gun_opts = GunOpts}) -> + case gun:open(Host, Port, GunOpts) of + {ok, ConnPid} when is_pid(ConnPid) -> + MRef = monitor(process, ConnPid), + {ok, State#state{mref = MRef, client = ConnPid}}; + {error, Reason} -> + {error, Reason} + end. + +gun_opts(Opts) -> + gun_opts(Opts, #{retry => 5, + retry_timeout => 1000, + connect_timeout => 5000, + protocols => [http], + http_opts => #{keepalive => infinity}}). + +gun_opts([], Acc) -> + Acc; +gun_opts([{retry, Retry} | Opts], Acc) -> + gun_opts(Opts, Acc#{retry => Retry}); +gun_opts([{retry_timeout, RetryTimeout} | Opts], Acc) -> + gun_opts(Opts, Acc#{retry_timeout => RetryTimeout}); +gun_opts([{connect_timeout, ConnectTimeout} | Opts], Acc) -> + gun_opts(Opts, Acc#{connect_timeout => ConnectTimeout}); +gun_opts([{transport, Transport} | Opts], Acc) -> + gun_opts(Opts, Acc#{transport => Transport}); +gun_opts([{transport_opts, TransportOpts} | Opts], Acc) -> + gun_opts(Opts, Acc#{transport_opts => TransportOpts}); +gun_opts([_ | Opts], Acc) -> + gun_opts(Opts, Acc). + +call(ChannPid, Msg, Timeout) -> + gen_server:call(ChannPid, Msg, Timeout). + +pick(Pool) -> + gproc_pool:pick_worker(Pool). + +do_request(Client, get, {Path, Headers}) -> + gun:get(Client, Path, Headers); +do_request(Client, post, {Path, Headers, Body}) -> + gun:post(Client, Path, Headers, Body). + +flush_stream(Client, StreamRef) -> + receive + {gun_response, Client, StreamRef, _, _, _} -> + flush_stream(Client, StreamRef); + {gun_data, Client, StreamRef, _, _} -> + flush_stream(Client, StreamRef); + {gun_error, Client, StreamRef, _} -> + flush_stream(Client, StreamRef) + after 0 -> + ok + end. \ No newline at end of file diff --git a/src/emqx_http_client_sup.erl b/src/emqx_http_client_sup.erl new file mode 100644 index 0000000..dcdd2e4 --- /dev/null +++ b/src/emqx_http_client_sup.erl @@ -0,0 +1,48 @@ +-module(emqx_http_client_sup). + +-behaviour(supervisor). + +-export([ start_link/2 + , init/1 + , stop_pool/1 + ]). + +start_link(Pool, Opts) -> + supervisor:start_link(?MODULE, [Pool, Opts]). + +init([Pool, Opts]) -> + PoolSize = pool_size(Opts), + ok = ensure_pool(Pool, random, [{size, PoolSize}]), + {ok, {{one_for_one, 10, 100}, [ + begin + ensure_pool_worker(Pool, {Pool, I}, I), + #{id => {Pool, I}, + start => {emqx_http_client, start_link, [Pool, I, Opts]}, + restart => transient, + shutdown => 5000, + type => worker, + modules => [emqx_http_client]} + end || I <- lists:seq(1, PoolSize)]}}. + + +ensure_pool(Pool, Type, Opts) -> + try gproc_pool:new(Pool, Type, Opts) + catch + error:exists -> ok + end. + +ensure_pool_worker(Pool, Name, Slot) -> + try gproc_pool:add_worker(Pool, Name, Slot) + catch + error:exists -> ok + end. + +pool_size(Opts) -> + Schedulers = erlang:system_info(schedulers), + proplists:get_value(pool_size, Opts, Schedulers). + +stop_pool(Name) -> + Workers = gproc_pool:defined_workers(Name), + [gproc_pool:remove_worker(Name, WokerName) || {WokerName, _, _} <- Workers], + gproc_pool:delete(Name), + ok. \ No newline at end of file diff --git a/test/emqx_auth_http_SUITE.erl b/test/emqx_auth_http_SUITE.erl index 25ff942..f9acdc6 100644 --- a/test/emqx_auth_http_SUITE.erl +++ b/test/emqx_auth_http_SUITE.erl @@ -64,32 +64,38 @@ set_special_configs(emqx, _Schmea, _Inet) -> emqx_ct_helpers:deps_path(emqx, LoadedPluginPath)); set_special_configs(emqx_auth_http, Schema, Inet) -> - AuthReq = maps:from_list(application:get_env(emqx_auth_http, auth_req, [])), - SuprReq = maps:from_list(application:get_env(emqx_auth_http, super_req, [])), - AclReq = maps:from_list(application:get_env(emqx_auth_http, acl_req, [])), - SvrAddr = http_server_host(Schema, Inet), - - AuthReq1 = AuthReq#{method := get, url := SvrAddr ++ "/mqtt/auth"}, - SuprReq1 = SuprReq#{method := post, content_type := 'x-www-form-urlencoded', url := SvrAddr ++ "/mqtt/superuser"}, - AclReq1 = AclReq #{method := post, content_type := json, url := SvrAddr ++ "/mqtt/acl"}, + ServerAddr = http_server(Schema, Inet), + + AuthReq = #{method => post, + url => ServerAddr ++ "/mqtt/auth", + content_type => <<"application/json">>, + params => [{"clientid", "%c"}, {"username", "%u"}, {"password", "%P"}]}, + SuperReq = #{method => post, + url => ServerAddr ++ "/mqtt/superuser", + content_type => <<"application/json">>, + params => [{"clientid", "%c"}, {"username", "%u"}]}, + AclReq = #{method => post, + url => ServerAddr ++ "/mqtt/acl", + content_type => <<"application/json">>, + params => [{"access", "%A"}, {"username", "%u"}, {"clientid", "%c"}, {"ipaddr", "%a"}, {"topic", "%t"}, {"mountpoint", "%m"}]}, Schema =:= https andalso set_https_client_opts(), - application:set_env(emqx_auth_http, auth_req, maps:to_list(AuthReq1)), - application:set_env(emqx_auth_http, super_req, maps:to_list(SuprReq1)), - application:set_env(emqx_auth_http, acl_req, maps:to_list(AclReq1)). + application:set_env(emqx_auth_http, auth_req, maps:to_list(AuthReq)), + application:set_env(emqx_auth_http, super_req, maps:to_list(SuperReq)), + application:set_env(emqx_auth_http, acl_req, maps:to_list(AclReq)). %% @private set_https_client_opts() -> - HttpOpts = maps:from_list(application:get_env(emqx_auth_http, http_opts, [])), - HttpOpts1 = HttpOpts#{ssl => emqx_ct_helpers:client_ssl_twoway()}, - application:set_env(emqx_auth_http, http_opts, maps:to_list(HttpOpts1)). + TransportOpts = emqx_ct_helpers:client_ssl_twoway(), + {ok, PoolOpts} = application:get_env(emqx_auth_http, pool_opts), + application:set_env(emqx_auth_http, pool_opts, [{transport_opts, TransportOpts}, {transport, ssl} | PoolOpts]). %% @private -http_server_host(http, inet) -> "http://127.0.0.1:8991"; -http_server_host(http, inet6) -> "http://[::1]:8991"; -http_server_host(https, inet) -> "https://127.0.0.1:8991"; -http_server_host(https, inet6) -> "https://[::1]:8991". +http_server(http, inet) -> "http://127.0.0.1:8991"; +http_server(http, inet6) -> "http://[::1]:8991"; +http_server(https, inet) -> "https://127.0.0.1:8991"; +http_server(https, inet6) -> "https://[::1]:8991". %%------------------------------------------------------------------------------ %% Testcases