From 283eb5ac1dfb68d004c7d985e2b6f8615e73e803 Mon Sep 17 00:00:00 2001 From: Roman Shuplov Date: Thu, 29 Dec 2016 13:50:12 -0800 Subject: [PATCH 1/4] start added cluster, fixed pool start --- UPGRADE.md | 7 + samples/apps/review2/src/config.erl | 2 + samples/sys.config | 141 ++++++++--------- src/sgi_app.erl | 4 +- src/sgi_arbiter.erl | 108 +++++++++---- src/sgi_cluster.erl | 227 ++++++++++++++++++++++++++++ src/sgi_monitoring.erl | 29 +++- src/sgi_n2o_fcgi_handler.erl | 39 +++-- src/sgi_n2o_uwsgi_handler.erl | 8 +- src/sgi_pool.erl | 14 +- src/sgi_sup.erl | 24 ++- 11 files changed, 472 insertions(+), 131 deletions(-) create mode 100644 UPGRADE.md create mode 100644 src/sgi_cluster.erl diff --git a/UPGRADE.md b/UPGRADE.md new file mode 100644 index 0000000..fc0961e --- /dev/null +++ b/UPGRADE.md @@ -0,0 +1,7 @@ +# Upgrading Instructions + +### From 0.7.0 To 0.8.0 + +- Tuple list in `sys.config` in section `servers` has been changed to map. +- Added support of cluster in `sgi_n2o_fcgi_handler` and `sgi_n2o_uwsgi_handler`. + diff --git a/samples/apps/review2/src/config.erl b/samples/apps/review2/src/config.erl index e2d8e08..020c5a7 100644 --- a/samples/apps/review2/src/config.erl +++ b/samples/apps/review2/src/config.erl @@ -14,6 +14,8 @@ log_modules() -> % any sgi_sup, sgi_arbiter, sgi_socks5, + sgi_monitoring, + sgi_cluster, %% n2o_async, %% n2o_proto, diff --git a/samples/sys.config b/samples/sys.config index a9da29f..014ad53 100644 --- a/samples/sys.config +++ b/samples/sys.config @@ -1,74 +1,79 @@ [ - {n2o, [{port,8000}, - {app,review}, - {upload,"./apps/review/priv/static/"}, - {search,"/Users/5HT/depot/synrc/synrc.com/apps/*/doc/web/*.htm"}, - {route,routes}, - {mq,n2o_mq}, - {formatter,bert}, - {log_modules,config}, - {log_level,config}, - {log_backend,n2o_log}, - {session,n2o_session}, - {auto_session,disabled}, - {origin,<<"*">>}, - {bridge,n2o_cowboy}, - {pickler,n2o_pickle}, - {erroring,n2o_error}, - {event,pickle}]}, - {kvs, [{dba,store_mnesia}, - {schema, [kvs_user, kvs_acl, kvs_feed, kvs_subscription ]} ]}, - {sgi, [ - {servers, [ - [{name, default}, - {address, localhost}, - {port, 9000}, %% default port to FastCGI - {timeout, 60000}, - {weight, 2}, - {start_connections, 2}, - {max_connections, 10}, %% run N processes with 1 connection on each process. Count cannot be bigger then children of fcgi processes - {max_fails, 5}, - {failed_timeout, 60}], % failed_timeout in seconds - [{name, aaa}, - {address, localhost}, - {port, 9001}, - {timeout, 60000}, - {weight, 5}, - {start_connections, 2}, - {max_connections, 4}, - {max_fails, 5}, - {failed_timeout, 60}] - ]}, + {n2o, [{port, 8000}, + {app, review}, + {upload, "./apps/review/priv/static/"}, + {search, "/Users/5HT/depot/synrc/synrc.com/apps/*/doc/web/*.htm"}, + {route, routes}, + {mq, n2o_mq}, + {formatter, bert}, + {log_modules, config}, + {log_level, config}, + {log_backend, n2o_log}, + {session, n2o_session}, + {auto_session, disabled}, + {origin, <<"*">>}, + {bridge, n2o_cowboy}, + {pickler, n2o_pickle}, + {erroring, n2o_error}, + {event, pickle}]}, + {kvs, [{dba, store_mnesia}, + {schema, [kvs_user, kvs_acl, kvs_feed, kvs_subscription]}]}, + {sgi, [{servers, [ + #{name => default, + address => localhost, + port => 9000, %% default port to FastCGI + timeout => 60000, + weight => 2, + start_connections => 2, + max_connections => 10, %% run N processes with 1 connection on each process. Count cannot be bigger then children of fcgi processes + max_fails => 5, + failed_timeout => 60}, % failed_timeout in seconds + #{name => aaa, + address => localhost, + port => 9001, + timeout => 60000, + weight => 5, + start_connections => 2, + max_connections => 4, + max_fails => 5, + failed_timeout => 60} + ]}, -%% {proxy, #{type => socks5, %% Settings of proxy -%% address => localhost, -%% port => 9050, -%% timeout => 60000, -%% username => <<"username">>, -%% password => <<"password">> -%% }}, + %% {proxy, #{type => socks5, %% Settings of proxy + %% address => localhost, + %% port => 9050, + %% timeout => 60000, + %% username => <<"username">>, + %% password => <<"password">> + %% }}, - {balancing_method, priority}, % priority | blurred, priority is default - {multiplexed, unknown}, % unknown | 1 | 0 - {response_timeout, 600000}, % 1 minute - {vhosts, [ %% settings for family of cgi interfaces - [ - {server_name, "phphost.com"}, %% set your server name(domain), for local tests add line <127.0.0.1 phphost.com> into "/etc/hosts" (in Linux), "C:\Windows\System32\drivers\etc\hosts"(in Windows) - {aliase, "localhost"}, - {root, "/home/roman/dev/sgi/samples/cgi-scripts/php"}, %% set you FULL path to your codes - {index, "index.php"}%, %% default index file - %% rewrite very simple and support 2 cases: equal path and all("*"), you can remove it line if don't needed - %%{rewrite, [{"*", "index.php"}]} - ], - [ - {server_name, "yourhost2.com"}, %% set your server name(domain) - {aliase, "localhost"}, - {root, "/usr/local/www/yourhost2.com"}, %% set you full path to your codes - {index, "index.php"}, %% default index file - %% rewrite very simple and support 2 cases: equal path and all("*"), you can remove it line if don't needed - {rewrite, [{"*", "index.php"}]} - ] + {cluster, #{ + nodes => ['node2@127.0.0.1'], + syncr_interval => 60000, + response_timeout => 600000 + }}, + + {balancing_method, priority}, % priority | blurred, priority is default + {multiplexed, unknown}, % unknown | 1 | 0 + {response_timeout, 600000}, % 1 minute + {vhosts, [ %% settings for family of cgi interfaces + [ + {server_name, "phphost.com"}, %% set your server name(domain), for local tests add line <127.0.0.1 phphost.com> into "/etc/hosts" (in Linux), "C:\Windows\System32\drivers\etc\hosts"(in Windows) + {aliase, "localhost"}, + {root, "/home/roman/dev/sgi/samples/cgi-scripts/php"}, %% set you FULL path to your codes + {index, "index.php"}%, %% default index file + %% rewrite very simple and support 2 cases: equal path and all("*"), you can remove it line if don't needed + %%{rewrite, [{"*", "index.php"}]} + ], + [ + {server_name, "yourhost2.com"}, %% set your server name(domain) + {aliase, "localhost"}, + {root, "/usr/local/www/yourhost2.com"}, %% set you full path to your codes + {index, "index.php"}, %% default index file + %% rewrite very simple and support 2 cases: equal path and all("*"), you can remove it line if don't needed + {rewrite, [{"*", "index.php"}]} + ] ]} - ]}, + ]}, "sys-local.config" ]. diff --git a/src/sgi_app.erl b/src/sgi_app.erl index 07706ea..2c31608 100644 --- a/src/sgi_app.erl +++ b/src/sgi_app.erl @@ -2,5 +2,7 @@ -behaviour(application). -export([start/2, stop/1]). -start(_StartType, _StartArgs) -> sgi_sup:start_link(). +start(_StartType, _StartArgs) -> +%% sgi_monitoring:save_stat(), + sgi_sup:start_link(). stop(_State) -> ok. \ No newline at end of file diff --git a/src/sgi_arbiter.erl b/src/sgi_arbiter.erl index 04bdeab..d70f1f6 100644 --- a/src/sgi_arbiter.erl +++ b/src/sgi_arbiter.erl @@ -16,6 +16,9 @@ map/0, servers/0, not_av/0, + total_conn_count/0, + allocated_conn_count/0, + state/0, down/2]). %% gen_server callbacks @@ -34,15 +37,20 @@ -define(NOTAVAILABLE, 2). -define(DOWN, 3). +-define(MAX_CONNECTIONS, 1). + -define(PROC_LIST, proc_list). -define(PROC_MAP, proc_map). -define(PROC_BY_SERVER, proc_by_server). -define(PROC_ORDER_NUM, proc_order_num). --record(state, {adding_new_pool = 0, ch_to_state_timer}). +-record(state, {adding_new_pool = 0, ch_to_state_timer, total_conn_count = 0, allocated_conn_count = 0}). -record(proc, {pid, weight = 1, server_name, status = ?AVAILABLE, time_created = 0, failed_out = 0}). +-type proc() :: #proc{}. + + %%%=================================================================== %%% API %%%=================================================================== @@ -71,27 +79,25 @@ alloc(CountTry) -> alloc(CountTry - 1) end. -free(Pid) -> - gen_server:cast(?SERVER, {free, Pid}). +free(Pid) -> gen_server:cast(?SERVER, {free, Pid}). +new_pool_started(Pid) -> gen_server:cast(?SERVER, {new_pool_started, Pid}). +free_all() -> gen_server:cast(?SERVER, free_all). -new_pool_started(Pid) -> - gen_server:cast(?SERVER, {new_pool_started, Pid}). +-spec list() -> [pid()]. +list() -> gen_server:call(?SERVER, list). -free_all() -> - gen_server:cast(?SERVER, free_all). +-spec map() -> #{pid() => proc()}. +map() -> gen_server:call(?SERVER, map). -list() -> - gen_server:call(?SERVER, list). -map() -> - gen_server:call(?SERVER, map). -servers() -> - gen_server:call(?SERVER, servers). -not_av() -> - gen_server:call(?SERVER, not_av). - -down(Pid, FailedTimeout) -> - gen_server:cast(?SERVER, {down, Pid, FailedTimeout}). +-spec servers() -> #{ServerName => #{'processes' => list(), 'settings' => map()}} when + ServerName :: atom(). +servers() -> gen_server:call(?SERVER, servers). +not_av() -> gen_server:call(?SERVER, not_av). +total_conn_count() -> gen_server:call(?SERVER, total_conn_count). +allocated_conn_count() -> gen_server:call(?SERVER, allocated_conn_count). +down(Pid, FailedTimeout) -> gen_server:cast(?SERVER, {down, Pid, FailedTimeout}). +state() -> gen_server:call(?SERVER, state). %%%=================================================================== %%% gen_server callbacks @@ -99,9 +105,14 @@ down(Pid, FailedTimeout) -> init([]) -> self() ! ch_to_state, + timer:send_interval(60000, update_counts), {ok, #state{adding_new_pool = 1}}. +%%------------------------------------------------------------------- +%% Handle Call +%%------------------------------------------------------------------- + handle_call(alloc, _From, State) -> V = case active() of undefined -> @@ -118,9 +129,18 @@ handle_call(servers, _From, State) -> {reply, get_servers(), State}; handle_call(not_av, _From, State) -> {reply, get_not_av(), State}; +handle_call(total_conn_count, _From, State) -> + {reply, State#state.total_conn_count, State}; +handle_call(allocated_conn_count, _From, State) -> + {reply, State#state.allocated_conn_count, State}; +handle_call(state, _From, State) -> + {reply, State, State}; handle_call(_Request, _From, State) -> -{reply, ok, State}. + {reply, ok, State}. +%%------------------------------------------------------------------- +%% Handle Cast +%%------------------------------------------------------------------- handle_cast({down, Pid, FailedTimeout}, State) -> down1(Pid, FailedTimeout), @@ -137,8 +157,16 @@ handle_cast(free_all, State) -> handle_cast(_Request, State) -> {noreply, State}. -handle_info(add_pool, #state{adding_new_pool = 1} = State) -> - {noreply, State}; +%%------------------------------------------------------------------- +%% Handle Info +%%------------------------------------------------------------------- + +handle_info(update_counts, State) -> + State1 = update_counts(State), + {noreply, State1}; +handle_info(add_pool, State) -> + State1 = State#state{adding_new_pool = 1}, + {noreply, State1}; handle_info(add_pool, State) -> State1 = case add_pool() of N when is_integer(N) andalso N > 0 -> @@ -152,16 +180,19 @@ handle_info(ch_to_state, State) -> sgi:ct(State#state.ch_to_state_timer), Ch = supervisor:which_children(?SUPERVISOR), ch_to_state(Ch), + + self() ! update_counts, + {noreply, State#state{adding_new_pool = 0, ch_to_state_timer = undefined}}; handle_info(Info, State) -> wf:error(?MODULE, "Unknown Request: ~p~n", [Info]), {noreply, State}. +%%------------------------------------------------------------------- terminate(_Reason, _State) -> ok. - code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -174,7 +205,7 @@ add_pool() -> add_pool(maps:values(S), 0). add_pool([H|T], AddedNum) -> Sett = maps:get(settings, H, #{}), - MaxConnCount = maps:get(max_connections, Sett, 1), + MaxConnCount = maps:get(max_connections, Sett, ?MAX_CONNECTIONS), ProcCount = length(maps:get(processes, H, [])), AddedNum1 = AddedNum + case ProcCount < MaxConnCount of true -> @@ -191,11 +222,16 @@ increase_pool_count(Num, Max) -> % Num * 2 * ratio, ratio - 0.8 and can be chang Num1 = round(Num * 2 * 0.8), case Num1 > Max of true -> Max - Num; _ -> Num1 - Num end. + +%%------------------------------------------------------------------- +%% Add children to State of Arbiter +%%------------------------------------------------------------------- + ch_to_state(Ch) -> ch_to_state(lists:reverse(Ch), []). ch_to_state([Ch|Children], NewList) -> case hd(element(4, Ch)) of - sgi_pool -> % @todo fon't forget about this module name if will be changes of name of module. + sgi_pool -> % @todo don't forget about this module name if will be changes of name of module. ch_to_state(Children, [make_pool(element(2, Ch))|NewList]); _ -> ch_to_state(Children, NewList) @@ -222,7 +258,7 @@ proc_sort(Processes) -> SortedList = lists:sort(Fun, Processes), lists:map(fun(P) -> P#proc.pid end, SortedList) end, - Temp = lists:map(fun(P) -> {P#proc.pid,P} end, Processes), + Temp = lists:map(fun(P) -> {P#proc.pid, P} end, Processes), Map = maps:from_list(Temp), OldMap = case wf:state(?PROC_MAP) of undefined -> #{}; M1 -> M1 end, wf:state(?PROC_MAP, maps:merge(Map, OldMap)), @@ -232,14 +268,17 @@ proc_sort(Processes) -> group_by_server(Ch) -> S = wf:config(sgi, servers), group_by_server(S, Ch, 1, #{}). -group_by_server([H|T], Ch, N, M) -> - Settings = maps:from_list(H), +group_by_server([Settings|T], Ch, N, M) -> ServerName = maps:get(name, Settings), M1 = M#{ServerName => #{settings => Settings, processes => ch_by_server(Ch, ServerName)}}, group_by_server(T, Ch, N + 1, M1); group_by_server([], _, _, M) -> M. +-spec ch_by_server(ProcList, ServerName) -> Pids when + ProcList :: [proc()], + ServerName :: string(), + Pids :: [pid()]. ch_by_server(L, Name) -> ch_by_server(L, Name, []). ch_by_server([H|T], Name, Acc) -> @@ -309,6 +348,19 @@ active([H|T], M) -> end; active([], _) -> undefined. +update_counts(State) -> + Map = wf:state(?PROC_BY_SERVER), + Fun = fun(_K, V, AccIn) -> + case V of + #{settings := #{max_connections := V1}} -> AccIn + V1; + _ -> ?MAX_CONNECTIONS + end + end, + Total = maps:fold(Fun, 0, Map), + Allocated = erlang:length(wf:state(?PROC_LIST)), + State#state{total_conn_count = Total, allocated_conn_count = Allocated}. + + availabled(P,T,M) -> case check_alive(P) andalso check_connect(P) of true->P;_->active(T,M) end. downed(P,T,M) -> @@ -343,8 +395,6 @@ down1(Pid, FailedTimeout) -> _ -> ok end. - - new_pool_started(Pid, _State) -> % when sgi:is_alive(Pid) -> Pool = make_pool(Pid), diff --git a/src/sgi_cluster.erl b/src/sgi_cluster.erl new file mode 100644 index 0000000..e54c680 --- /dev/null +++ b/src/sgi_cluster.erl @@ -0,0 +1,227 @@ +-module(sgi_cluster). +-behaviour(gen_server). + +%% API +-export([start_link/0, node_info/0, is_ready/0, send/3, is_use/0]). + +%% gen_server callbacks +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +-define(SERVER, ?MODULE). + + +%% #{ +%% node@localhost => #{name => '', available => true, overloaded => false, last_updated => 0} +%% node1@localhost => #{name => '', available => true, overloaded => false, last_updated => 0} +%% } +-record(state, {nodes = #{}}). +-record(node_info, {name = '', available = true, overloaded = false, last_updated = 0}). +%%-record(cluster_info, {comps = []}). +%%-record(node_info, {comps = [#{name => '', available => true, overloaded => false, last_updated => 0}]}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +is_use() -> + case wf:config(sgi, cluster, false) of false -> false; _ -> true end. + +node_info() -> + gen_server:call(?SERVER, node_info). + +%% send(sgi_n2o_fcgi_handler, do_send, [CGIParams, HasBody, Body]) +-spec send(M :: atom(), F :: atom(), A :: []) -> Return :: term(). +send(M, F, A) -> + C = {sgi_monitoring:is_critical(), idle_node()}, + case C of + {true, Node} -> + Key = rpc:async_call(Node, M, F, A), + RT = sgi:mv(response_timeout, wf:config(sgi, cluster, #{}), 600000), % 10 minutes + case rpc:nb_yield(Key, RT) of + {value, Val} -> Val; + timeout -> <<>> + end; + _ -> + erlang:apply(M, F, A) + end. + +is_ready() -> + case whereis(?SERVER) of + undefined -> false; + _ -> true + end. + +idle_node() -> + gen_server:call(?SERVER, idle_node). + +%%is_overload() -> +%% sgi_monitoring:is_overload(). + +-spec(start_link() -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +init([]) -> + check_node(), + M = check_nodes(), + self() ! collect_load_info, + {ok, #state{nodes = M}}. + +%%------------------------------------------------------------------- +%% Handle Call +%%------------------------------------------------------------------- + +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call(node_info, _From, State) -> + {reply, State#state.nodes, State}; +handle_call(idle_node, _From, State) -> + R = do_idle_node(State), + {reply, R, State}; +handle_call(_Request, _From, State) -> + {reply, ok, State}. + +%%------------------------------------------------------------------- +%% Handle Cast +%%------------------------------------------------------------------- + +handle_cast(_Request, State) -> + {noreply, State}. + +%%------------------------------------------------------------------- +%% Handle Call +%%------------------------------------------------------------------- + +handle_info({From, is_overload}, State) -> + From ! {sgi_cluster, erlang:node(), {ok, node(), sgi_monitoring:is_overload()}}, + {noreply, State}; +handle_info(collect_load_info, State) -> + spawn(fun() -> + R = collect_load_info(), + sgi_cluster ! {collect_load_info_result, R} end), + SI = sgi:mv(syncr_interval, wf:config(sgi, cluster, #{}), 60000), + timer:send_after(SI, collect_load_info), + {noreply, State}; +handle_info({collect_load_info_result, R}, State) -> + State1 = cluster_info_response(R, State), + {noreply, State1}; +handle_info(_Info, State) -> + {noreply, State}. + +%%------------------------------------------------------------------- + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +%% Send request of info to all nodes +collect_load_info() -> + rpc:multi_server_call(nodes(), sgi_cluster, is_overload). + +-spec cluster_info_response(Response, State) -> State1 when + Response :: {Replies, BadNodes} | Replies | BadNodes | [], + Replies :: [{ok, NodeName, Resp}], %% Responses from each node + BadNodes :: [NodeName], + NodeName :: atom(), %% Name of remote node + Resp :: term(), %% Response from remote node + State :: #state{}, + State1 :: #state{}. +cluster_info_response([], State) -> + State; +cluster_info_response({[], []}, State) -> + State#state{nodes = #{}}; +cluster_info_response({Replies, BadNodes}, State) -> + State1 = cluster_info_response(Replies, State), + State2 = cluster_info_response(BadNodes, State1), + State2; +cluster_info_response([H | T], State) -> + NodeName = response_node_name(H), + {RecordInfo, M} = + case map_size(State#state.nodes) > 0 of + true -> + #{NodeName := RecordInfo1} = M1 = State#state.nodes, + {RecordInfo1, M1}; + false -> + {#node_info{name = NodeName}, #{}} + end, + NewRecord = + case H of + {ok, NodeName, Resp} -> + RecordInfo#node_info{available = true, overloaded = Resp, last_updated = unix_time()}; + NodeName -> + RecordInfo#node_info{available = false, last_updated = unix_time()} + end, + M2 = M#{NodeName => NewRecord}, + State1 = State#state{nodes = M2}, + cluster_info_response(T, State1). + +response_node_name({ok, N, _}) -> N; +response_node_name(N) -> N. + +%% Check node and change if needed. Don't forget run `epmd -daemon` +check_node() -> + case erlang:node() =:= nonode@nohost of + true -> + net_kernel:start(['node@127.0.0.1']); + _ -> + ok + end. + +check_nodes() -> + Nodes = sgi:mv(nodes, wf:config(sgi, cluster, #{}), []), + [net_kernel:connect_node(N) || N <- Nodes], + check_nodes(erlang:nodes(), #{}). +check_nodes([], M) -> + M; +check_nodes([NodeName | T], M) -> + M1 = maps:put(NodeName, #node_info{name = NodeName}, M), + check_nodes(T, M1). + +-spec do_idle_node(State :: #state{}) -> boolean(). +do_idle_node(State) -> + M = State#state.nodes, + L = maps:to_list(M), + do_idle_node1(L). + +do_idle_node1([]) -> + node(); +do_idle_node1([H|T]) -> + case node_is_idle(H) of + true -> + true; + _ -> + do_idle_node1(T) + end. + +node_is_idle(R) -> + case R of + R when R#node_info.available, R#node_info.overloaded =:= false -> + true; + _ -> + false + end. + +unix_time() -> + erlang:system_time(seconds). \ No newline at end of file diff --git a/src/sgi_monitoring.erl b/src/sgi_monitoring.erl index be95dd3..087229a 100644 --- a/src/sgi_monitoring.erl +++ b/src/sgi_monitoring.erl @@ -1,6 +1,6 @@ -module(sgi_monitoring). --export([cpu_load/0, mem_load/0, is_critical/0, save_stat/0, do_save_stat/0]). +-export([cpu_load/0, mem_load/0, conn_load/0, is_critical/0, is_overload/0, save_stat/0, do_save_stat/0]). -define(MAX_FILE_SIZE, 10485760). @@ -8,11 +8,28 @@ -define(CPU_STAT, "stat/cpu_stat.log"). -define(MEM_STAT, "stat/mem_stat.log"). -is_critical() -> cpu_load() > 90 orelse mem_load() > 90. +%% @private +%% @doc Checking or Processor or Memory overload, considering available waiting connections. +is_critical() -> + true. +%% is_overload() andalso conn_load() < 80. + +is_overload() -> + cpu_load(5) > 90 orelse mem_load() > 90. cpu_load() -> + cpu_load(1). + +-spec cpu_load(N) -> Result when + N :: 1 | 5 | 15, + Result :: non_neg_integer(). +cpu_load(N) -> + Load = case N of + 5 -> cpu_sup:avg5(); + 15 -> cpu_sup:avg15(); + _ -> cpu_sup:avg1() + end, D = 50, - Load = cpu_sup:avg1(), 100 * (1 - D/(D + Load)). mem_load() -> @@ -20,6 +37,9 @@ mem_load() -> Mt = sgi:pv(total_memory, memsup:get_system_memory_data(), 0), Ma / Mt * 100. +conn_load() -> + (sgi_arbiter:allocated_conn_count() / sgi_arbiter:total_conn_count()) * 100. + mem_allocated() -> lists:foldl(fun({_,X}, Sum) -> X + Sum end, 0, erlang:memory()). @@ -31,8 +51,7 @@ do_save_stat() -> archive_log(?CPU_STAT), archive_log(?MEM_STAT), Cpu = cpu_load(), - Mem = mem_allocated(), -%% Mem = mem_load(), + Mem = mem_load(), file:write_file(?CPU_STAT, <<(wf:to_binary(Cpu))/binary, "\n">>, [append]), file:write_file(?MEM_STAT, <<(wf:to_binary(Mem))/binary, "\n">>, [append]), timer:sleep(60000), diff --git a/src/sgi_n2o_fcgi_handler.erl b/src/sgi_n2o_fcgi_handler.erl index 0d8c99c..0bef395 100644 --- a/src/sgi_n2o_fcgi_handler.erl +++ b/src/sgi_n2o_fcgi_handler.erl @@ -3,7 +3,7 @@ -include_lib("stdlib/include/ms_transform.hrl"). %% API --export([init/0, send/0, send/1, stop/0]). +-export([init/0, send/0, send/1, do_send/3, stop/0]). -define(PROTO_CGI, <<"CGI/1.1">>). -define(PROTO_HTTP, <<"HTTP/1.1">>). @@ -40,18 +40,35 @@ send(Http) -> {_, Body} = bs(Http), CGIParams = get_params(Http), - {ok, Pid} = sgi_fcgi:start(), - Pid ! {overall, self(), CGIParams, has_body(Http), Body}, - Timer = erlang:send_after(wf:config(sgi, response_timeout, ?DEF_TIMEOUT), self(), {sgi_fcgi_timeout, Pid}), - Ret = ret(), - sgi:ct(Timer), - sgi_fcgi:stop(Pid), + Ret = case sgi_cluster:is_use() of + true -> + sgi_cluster:send(sgi_n2o_fcgi_handler, do_send, [CGIParams, has_body(Http), Body]); + _ -> + do_send(CGIParams, has_body(Http), Body) + end, + +%% {ok, Pid} = sgi_fcgi:start(), +%% Pid ! {overall, self(), CGIParams, has_body(Http), Body}, +%% Timer = erlang:send_after(wf:config(sgi, response_timeout, ?DEF_TIMEOUT), self(), {sgi_fcgi_timeout, Pid}), +%% Ret = ret(), +%% sgi:ct(Timer), +%% sgi_fcgi:stop(Pid), + RetH = get_response_headers(), set_header_to_cowboy(RetH, 0), terminate(), %% @todo Return headers from cgi because cowboy don't give access to resp_headers - {iolist_to_binary(Ret), wf:state(status), RetH}. + {Ret, wf:state(status), RetH}. +%% {iolist_to_binary(Ret), wf:state(status), RetH}. +do_send(CGIParams, HasBody, Body) -> + {ok, Pid} = sgi_fcgi:start(), + Pid ! {overall, self(), CGIParams, HasBody, Body}, + Timer = erlang:send_after(wf:config(sgi, response_timeout, ?DEF_TIMEOUT), self(), {sgi_fcgi_timeout, Pid}), + Ret = ret(), + sgi:ct(Timer), + sgi_fcgi:stop(Pid), + iolist_to_binary(Ret). %% =========================================================== %% Prepare Request @@ -331,13 +348,15 @@ ret() -> []; {sgi_fcgi_return_error, Err} -> wf:error(?MODULE, "Connection error ~p~n", [Err]), - set_header_to_cowboy([{<<"retry-after">>, <<"3600">>}]), + update_response_headers([{<<"retry-after">>, <<"3600">>}], true), +%% set_header_to_cowboy([{<<"retry-after">>, <<"3600">>}]), wf:state(status, 503), []; {sgi_fcgi_timeout, Pid} -> sgi_fcgi:stop(Pid), wf:error(?MODULE, "Connect timeout to FastCGI ~n", []), - set_header_to_cowboy([{<<"retry-after">>, <<"3600">>}]), + update_response_headers([{<<"retry-after">>, <<"3600">>}], true), +%% set_header_to_cowboy([{<<"retry-after">>, <<"3600">>}]), wf:state(status, 503), [] end. diff --git a/src/sgi_n2o_uwsgi_handler.erl b/src/sgi_n2o_uwsgi_handler.erl index 1b5f100..2f3d452 100644 --- a/src/sgi_n2o_uwsgi_handler.erl +++ b/src/sgi_n2o_uwsgi_handler.erl @@ -335,13 +335,15 @@ ret() -> stdout(Out); {sgi_uwsgi_return_error, Err} -> wf:error(?MODULE, "Connection error ~p~n", [Err]), - set_header_to_cowboy([{<<"retry-after">>, <<"3600">>}]), + update_response_headers([{<<"retry-after">>, <<"3600">>}], true), +%% set_header_to_cowboy([{<<"retry-after">>, <<"3600">>}]), wf:state(status, 503), []; {sgi_uwsgi_timeout, Pid} -> sgi_uwsgi:stop(Pid), - wf:error(?MODULE, "Connect timeout to FastCGI ~n", []), - set_header_to_cowboy([{<<"retry-after">>, <<"3600">>}]), + wf:error(?MODULE, "Connect timeout to uwsgi ~n", []), + update_response_headers([{<<"retry-after">>, <<"3600">>}], true), +%% set_header_to_cowboy([{<<"retry-after">>, <<"3600">>}]), wf:state(status, 503), [] end diff --git a/src/sgi_pool.erl b/src/sgi_pool.erl index a337687..dc291a8 100644 --- a/src/sgi_pool.erl +++ b/src/sgi_pool.erl @@ -101,13 +101,13 @@ try_connect(Pid) -> init([_Name, Conf]) -> timer:send_interval(?HIBERNATE_AFTER, self(), hibernate), {ok, #state{ - server_name = sgi:pv(name, Conf, default), - address = sgi:pv(address, Conf, localhost), - port = sgi:pv(port, Conf, 9000), - weight = sgi:pv(weight, Conf, 1), - max_fails = sgi:pv(max_fails, Conf, 10), - failed_timeout = sgi:pv(failed_timeout, Conf, 1), % in seconds - timeout = sgi:pv(timeout, Conf, 30000) + server_name = sgi:mv(name, Conf, default), + address = sgi:mv(address, Conf, localhost), + port = sgi:mv(port, Conf, 9000), + weight = sgi:mv(weight, Conf, 1), + max_fails = sgi:mv(max_fails, Conf, 10), + failed_timeout = sgi:mv(failed_timeout, Conf, 1), % in seconds + timeout = sgi:mv(timeout, Conf, 30000) }}. handle_call({once, Request}, _From, State) -> diff --git a/src/sgi_sup.erl b/src/sgi_sup.erl index f5ed9aa..834f5c0 100644 --- a/src/sgi_sup.erl +++ b/src/sgi_sup.erl @@ -40,7 +40,11 @@ stop_child(Name) -> start_children() -> ok = start_pool_children(), - ?SERVER:start_child(sgi_arbiter). + ?SERVER:start_child(sgi_arbiter), + case sgi_cluster:is_use() of + false -> skip; + _ -> ?SERVER:start_child(sgi_cluster) + end. %%%=================================================================== %%% Supervisor callbacks @@ -55,6 +59,9 @@ init([]) -> %%% Internal functions %%%=================================================================== +%% +%% Make Child Spec as process (include tcp connections) for starting with supervisor +%% -spec make_pool_spec() -> list(). make_pool_spec() -> Conf = wf:config(sgi, servers), @@ -64,18 +71,19 @@ make_pool_spec() -> List :: list(), List1 :: list(). make_pool_spec([], New) -> New; -make_pool_spec([H|T], L) -> - P = make_pool_spec(sgi:pv(start_connections,H,1), H, []), - make_pool_spec(T, L ++ P). +make_pool_spec([H | T], Acc) -> + P = make_pool_spec(sgi:mv(start_connections, H, 1), H, []), + make_pool_spec(T, Acc ++ P). --spec make_pool_spec(Num, Conf, List) -> list() when +-spec make_pool_spec(Num, Conf, List) -> List1 when Num :: non_neg_integer(), Conf :: [tuple()], - List :: list(). + List :: [], + List1 :: []. make_pool_spec(0, _, L) -> L; make_pool_spec(Num, Conf, L) when is_integer(Num) andalso Num > 0 -> - N = wf:to_atom("Pool#" ++ wf:to_list(sgi:pv(name, Conf, default)) ++ "," ++ wf:to_list(rand:uniform(100000))), + N = wf:to_atom("Pool#" ++ wf:to_list(sgi:mv(name, Conf, default)) ++ "," ++ wf:to_list(rand:uniform(100000))), M = #{id => N, start => {sgi_pool, start_link, [N, Conf]}}, - make_pool_spec(Num - 1, Conf, [M|L]); + make_pool_spec(Num - 1, Conf, [M | L]); make_pool_spec(_, _, _) -> []. From 8cfb2f13c731e304edd75cf277911887578b66fd Mon Sep 17 00:00:00 2001 From: Roman Shuplov Date: Sat, 25 Mar 2017 09:34:48 -0700 Subject: [PATCH 2/4] fixed return headers from request, added idle of nodes, other fixes, changed using multiplexer to NO in fcgi protocol --- src/sgi_cluster.erl | 158 +++++++++++++++++++++------------- src/sgi_fcgi.erl | 2 +- src/sgi_monitoring.erl | 9 +- src/sgi_n2o_fcgi_handler.erl | 23 ++--- src/sgi_n2o_uwsgi_handler.erl | 29 +++++-- 5 files changed, 134 insertions(+), 87 deletions(-) diff --git a/src/sgi_cluster.erl b/src/sgi_cluster.erl index e54c680..8e9f5ab 100644 --- a/src/sgi_cluster.erl +++ b/src/sgi_cluster.erl @@ -2,7 +2,7 @@ -behaviour(gen_server). %% API --export([start_link/0, node_info/0, is_ready/0, send/3, is_use/0]). +-export([start_link/0, node_info/0, is_ready/0, send/3, is_use/0, is_overloaded/0, info/0]). %% gen_server callbacks -export([init/1, @@ -14,15 +14,13 @@ -define(SERVER, ?MODULE). - +%% Sample of structure of State %% #{ %% node@localhost => #{name => '', available => true, overloaded => false, last_updated => 0} %% node1@localhost => #{name => '', available => true, overloaded => false, last_updated => 0} %% } --record(state, {nodes = #{}}). --record(node_info, {name = '', available = true, overloaded = false, last_updated = 0}). -%%-record(cluster_info, {comps = []}). -%%-record(node_info, {comps = [#{name => '', available => true, overloaded => false, last_updated => 0}]}). +-record(node_info, {name = '', available = true, overloaded = false, updated = 0}). +-record(state, {nodes = #{}, last_sent_node = 0, self = #node_info{}}). %%%=================================================================== %%% API @@ -34,19 +32,21 @@ is_use() -> node_info() -> gen_server:call(?SERVER, node_info). -%% send(sgi_n2o_fcgi_handler, do_send, [CGIParams, HasBody, Body]) +info() -> + gen_server:call(?SERVER, info). + -spec send(M :: atom(), F :: atom(), A :: []) -> Return :: term(). send(M, F, A) -> - C = {sgi_monitoring:is_critical(), idle_node()}, - case C of - {true, Node} -> - Key = rpc:async_call(Node, M, F, A), - RT = sgi:mv(response_timeout, wf:config(sgi, cluster, #{}), 600000), % 10 minutes - case rpc:nb_yield(Key, RT) of + case is_overloaded() of + true -> + wf:info(?MODULE, "Try call to cluster", []), + Key = rpc:async_call(idle_node(), M, F, A), + case rpc:nb_yield(Key, rt()) of {value, Val} -> Val; timeout -> <<>> end; _ -> + wf:info(?MODULE, "Have no availeble nodes, calling self node", []), erlang:apply(M, F, A) end. @@ -56,14 +56,12 @@ is_ready() -> _ -> true end. +is_overloaded() -> + gen_server:call(?SERVER, is_overloaded). + idle_node() -> gen_server:call(?SERVER, idle_node). -%%is_overload() -> -%% sgi_monitoring:is_overload(). - --spec(start_link() -> - {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). @@ -75,25 +73,27 @@ init([]) -> check_node(), M = check_nodes(), self() ! collect_load_info, - {ok, #state{nodes = M}}. + {ok, #state{nodes = M, self = #node_info{name = node()}}}. %%------------------------------------------------------------------- %% Handle Call %%------------------------------------------------------------------- --spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, - State :: #state{}) -> - {reply, Reply :: term(), NewState :: #state{}} | - {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | - {stop, Reason :: term(), NewState :: #state{}}). handle_call(node_info, _From, State) -> {reply, State#state.nodes, State}; + +handle_call(info, _From, State) -> + {reply, State, State}; + +handle_call(is_overloaded, _From, State) -> + {reply, State#state.self#node_info.overloaded, State}; + handle_call(idle_node, _From, State) -> - R = do_idle_node(State), - {reply, R, State}; + {ok, NodeName, Index} = do_idle_node(State), + wf:info(?MODULE, "Selected node for a help: ~p~n", [NodeName]), + State1 = State#state{last_sent_node = Index}, + {reply, NodeName, State1}; + handle_call(_Request, _From, State) -> {reply, ok, State}. @@ -105,22 +105,37 @@ handle_cast(_Request, State) -> {noreply, State}. %%------------------------------------------------------------------- -%% Handle Call +%% Handle Info %%------------------------------------------------------------------- +%% @doc Request to other nodes. handle_info({From, is_overload}, State) -> From ! {sgi_cluster, erlang:node(), {ok, node(), sgi_monitoring:is_overload()}}, {noreply, State}; + +%% @doc Self request for collect information. To do each minute. handle_info(collect_load_info, State) -> - spawn(fun() -> - R = collect_load_info(), - sgi_cluster ! {collect_load_info_result, R} end), SI = sgi:mv(syncr_interval, wf:config(sgi, cluster, #{}), 60000), + spawn(fun() -> + {ok, TRef} = timer:kill_after(SI - 5000), % kill current process because will soon open new the same. + wf:info(?MODULE, "Send is_overload request to cluster", []), + R = rpc:multi_server_call(nodes(), sgi_cluster, is_overload), + wf:info(?MODULE, "Recieve claster info: ~p~n", [R]), + timer:cancel(TRef), + sgi_cluster ! {collect_load_info_result, R} + end), + + Self = State#state.self, + State1 = State#state{self = Self#node_info{overloaded = sgi_monitoring:is_overload(), updated = unix_time()}}, + timer:send_after(SI, collect_load_info), - {noreply, State}; + {noreply, State1}; + +%% @doc Parse result from other nodes. handle_info({collect_load_info_result, R}, State) -> State1 = cluster_info_response(R, State), {noreply, State1}; + handle_info(_Info, State) -> {noreply, State}. @@ -136,10 +151,6 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%=================================================================== -%% Send request of info to all nodes -collect_load_info() -> - rpc:multi_server_call(nodes(), sgi_cluster, is_overload). - -spec cluster_info_response(Response, State) -> State1 when Response :: {Replies, BadNodes} | Replies | BadNodes | [], Replies :: [{ok, NodeName, Resp}], %% Responses from each node @@ -169,9 +180,9 @@ cluster_info_response([H | T], State) -> NewRecord = case H of {ok, NodeName, Resp} -> - RecordInfo#node_info{available = true, overloaded = Resp, last_updated = unix_time()}; + RecordInfo#node_info{available = true, overloaded = Resp, updated = unix_time()}; NodeName -> - RecordInfo#node_info{available = false, last_updated = unix_time()} + RecordInfo#node_info{available = false, updated = unix_time()} end, M2 = M#{NodeName => NewRecord}, State1 = State#state{nodes = M2}, @@ -189,6 +200,8 @@ check_node() -> ok end. +-spec check_nodes() -> Map when + Map :: #{node() => #node_info{}} | #{}. check_nodes() -> Nodes = sgi:mv(nodes, wf:config(sgi, cluster, #{}), []), [net_kernel:connect_node(N) || N <- Nodes], @@ -199,29 +212,54 @@ check_nodes([NodeName | T], M) -> M1 = maps:put(NodeName, #node_info{name = NodeName}, M), check_nodes(T, M1). --spec do_idle_node(State :: #state{}) -> boolean(). +-spec do_idle_node(State :: #state{}) -> {ok, NodeName, Index} when + NodeName :: list(), + Index :: non_neg_integer(). do_idle_node(State) -> - M = State#state.nodes, - L = maps:to_list(M), - do_idle_node1(L). - -do_idle_node1([]) -> - node(); -do_idle_node1([H|T]) -> - case node_is_idle(H) of - true -> - true; - _ -> - do_idle_node1(T) + L = maps:values(State#state.nodes), + I = State#state.last_sent_node, + do_idle_node(L, I + 1, I). + +-spec do_idle_node(L, I, SelfI) -> term() when + L :: list(), % List of nodes + I :: non_neg_integer(), % Iterationable index for searching node + SelfI :: non_neg_integer(). % Current index for avoid cycle and search only in one circle +do_idle_node([], _I, _) -> + {ok, node(), 0}; +%%do_idle_node([{NodeInfo}], _I, _SelfI) -> %% using for performance trick if we have only one helped node in our cluster +%% case NodeInfo of +%% NodeInfo when NodeInfo#node_info.available, not NodeInfo#node_info.overloaded -> +%% {ok, NodeInfo#node_info.name, 1}; +%% _ -> +%% {ok, node(), 0} +%% end; +do_idle_node(L, I, SelfI) -> + try + wf:info(?MODULE, "List of nodes: ~p~n", [L]), + wf:info(?MODULE, "SelfI: ~p~n", [SelfI]), + case lists:nth(I, L) of + NodeInfo when NodeInfo#node_info.available, not NodeInfo#node_info.overloaded -> + {ok, NodeInfo#node_info.name, I}; + _ -> + case I == SelfI of + true -> + do_idle_node([], 0, 0); + _ -> + do_idle_node(L, I + 1, SelfI) + end + end + catch + error:_Reason -> + case SelfI of + 0 -> + do_idle_node([], 0, 0); % call exit function because nothing found + _ -> + do_idle_node(L, 1, SelfI) % 1 - is first element in array + end end. -node_is_idle(R) -> - case R of - R when R#node_info.available, R#node_info.overloaded =:= false -> - true; - _ -> - false - end. +rt() -> + sgi:mv(response_timeout, wf:config(sgi, cluster, #{}), 600000). % 10 minutes unix_time() -> erlang:system_time(seconds). \ No newline at end of file diff --git a/src/sgi_fcgi.erl b/src/sgi_fcgi.erl index b35e7cc..44fb6b0 100644 --- a/src/sgi_fcgi.erl +++ b/src/sgi_fcgi.erl @@ -281,7 +281,7 @@ check_multiplex() -> MPXS1 = case lists:keyfind(wf:to_binary(?FCGI_MPXS_CONNS), 1, Ret) of {_, <<"0">>} -> ?FCGI_MULTIPLEXED_NO; {_, <<"1">>} -> ?FCGI_MULTIPLEXED_YES; - _ -> ?FCGI_MULTIPLEXED_UNKNOWN + _ -> ?FCGI_MULTIPLEXED_NO end, application:set_env(sgi, multiplexed, MPXS1); _ -> ok diff --git a/src/sgi_monitoring.erl b/src/sgi_monitoring.erl index 087229a..ff2ed0f 100644 --- a/src/sgi_monitoring.erl +++ b/src/sgi_monitoring.erl @@ -10,12 +10,15 @@ %% @private %% @doc Checking or Processor or Memory overload, considering available waiting connections. +%% Server is overloaded but is_critical() -> - true. -%% is_overload() andalso conn_load() < 80. +%% true. + is_overload() andalso conn_load() < 80. +%% @doc Resources of server close to end. is_overload() -> - cpu_load(5) > 90 orelse mem_load() > 90. +%% true. + cpu_load(5) > 80 orelse mem_load() > 80. cpu_load() -> cpu_load(1). diff --git a/src/sgi_n2o_fcgi_handler.erl b/src/sgi_n2o_fcgi_handler.erl index 0bef395..636e237 100644 --- a/src/sgi_n2o_fcgi_handler.erl +++ b/src/sgi_n2o_fcgi_handler.erl @@ -40,21 +40,13 @@ send(Http) -> {_, Body} = bs(Http), CGIParams = get_params(Http), - Ret = case sgi_cluster:is_use() of + {RetH, Ret} = case sgi_cluster:is_use() of true -> sgi_cluster:send(sgi_n2o_fcgi_handler, do_send, [CGIParams, has_body(Http), Body]); _ -> do_send(CGIParams, has_body(Http), Body) end, -%% {ok, Pid} = sgi_fcgi:start(), -%% Pid ! {overall, self(), CGIParams, has_body(Http), Body}, -%% Timer = erlang:send_after(wf:config(sgi, response_timeout, ?DEF_TIMEOUT), self(), {sgi_fcgi_timeout, Pid}), -%% Ret = ret(), -%% sgi:ct(Timer), -%% sgi_fcgi:stop(Pid), - - RetH = get_response_headers(), set_header_to_cowboy(RetH, 0), terminate(), %% @todo Return headers from cgi because cowboy don't give access to resp_headers @@ -63,12 +55,13 @@ send(Http) -> do_send(CGIParams, HasBody, Body) -> {ok, Pid} = sgi_fcgi:start(), - Pid ! {overall, self(), CGIParams, HasBody, Body}, Timer = erlang:send_after(wf:config(sgi, response_timeout, ?DEF_TIMEOUT), self(), {sgi_fcgi_timeout, Pid}), + Pid ! {overall, self(), CGIParams, HasBody, Body}, Ret = ret(), + RetH = get_response_headers(), sgi:ct(Timer), sgi_fcgi:stop(Pid), - iolist_to_binary(Ret). + {RetH, iolist_to_binary(Ret)}. %% =========================================================== %% Prepare Request @@ -79,8 +72,8 @@ vhosts() -> Vs = wf:config(sgi, vhosts, []), vhosts(Vs). vhosts([H|T]) -> - S = sgi:pv(server_name, H, ""), - A = sgi:pv(aliase, H, ""), + S = sgi:mv(server_name, H, ""), + A = sgi:mv(alias, H, ""), case wf:to_list(host()) of Host when Host =:= S orelse Host =:= A -> wf:state(vhost, H), ok; @@ -93,7 +86,7 @@ vhosts([]) -> wf:state(vhost, []), ok. vhost(Key) -> vhost(Key, ""). -spec vhost(atom(), []) -> term(). vhost(Key, Def) -> - sgi:pv(Key, wf:state(vhost), Def). + sgi:mv(Key, wf:state(vhost), Def). -spec get_params(http()) -> list(). get_params(Http) -> @@ -378,10 +371,12 @@ update_response_headers(Hs, Status) -> Status1 = case RespH1#response_headers.ended of true -> true; _ -> Status end, RespH2 = RespH1#response_headers{buff = lists:append([Hs, RespH1#response_headers.buff]), ended = Status1}, wf:state(sgi_n2o_fcgi_response_headers, RespH2). + get_response_headers() -> case wf:state(sgi_n2o_fcgi_response_headers) of undefined -> []; H -> lists:reverse(H#response_headers.buff) end. get_response_headers_ended() -> case wf:state(sgi_n2o_fcgi_response_headers) of undefined -> false; H -> H#response_headers.ended end. + get_response_header(K) -> sgi:pv(K, get_response_headers(), <<>>). diff --git a/src/sgi_n2o_uwsgi_handler.erl b/src/sgi_n2o_uwsgi_handler.erl index 2f3d452..48de629 100644 --- a/src/sgi_n2o_uwsgi_handler.erl +++ b/src/sgi_n2o_uwsgi_handler.erl @@ -39,17 +39,28 @@ send(Http) -> {_, Body} = bs(Http), CGIParams = get_params(Http), + {RetH, Ret} = case sgi_cluster:is_use() of + true -> + sgi_cluster:send(sgi_n2o_uwsgi_handler, do_send, [CGIParams, has_body(Http), Body]); + _ -> + do_send(CGIParams, has_body(Http), Body) + end, + + set_header_to_cowboy(RetH, 0), + terminate(), + %% @todo Return headers from cgi because cowboy don't give access to resp_headers + {Ret, wf:state(status), RetH}. + +do_send(CGIParams, HasBody, Body) -> {ok, Pid} = sgi_uwsgi:start(), - Pid ! {overall, self(), CGIParams, has_body(Http), Body}, Timer = erlang:send_after(wf:config(sgi, response_timeout, ?DEF_TIMEOUT), self(), {sgi_uwsgi_timeout, Pid}), + Pid ! {overall, self(), CGIParams, HasBody, Body}, Ret = ret(), + RetH = get_response_headers(), sgi:ct(Timer), sgi_uwsgi:stop(Pid), - RetH = get_response_headers(), - set_header_to_cowboy(RetH, 0), - terminate(), - %% @todo Return headers from cgi because cowboy don't give access to resp_headers - {iolist_to_binary(Ret), wf:state(status), RetH}. + {RetH, iolist_to_binary(Ret)}. + %% =========================================================== @@ -61,8 +72,8 @@ vhosts() -> Vs = wf:config(sgi, vhosts, []), vhosts(Vs). vhosts([H|T]) -> - S = sgi:pv(server_name, H, ""), - A = sgi:pv(aliase, H, ""), + S = sgi:mv(server_name, H, ""), + A = sgi:mv(alias, H, ""), case wf:to_list(host()) of Host when Host =:= S orelse Host =:= A -> wf:state(vhost, H), ok; @@ -75,7 +86,7 @@ vhosts([]) -> wf:state(vhost, []), ok. vhost(Key) -> vhost(Key, ""). -spec vhost(atom(), []) -> term(). vhost(Key, Def) -> - sgi:pv(Key, wf:state(vhost), Def). + sgi:mv(Key, wf:state(vhost), Def). -spec get_params(http()) -> list(). get_params(Http) -> From 2f927f4d915289419a02a2f18e124d2f52b8f987 Mon Sep 17 00:00:00 2001 From: Roman Shuplov Date: Sat, 1 Apr 2017 01:44:37 -0700 Subject: [PATCH 3/4] bug fix, refactoring --- README.md | 8 ++++ UPGRADE.md | 2 +- samples/sys.config | 56 +++++++++++++-------------- src/sgi.app.src | 2 +- src/sgi_cluster.erl | 66 ++++++++++++++++++++------------ src/sgi_monitoring.erl | 11 +++--- src/sgi_pool.erl | 87 ++++++++++++++++++++---------------------- src/sgi_sup.erl | 12 ++++-- 8 files changed, 134 insertions(+), 110 deletions(-) diff --git a/README.md b/README.md index 019b565..a6371b8 100644 --- a/README.md +++ b/README.md @@ -24,6 +24,14 @@ and you can send message, for example through the [Tor](https://torproject.org/) - Erlang 18.1+ +## Documentation + +Read documentation in [Wiki](https://github.com/AstRonin/sgi/wiki). + +## Upgrade + +Read [UPGRADE](https://github.com/AstRonin/sgi/blob/master/UPGRADE.md) before installation new version. + ## Try Samples ### Sample 1 - Add a content from other languages inside of your Site. diff --git a/UPGRADE.md b/UPGRADE.md index fc0961e..66bf32f 100644 --- a/UPGRADE.md +++ b/UPGRADE.md @@ -2,6 +2,6 @@ ### From 0.7.0 To 0.8.0 -- Tuple list in `sys.config` in section `servers` has been changed to map. +- Configuration tuple list changed to map in `sys.config`. - Added support of cluster in `sgi_n2o_fcgi_handler` and `sgi_n2o_uwsgi_handler`. diff --git a/samples/sys.config b/samples/sys.config index 014ad53..b83d44e 100644 --- a/samples/sys.config +++ b/samples/sys.config @@ -36,43 +36,43 @@ start_connections => 2, max_connections => 4, max_fails => 5, - failed_timeout => 60} - ]}, + failed_timeout => 60}]}, + +%% {proxy, #{type => socks5, %% Settings of proxy +%% address => localhost, +%% port => 9050, +%% timeout => 60000, +%% username => <<"username">>, +%% password => <<"password">> +%% }}, - %% {proxy, #{type => socks5, %% Settings of proxy - %% address => localhost, - %% port => 9050, - %% timeout => 60000, - %% username => <<"username">>, - %% password => <<"password">> - %% }}, - {cluster, #{ - nodes => ['node2@127.0.0.1'], - syncr_interval => 60000, - response_timeout => 600000 - }}, +%% {cluster, #{ +%% nodes => ['node2@127.0.0.1'], +%% syncr_interval => 60000, +%% response_timeout => 600000, +%% cpu_overload => 80, % max percent of loading when need call to cluster +%% mem_overload => 80 % max percent of loading when need call to cluster +%% }}, {balancing_method, priority}, % priority | blurred, priority is default {multiplexed, unknown}, % unknown | 1 | 0 {response_timeout, 600000}, % 1 minute {vhosts, [ %% settings for family of cgi interfaces - [ - {server_name, "phphost.com"}, %% set your server name(domain), for local tests add line <127.0.0.1 phphost.com> into "/etc/hosts" (in Linux), "C:\Windows\System32\drivers\etc\hosts"(in Windows) - {aliase, "localhost"}, - {root, "/home/roman/dev/sgi/samples/cgi-scripts/php"}, %% set you FULL path to your codes - {index, "index.php"}%, %% default index file + #{server_name => "phphost.com", + alias => "localhost", + root => "/home/roman/dev/sgi/samples/cgi-scripts/php", + index => "index.php" %, %% default index file %% rewrite very simple and support 2 cases: equal path and all("*"), you can remove it line if don't needed - %%{rewrite, [{"*", "index.php"}]} - ], - [ - {server_name, "yourhost2.com"}, %% set your server name(domain) - {aliase, "localhost"}, - {root, "/usr/local/www/yourhost2.com"}, %% set you full path to your codes - {index, "index.php"}, %% default index file + %%rewrite => [{"*", "index.php"}] + }, + #{server_name => "yourhost2.com", %% set your server name(domain) + alias => "localhost", + root => "/usr/local/www/yourhost2.com", %% set you full path to your codes + index => "index.php", %% default index file %% rewrite very simple and support 2 cases: equal path and all("*"), you can remove it line if don't needed - {rewrite, [{"*", "index.php"}]} - ] + rewrite => [{"*", "index.php"}] + } ]} ]}, "sys-local.config" diff --git a/src/sgi.app.src b/src/sgi.app.src index a1962e6..cb8955d 100644 --- a/src/sgi.app.src +++ b/src/sgi.app.src @@ -1,6 +1,6 @@ {application, sgi, [ {description, "Socket Gateway Interface"}, - {vsn, "0.7.0"}, + {vsn, "0.8.0"}, {registered, [sgi_arbiter, sgi_multiplexer, sgi_sup]}, {modules, []}, {applications, [kernel, stdlib, sasl, os_mon, n2o]}, diff --git a/src/sgi_cluster.erl b/src/sgi_cluster.erl index 8e9f5ab..5da5ece 100644 --- a/src/sgi_cluster.erl +++ b/src/sgi_cluster.erl @@ -2,7 +2,7 @@ -behaviour(gen_server). %% API --export([start_link/0, node_info/0, is_ready/0, send/3, is_use/0, is_overloaded/0, info/0]). +-export([start_link/0, nodes_info/0, is_ready/0, send/3, is_use/0, is_overloaded/0, info/0]). %% gen_server callbacks -export([init/1, @@ -13,6 +13,7 @@ code_change/3]). -define(SERVER, ?MODULE). +-define(RT, sgi:mv(response_timeout, wf:config(sgi, cluster, #{}), 600000)). % 10 minutes %% Sample of structure of State %% #{ @@ -26,39 +27,49 @@ %%% API %%%=================================================================== +%% @doc Return true if cluster should be use is_use() -> case wf:config(sgi, cluster, false) of false -> false; _ -> true end. -node_info() -> - gen_server:call(?SERVER, node_info). +-spec nodes_info() -> NodesInfo when + NodesInfo :: term(). %% Information about nodes of cluster +nodes_info() -> + gen_server:call(?SERVER, nodes_info). +%% @doc Return state of genserver +-spec info() -> State when + State :: term(). info() -> gen_server:call(?SERVER, info). +%% @doc Send request to available node or to self -spec send(M :: atom(), F :: atom(), A :: []) -> Return :: term(). send(M, F, A) -> case is_overloaded() of true -> wf:info(?MODULE, "Try call to cluster", []), Key = rpc:async_call(idle_node(), M, F, A), - case rpc:nb_yield(Key, rt()) of - {value, Val} -> Val; - timeout -> <<>> + case rpc:nb_yield(Key, ?RT) of + {value, Val} -> + Val; + timeout -> + erlang:throw(timeout) end; _ -> - wf:info(?MODULE, "Have no availeble nodes, calling self node", []), erlang:apply(M, F, A) end. - +%% @doc Return true if genserver started and available is_ready() -> case whereis(?SERVER) of undefined -> false; _ -> true end. +%% @doc Return true if server is overloaded is_overloaded() -> gen_server:call(?SERVER, is_overloaded). +%% @doc Search not everloaded node and selected it for execute a request idle_node() -> gen_server:call(?SERVER, idle_node). @@ -69,17 +80,20 @@ start_link() -> %%% gen_server callbacks %%%=================================================================== + init([]) -> check_node(), M = check_nodes(), self() ! collect_load_info, {ok, #state{nodes = M, self = #node_info{name = node()}}}. + %%------------------------------------------------------------------- %% Handle Call %%------------------------------------------------------------------- -handle_call(node_info, _From, State) -> + +handle_call(nodes_info, _From, State) -> {reply, State#state.nodes, State}; handle_call(info, _From, State) -> @@ -97,17 +111,21 @@ handle_call(idle_node, _From, State) -> handle_call(_Request, _From, State) -> {reply, ok, State}. + %%------------------------------------------------------------------- %% Handle Cast %%------------------------------------------------------------------- + handle_cast(_Request, State) -> {noreply, State}. + %%------------------------------------------------------------------- %% Handle Info %%------------------------------------------------------------------- + %% @doc Request to other nodes. handle_info({From, is_overload}, State) -> From ! {sgi_cluster, erlang:node(), {ok, node(), sgi_monitoring:is_overload()}}, @@ -118,7 +136,6 @@ handle_info(collect_load_info, State) -> SI = sgi:mv(syncr_interval, wf:config(sgi, cluster, #{}), 60000), spawn(fun() -> {ok, TRef} = timer:kill_after(SI - 5000), % kill current process because will soon open new the same. - wf:info(?MODULE, "Send is_overload request to cluster", []), R = rpc:multi_server_call(nodes(), sgi_cluster, is_overload), wf:info(?MODULE, "Recieve claster info: ~p~n", [R]), timer:cancel(TRef), @@ -139,18 +156,22 @@ handle_info({collect_load_info_result, R}, State) -> handle_info(_Info, State) -> {noreply, State}. + %%------------------------------------------------------------------- + terminate(_Reason, _State) -> ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. + %%%=================================================================== %%% Internal functions %%%=================================================================== + -spec cluster_info_response(Response, State) -> State1 when Response :: {Replies, BadNodes} | Replies | BadNodes | [], Replies :: [{ok, NodeName, Resp}], %% Responses from each node @@ -165,8 +186,7 @@ cluster_info_response({[], []}, State) -> State#state{nodes = #{}}; cluster_info_response({Replies, BadNodes}, State) -> State1 = cluster_info_response(Replies, State), - State2 = cluster_info_response(BadNodes, State1), - State2; + cluster_info_response(BadNodes, State1); cluster_info_response([H | T], State) -> NodeName = response_node_name(H), {RecordInfo, M} = @@ -191,6 +211,7 @@ cluster_info_response([H | T], State) -> response_node_name({ok, N, _}) -> N; response_node_name(N) -> N. + %% Check node and change if needed. Don't forget run `epmd -daemon` check_node() -> case erlang:node() =:= nonode@nohost of @@ -200,6 +221,7 @@ check_node() -> ok end. + -spec check_nodes() -> Map when Map :: #{node() => #node_info{}} | #{}. check_nodes() -> @@ -212,6 +234,7 @@ check_nodes([NodeName | T], M) -> M1 = maps:put(NodeName, #node_info{name = NodeName}, M), check_nodes(T, M1). + -spec do_idle_node(State :: #state{}) -> {ok, NodeName, Index} when NodeName :: list(), Index :: non_neg_integer(). @@ -226,17 +249,15 @@ do_idle_node(State) -> SelfI :: non_neg_integer(). % Current index for avoid cycle and search only in one circle do_idle_node([], _I, _) -> {ok, node(), 0}; -%%do_idle_node([{NodeInfo}], _I, _SelfI) -> %% using for performance trick if we have only one helped node in our cluster -%% case NodeInfo of -%% NodeInfo when NodeInfo#node_info.available, not NodeInfo#node_info.overloaded -> -%% {ok, NodeInfo#node_info.name, 1}; -%% _ -> -%% {ok, node(), 0} -%% end; +do_idle_node([NodeInfo], _I, _SelfI) -> %% using for performance trick if we have only one helped node in our cluster + case NodeInfo of + NodeInfo when NodeInfo#node_info.available, not NodeInfo#node_info.overloaded -> + {ok, NodeInfo#node_info.name, 1}; + _ -> + {ok, node(), 0} + end; do_idle_node(L, I, SelfI) -> try - wf:info(?MODULE, "List of nodes: ~p~n", [L]), - wf:info(?MODULE, "SelfI: ~p~n", [SelfI]), case lists:nth(I, L) of NodeInfo when NodeInfo#node_info.available, not NodeInfo#node_info.overloaded -> {ok, NodeInfo#node_info.name, I}; @@ -258,8 +279,5 @@ do_idle_node(L, I, SelfI) -> end end. -rt() -> - sgi:mv(response_timeout, wf:config(sgi, cluster, #{}), 600000). % 10 minutes - unix_time() -> erlang:system_time(seconds). \ No newline at end of file diff --git a/src/sgi_monitoring.erl b/src/sgi_monitoring.erl index ff2ed0f..12532da 100644 --- a/src/sgi_monitoring.erl +++ b/src/sgi_monitoring.erl @@ -7,18 +7,17 @@ -define(MAX_LOG_FILES, 2). -define(CPU_STAT, "stat/cpu_stat.log"). -define(MEM_STAT, "stat/mem_stat.log"). +-define(CPU_OVER, sgi:mv(cpu_overload, wf:config(sgi, cluster, #{}), 80)). +-define(MEM_OVER, sgi:mv(mem_overload, wf:config(sgi, cluster, #{}), 80)). %% @private -%% @doc Checking or Processor or Memory overload, considering available waiting connections. -%% Server is overloaded but +%% @doc Checking or Processor or Memory is overloaded, considering available waiting connections. is_critical() -> -%% true. is_overload() andalso conn_load() < 80. -%% @doc Resources of server close to end. +%% @doc Resources of server is closed to the end. is_overload() -> -%% true. - cpu_load(5) > 80 orelse mem_load() > 80. + cpu_load(5) > ?CPU_OVER orelse mem_load() > ?MEM_OVER. cpu_load() -> cpu_load(1). diff --git a/src/sgi_pool.erl b/src/sgi_pool.erl index dc291a8..4352e75 100644 --- a/src/sgi_pool.erl +++ b/src/sgi_pool.erl @@ -45,10 +45,8 @@ start_link(Name) -> start_link(Name, Conf) -> gen_server:start_link(?MODULE, [Name, Conf], []). -%% -%% Simple and small request. +%% @doc Simple and small request. %% Send request to a server, receive only one response, and free a socket -%% -spec once_call(binary()) -> {ok, binary()} | {error, term()}. once_call(Request) -> case ?ARBITER:alloc() of @@ -75,29 +73,20 @@ once_call(PoolPid, Request) -> {error, Reason} end. -%% -%% Getting settings of this connection for control -%% +%% @doc Getting settings of this connection for control settings(Pid) -> gen_server:call(Pid, get_settings). -%% -%% Checking connect to a server -%% +%% @doc Checking connect to a server try_connect(Pid) -> gen_server:call(Pid, try_connect). -%%-spec jsend(binary()) -> {ok, binary()} | {error, term()}. -%%jsend(Request) -> -%% {ok, PoolPid} = ?ARBITER:alloc(), -%% Ret = gen_server:call(PoolPid, {jsend, Request}), -%% ?ARBITER:free(PoolPid), -%% Ret. %%%=================================================================== %%% gen_server callbacks %%%=================================================================== + init([_Name, Conf]) -> timer:send_interval(?HIBERNATE_AFTER, self(), hibernate), {ok, #state{ @@ -110,6 +99,12 @@ init([_Name, Conf]) -> timeout = sgi:mv(timeout, Conf, 30000) }}. + +%%------------------------------------------------------------------- +%% Handle Call +%%------------------------------------------------------------------- + + handle_call({once, Request}, _From, State) -> case connect(State, false) of {ok, State1} -> @@ -124,28 +119,36 @@ handle_call({once, Request}, _From, State) -> wf:error(?MODULE, "Can't create Socket: ~p~n", [Reason]), {reply, {error, Reason}, set_last_active_time(State1)} end; + handle_call(get_settings, _From, State) -> M = #{weight => State#state.weight, server_name => State#state.server_name}, {reply, {ok, M}, State}; + handle_call(try_connect, _From, State) -> case connect(State) of {ok, State1} -> {reply, ok, State1}; {error, _Reason, State1} -> {reply, error, State1} end; -%%handle_call({jsend, Request}, _From, State) -> -%% {ok, State1} = connect(State, false), -%% ok = gen_tcp:send(State1#state.socket, Request), -%% Ret = do_recv(State1, []), -%% State2 = close(State1), -%% {reply, Ret, State2}; + handle_call(_Request, _From, State) -> {reply, ok, State}. + +%%------------------------------------------------------------------- +%% Handle Cast +%%------------------------------------------------------------------- + + handle_cast(_Request, State) -> {noreply, State}. -%% -%% Send msg -%% + + +%%------------------------------------------------------------------- +%% Handle Info +%%------------------------------------------------------------------- + + +%% @doc Send msg by TCP to client handle_info({send, Request, From}, State) -> State1 = State#state{from = From}, case connect(State1) of @@ -165,27 +168,23 @@ handle_info({send, Request, From}, State) -> {noreply, set_last_active_time(State2)} end; -%% -%% Receive msg -%% +%% @doc Receive msg from client and forward to waiting process handle_info({tcp, Socket, Data}, State) -> State#state.from ! {socket_return, Data}, inet:setopts(Socket, [{active, once}]), {noreply, set_last_active_time(State)}; -%% -%% Receive msg about connection was closed -%% +%% @doc Receive msg that connection closed handle_info({tcp_closed, _Socket}, State) -> %% wf:info(?MODULE, "TCP connection CLOSED with state: ~p~n", [State]), {noreply, set_last_active_time(State#state{socket = undefined})}; -%% -%% Receive msg about connection had errors -%% +%% @doc Receive msg that connection closed with errors handle_info({tcp_error, _Socket, Reason}, State) -> wf:info(?MODULE, "TCP connection got ERROR: ~p with state: ~p~n", [Reason, State]), {noreply, set_last_active_time(State#state{socket = undefined})}; + +%% @doc Move process to hibernate after closing connection handle_info(hibernate, State) -> case sgi:time_now() > (State#state.last_active_time + ?HIBERNATE_AFTER) of true -> @@ -195,40 +194,40 @@ handle_info(hibernate, State) -> {noreply, State} end; -%% -%% Close connection -%% +%% @doc Close connection handle_info(close, State) -> State1 = close(State), {noreply, set_last_active_time(State1)}; + handle_info(send_alive, State) -> % @todo it obtains overload if more than 1000 processes will send this message case sgi:is_alive(sgi_arbiter) of true -> sgi_arbiter:new_pool_started(self()); _ -> wait end, {noreply, State}; + handle_info(Info, State) -> wf:error(?MODULE, "Unexpected message: ~p~n", [Info]), {noreply, set_last_active_time(State)}. + terminate(_Reason, State) -> close(State), ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. + %%%=================================================================== %%% Internal functions %%%=================================================================== + connect(State) -> connect(State, once). -%% -%% Process is trying to connect to a server. -%% +%% @doc Process is trying to connect to a server. -spec connect(#state{}, tcp_active_type()) -> {ok, State :: #state{}} | {error, Reason :: term(), State :: #state{}}. - connect(State = #state{socket = undefined, address = Address, port = Port}, Active) -> {Address1, Port1} = @@ -300,9 +299,7 @@ do_recv(State, Bs) -> {error, Reason} end. -%% -%% Receive only one msg -%% +%% @doc Receive only one msg -spec do_recv_once(#state{}) -> {ok, term()} | {error, term()}. do_recv_once(State) -> case gen_tcp:recv(State#state.socket, 0) of @@ -316,9 +313,7 @@ do_recv_once(State) -> set_last_active_time(State) -> State#state{last_active_time = sgi:time_now()}. -%% -%% Tell arbiter what connection is fail -%% +%% @doc Tell arbiter what connection is fail -spec overage_fail_conns(#state{}) -> #state{}. overage_fail_conns(State) -> case State#state.fails >= State#state.max_fails of diff --git a/src/sgi_sup.erl b/src/sgi_sup.erl index 834f5c0..0ac18c4 100644 --- a/src/sgi_sup.erl +++ b/src/sgi_sup.erl @@ -34,16 +34,20 @@ start_child(N) -> supervisor:start_child(?MODULE, #{id => N, start => {N, start_link, []}}). stop_child(Name) -> case supervisor:terminate_child(?MODULE, Name) of - ok -> supervisor:delete_child(?MODULE, Name); - Error -> Error + ok -> + supervisor:delete_child(?MODULE, Name); + Error -> + Error end. start_children() -> ok = start_pool_children(), ?SERVER:start_child(sgi_arbiter), case sgi_cluster:is_use() of - false -> skip; - _ -> ?SERVER:start_child(sgi_cluster) + false -> + skip; + _ -> + ?SERVER:start_child(sgi_cluster) end. %%%=================================================================== From c8018a2976b0d51d006bf5747c01e649f2f234a3 Mon Sep 17 00:00:00 2001 From: Roman Shuplov Date: Sat, 1 Apr 2017 02:16:13 -0700 Subject: [PATCH 4/4] bug fix --- src/sgi_arbiter.erl | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/sgi_arbiter.erl b/src/sgi_arbiter.erl index d70f1f6..c8726db 100644 --- a/src/sgi_arbiter.erl +++ b/src/sgi_arbiter.erl @@ -164,9 +164,6 @@ handle_cast(_Request, State) -> handle_info(update_counts, State) -> State1 = update_counts(State), {noreply, State1}; -handle_info(add_pool, State) -> - State1 = State#state{adding_new_pool = 1}, - {noreply, State1}; handle_info(add_pool, State) -> State1 = case add_pool() of N when is_integer(N) andalso N > 0 ->