diff --git a/components/authorize/src/authorize_keys.erl b/components/authorize/src/authorize_keys.erl index b5ffd72..52f8b0e 100644 --- a/components/authorize/src/authorize_keys.erl +++ b/components/authorize/src/authorize_keys.erl @@ -289,6 +289,12 @@ normalize_conn(local) -> local; normalize_conn({IP, Port} = Conn) when is_binary(IP), is_binary(Port) -> Conn; +normalize_conn({{A,B,C,D}, Port}) -> + {iolist_to_binary([integer_to_binary(A),<<".">>, + integer_to_binary(B),<<".">>, + integer_to_binary(C),<<".">>, + integer_to_binary(D)]), + to_bin(Port)}; normalize_conn({IP, Port}) -> {to_bin(IP), to_bin(Port)}. diff --git a/components/authorize/src/authorize_rpc.erl b/components/authorize/src/authorize_rpc.erl index 9fa052a..ce55624 100644 --- a/components/authorize/src/authorize_rpc.erl +++ b/components/authorize/src/authorize_rpc.erl @@ -85,7 +85,7 @@ get_credentials(CompSpec) -> remove_connection(CompSpec, Conn) -> rvi_common:notification(authorize, ?MODULE, remove_connection, - [{conn, Conn}], [status], CompSpec). + [{conn, Conn}], CompSpec). store_creds(CompSpec, Creds, Conn) -> store_creds(CompSpec, Creds, Conn, undefined). diff --git a/components/dlink_tcp/src/connection.erl b/components/dlink_tcp/src/connection.erl index 5444d11..5b95221 100644 --- a/components/dlink_tcp/src/connection.erl +++ b/components/dlink_tcp/src/connection.erl @@ -202,8 +202,6 @@ handle_call(_Request, _From, State) -> %% @end %%-------------------------------------------------------------------- handle_cast({send, Data}, #st{packet_mod = PMod, packet_st = PSt} = St) -> - ?debug("~p:handle_cast(send): Sending: ~p", - [ ?MODULE, Data]), {ok, Encoded, PSt1} = PMod:encode(Data, PSt), gen_tcp:send(St#st.sock, Encoded), @@ -212,8 +210,6 @@ handle_cast({send, Data, Opts}, #st{sock = Socket, packet_mod = PMod, packet_st = PSt, frag_opts = FragOpts} = St) -> - ?debug("handle_cast({send, Data, ~p, ...), FragOpts = ~p", - [Opts, FragOpts]), {ok, Bin, PSt1} = PMod:encode(Data, PSt), St1 = St#st{packet_st = PSt1}, rvi_frag:send(Bin, Opts ++ FragOpts, ?MODULE, diff --git a/components/dlink_tls/src/dlink_tls_conn.erl b/components/dlink_tls/src/dlink_tls_conn.erl index 346f912..49349a1 100644 --- a/components/dlink_tls/src/dlink_tls_conn.erl +++ b/components/dlink_tls/src/dlink_tls_conn.erl @@ -37,6 +37,7 @@ -export([is_connection_up/2]). -export([terminate_connection/1]). -export([terminate_connection/2]). +-export([publish_node_id/3]). -define(SERVER, ?MODULE). @@ -127,6 +128,9 @@ is_connection_up(IP, Port) -> false end. +publish_node_id(FromPid, NodeId, Cs) -> + gen_server:cast(FromPid, {publish_node_id, NodeId, Cs}). + %%%=================================================================== %%% gen_server callbacks %%%=================================================================== @@ -264,7 +268,12 @@ handle_cast({activate_socket, Sock}, #st{} = State) -> Res = inet:setopts(Sock, [{active, once}]), ?debug("connection:activate_socket(): ~p", [Res]), {noreply, State}; - +handle_cast({publish_node_id, NodeId, Cs}, #st{} = St) -> + ?debug("publish_node_id (~p)", [NodeId]), + %% Do this from the connection process, so that schedule_rpc can + %% monitor the connection and unpublish when it goes away. + schedule_rpc:publish_node_id(Cs, NodeId, dlink_tls_rpc), + {noreply, St}; handle_cast(_Msg, #st{} = State) -> ?warning("~p:handle_cast(): Unknown cast: ~p~nSt=~p", [ ?MODULE, _Msg, State]), diff --git a/components/dlink_tls/src/dlink_tls_connmgr.erl b/components/dlink_tls/src/dlink_tls_connmgr.erl index d27647d..67cb8aa 100644 --- a/components/dlink_tls/src/dlink_tls_connmgr.erl +++ b/components/dlink_tls/src/dlink_tls_connmgr.erl @@ -45,23 +45,27 @@ %%%=================================================================== add_connection(IP, Port, Pid) -> - gen_server:call(?SERVER, { add_connection, IP, Port, Pid}). + gen_server:call(?SERVER, { add_connection, ip(IP), Port, Pid}). delete_connection_by_pid(Pid) -> gen_server:call(?SERVER, { delete_connection_by_pid, Pid } ). delete_connection_by_address(IP, Port) -> - gen_server:call(?SERVER, { delete_connection_by_address, IP, Port } ). + gen_server:call(?SERVER, { delete_connection_by_address, ip(IP), Port } ). find_connection_by_pid(Pid) -> gen_server:call(?SERVER, { find_connection_by_pid, Pid } ). find_connection_by_address(IP, Port) -> - gen_server:call(?SERVER, { find_connection_by_address, IP, Port } ). + gen_server:call(?SERVER, { find_connection_by_address, ip(IP), Port } ). connections() -> gen_server:call(?SERVER, connections). +ip(IP) -> + {ok, Addr} = inet:ip(IP), + Addr. + %%-------------------------------------------------------------------- %% @doc %% Starts the server @@ -181,7 +185,7 @@ handle_call({find_connection_by_address, IP, Port}, _From, St) -> end; handle_call(connections, _From, St) -> - {reply, ets_select(?ADDR_TAB, [{ {'$1','_'}, [], ['$1'] }]), St}; + {reply, ets_select(?ADDR_TAB, [{ '_', [], ['$_'] }]), St}; handle_call(_Request, _From, State) -> ?warning("~p:handle_call(): Unknown call: ~p", [ ?MODULE, _Request]), diff --git a/components/dlink_tls/src/dlink_tls_rpc.erl b/components/dlink_tls/src/dlink_tls_rpc.erl index 849271a..c062e29 100644 --- a/components/dlink_tls/src/dlink_tls_rpc.erl +++ b/components/dlink_tls/src/dlink_tls_rpc.erl @@ -277,32 +277,42 @@ connect_and_retry_remote( IP, Port, CompSpec) -> end. -announce_local_service_(_CompSpec, [], _Service, _Availability) -> +announce_services_(_CompSpec, [], _Services, _Cost, _Availability) -> ok; -announce_local_service_(CompSpec, - [ConnPid | T], - Service, Availability) -> - - AvailabilityMsg = availability_msg(Availability, [Service]), - Res = dlink_tls_conn:send( - ConnPid, - rvi_common:pass_log_id( - [ { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE } - | AvailabilityMsg ], CompSpec)), - - ?debug("dlink_tls:announce_local_service(~p: ~p) -> ~p Res: ~p", - [ Availability, Service, ConnPid, Res]), +announce_services_(CompSpec, + [{Conn, ConnPid} | T], + Services, Cost, Availability) -> + + ?debug("announce_services(~p, ~p, ~p)", [Conn, ConnPid, Services]), + case authorize_rpc:filter_by_service( + CompSpec, Services, Conn) of + [ok, [_]] -> + ?debug("will announce", []), + AvailabilityMsg = availability_msg(Availability, Services), + Res = dlink_tls_conn:send( + ConnPid, + rvi_common:pass_log_id( + [ { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE }, + { ?DLINK_ARG_COST, Cost } + | AvailabilityMsg ], CompSpec)), + + ?debug("announce_services(~p: ~p) -> ~p Res: ~p", + [ Availability, Services, ConnPid, Res]); + _Other -> + ?debug("WON'T announce (~p)", [_Other]), + ignore + end, %% Move on to next connection. - announce_local_service_(CompSpec, - T, - Service, Availability). + announce_services_(CompSpec, + T, + Services, Cost, Availability). announce_local_service_(CompSpec, Service, Availability) -> - announce_local_service_(CompSpec, - get_connections(), - Service, Availability). + announce_services_(CompSpec, + dlink_tls_connmgr:connections(), + [Service], link_cost(CompSpec), Availability). %% We lost the socket connection. %% Unregister all services that were routed to the remote end that just died. @@ -366,13 +376,15 @@ handle_socket(FromPid, PeerIP, PeerPort, data, Elems, CompSpec) -> ?DLINK_CMD_AUTHORIZE -> ?debug("got authorize ~s:~w", [PeerIP, PeerPort]), [ ProtoVersion, + NodeId, Credentials ] = opts([?DLINK_ARG_VERSION, + ?DLINK_ARG_NODE_ID, ?DLINK_ARG_CREDENTIALS], Elems, undefined), try - process_authorize(FromPid, PeerIP, PeerPort, + process_authorize(FromPid, PeerIP, PeerPort, NodeId, Credentials, ProtoVersion, CS) catch throw:{protocol_failure, What} -> @@ -397,13 +409,21 @@ handle_socket(FromPid, PeerIP, PeerPort, data, Elems, CompSpec) -> ?DLINK_CMD_SERVICE_ANNOUNCE -> ?debug("got service_announce ~s:~w", [PeerIP, PeerPort]), [ Status, - Services ] = + Services, + Cost0 ] = opts([?DLINK_ARG_STATUS, - ?DLINK_ARG_SERVICES], + ?DLINK_ARG_SERVICES, + ?DLINK_ARG_COST], Elems, undefined), + Cost = case Cost0 of + _ when is_integer(Cost0) -> + Cost0; + _ -> link_cost(CS) + end, log("sa from ~s:~w", [PeerIP, PeerPort], CS), - process_announce(Status, Services, FromPid, PeerIP, PeerPort, CompSpec); + process_announce(Status, Services, Cost, + FromPid, PeerIP, PeerPort, CompSpec); ?DLINK_CMD_RECEIVE -> [ _TransactionID, @@ -556,7 +576,8 @@ handle_call({rvi, send_data, [ProtoMod, Service, Data, DataLinkOpts] = Args}, {reply, [no_route], St}; %% FIXME: What to do if we have multiple connections to the same service? - [ConnPid | _T] -> + [_|_] = Conns -> + ConnPid = cheapest_conn(Conns), Res = dlink_tls_conn:send( ConnPid, [{?DLINK_ARG_TRANSACTION_ID, Tid}, {?DLINK_ARG_CMD, ?DLINK_CMD_RECEIVE}, @@ -633,7 +654,13 @@ get_services_by_connection(ConnPid) -> end. +get_connections_by_service(<<"rvi:", _/binary>> = Service) -> + [Svc|_] = binary:split(Service, [<<"/">>]), + get_connections_by_service_(Svc); get_connections_by_service(Service) -> + get_connections_by_service_(Service). + +get_connections_by_service_(Service) -> ?debug("get_connections_by_service(~p,~n~p)", [Service, ets:tab2list(?SERVICE_TABLE)]), case ets:lookup(?SERVICE_TABLE, Service) of @@ -645,25 +672,42 @@ get_connections_by_service(Service) -> [] end. +cheapest_conn([{Pid, Cost}|Conns]) -> + cheapest_conn(Conns, Cost, Pid). -add_services(SvcNameList, ConnPid) -> +cheapest_conn([{P, C}|T], Cost, Pid) -> + if C < Cost -> + cheapest_conn(T, C, P); + true -> + cheapest_conn(T, Cost, Pid) + end; +cheapest_conn([], Cost, Pid) -> + ?debug("cheapest connection: ~p (Cost = ~p)", [Pid, Cost]), + Pid. + +add_services(SvcNameList, Cost, ConnPid) -> %% Create or replace existing connection table entry %% with the sum of new and old services. ?debug("add_services(~p, ~p)", [SvcNameList, ConnPid]), ets:insert(?CONNECTION_TABLE, #connection_entry { connection = ConnPid, - services = SvcNameList ++ get_services_by_connection(ConnPid) + services = union(SvcNameList, + get_services_by_connection(ConnPid)) }), %% Add the connection to the service entry for each servic. [ ets:insert(?SERVICE_TABLE, #service_entry { service = SvcName, - connections = [ConnPid | get_connections_by_service(SvcName)] + connections = [{ConnPid, Cost} + | get_connections_by_service(SvcName)] }) || SvcName <- SvcNameList ], ok. +union(A, B) -> + A ++ (B -- A). + delete_services(ConnPid, SvcNameList) -> ets:insert(?CONNECTION_TABLE, @@ -674,11 +718,14 @@ delete_services(ConnPid, SvcNameList) -> %% Loop through all services and update the conn table %% Update them with a new version where ConnPid has been removed - [ ets:insert(?SERVICE_TABLE, - #service_entry { - service = SvcName, - connections = get_connections_by_service(SvcName) -- [ConnPid] - }) || SvcName <- SvcNameList ], + [ ets:insert( + ?SERVICE_TABLE, + #service_entry { + service = SvcName, + connections = [S || {_,P} = S + <- get_connections_by_service(SvcName), + P =/= ConnPid] + }) || SvcName <- SvcNameList ], ok. availability_msg(Availability, Services) -> @@ -686,9 +733,13 @@ availability_msg(Availability, Services) -> { ?DLINK_ARG_SERVICES, Services }]. status_string(available ) -> ?DLINK_ARG_AVAILABLE; -status_string(unavailable) -> ?DLINK_ARG_UNAVAILABLE. +status_string(unavailable) -> ?DLINK_ARG_UNAVAILABLE; +status_string(S) when S == ?DLINK_ARG_AVAILABLE; + S == ?DLINK_ARG_UNAVAILABLE -> + S. -process_authorize(FromPid, PeerIP, PeerPort, + +process_authorize(FromPid, PeerIP, PeerPort, NodeId, Credentials, ProtoVersion, CompSpec) -> ?info("dlink_tls:authorize(): Peer Address: ~s:~p", [PeerIP, PeerPort ]), case ProtoVersion of @@ -701,7 +752,7 @@ process_authorize(FromPid, PeerIP, PeerPort, log("auth ~s:~w", [PeerIP, PeerPort], CompSpec), PeerCert = rvi_common:get_value(dlink_tls_peer_cert, not_found, CompSpec), authorize_rpc:store_creds(CompSpec, Credentials, Conn, PeerCert), - connection_authorized(FromPid, Conn, CompSpec). + connection_authorized(FromPid, Conn, NodeId, CompSpec). send_authorize(Pid, CompSpec) -> ?debug("send_authorize() Pid = ~p; CompSpec = ~p", [Pid, abbrev(CompSpec)]), @@ -709,14 +760,18 @@ send_authorize(Pid, CompSpec) -> dlink_tls_conn:send(Pid, rvi_common:pass_log_id( [{?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE}, {?DLINK_ARG_VERSION, ?DLINK_TLS_VERSION}, + {?DLINK_ARG_NODE_ID, rvi_common:node_id()}, {?DLINK_ARG_CREDENTIALS, Creds}], CompSpec)). -connection_authorized(FromPid, {RemoteIP, RemotePort} = Conn, CompSpec) -> +connection_authorized(FromPid, {RemoteIP, RemotePort} = Conn, + NodeId, CompSpec) -> %% If FromPid (the genserver managing the socket) is not yet registered %% with the connection manager, this is an incoming connection %% from the client. We should respond with our own authorize followed by %% a service announce log("authorized ~s:~w", [RemoteIP, RemotePort], CompSpec), + dlink_tls_conn:publish_node_id(FromPid, NodeId, CompSpec), + add_services([<<"rvi:", NodeId/binary>>], 0, FromPid), case dlink_tls_connmgr:find_connection_by_pid(FromPid) of not_found -> ?info("dlink_tls:authorize(): New connection!"), @@ -756,17 +811,26 @@ process_data(_FromPid, RemoteIP, RemotePort, ProtocolMod, Data, CompSpec) -> Proto = binary_to_existing_atom(ProtocolMod, latin1), Proto:receive_message(CompSpec, {RemoteIP, RemotePort}, Data). -process_announce(Avail, Svcs, FromPid, IP, Port, CompSpec) -> +process_announce(Avail, Svcs, Cost, FromPid, IP, Port, CompSpec) -> ?debug("dlink_tls:service_announce(~p): Address: ~p:~p", [Avail,IP,Port]), ?debug("dlink_tls:service_announce(~p): Services: ~p", [Avail,Svcs]), + ?debug("dlink_tls:service_announce(~p): Cost: ~p", [Cost]), + Connections = dlink_tls_connmgr:connections() -- [FromPid], case Avail of ?DLINK_ARG_AVAILABLE -> - add_services(Svcs, FromPid), + add_services(Svcs, Cost, FromPid), service_discovery_rpc:register_services(CompSpec, Svcs, ?MODULE); ?DLINK_ARG_UNAVAILABLE -> delete_services(FromPid, Svcs), service_discovery_rpc:unregister_services(CompSpec, Svcs, ?MODULE) end, + MaxCost = max_cost(CompSpec), + case Cost + routing_cost(CompSpec) + link_cost(CompSpec) of + NewCost when NewCost > MaxCost -> + ignore; + NewCost -> + announce_services_(CompSpec, Connections, Svcs, NewCost, Avail) + end, ok. delete_connection(Conn) -> @@ -776,13 +840,15 @@ delete_connection(Conn) -> %% Replace each existing connection entry that has %% SvcName with a new one where the SvcName is removed. - lists:map(fun(SvcName) -> - Existing = get_connections_by_service(SvcName), - ets:insert(?SERVICE_TABLE, # - service_entry { - service = SvcName, - connections = Existing -- [ Conn ] - }) + lists:map( + fun(SvcName) -> + Existing = get_connections_by_service(SvcName), + ets:insert(?SERVICE_TABLE, # + service_entry { + service = SvcName, + connections = lists:keydelete( + Conn, 2, Existing) + }) end, SvcNameList), %% Delete the connection @@ -791,16 +857,21 @@ delete_connection(Conn) -> -get_connections('$end_of_table', Acc) -> - Acc; +%% get_connections('$end_of_table', Acc) -> +%% Acc; + +%% get_connections(Key, Acc) -> +%% get_connections(ets:next(?CONNECTION_TABLE, Key), [ Key | Acc ]). + -get_connections(Key, Acc) -> - get_connections(ets:next(?CONNECTION_TABLE, Key), [ Key | Acc ]). +%% get_connections() -> +%% get_connections(ets:first(?CONNECTION_TABLE), []). +link_cost(_CS) -> 1. -get_connections() -> - get_connections(ets:first(?CONNECTION_TABLE), []). +routing_cost(_CS) -> 1. +max_cost(_CS) -> 10. get_credentials(CompSpec) -> case authorize_rpc:get_credentials(CompSpec) of diff --git a/components/rvi_common/include/rvi_dlink_bin.hrl b/components/rvi_common/include/rvi_dlink_bin.hrl index 6ccabd2..bdd73c6 100644 --- a/components/rvi_common/include/rvi_dlink_bin.hrl +++ b/components/rvi_common/include/rvi_dlink_bin.hrl @@ -18,6 +18,7 @@ -define(DLINK_ARG_CMD, <<"cmd">>). -define(DLINK_ARG_TRANSACTION_ID, <<"tid">>). -define(DLINK_ARG_ADDRESS, <<"addr">>). +-define(DLINK_ARG_NODE_ID, <<"id">>). -define(DLINK_ARG_PORT, <<"port">>). -define(DLINK_ARG_VERSION, <<"ver">>). -define(DLINK_ARG_CREDENTIAL, <<"cred">>). @@ -27,5 +28,6 @@ -define(DLINK_ARG_MODULE, <<"mod">>). -define(DLINK_ARG_AVAILABLE, <<"av">>). -define(DLINK_ARG_UNAVAILABLE, <<"un">>). +-define(DLINK_ARG_COST, <<"cost">>). -define(DLINK_ARG_STATUS, <<"stat">>). -define(DLINK_ARG_DATA, <<"data">>). diff --git a/components/rvi_common/src/exoport_exo_http.erl b/components/rvi_common/src/exoport_exo_http.erl index cc3e7b9..91df765 100644 --- a/components/rvi_common/src/exoport_exo_http.erl +++ b/components/rvi_common/src/exoport_exo_http.erl @@ -7,7 +7,8 @@ %% -module(exoport_exo_http). -export([instance/3, - handle_body/4]). + handle_body/4, + inspect_multipart_post/2]). -include_lib("exo/include/exo_http.hrl"). -include_lib("lager/include/log.hrl"). @@ -43,14 +44,18 @@ handle_post(Socket, Request, Body, AppMod) -> when T=="application/json"; T=="application/json-rpc"; T=="application/jsonrequest" -> - handle_post_json(Socket, Request, Body, AppMod); + handle_post_json(Socket, Request, Body, [], AppMod); #http_chdr{content_type = "multipart/" ++ _} -> + ?debug("Multipart:~n" + "Request = ~p~n" + "Body = ~p", [Request, Body]), handle_post_multipart(Socket, Request, Body, AppMod) end. -handle_post_json(Socket, _Request, Body, AppMod) -> +handle_post_json(Socket, _Request, Body, Attachments, AppMod) -> try decode_json(Body) of - {call, Id, Method, Args} -> + {call, Id, Method, Args0} -> + Args = Args0 ++ Attachments, try handle_rpc(AppMod, Method, Args) of {ok, Reply} -> success_response(Socket, Id, Reply); @@ -65,7 +70,8 @@ handle_post_json(Socket, _Request, Body, AppMod) -> erlang:get_stacktrace()]), error_response(Socket, Id, internal_error) end; - {notification, Method, Args} -> + {notification, Method, Args0} -> + Args = Args0 ++ Attachments, handle_notification(AppMod, Method, Args), exo_http_server:response(Socket, undefined, 200, "OK", ""); {error, _} -> @@ -76,12 +82,36 @@ handle_post_json(Socket, _Request, Body, AppMod) -> "Internal Error", "Internal Error") end. +handle_post_multipart(Socket, Request, Body, AppMod) -> + try inspect_multipart_post_(Request, Body) of + {ok, [BodyPart], Files} -> + handle_post_json(Socket, Request, BodyPart, + pack_attachments(Files), AppMod); + {error, Reason} -> + error_response(Socket, Reason) + catch + error:Other -> + ?error("Error handling multipart post ~p~n~p", + [Other, erlang:get_stacktrace()]), + error_response(Socket, internal_error) + end. + +inspect_multipart_post(Request, Body) -> + try inspect_multipart_post_(Request, Body) + catch + throw:Error -> + {error, Error} + end. -handle_post_multipart(Socket, Request, Body, _AppMod) -> +inspect_multipart_post_(Request, Body) -> case (Request#http_request.headers)#http_chdr.content_type of "multipart/" ++ _ = Type -> + %% re match either boundary=foo or boundary="foo" + %% (modulo whitespace) {match, [B]} = - re:run(Type, "boundary\\s*=\\s*(.+)$", [{capture,[1],binary}]), + re:run(Type, "boundary\\s*=\\s*\"?([^\"]+)\"?$", + [{capture,[1],binary}]), + ?debug("Boundary = ~p", [B]), BoundaryRe = <<"--",B/binary,"\\s*\\r\\n|", "\\r\\n--",B/binary,"\\s*\\r\\n|", "\\r\\n--",B/binary,"--\\s*\\r\\n">>, @@ -89,22 +119,108 @@ handle_post_multipart(Socket, Request, Body, _AppMod) -> || P <- re:split(Body, BoundaryRe, [{return,binary}]), P =/= <<>>], io:fwrite("Parts = ~p~n", [Parts]), - error_response(Socket, internal_error) + multipart_json(Parts); + "application/json" ++ _ -> + {ok, [Body], []} end. +pack_attachments([]) -> + []; +pack_attachments([_|_] = L) -> + [{<<"rvi.files">>, + [[ + {<<"cid">>, to_bin(ID)}, + {<<"hdrs">>, [hdr_obj(<<"Content-Type">>, Type) + | [hdr_obj(K, V) || {K, V} <- Hs]]}, + {<<"data">>, to_bin(Body)} + ] + || {ID, Type, Hs, Body} <- L]}]. + +to_bin(Str) -> + iolist_to_binary(Str). + +hdr_obj(K, V) -> + { to_bin(K), to_bin(V) }. + decode_part(P) -> - decode_part(P, []). + io:fwrite("decode_part()~n" + "---------------------------~n" + "~s~n" + "---------------------------~n", [P]), + decode_part(P, #http_chdr{}). decode_part(P, Acc) -> case erlang:decode_packet(httph, P, []) of {ok, {http_header,_,K,_,V}, Rest} -> - decode_part(Rest, [exo_http:set_chdr(K,V,#http_chdr{})|Acc]); + decode_part(Rest, exo_http:set_chdr(K,V,Acc)); {ok, http_eoh, Body} -> - {lists:reverse(Acc), Body}; + {Acc, Body}; {ok, {http_error, _}, _} -> - {lists:reverse(Acc), P} + {Acc, P} + end. + +multipart_json(Parts) -> + AllParts = [{content_id(H), content_type(H), H#http_chdr.other, Body} + || {H, Body} <- Parts], + case [P || {_, T, _, _} = P <- AllParts, + json_content(T)] of + [{_, _, _, B} = JPart] -> + %% only one application/json part + {ok, [B], [P || P <- AllParts -- [JPart]]}; + [_,_|_] = JParts -> % more than one + case [P || {ID, _T, _Hs, _B} = P <- JParts, + is_body_id(ID)] of + [{_, _, _, B} = JPart] -> + {ok, [B], [P || P <- AllParts -- [JPart]]}; + _ -> + throw(invalid_params) + end; + [] -> + throw(invalid_params) end. +is_body_id(ID) -> + string:to_lower(string:strip(ID)) =:= "body". + +json_content(T) -> + case T of + "application/json" ++ _ -> + true; + _ -> + false + end. + +content_id(#http_chdr{other = Hdrs}) -> + case match_hdr("content-id", Hdrs) of + {value, V} -> V; + false -> + case match_hdr("content-disposition", Hdrs) of + {value, V} -> + case re:run(V, "filename=\"(.+)\"", [{capture, [1], list}]) of + {match, [FN]} -> + FN; + nomatch -> + "" + end; + false -> + "" + end + end. + +content_type(#http_chdr{content_type = T}) -> + string:to_lower(string:strip(T)). + +match_hdr(H, [{K, V} = X|T]) -> + ?debug("match_hdr(~p, ~p)", [H, X]), + case string:to_lower(string:strip(K)) of + H -> + {value, V}; + _ -> + match_hdr(H, T) + end; +match_hdr(_, []) -> + false. + %% Validated RPC handle_rpc(Mod, Method, Args) -> ?debug("exoport_exo_http_server:handle_rpc(): Mod: ~p", [Mod]), @@ -154,13 +270,20 @@ success_response(Socket, Id, Reply) -> error_response(Socket, Error) -> %% No Id available + {Code, Msg} = pick_error(Error), JSON = [{<<"jsonrpc">>, <<"2.0">>}, - {<<"error">>, [{<<"code">>, json_error_code(Error)}, - {<<"message">>, json_error_msg(Error)}]}], + {<<"error">>, [{<<"code">>, Code}, + {<<"message">>, Msg}]}], Body = jsx:encode(JSON), exo_http_server:response(Socket, undefined, 200, "OK", Body, [{content_type, "application/json"}]). +pick_error(Error) when is_atom(Error) -> + Code = json_error_code(Error), + {Code, json_error_msg(Code)}; +pick_error(Error) when is_integer(Error) -> + {Error, json_error_msg(Error)}. + error_response(Socket, Id, Error) -> JSON = [{<<"jsonrpc">>, <<"2.0">>}, {<<"id">>, Id}, diff --git a/components/rvi_common/src/rvi_common.erl b/components/rvi_common/src/rvi_common.erl index 0c295f1..f580ec2 100644 --- a/components/rvi_common/src/rvi_common.erl +++ b/components/rvi_common/src/rvi_common.erl @@ -20,6 +20,7 @@ -export([notification/5]). -export([json_rpc_status/1]). -export([get_json_element/2]). +-export([get_opt_json_element/3]). -export([sanitize_service_string/1]). -export([local_service_to_string/1]). -export([local_service_to_string/2]). @@ -29,6 +30,8 @@ -export([node_address_string/0]). -export([node_address_tuple/0]). -export([node_msisdn/0]). +-export([node_id/0]). +-export([default_node_id/0]). -export([get_request_result/1]). -export([get_component_specification/0, get_component_modules/1, @@ -67,6 +70,7 @@ -define(NODE_SERVICE_PREFIX, node_service_prefix). -define(NODE_ADDRESS, node_address). -define(NODE_MSISDN, node_msisdn). +-define(NODE_ID, node_id). -record(pst, { buffer = [], @@ -77,20 +81,28 @@ json_rpc_status([I] = Str) when I >= $0, I =< $9 -> try json_rpc_status(list_to_integer(Str)) - catch error:_ -> undefined + catch error:_ -> 999 end; json_rpc_status(I) when is_integer(I)-> - case lists:keyfind(I, 1, status_values()) of - {_, St} -> St; - false -> undefined - end; + I; json_rpc_status(A) when is_atom(A) -> case lists:keyfind(A, 2, status_values()) of {I, _} -> I; false -> 999 end; +json_rpc_status(B) when is_binary(B) -> + try lists:keyfind( + binary_to_existing_atom(B, latin1), 2, status_values()) of + {I, _} -> I; + false -> 999 + catch + error:_ -> 999 + end; json_rpc_status(L) when is_list(L) -> - undefined. + case lists:keyfind(<<"status">>, 1, L) of + {_, St} -> json_rpc_status(St); + _ -> 999 + end. status_values() -> [{0, ok}, @@ -100,7 +112,8 @@ status_values() -> {4, internal}, {5, already_connected}, {6, no_route}, - {7, unauthorized}]. + {7, unauthorized}, + {8, timeout}]. get_request_result(R) -> ?debug("get_request_result(~p)", [R]), @@ -303,6 +316,13 @@ unstruct([_|_] = Elems) -> [unstruct(X) || X <- Elems]; unstruct(X) -> X. +get_opt_json_element(ElemPath, Default, JSON) -> + case get_json_element(ElemPath, JSON) of + {ok, Value} -> + Value; + {error, _} -> + Default + end. %% If Path is just a single element, convert to list and try again. get_json_element(_, []) -> @@ -444,6 +464,10 @@ sanitize_service_string(Service) when is_list(Service) -> Res end. +bin(I) when is_integer(I) -> + integer_to_binary(I); +bin(A) when is_atom(A) -> + atom_to_binary(A, latin1); bin(L) -> iolist_to_binary(L). @@ -493,6 +517,24 @@ last(B) when is_binary(B) -> <<_:Sz/binary, Last>> = B, Last. +node_id() -> + case setup:get_env(rvi_core, ?NODE_ID) of + {ok, Id} -> bin(Id); + undefined -> + default_node_id() + end. + +default_node_id() -> + HashInput = [setup:home(), + node_address_string(), + local_service_prefix()], + base64url:encode( + crypto:hash_final( + lists:foldl( + fun(V, Acc) -> + crypto:hash_update(Acc, bin(V)) + end, crypto:hash_init(sha256), HashInput))). + node_address_string() -> case application:get_env(rvi_core, ?NODE_ADDRESS) of {ok, P} when is_atom(P) -> atom_to_binary(P, latin1); @@ -856,7 +898,7 @@ start_msgpack_rpc_client(Component, Module, Opts, XOpts) -> Name = {msgpack_rpc_client, Component, Module}, rvi_msgpack_rpc:start_link([{gproc, {n,l,Name}}|XOpts] ++ Opts). -start_msgpack_rpc_server(Component, Module, Opts, XOpts) -> +start_msgpack_rpc_server(_Component, Module, Opts, XOpts) -> %% Name = {msgpack_rpc_server, Component, Module}, [Callback, Rest] = take([{callback, fun() -> msgpack_rpc_cb(Module) end}], XOpts ++ Opts), diff --git a/components/rvi_services/src/rvi_services.app.src b/components/rvi_services/src/rvi_services.app.src new file mode 100644 index 0000000..984c2f0 --- /dev/null +++ b/components/rvi_services/src/rvi_services.app.src @@ -0,0 +1,28 @@ +%% +%% Copyright (C) 2016, Jaguar Land Rover +%% +%% This program is licensed under the terms and conditions of the +%% Mozilla Public License, version 2.0. The full text of the +%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/ +%% + +%% -*- erlang -*- +{application, rvi_services, + [ + {description, "Internal RVI Core services"}, + {vsn, "0.1"}, + {registered, []}, + {applications, [ + kernel, + stdlib, + rvi_common, + service_discovery, + service_edge + ]}, + {mod, { rvi_services_app, []}}, + {start_phases, [{register_services, []}, + {announce, []}]}, + {env, [ + {rvi_core_await, [{n,l,rvi_services}]} + ]} + ]}. diff --git a/components/rvi_services/src/rvi_services.erl b/components/rvi_services/src/rvi_services.erl new file mode 100644 index 0000000..6143ea8 --- /dev/null +++ b/components/rvi_services/src/rvi_services.erl @@ -0,0 +1,71 @@ +%% -*- erlang-indent-level: 4; indent-tabs-mode: nil -*- +%% +%% Copyright (C) 2016, Jaguar Land Rover +%% +%% This program is licensed under the terms and conditions of the +%% Mozilla Public License, version 2.0. The full text of the +%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/ +%% + +-module(rvi_services). +-behaviour(gen_server). + +-export([start_link/0]). +-export([register_services/0]). +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +-include_lib("lager/include/log.hrl"). +-include_lib("exo/include/exo_url.hrl"). + +-record(st, {cs}). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +register_services() -> + gen_server:call(?MODULE, register_services). + +init([]) -> + {ok, #st{cs = rvi_common:get_component_specification()}}. + + +handle_call(register_services, _From, #st{cs = CS} = St) -> + Svcs = configured_services(), + ?debug("configured_services() -> ~p", [Svcs]), + _ = [register_service(S, CS) || S <- Svcs], + {reply, ok, St}; +handle_call(Other, _, St) -> + ?debug("unknown call (~p)", [Other]), + {reply, {error, badarg}, St}. + +handle_cast(_Msg, St) -> + {noreply, St}. + +handle_info(_Msg, St) -> + {noreply, St}. + +terminate(_Reason, _St) -> + ok. + +code_change(_FromVsn, St, _Extra) -> + {ok, St}. + +configured_services() -> + setup:get_env(rvi_core, services, []). + +register_service({SvcName, Module, URL}, CompSpec) -> + Port = url_port(URL), + {ok,_} = rvi_services_svc_sup:start_service(Port, Module), + service_edge_rpc:register_service(CompSpec, SvcName, URL). + +url_port("http" ++ _ = URI) -> + #url{port = Port} = exo_url:parse(URI), + Port; +url_port("HTTP" ++ _ = URI) -> + #url{port = Port} = exo_url:parse(URI), + Port. diff --git a/components/rvi_services/src/rvi_services_adm.erl b/components/rvi_services/src/rvi_services_adm.erl new file mode 100644 index 0000000..8b5327c --- /dev/null +++ b/components/rvi_services/src/rvi_services_adm.erl @@ -0,0 +1,19 @@ +%% -*- erlang-indent-level: 4; indent-tabs-mode: nil -*- +%% +%% Copyright (C) 2016, Jaguar Land Rover +%% +%% This program is licensed under the terms and conditions of the +%% Mozilla Public License, version 2.0. The full text of the +%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/ +%% + +-module(rvi_services_adm). + +-export([handle_rpc/2]). + +-include_lib("lager/include/log.hrl"). + +handle_rpc(Request, Args) -> + ?debug("handle_rpc(~p, ~p)", [Request, Args]), + {ok, [{status, ok}, + {value, <<"foobar">>}]}. diff --git a/components/rvi_services/src/rvi_services_app.erl b/components/rvi_services/src/rvi_services_app.erl new file mode 100644 index 0000000..f1fb3b8 --- /dev/null +++ b/components/rvi_services/src/rvi_services_app.erl @@ -0,0 +1,29 @@ +%% -*- erlang-indent-level: 4; indent-tabs-mode: nil -*- +%% +%% Copyright (C) 2016, Jaguar Land Rover +%% +%% This program is licensed under the terms and conditions of the +%% Mozilla Public License, version 2.0. The full text of the +%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/ +%% + +-module(rvi_services_app). + +-export([start/2, + start_phase/3, + stop/1]). + +-include_lib("lager/include/log.hrl"). + +start(_StartType, _StartArgs) -> + rvi_services_sup:start_link(). + +start_phase(register_services, _, _) -> + ?debug("start_phase: register_services", []), + rvi_services:register_services(); +start_phase(announce, _, _) -> + ?debug("start_phase: announce", []), + rvi_common:announce({n, l, rvi_services}). + +stop(_State) -> + ok. diff --git a/components/rvi_services/src/rvi_services_sup.erl b/components/rvi_services/src/rvi_services_sup.erl new file mode 100644 index 0000000..82914b7 --- /dev/null +++ b/components/rvi_services/src/rvi_services_sup.erl @@ -0,0 +1,29 @@ +%% -*- erlang-indent-level: 4; indent-tabs-mode: nil -*- +%% +%% Copyright (C) 2016, Jaguar Land Rover +%% +%% This program is licensed under the terms and conditions of the +%% Mozilla Public License, version 2.0. The full text of the +%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/ +%% + +-module(rvi_services_sup). +-behaviour(supervisor). + +-export([start_link/0]). + +-export([init/1]). + +%% Helper macro for declaring children of supervisor +-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + + +init([]) -> + {ok, { {one_for_one, 5, 10}, + [ + ?CHILD(rvi_services, worker), + ?CHILD(rvi_services_svc_sup, supervisor) + ]} }. diff --git a/components/rvi_services/src/rvi_services_svc_sup.erl b/components/rvi_services/src/rvi_services_svc_sup.erl new file mode 100644 index 0000000..194ccf0 --- /dev/null +++ b/components/rvi_services/src/rvi_services_svc_sup.erl @@ -0,0 +1,39 @@ +%% -*- erlang-indent-level: 4; indent-tabs-mode: nil -*- +%% +%% Copyright (C) 2016, Jaguar Land Rover +%% +%% This program is licensed under the terms and conditions of the +%% Mozilla Public License, version 2.0. The full text of the +%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/ +%% + +-module(rvi_services_svc_sup). +-behaviour(supervisor). + +-export([start_link/0]). +-export([start_service/2]). +-export([init/1]). + +-include_lib("lager/include/log.hrl"). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +start_service(Port, Module) -> + ?debug("start_service(~p, ~p)", [Port, Module]), + supervisor:start_child( + ?MODULE, [Port, [{request_handler, + {exoport_exo_http, handle_body, [Module]}} + ]]). + +init([]) -> + {ok, { #{strategy => simple_one_for_one, + intensity => 5, + period => 10}, + [#{id => id, + start => {exo_http_server, start_link, []}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => [exo_http_server]} + ] }}. diff --git a/components/schedule/src/rvi_routing.erl b/components/schedule/src/rvi_routing.erl index 9d7917e..f77f10d 100644 --- a/components/schedule/src/rvi_routing.erl +++ b/components/schedule/src/rvi_routing.erl @@ -204,7 +204,6 @@ find_routes_(Rt, _Svc, CurRoutes, CurMatchLen) -> ?warning("rvi_routing(): Incorrect route entry: ~p", [Rt]), { CurRoutes, CurMatchLen }. - find_routes(Routes, Service) -> ?debug("find_routes(~p, ~p)", [Routes, Service]), case find_routes_(Routes, Service, undefined, 0) of @@ -262,6 +261,7 @@ find_protocols_(DataLink, [ {{ _Pr, _PrOp }, { _DL, _DLOp }} | T], Acc) -> find_protocols(AllRoutes, Service, DataLink) -> + ?debug("find_protocols(~p, ~p)", [AllRoutes, Service]), SvcRoutes = find_routes(AllRoutes, Service), Res = find_protocols_(DataLink, SvcRoutes, []), ?debug("find_protocols(~p:~p): -> ~p", [ DataLink, Service, Res]), diff --git a/components/schedule/src/schedule_rpc.erl b/components/schedule/src/schedule_rpc.erl index 7f5a0a9..c0cd59a 100644 --- a/components/schedule/src/schedule_rpc.erl +++ b/components/schedule/src/schedule_rpc.erl @@ -19,6 +19,7 @@ %% FIXME: Should be rvi_service_discovery behavior -export([service_available/3, service_unavailable/3]). +-export([publish_node_id/3]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -132,7 +133,17 @@ schedule_message(CompSpec, { parameters, Parameters }], [status, transaction_id], CompSpec). - +publish_node_id(Cs, NodeId, DLMod) -> + P = self(), + S = <<"rvi:", NodeId/binary>>, + spawn(fun() -> + MRef = erlang:monitor(process, P), + receive + {'DOWN', MRef, _, _, _} -> + service_unavailable(Cs, S, DLMod) + end + end), + service_available(Cs, S, DLMod). service_available(CompSpec, SvcName, DataLinkModule) -> @@ -249,7 +260,7 @@ handle_call(Other, _From, St) -> %%-------------------------------------------------------------------- -handle_cast( {rvi, service_available, [ SvcName, DataLinkModule ]}, St) -> +handle_cast( {rvi, service_available, [ SvcName, DataLinkModule|_]}, St) -> %% Find or create the service. ?debug("sched:service_available(): ~p:~s", [ DataLinkModule, SvcName ]), @@ -266,8 +277,7 @@ handle_cast( {rvi, service_available, [ SvcName, DataLinkModule ]}, St) -> NSt2 = send_orphaned_messages(SvcName, DataLinkModule, NSt1), { noreply, NSt2 }; - -handle_cast( {rvi, service_unavailable, [SvcName, DataLinkModule]}, +handle_cast( {rvi, service_unavailable, [SvcName, DataLinkModule|_]}, #st { services_tid = SvcTid } = St) -> %% Grab the service @@ -440,7 +450,8 @@ queue_message(#message{service = SvcName, timeout = Timeout} = Msg, ?debug("sched:q(~p:~s): timeout: ~p", [DLMod, SvcName, Timeout]), - SvcRec = find_or_create_service(SvcName, DLMod, St), + #service{key = {Service,_}} = SvcRec = + find_or_create_service(SvcName, DLMod, St), %% @@ -455,7 +466,7 @@ queue_message(#message{service = SvcName, timeout = Timeout} = Msg, timeout_tref = 0 }, - case DLMod:setup_data_link(St#st.cs, SvcName, DLOpt) of + case DLMod:setup_data_link(St#st.cs, Service, DLOpt) of [ ok, DLTimeout ] -> TOut = select_timeout(calc_relative_tout(Timeout), DLTimeout), @@ -484,8 +495,6 @@ queue_message(#message{service = SvcName, timeout = Timeout} = Msg, queue_message(Msg1, RemainingRoutes, St) end. - - %% Send messages to locally connected service send_message(local, _, _, _, Msg, St) -> @@ -609,16 +618,18 @@ send_orphaned_messages(SvcName, local, St) -> send_orphaned_messages(SvcName, DataLinkMod, St) -> %% See if there is an orphaned queue for SvcName - case { find_service(SvcName, orphaned, St), - rvi_routing:get_service_protocols(SvcName, DataLinkMod) } of - { not_found, _ } -> %% No orphaned messages destined for the service - ?debug("sched:send_orph(~p:~p): No orphaned messages waiting", - [DataLinkMod, SvcName]), + case find_service(SvcName, orphaned, St) of + not_found -> St; + SvcRec -> + send_orphaned_messages(SvcName, SvcRec, DataLinkMod, St) + end. - %% We have messages waiting for the service, but no protocol has been configured - %% for transmitting them over the given data link module - { _, [] } -> +send_orphaned_messages(SvcName, SvcRec, DataLinkMod, St) -> + case rvi_routing:get_service_protocols(SvcName, DataLinkMod) of + %% We have messages waiting for the service, but no protocol has been + %% configured for transmitting them over the given data link module + [] -> ?debug("sched:send_orph(~p:~p): No protocol configured. Skipped", [DataLinkMod, SvcName]), St; @@ -627,13 +638,11 @@ send_orphaned_messages(SvcName, DataLinkMod, St) -> %% we have at least one protocol that we can use over %% the given data link %% Start chugging out messages - { SvcRec, [{ Proto, ProtoOpts, DataLinkOpts } | _]} -> + [{ Proto, ProtoOpts, DataLinkOpts } | _] -> send_orphaned_messages_(Proto, ProtoOpts, DataLinkMod, DataLinkOpts, SvcRec, St) - end. - - + end. send_orphaned_messages_(Protocol, ProtocolOpts, DataLinkMod, DataLinkOpts, @@ -676,7 +685,19 @@ send_orphaned_messages_(Protocol, ProtocolOpts, end. - +find_service(<<"rvi:", Rest/binary>>, DLMod, #st{services_tid = SvcTid}) -> + case re:split(Rest, <<"/">>, [{return, binary}]) of + [NodeId | _] -> + ?debug("NodeId = ~p", [NodeId]), + case ets:lookup(SvcTid, {<<"rvi:", NodeId/binary>>, DLMod}) of + [] -> + not_found; + [SvcRec] -> + SvcRec + end; + _ -> + not_found + end; find_service(SvcName, DataLinkMod, #st { services_tid = SvcTid }) -> ?debug("sched:find_or_create_service(): ~p:~p", [ DataLinkMod, SvcName]), @@ -703,23 +724,20 @@ find_or_create_service(SvcName, DataLinkMod, St) -> SvcRec end. - - %% Create a new service. %% Warning: Will overwrite existing service (and its message table reference). %% update_service(SvcName, DataLinkMod, Available, #st { services_tid = SvcsTid, cs = CS }) -> - + Key = {SvcName, DataLinkMod}, MsgTID = - case ets:lookup(SvcsTid, { SvcName, DataLinkMod }) of + case ets:lookup(SvcsTid, Key) of [] -> %% The given service does not exist, create a new message TID ?debug("sched:update_service(~p:~p): ~p - Creating new", [ DataLinkMod, SvcName, Available]), ets:new(rvi_messages, [ ordered_set, private, { keypos, #message.transaction_id } ]); - [ TmpSvcRec ] -> %% Grab the existing messagae table ID ?debug("sched:update_service(~p:~p): ~p - Updating existing", @@ -729,7 +747,6 @@ update_service(SvcName, DataLinkMod, Available, TID end, - %% Insert new service to ets table. SvcRec = #service { key = { SvcName, DataLinkMod }, @@ -742,7 +759,6 @@ update_service(SvcName, DataLinkMod, Available, SvcRec. - %% Create a new and unique transaction id create_transaction_id(St) -> ?debug("sched:create_transaction_id(~p): ", [ St#st.next_transaction_id ]), @@ -755,8 +771,7 @@ create_transaction_id(St) -> %% Calculate a relative timeout based on the Msec UnixTime TS we are %% provided with. calc_relative_tout(UnixTimeMS) -> - { Mega, Sec, Micro } = now(), - Now = Mega * 1000000000 + Sec * 1000 + trunc(Micro / 1000) , + Now = erlang:system_time(milli_seconds), ?debug("sched:calc_relative_tout(): TimeoutUnixMS(~p) - Now(~p) = ~p", [ UnixTimeMS, Now, UnixTimeMS - Now ]), diff --git a/components/service_discovery/src/service_discovery_rpc.erl b/components/service_discovery/src/service_discovery_rpc.erl index a553e18..3d3f4f6 100644 --- a/components/service_discovery/src/service_discovery_rpc.erl +++ b/components/service_discovery/src/service_discovery_rpc.erl @@ -21,6 +21,7 @@ -export([get_all_services/1, get_services_by_module/2, get_modules_by_service/2, + is_service_available/2, subscribe/2, unsubscribe/2, register_services/3, @@ -89,6 +90,11 @@ get_modules_by_service(CompSpec, Service) -> [ { service, Service }], [status, modules], CompSpec). +is_service_available(CompSpec, Service) -> + rvi_common:request(service_discovery, ?MODULE, + is_service_available, + [ { service, Service }], + [status, result], CompSpec). register_services(CompSpec, Services, DataLinkModule) -> ?debug("register_services(Mod=~p): ~p", [DataLinkModule, Services]), @@ -183,7 +189,7 @@ handle_rpc(<<"get_services_by_module">>, Args) -> handle_rpc(<<"get_modules_by_service">>, Args) -> LogId = rvi_common:get_json_log_id(Args), {ok, Service } = rvi_common:get_json_element(["service"], Args), - ?debug("svc_disc:get_modules_by_service(json-rpc): ~p ", [Service]), + ?debug("get_modules_by_service(json-rpc): ~p ", [Service]), [ok, Modules ] = gen_server:call(?SERVER, { rvi, get_modules_by_service, @@ -191,8 +197,15 @@ handle_rpc(<<"get_modules_by_service">>, Args) -> {ok, [ {status, rvi_common:json_rpc_status(ok)} , { modules, { array, Modules } }]}; - - +handle_rpc(<<"is_service_available">>, Args) -> + LogId = rvi_common:get_json_log_id(Args), + {ok, Service} = rvi_common:get_json_element(["service"], Args), + ?debug("service_availability(json-rpc): ~p ", [Service]), + [ok, Avail] = gen_server:call( + ?SERVER, + {rvi, is_service_available, [Service, LogId]}), + {ok, [ {status, rvi_common:json_rpc_status(ok)}, + {result, Avail}]}; %% %% Handle the rest. @@ -225,6 +238,8 @@ handle_call_({rvi, get_services_by_module, [Module | _LogId]}, _From, St) -> handle_call_({rvi, get_modules_by_service, [Service | _LogId]}, _From, St) -> {reply, [ok, get_modules_by_service_(Service)], St }; +handle_call_({rvi, is_service_available, [Service | _LogId]}, _From, St) -> + {reply, [ok, ets:member(?SERVICE_TABLE, Service)], St}; handle_call_(Other, _From, St) -> ?warning("svc_disc:handle_call(~p): unknown", [ Other ]), diff --git a/components/service_edge/src/service_edge.app.src b/components/service_edge/src/service_edge.app.src index 13f8d39..172e9bd 100644 --- a/components/service_edge/src/service_edge.app.src +++ b/components/service_edge/src/service_edge.app.src @@ -2,7 +2,7 @@ %% Copyright (C) 2014, Jaguar Land Rover %% %% This program is licensed under the terms and conditions of the -%% Mozilla Public License, version 2.0. The full text of the +%% Mozilla Public License, version 2.0. The full text of the %% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/ %% @@ -16,7 +16,9 @@ {applications, [ kernel, stdlib, - rvi_common + rvi_common, + authorize, + service_discovery ]}, {mod, { service_edge_app, []}}, diff --git a/components/service_edge/src/service_edge_rpc.erl b/components/service_edge/src/service_edge_rpc.erl index b3fa760..5b2d85f 100644 --- a/components/service_edge/src/service_edge_rpc.erl +++ b/components/service_edge/src/service_edge_rpc.erl @@ -15,7 +15,8 @@ -export([handle_websocket/3]). --export([start_link/0]). +-export([start_link/0, + register_service/3]). -export([init/1, handle_call/3, @@ -24,6 +25,12 @@ terminate/2, code_change/3]). +-export([rpc/2, %% (Service, Params) + rpc/3, %% (Service, Timeout, Params) + msg/2, %% (Service, Params) + msg/3 %% (Service, Timeout, Params) + ]). + -export([handle_remote_message/5, handle_local_timeout/3]). @@ -49,7 +56,7 @@ -include_lib("trace_runner/include/trace_runner.hrl"). -define(SERVER, ?MODULE). - +-define(LONG_TIMEOUT, 60000). -record(st, { %% Component specification @@ -61,7 +68,8 @@ -record(service_entry, { service = "", %% Servie handled by this entry. - url = undefined %% URL where the service can be reached. + url = undefined, %% URL where the service can be reached. + opts = [] }). record_fields(service_entry) -> record_info(fields, service_entry); @@ -69,6 +77,40 @@ record_fields(st ) -> record_info(fields, st); record_fields(component_spec) -> record_info(fields, component_spec); record_fields(_) -> no. +rpc(Service, Args) -> + rpc(Service, 10000, Args). + +rpc(Service, Timeout, Args0) -> + Args = case is_synch(Args0) of + false -> [{<<"rvi.synch">>, true}|Args0]; + {true,_} -> Args0 + end, + rvi_common:request(service_edge, ?MODULE, message, + [{<<"service_name">>, service_name(Service)}, + {<<"timeout">>, Timeout}, + {<<"parameters">>, Args}], + [status, result], + rvi_common:get_component_specification()). + +msg(Service, Args) -> + msg(Service, 10000, Args). + +msg(Service, Timeout, Args) -> + rvi_common:request(service_edge, ?MODULE, message, + [{<<"service_name">>, service_name(Service)}, + {<<"timeout">>, Timeout}, + {<<"parameters">>, Args}], + [status, tid], + rvi_common:get_component_specification()). + +service_name(<<"$PFX", Rest/binary>>) -> + Pfx = rvi_common:local_service_prefix(), + re:replace(<>, <<"//">>, <<"/">>, + [global, {return, binary}]); +service_name(Svc) -> + Svc. + + start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). @@ -92,6 +134,12 @@ init([]) -> cs = CompSpec }}. +register_service(CompSpec, SvcName, URL) -> + ?debug("register_service()~n", []), + rvi_common:request(service_edge, ?MODULE, register_service, + [{<<"service">>, SvcName}, + {<<"network_address">>, URL}], + [status, service, method], CompSpec). start_json_server() -> Allowed = get_allowed(), @@ -221,8 +269,6 @@ handle_local_timeout(CompSpec, SvcName, TransID) -> { transaction_id, TransID} ], CompSpec). - - handle_websocket(WSock, Mesg, Arg) -> Decoded = try jsx:decode(Mesg) catch error:E0 -> @@ -250,14 +296,15 @@ handle_websocket(WSock, Mesg, Arg) -> end, ok. - - %% Websocket interface handle_ws_json_rpc(WSock, <<"message">>, Params, _Arg ) -> { ok, SvcName0 } = rvi_common:get_json_element(["service_name"], Params), { ok, Timeout } = rvi_common:get_json_element(["timeout"], Params), - { ok, Parameters } = rvi_common:get_json_element(["parameters"], Params), + { ok, Parameters0 } = rvi_common:get_json_element(["parameters"], Params), + Files = rvi_common:get_opt_json_element(["rvi.files"], [], Params), + ?debug("Files = ~p", [Files]), SvcName = iolist_to_binary(SvcName0), + Parameters = append_files_to_params(Files, Parameters0), ?event({message, ws, [SvcName, Timeout, Parameters]}), ?debug("WS Parameters: ~p", [Parameters]), %% Parameters = parse_ws_params(Parameters0), @@ -269,25 +316,30 @@ handle_ws_json_rpc(WSock, <<"message">>, Params, _Arg ) -> case gen_server:call( ?SERVER, {rvi, handle_local_message, - [ SvcName, Timeout, Parameters | LogId ]}) of + [ SvcName, Timeout, Parameters | LogId ]}, ?LONG_TIMEOUT) of [not_found] -> {ok, [{status, rvi_common:json(not_found)}]}; [Res, TID] -> ?debug("service_edge_rpc:wse_message(~p) Res: ~p", [ WSock, Res ]), { ok, [ { status, rvi_common:json_rpc_status(Res) }, { transaction_id, TID}, - { method, <<"message">>}] } + { method, <<"message">>}] }; + {ok, _} = CompleteResult -> + CompleteResult end; handle_ws_json_rpc(WSock, <<"register_service">>, Params,_Arg ) -> { ok, SvcName } = rvi_common:get_json_element(["service_name"], Params), ?event({register_service, ws, SvcName}), ?debug("service_edge_rpc:websocket_register(~p) service: ~p", [ WSock, SvcName ]), + Opts = rvi_common:get_opt_json_element(<<"opts">>, [], Params), + LogId = log_id_json_tail(Params), [ok, FullSvcName ] = gen_server:call(?SERVER, { rvi, - register_local_service, + register_service, [ SvcName, - "ws:" ++ pid_to_list(WSock)]}), + "ws:" ++ pid_to_list(WSock), + Opts | LogId]}), { ok, [ { status, rvi_common:json_rpc_status(ok)}, { service, FullSvcName }, @@ -297,7 +349,7 @@ handle_ws_json_rpc(WSock, <<"unregister_service">>, Params, _Arg ) -> { ok, SvcName } = rvi_common:get_json_element(["service_name"], Params), ?event({unregister_service, ws, SvcName}), ?debug("service_edge_rpc:websocket_unregister(~p) service: ~p", [ WSock, SvcName ]), - gen_server:call(?SERVER, { rvi, unregister_local_service, [ SvcName ]}), + gen_server:call(?SERVER, { rvi, unregister_service, [ SvcName ]}), { ok, [ { status, rvi_common:json_rpc_status(ok)} ]}; handle_ws_json_rpc(WSock, <<"get_node_service_prefix">>, Params, _Arg) -> @@ -329,10 +381,11 @@ handle_rpc(<<"register_service">>, Args) -> {ok, SvcName} = rvi_common:get_json_element([<<"service">>], Args), ?event({register_service, json_rpc, SvcName}), {ok, URL} = rvi_common:get_json_element([<<"network_address">>], Args), + Opts = rvi_common:get_opt_json_element([<<"opts">>], [], Args), + LogId = log_id_json_tail(Args), [ok, FullSvcName ] = gen_server:call(?SERVER, - { rvi, register_local_service, - [ SvcName, URL]}), - + { rvi, register_service, + [SvcName, URL, Opts | LogId] }), {ok, [ {status, rvi_common:json_rpc_status(ok) }, { service, FullSvcName }, { method, <<"register_service">>} @@ -342,7 +395,9 @@ handle_rpc(<<"register_service">>, Args) -> handle_rpc(<<"unregister_service">>, Args) -> {ok, SvcName} = rvi_common:get_json_element(["service"], Args), ?event({unregister_service, json_rpc, SvcName}), - gen_server:call(?SERVER, { rvi, unregister_local_service, [ SvcName]}), + LogId = log_id_json_tail(Args), + gen_server:call(?SERVER, { rvi, unregister_service, + [SvcName | LogId]}), {ok, [ { status, rvi_common:json_rpc_status(ok) }, { method, <<"unregister_service">>} ]}; @@ -362,15 +417,27 @@ handle_rpc(<<"get_available_services">>, _Args) -> handle_rpc(<<"message">>, Args) -> {ok, SvcName} = rvi_common:get_json_element(["service_name"], Args), {ok, Timeout} = rvi_common:get_json_element(["timeout"], Args), - {ok, Parameters} = rvi_common:get_json_element(["parameters"], Args), + {ok, Parameters0} = rvi_common:get_json_element(["parameters"], Args), + Files = rvi_common:get_opt_json_element(["rvi.files"], [], Args), + ?debug("Files = ~p~n", [Files]), + Parameters = append_files_to_params(Files, Parameters0), ?event({message, json_rpc, [SvcName, Timeout, Parameters]}), LogId = log_id_json_tail(Args ++ Parameters), - [ Res, TID ] = gen_server:call(?SERVER, { rvi, handle_local_message, - [ SvcName, Timeout, Parameters | LogId]}), - {ok, [ { status, rvi_common:json_rpc_status(Res) }, - { transaction_id, TID }, - { method, <<"message">>} - ]}; + case gen_server:call( + ?SERVER, { rvi, message, + [ SvcName, Timeout, Parameters | LogId]}, + ?LONG_TIMEOUT) of + [Res, TID] -> + ?debug("'message' result: ~p", [[Res, TID]]), + {ok, [ { status, rvi_common:json_rpc_status(Res) }, + { transaction_id, TID }, + { method, <<"message">>} + ]}; + [Res] -> + {ok, [ { status, rvi_common:json_rpc_status(Res) } ]}; + {ok, _} = CompleteResult -> + CompleteResult + end; handle_rpc(Other, _Args) -> ?warning("service_edge_rpc:handle_rpc(~p): unknown command", [ Other ]), @@ -424,13 +491,15 @@ handle_notification(<<"handle_remote_message">>, Args) -> { ok, SvcName } = rvi_common:get_json_element(["service"], Args), { ok, Timeout } = rvi_common:get_json_element(["timeout"], Args), { ok, Parameters } = rvi_common:get_json_element(["parameters"], Args), + Files = rvi_common:get_opt_json_element(["rvi.files"], Args), gen_server:cast(?SERVER, { rvi, handle_remote_message, [ - IP, - Port, - SvcName, - Timeout, - Parameters + IP, + Port, + SvcName, + Timeout, + Parameters, + Files ]}), ok; @@ -462,28 +531,20 @@ handle_notification(Other, _Args) -> %% the only calls invoked by other components, and not the locally %% connected services that uses the same HTTP port to transmit their %% register_service, and message calls. -handle_call({ rvi, register_local_service, [SvcName, URL | T] }, _From, St) -> - ?debug("service_edge_rpc:register_local_service(): service: ~p ", [SvcName]), - ?debug("service_edge_rpc:register_local_service(): address: ~p ", [URL]), +handle_call({ rvi, register_service, [SvcName, URL, Opts | T] }, + _From, St) -> + ?debug("service_edge_rpc:register_service(): service: ~p ", [SvcName]), + ?debug("service_edge_rpc:register_service(): address: ~p ", [URL]), FullSvcName = rvi_common:local_service_to_string(SvcName), - CS = start_log(T, "reg local service: ~s", [FullSvcName], St#st.cs), - ?debug("service_edge_rpc:register_local_service(): full name: ~p ", [FullSvcName]), - - ets:insert(?SERVICE_TABLE, #service_entry { - service = FullSvcName, - url = URL }), - - %% Register with service discovery, will trigger callback to service_available() - %% that forwards the registration to other connected services. - service_discovery_rpc:register_services(CS, [FullSvcName], local), - - - %% Return ok. - { reply, [ ok, FullSvcName ], St }; + try register_local_service_(FullSvcName, URL, Opts, T, St) + catch + throw:Reason -> + {reply, [Reason], St} + end; -handle_call({ rvi, unregister_local_service, [SvcName | T] }, _From, St) -> - ?debug("service_edge_rpc:unregister_local_service(): service: ~p ", [SvcName]), +handle_call({ rvi, unregister_service, [SvcName | T] }, _From, St) -> + ?debug("service_edge_rpc:unregister_service(): service: ~p ", [SvcName]), ets:delete(?SERVICE_TABLE, SvcName), @@ -509,12 +570,12 @@ handle_call({rvi, get_available_services, []}, _From, St) -> %% [{struct,[{"a","b"}]}] %% 13:48:12.943 [debug] service_edge_rpc:local_msg: parameters: [{struct,[{"a","b"}]}] -handle_call({ rvi, handle_local_message, +handle_call({ rvi, message, [SvcName, TimeoutArg, Parameters | Tail] = Args }, From, #st{pending = Pend} = St) -> - ?debug("service_edge_rpc:local_msg: service_name: ~p", [SvcName]), - ?debug("service_edge_rpc:local_msg: timeout: ~p", [TimeoutArg]), - ?debug("service_edge_rpc:local_msg: parameters: ~p", [Parameters]), + ?debug("local_msg: service_name: ~p", [SvcName]), + ?debug("local_msg: timeout: ~p", [TimeoutArg]), + ?debug("local_msg: parameters: ~p", [Parameters]), CS = start_log(Tail, "local_message: ~s", [SvcName], St#st.cs), %% %% Authorize local message and retrieve a certificate / signature @@ -558,10 +619,15 @@ handle_cast({rvi, handle_remote_message, Parameters ] }, #st{cs = CS} = St) -> ?event({handle_remote_message, [IP, Port, SvcName, Timeout]}, St), - spawn(fun() -> - handle_remote_message_( - IP, Port, SvcName, Timeout, Parameters, CS) - end), + case SvcName of + <<"rvi:", _/binary>> -> + dispatch_reply(IP, Port, SvcName, Timeout, Parameters, CS); + _ -> + spawn(fun() -> + handle_remote_message_( + IP, Port, SvcName, Timeout, Parameters, CS) + end) + end, {noreply, St}; handle_cast({ rvi, handle_local_timeout, [SvcName, TransactionID] }, St) -> @@ -575,10 +641,35 @@ handle_cast(Other, St) -> ?warning("service_edge_rpc:handle_cast(~p): unknown", [ Other ]), {noreply, St}. -handle_info({'DOWN', Ref, _, _, {deferred_reply, Reply}}, +handle_info({'DOWN', Ref, _, _, {deferred_reply, Deferred}}, #st{pending = Pend} = St) -> + ?debug("got deferred reply: ~p", [Deferred]), case lists:keyfind(Ref, 2, Pend) of {_Pid, Ref, From} = P -> + Reply = + case Deferred of + [ok, {_, <<"{\"jsonrpc\"", _/binary>> = JSON}] -> + Decoded = jsx:decode(JSON), + ?debug("Decoded = ~p", [Decoded]), + {_, R} = lists:keyfind(<<"result">>, 1, Decoded), + ?debug("R = ~p", [R]), + {ok, [convert_status(X) || X <- R]}; + [ok, [{_,_}|_] = ReplyL] -> + case lists:keyfind(<<"result">>, 1, ReplyL) of + {_, R} -> + ?debug("R = ~p", [R]), + {ok, [convert_status(X) || X <- R]}; + false -> + ?debug("Cannot find result: ~p", [Deferred]), + Deferred + end; + [ok, I] when is_integer(I) -> + Deferred; + Other -> + ?debug("Strange deferred_reply: ~p", [Other]), + Other + end, + ?debug("Reply = ~p", [Reply]), gen_server:reply(From, Reply), {noreply, St#st{pending = Pend -- [P]}}; false -> @@ -591,16 +682,33 @@ handle_info({'DOWN', Ref, _, _, Reason}, #st{pending = Pend} = St) -> gen_server:reply(From, [internal]), {noreply, St#st{pending = Pend -- [P]}}; false -> + ?debug("got DOWN, but no corresponding pending", []), {noreply, St} end; handle_info(_Info, St) -> {noreply, St}. +convert_status({<<"status">> = K, St}) -> + {K, rvi_common:json_rpc_status(St)}; +convert_status(X) -> + X. + + terminate(_Reason, _St) -> ok. code_change(_OldVsn, St, _Extra) -> {ok, St}. +dispatch_reply(_IP, _Port, <<"rvi:", _/binary>> = ReplyId, _Timeout, Params, _CS) -> + case gproc:where({n, l, {rvi, rpc, ReplyId}}) of + undefined -> + ?debug("No process matching ~p", [ReplyId]), + ignore; + Pid -> + Pid ! {rvi, rpc_return, ReplyId, Params}, + ok + end. + handle_remote_message_(IP, Port, SvcName, Timeout, Parameters, CS) -> ?debug("service_edge:remote_msg(): remote_ip: ~p", [IP]), ?debug("service_edge:remote_msg(): remote_port: ~p", [Port]), @@ -611,7 +719,6 @@ handle_remote_message_(IP, Port, SvcName, Timeout, Parameters, CS) -> %% Check if this is a local message. case ets:lookup(?SERVICE_TABLE, SvcName) of [ #service_entry { url = URL }] -> %% This is a local message - Parameters1 = Parameters, case authorize_rpc:authorize_remote_message( CS, SvcName, @@ -619,21 +726,32 @@ handle_remote_message_(IP, Port, SvcName, Timeout, Parameters, CS) -> {remote_port, Port}, {service_name, SvcName}, {timeout, Timeout}, - {parameters, Parameters1}]) of + {parameters, Parameters}]) of [ ok ] -> - forward_message_to_local_service( - URL, SvcName, Parameters, CS); + forward_remote_msg_to_local_service( + URL, SvcName, Parameters, Timeout, CS); [ _Other ] -> ?warning("service_entry:remote_msg(): " "Failed to authenticate ~p (~p)", [SvcName, _Other]) end; [] -> - ?notice("service_entry:remote_msg(): Service Disappeared ~p", - [SvcName]) + case service_discovery_rpc:is_service_available(CS, SvcName) of + [ok, false] -> + %% log error and discard + ?debug("Remote service disappeared (~p)", [SvcName]), + [not_found]; + [ok, true] -> + ?debug( + "Forward remote message to schedule (~p)", [SvcName]), + schedule_rpc:schedule_message(CS, + SvcName, + Timeout, + Parameters) + end end. -handle_local_message_([SvcName, TimeoutArg, Parameters | _] = Args, CS) -> +handle_local_message_([SvcName, TimeoutArg, Parameters|_] = Args, CS) -> ?debug("CS = ~p", [lager:pr(CS, rvi_common)]), case authorize_rpc:authorize_local_message( CS, SvcName, [{service_name, SvcName}, @@ -647,7 +765,7 @@ handle_local_message_([SvcName, TimeoutArg, Parameters | _] = Args, CS) -> [not_found] end. -do_handle_local_message_([SvcName, TimeoutArg, Parameters | _Tail], CS) -> +do_handle_local_message_([SvcName, TimeoutArg, Parameters0 | _Tail], CS) -> %% %% Slick but ugly. %% If the timeout is more than 24 hrs old when parsed as unix time, @@ -666,6 +784,8 @@ do_handle_local_message_([SvcName, TimeoutArg, Parameters | _Tail], CS) -> false -> %% Absolute timeout. Convert to unix time msec TimeoutArg * 1000 end, + {Synch, Tag, Parameters} = check_if_synch(Parameters0), + ?debug("check_if_synch(~p) -> ~p", [Parameters0, {Synch, Tag, Parameters}]), %% %% Check if this is a local service by trying to resolve its service name. %% If successful, just forward it to its service_name. @@ -680,8 +800,12 @@ do_handle_local_message_([SvcName, TimeoutArg, Parameters | _Tail], CS) -> Res = forward_message_to_local_service(URL, SvcName, Parameters, + Synch, Tag, + Timeout, CS), - Res; + TimeoutMS = timeout_ms(Timeout), + ?debug("TimeoutMS = ~p", [TimeoutMS]), + rpc_return(Synch, Tag, TimeoutMS, Res); _ -> %% SvcName is remote %% Ask Schedule the request to resolve the network address @@ -691,9 +815,92 @@ do_handle_local_message_([SvcName, TimeoutArg, Parameters | _Tail], CS) -> SvcName, Timeout, Parameters), - [ok, TID ] + TimeoutMS = timeout_ms(Timeout), + ?debug("TimeoutMS = ~p", [TimeoutMS]), + rpc_return(Synch, Tag, TimeoutMS, [ok, TID ]) end. +timeout_ms(UnixT) -> + {M,S,U} = os:timestamp(), + Now = M * 1000000000 + (S * 1000) + (U div 1000), + UnixT - Now. + +check_if_synch(Params) -> + IsSynch = is_synch(Params), + case IsSynch of + {true, Level} -> + {ReplyId, Params1} = prepare_rpc_wait(Params, Level), + ?debug("prepare_rpc_wait(~p, ~p) -> ~p", + [Params, Level, {ReplyId, Params1}]), + {true, ReplyId, Params1}; + false -> + {false, none, Params} + end. + +is_synch(Params) -> + case lists:keyfind(<<"rvi.synch">>, 1, Params) of + {_, T} when T==true; T == <<"true">> -> + {true, 1}; + {_, <<"rvi:", _/binary>>} -> + {true, 1}; + false -> + case rvi_common:get_json_element( + ["parameters","rvi.synch"], Params) of + {ok, T} when T==true; T == <<"true">> -> + {true, 2}; + {ok, <<"rvi:", _/binary>>} -> + {true, 2}; + _ -> + false + end + end. + +prepare_rpc_wait(Params, Level) -> + NodeId = rvi_common:node_id(), + Seq = integer_to_binary(erlang:unique_integer([positive, monotonic])), + ReplyId = <<"rvi:", NodeId/binary, "/", Seq/binary>>, + gproc:reg({n,l,{rvi, rpc, ReplyId}}), + {ReplyId, replace_synch(Level, Params, {<<"rvi.synch">>, ReplyId})}. + +replace_synch(1, Params, New) -> + lists:keyreplace(<<"rvi.synch">>, 1, Params, New); +replace_synch(2, Params, New) -> + {_, Ps} = lists:keyfind(<<"parameters">>, 1, Params), + Ps1 = lists:keyreplace(<<"rvi.synch">>, 1, Ps, New), + lists:keyreplace(<<"parameters">>, 1, Params, {<<"parameters">>, Ps1}). + +rpc_return(false, _, _, Res) -> + ?debug("rpc_return(false,_,_,_) -> ~p", [Res]), + Res; +rpc_return(true, Tag, Timeout, _OrigRes) -> + ?debug("rpc_return(true,~p,~p,_)", [Tag, Timeout]), + receive + {rvi, rpc_return, Tag, Reply} -> + ?debug("received matching reply (Tag = ~p)~n~p", [Tag, Reply]), + [ok, Reply]; + Other -> + ?debug("Received Other = ~p", [Other]), + [ok, internal] + after Timeout -> + [timeout] + end. + +register_local_service_(FullSvcName, URL, Opts, T, St) -> + SvcOpts = parse_svc_opts(Opts), + CS = start_log(T, "reg local service: ~s", [FullSvcName], St#st.cs), + ?debug("register_local_service(): full name: ~p ", [FullSvcName]), + ets:insert(?SERVICE_TABLE, #service_entry { + service = FullSvcName, + opts = SvcOpts, + url = URL }), + + %% Register with service discovery, will trigger callback to service_available() + %% that forwards the registration to other connected services. + service_discovery_rpc:register_services(CS, [FullSvcName], local), + %% Return ok. + { reply, [ ok, FullSvcName ], St }. + + json_rpc_notification(Method, Parameters) -> jsx:encode( [{<<"json-rpc">>, <<"2.0">>}, @@ -761,9 +968,22 @@ dispatch_to_local_service(URL, Command, Args) -> %% Forward a message to a specific locally connected service. %% Called by forward_message_to_local_service/2. %% -forward_message_to_local_service(URL,SvcName, Parameters, CompSpec) -> - ?debug("service_edge:forward_to_local(): URL: ~p", [URL]), - ?debug("service_edge:forward_to_local(): Parameters: ~p", [Parameters]), +forward_remote_msg_to_local_service(URL,SvcName, Parameters, Timeout, CS) -> + {Synch, Tag} = + case rvi_common:get_json_element([<<"rvi.synch">>], Parameters) of + {ok, <<"rvi:", _/binary>> = ID} -> + {true, ID}; + _ -> + {false, none} + end, + forward_message_to_local_service( + URL, SvcName, Parameters, Synch, Tag, Timeout, CS). + +forward_message_to_local_service(URL, SvcName, Parameters, + Synch, Tag, Timeout, CS) -> + ?debug("forward_to_local(): URL: ~p", [URL]), + ?debug("forward_to_local(): Synch: ~p", [Synch]), + ?debug("forward_to_local(): Parameters: ~p", [Parameters]), %% %% Strip our node prefix from service_name so that @@ -773,28 +993,76 @@ forward_message_to_local_service(URL,SvcName, Parameters, CompSpec) -> %% Pfx = rvi_common:local_service_prefix(), Sz = byte_size(Pfx)-1, - <<_:Sz/binary, LocalSvcName/binary>> = SvcName, + <<_:Sz/binary, LocalSvcName0/binary>> = SvcName, + LocalSvcName = normalize_slash(LocalSvcName0), ?debug("Service name: ~p (Pfx = ~p)", [LocalSvcName, Pfx]), %% Deliver the message to the local service, which can %% be either a wse websocket, or a regular HTTP JSON-RPC call - spawn(fun() -> - try + Me = self(), + spawn( + fun() -> + try log_outcome( - rvi_common:get_request_result( - dispatch_to_local_service(URL, - message, - [{<<"service_name">>, LocalSvcName }, - {<<"parameters">>, Parameters }])), - SvcName, CompSpec) - catch - Tag:Err -> - ?error("Caught ~p:~p~n~p", - [Tag,Err,erlang:get_stacktrace()]) - end - end), + maybe_reply( + Synch, Me, Tag, + rvi_common:get_request_result( + dispatch_to_local_service( + URL, + message, + [{<<"service_name">>, LocalSvcName }, + {<<"parameters">>, Parameters }])), Timeout, CS), + SvcName, CS) + catch + Tag:Err -> + ?error("Caught ~p:~p~n~p", + [Tag,Err,erlang:get_stacktrace()]) + end + end), timer:sleep(500), [ ok, -1 ]. +normalize_slash(Svc) -> + {match, [Stripped]} = re:run(Svc, "/*(.*)", [{capture,[1],binary}]), + <<"/", Stripped/binary>>. + +maybe_reply(false, _, _, Res, _Timeout, _CS) -> + Res; +maybe_reply(true, _, <<"rvi:", _/binary>> = Tag, Res, Timeout, CS) -> + ?debug("maybe_reply() Tag: ~p", [Tag]), + ?debug("maybe_reply() Res: ~p", [Res]), + case decode_reply(Res) of + error -> + ignore; %% but should log ... + Params -> + log("schedule reply (~p)", [Tag], CS), + schedule_rpc:schedule_message(CS, Tag, Timeout, Params) + end, + Res; +maybe_reply(true, Pid, Tag, Res, _Timeout, _CS) -> + Pid ! {rvi, rpc_return, Tag, Res}, + Res. + +decode_reply({_, Res}) when is_binary(Res) -> + try jsx:decode(Res) of + Decoded -> + {ok, Result} = + rvi_common:get_json_element([<<"result">>], Decoded), + [{<<"result">>, Result}] + catch + error:_ -> + error + end; +decode_reply({_, [{_,_}|_] = Res}) -> + try rvi_common:get_json_element([<<"result">>], Res) of + {ok, Result} -> + [{<<"result">>, Result}]; + _ -> + error + catch + error:_ -> + error + end. + log_outcome({Status, _}, _SvcName, CS) -> log("result: ~w", [Status], CS); log_outcome(Other, _SvcName, CS) -> @@ -880,3 +1148,21 @@ log_id_json_tail(Args) -> event(_, _, _) -> ok. + +append_files_to_params([], Parameters) -> + Parameters; +append_files_to_params(Files, [T|_] = Parameters) when is_tuple(T) -> % object + Parameters ++ [{<<"rvi.files">>, Files}]; +append_files_to_params(Files, [_|_] = Parameters) -> % array + Parameters ++ [[{<<"rvi.files">>, Files}]]. + + +parse_svc_opts(Opts) -> + Files = rvi_common:get_opt_json_element(<<"files">>, <<"inline">>, Opts), + [{<<"files">>, files_option(Files)}]. + +files_option(O = <<"inline">>) -> O; +files_option(O = <<"reject">>) -> O; +files_option(O = <<"multipart">>) -> O; +files_option(_) -> + throw(invalid_command). diff --git a/deps/exo/src/exo_http.erl b/deps/exo/src/exo_http.erl index 8aea78a..40bb494 100644 --- a/deps/exo/src/exo_http.erl +++ b/deps/exo/src/exo_http.erl @@ -26,7 +26,7 @@ -export([wtrace/1, wtrace/2, wtrace/3, wtrace/4]). -export([open/1, open/2, close/3]). -export([request/2, request/3, request/4, request/5]). --export([send/2, send/3, send/4, send/7, +-export([send/2, send/3, send/4, send/7, send_body/2, send_chunk/2, send_chunk_end/2]). %% message interface @@ -57,8 +57,8 @@ format_hdr/1, fmt_chdr/1, fmt_shdr/1, - make_request/4, - make_response/4, + make_request/4, + make_response/4, auth_basic_encode/2, url_encode/1, make_headers/2 @@ -145,7 +145,7 @@ wtrace(Url, Version, Hs,Timeout) -> %% %% Content-type: multipart/
%% -%% - Data = [{file,ContentType,DispositionName,FileName} | +%% - Data = [{file,ContentType,DispositionName,FileName} | %% {data,ContentType,DispositionName,<>} | %% <>] %% @@ -177,19 +177,19 @@ wpost_body(Req, Data) -> wpost_plain_body(Req, Data) end. - + wpost_form_body(Req, Data) -> {ok,Req,format_query(Data)}. wpost_multi_body(Req, Data) -> H = Req#http_request.headers, Ct0 = H#http_chdr.content_type, - {Boundary,Req1} = + {Boundary,Req1} = case string:str(Ct0, "boundary=") of 0 -> {MS,S,US} = now(), Bnd = integer_to_list((1000000*(1000000*MS+S) + US)), - Ct1 = H#http_chdr.content_type ++ + Ct1 = H#http_chdr.content_type ++ "; boundary=\""++Bnd ++"\"", H1 = set_chdr('Content-Type', Ct1, H), {Bnd, Req#http_request { headers = H1 }}; @@ -219,18 +219,18 @@ wpost_plain_body(Req, Data) -> [{file,_,_,FileName}] -> {ok,Bin} = file:read_file(FileName), Bin; - [{data,_,Bin}] -> + [{data,_,Bin}] -> Bin; - [{data,_,_,Bin}] -> + [{data,_,_,Bin}] -> Bin; List when is_list(List) -> - list_to_binary(List) + iolist_to_binary(List) end, {ok,Req,Body}. multi_data(Data, Boundary) -> - list_to_binary( + iolist_to_binary( [ lists:map( fun(Bin) when is_binary(Bin) -> @@ -273,6 +273,14 @@ multi_data(Data, Boundary) -> Bin, ?CRNL ]; + ({data,[{_,_}|_] = Hdrs,Bin}) -> + [ + "--",Boundary,?CRNL, + [format_field(K, V) || {K,V} <- Hdrs], + ?CRNL, + Bin, + ?CRNL + ]; ({data,ContentType,Bin}) -> [ "--",Boundary,?CRNL, @@ -323,7 +331,7 @@ xrequest(Proxy,Port,Req,Body,Timeout) -> end; Error -> Error - end. + end. request(S, Req, Body, Proxy) -> request(S, Req, Body, Proxy, infinity). @@ -342,14 +350,14 @@ request(S, Req, Body, Proxy, Timeout) -> ?dbg("body: ~p\n", [Error]), Error end; - Error -> + Error -> ?dbg("response: ~p\n", [Error]), Error end; Error -> Error end. -open(Request) -> +open(Request) -> open(Request,infinity). open(Request,Timeout) -> @@ -397,7 +405,7 @@ do_close(Req, Res) -> ResH = Res#http_response.headers, case tokens(ResH#http_shdr.connection) of ["close"] -> true; - ["keep-alive"] -> + ["keep-alive"] -> %% Check {1,0} and keep-alive requested false; _ -> @@ -439,7 +447,7 @@ send(Socket, Method, URI, Version, H, Body, Proxy) -> true -> Url#url.port end, - H1 = + H1 = if H#http_chdr.host == undefined -> H#http_chdr { host = Url#url.host }; true -> @@ -454,7 +462,7 @@ send(Socket, Method, URI, Version, H, Body, Proxy) -> true -> H1 end, - H3 = if Version == {1,0}, + H3 = if Version == {1,0}, H1#http_chdr.connection == undefined -> H2#http_chdr { connection = "keep-alive" }; true -> @@ -466,7 +474,7 @@ send(Socket, Method, URI, Version, H, Body, Proxy) -> exo_socket:send(Socket, Request). %% -%% Send "extra" body data not sent in the original send +%% Send "extra" body data not sent in the original send %% send_body(Socket, Body) -> exo_socket:send(Socket, Body). @@ -593,7 +601,7 @@ recv_body_eof(Socket) -> recv_body_eof(Socket,infinity). recv_body_eof(Socket,Timeout) -> - ?dbg("RECV_BODY_EOF: tmo=~w\n", [Timeout]), + ?dbg("RECV_BODY_EOF: tmo=~w\n", [Timeout]), exo_socket:setopts(Socket, [{packet,raw},{mode,binary}]), recv_body_eof1(Socket, [], Timeout). @@ -602,7 +610,7 @@ recv_body_eof1(Socket, Acc,Timeout) -> {ok, Bin} -> recv_body_eof1(Socket, [Bin|Acc],Timeout); {error, closed} -> - {ok, list_to_binary(reverse(Acc))}; + {ok, iolist_to_binary(reverse(Acc))}; Error -> Error end. @@ -611,10 +619,10 @@ recv_body_data(Socket, Len) -> recv_body_data(Socket, Len, infinity). recv_body_data(_Socket, 0, _Timeout) -> - ?dbg("RECV_BODY_DATA: len=0, tmo=~w\n", [_Timeout]), + ?dbg("RECV_BODY_DATA: len=0, tmo=~w\n", [_Timeout]), {ok, <<>>}; recv_body_data(Socket, Len, Timeout) -> - ?dbg("RECV_BODY_DATA: len=~p, tmo=~w\n", [Len,Timeout]), + ?dbg("RECV_BODY_DATA: len=~p, tmo=~w\n", [Len,Timeout]), exo_socket:setopts(Socket, [{packet,raw},{mode,binary}]), case exo_socket:recv(Socket, Len, Timeout) of {ok, Bin} -> @@ -646,8 +654,8 @@ recv_body_chunk(S, Acc, Timeout) -> ?dbg("CHUNK TRAILER: ~p\n", [_TR]), exo_socket:setopts(S, [{packet,http}, {mode,binary}]), - {ok,list_to_binary(reverse(Acc))}; - Error -> + {ok, iolist_to_binary(reverse(Acc))}; + Error -> Error end; ChunkSize > 0 -> @@ -691,10 +699,10 @@ recv_headers(S, R) -> recv_headers(S, R, Timeout) -> if is_record(R, http_request) -> recv_hc(S, R, #http_chdr { },Timeout); - is_record(R, http_response) -> + is_record(R, http_response) -> recv_hs(S, R, #http_shdr { },Timeout) end. - + recv_hc(S, R, H, Timeout) -> case exo_socket:recv(S, 0, Timeout) of @@ -714,10 +722,10 @@ recv_hc(S, R, H, Timeout) -> Got -> {error, Got} end; - {error, {http_error, ?CRNL}} -> + {error, {http_error, ?CRNL}} -> ?dbg("ERROR CRNL <\n", []), recv_hc(S, R, H,Timeout); - {error, {http_error, ?NL}} -> + {error, {http_error, ?NL}} -> ?dbg("ERROR NL <\n", []), recv_hc(S, R, H,Timeout); Error -> Error @@ -741,10 +749,10 @@ recv_hs(S, R, H, Timeout) -> Got -> {error, Got} end; - {error, {http_error, ?CRNL}} -> + {error, {http_error, ?CRNL}} -> ?dbg("ERROR CRNL <\n", []), recv_hs(S, R, H,Timeout); - {error, {http_error, ?NL}} -> + {error, {http_error, ?NL}} -> ?dbg("ERROR NL <\n", []), recv_hs(S, R, H, Timeout); Error -> Error @@ -782,7 +790,7 @@ format_request(Method, Url, Version, Proxy) -> end, " ", if is_record(Url, url) -> - if Proxy == true -> + if Proxy == true -> exo_url:format(Url); true -> exo_url:format_path(Url) @@ -801,7 +809,7 @@ format_response(R) -> R#http_response.phrase). format_response({0,9}, _Status, _Phrase) -> ""; -format_response(Version, Status, Phrase) -> +format_response(Version, Status, Phrase) -> [case Version of {1,0} -> "HTTP/1.0"; {1,1} -> "HTTP/1.1" @@ -853,7 +861,7 @@ url_encode([C|T]) -> C >= $0, C =< $9 -> [C|url_encode(T)]; C == $\s -> [$+|url_encode(T)]; C == $_; C == $.; C == $-; C == $/; C == $: -> % FIXME: more.. - [C|url_encode(T)]; + [C|url_encode(T)]; true -> case erlang:integer_to_list(C, 16) of [C1] -> [$%,$0,C1 | url_encode(T)]; @@ -900,7 +908,7 @@ mk_shdr([{K,V}|Hs], H) -> mk_shdr([], H) -> H. -set_shdr(K,V,H) -> +set_shdr(K,V,H) -> case K of 'Connection' -> H#http_shdr { connection = V }; 'Transfer-Encoding' -> H#http_shdr { transfer_encoding = V }; @@ -908,7 +916,7 @@ set_shdr(K,V,H) -> 'Set-Cookie' -> H#http_shdr { set_cookie = V }; 'Content-Length' -> H#http_shdr { content_length = V }; 'Content-Type' -> H#http_shdr { content_type = V }; - _ -> + _ -> Hs = [{K,V} | H#http_shdr.other], H#http_shdr { other = Hs } end. @@ -921,7 +929,7 @@ mk_chdr([{K,V}|Hs], H) -> mk_chdr([], H) -> H. -set_chdr(K,V,H) -> +set_chdr(K,V,H) -> case K of 'Host' -> H#http_chdr { host = V }; 'Connection' -> H#http_chdr { connection = V }; @@ -949,9 +957,9 @@ set_chdr(K,V,H) -> end. format_hdr(H) when is_record(H, http_chdr) -> - fcons('Host', H#http_chdr.host, - fcons('Connection', H#http_chdr.connection, - fcons('Transfer-Encoding', H#http_chdr.transfer_encoding, + fcons('Host', H#http_chdr.host, + fcons('Connection', H#http_chdr.connection, + fcons('Transfer-Encoding', H#http_chdr.transfer_encoding, fcons('Accept', H#http_chdr.accept, fcons('If-Modified-Since', H#http_chdr.if_modified_since, fcons('If-Match', H#http_chdr.if_match, @@ -969,7 +977,7 @@ format_hdr(H) when is_record(H, http_chdr) -> fcons('Authorization', H#http_chdr.authorization, format_headers(H#http_chdr.other))))))))))))))))))); format_hdr(H) when is_record(H, http_shdr) -> - fcons('Connection', H#http_shdr.connection, + fcons('Connection', H#http_shdr.connection, fcons('Transfer-Encoding', H#http_shdr.transfer_encoding, fcons('Location', H#http_shdr.location, fcons('Set-Cookie', H#http_shdr.set_cookie, @@ -978,16 +986,16 @@ format_hdr(H) when is_record(H, http_shdr) -> format_headers(H#http_shdr.other))))))). -%% -%% Convert the http_chdr (client header) structure into a +%% +%% Convert the http_chdr (client header) structure into a %% key value list suitable for formatting. %% returns [ {Key,Value} ] %% Looks a bit strange, but is done this way to avoid creation %% of garabge. fmt_chdr(H) -> - hcons('Host', H#http_chdr.host, - hcons('Connection', H#http_chdr.connection, - hcons('Transfer-Encoding', H#http_chdr.transfer_encoding, + hcons('Host', H#http_chdr.host, + hcons('Connection', H#http_chdr.connection, + hcons('Transfer-Encoding', H#http_chdr.transfer_encoding, hcons('Accept', H#http_chdr.accept, hcons('If-Modified-Since', H#http_chdr.if_modified_since, hcons('If-Match', H#http_chdr.if_match, @@ -1005,11 +1013,11 @@ fmt_chdr(H) -> hcons('Authorization', H#http_chdr.authorization, H#http_chdr.other)))))))))))))))))). -%% Convert the http_shdr (server header) structure into a +%% Convert the http_shdr (server header) structure into a %% key value list suitable for formatting. fmt_shdr(H) -> - hcons('Connection', H#http_shdr.connection, - hcons('Transfer-Encoding', H#http_shdr.transfer_encoding, + hcons('Connection', H#http_shdr.connection, + hcons('Transfer-Encoding', H#http_shdr.transfer_encoding, hcons('Location', H#http_shdr.location, hcons('Set-Cookie', H#http_shdr.set_cookie, hcons('Content-Length', H#http_shdr.content_length, @@ -1017,7 +1025,7 @@ fmt_shdr(H) -> H#http_shdr.other)))))). hcons(_Key, undefined, Hs) -> Hs; -hcons(Key, Val, Hs) -> +hcons(Key, Val, Hs) -> [{Key,Val} | Hs]. hcons_list(Key, [V|Vs], Hs) -> @@ -1042,7 +1050,7 @@ chunk_size(Line) -> chunk_size(Line, 0). chunk_size([H|Hs], N) -> - if + if H >= $0, H =< $9 -> chunk_size(Hs, (N bsl 4)+(H-$0)); H >= $a, H =< $f -> @@ -1054,11 +1062,10 @@ chunk_size([H|Hs], N) -> H == $\s -> {N, Hs}; H == $; -> {N, [H|Hs]} end; -chunk_size([], N) -> +chunk_size([], N) -> {N, ""}. -tokens(undefined) -> +tokens(undefined) -> []; tokens(Line) -> string:tokens(string:to_lower(Line), ";"). - diff --git a/priv/config/rvi_backend.config b/priv/config/rvi_backend.config index 0781553..c9ddea0 100644 --- a/priv/config/rvi_backend.config +++ b/priv/config/rvi_backend.config @@ -46,6 +46,12 @@ LogLevel = Env("RVI_LOGLEVEL", notice). { node_service_prefix, "genivi.org/backend"}, + { services, + [ + {<<"rvi/admin">>, rvi_services_adm, + "http://" ++ IPPort(MyIP, MyPort+30)} + ] }, + { routing_rules, [ { "", diff --git a/priv/config/rvi_common.config b/priv/config/rvi_common.config index 0b6cb8d..381df00 100644 --- a/priv/config/rvi_common.config +++ b/priv/config/rvi_common.config @@ -62,6 +62,7 @@ LogLevel = Env("RVI_LOGLEVEL", info). rvi_common, service_discovery, service_edge, + rvi_services, authorize, schedule, dlink, @@ -115,10 +116,13 @@ LogLevel = Env("RVI_LOGLEVEL", info). %% }, {rvi_core, [ + {'$setup_vars', [{"NODE_ID", {apply, rvi_common, default_node_id, []}}]}, + %% {device_key, "$PRIV_DIR/keys/device_key.pem"}, {device_cert, "$PRIV_DIR/certificates/device_cert.crt"}, {root_cert, "$PRIV_DIR/certificates/root_cert.crt"}, - {cred_dir, "$PRIV_DIR/credentials"} + {cred_dir, "$PRIV_DIR/credentials"}, + {node_id, {'$binary', "$NODE_ID"}} ]} ]} ]. diff --git a/priv/config/rvi_sample.config b/priv/config/rvi_sample.config index 3f957ba..46887c7 100644 --- a/priv/config/rvi_sample.config +++ b/priv/config/rvi_sample.config @@ -145,6 +145,11 @@ LogLevel = Env("RVI_LOGLEVEL", notice). %% { node_service_prefix, "genivi.org/node/$rvi_uuid(default_id)/"}, + { services, + [ + {<<"rvi/admin">>, rvi_services_adm, + "http://" ++ IPPort(MyIP, MyPort+30)} + ] }, %% Routing rules determine how to get a message targeting a specific %% service to its destination. diff --git a/priv/test_config/bt_sample.config b/priv/test_config/bt_sample.config index 92fb0d1..b6fb9b5 100644 --- a/priv/test_config/bt_sample.config +++ b/priv/test_config/bt_sample.config @@ -7,7 +7,7 @@ [ {rvi_core, [ - { [routing_rules, ""], {proto_json_rpc, dlink_bt_rpc} }, + { [routing_rules, ""], [{proto_json_rpc, dlink_bt_rpc}] }, { [components, data_link], [{dlink_bt_rpc, gen_server, [{server_opts, [{test_mode, tcp}, {port, 9007}]}, diff --git a/priv/test_config/tls_sample2.config b/priv/test_config/tls_sample2.config new file mode 100644 index 0000000..46cfb2f --- /dev/null +++ b/priv/test_config/tls_sample2.config @@ -0,0 +1,18 @@ +%% -*- erlang -*- +[ + {include_lib, "rvi_core/priv/test_config/sample.config"}, + {set_env, + [ + {rvi_core, + [ + { node_service_prefix, "jlr.com/vin/def" }, + { [routing_rules, ""], [{proto_msgpack_rpc, dlink_tls_rpc}] }, + { [components, data_link], [{dlink_tls_rpc, gen_server, + [{server_opts, [{port, 9107}, + {ping_interval,500}]}, + {persistent_connections, + ["localhost:8807"]}]}]}, + { [components, protocol], [{ proto_msgpack_rpc, gen_server, [] }] } + ]} + ]} +]. diff --git a/python/rvi_call.py b/python/rvi_call.py index 6fb8b55..49218b7 100755 --- a/python/rvi_call.py +++ b/python/rvi_call.py @@ -4,12 +4,12 @@ # Copyright (C) 2014, Jaguar Land Rover # # This program is licensed under the terms and conditions of the -# Mozilla Public License, version 2.0. The full text of the +# Mozilla Public License, version 2.0. The full text of the # Mozilla Public License is at https://www.mozilla.org/MPL/2.0/ # -# +# # Simple RVI service caller -# +# import sys from rvilib import RVI @@ -25,12 +25,12 @@ def usage(): print print "Example: ./callrvi.py -n http://rvi1.nginfotpdx.net:9001 \\" print " jlr.com/vin/aaron/4711/test/ping \\" - print " arg1=val1 arg2=val2" + print " arg1=val1 arg2=val2" sys.exit(255) -# +# # Check that we have the correct arguments # opts, args= getopt.getopt(sys.argv[1:], "n:") @@ -71,4 +71,5 @@ def usage(): # # Send the messge. # -rvi.message(service, rvi_args) +response = rvi.message(service, rvi_args) +print "Response: ", response diff --git a/rebar.config b/rebar.config index 6533c9e..e3ab5ea 100644 --- a/rebar.config +++ b/rebar.config @@ -7,6 +7,7 @@ {lib_dirs, [ "deps", "components" ]}. {sub_dirs, ["rel", "components/rvi_common", + "components/rvi_services", "components/authorize", "components/dlink", "components/dlink_bt", diff --git a/src/rvi_ttb.erl b/src/rvi_ttb.erl new file mode 100644 index 0000000..d1871f8 --- /dev/null +++ b/src/rvi_ttb.erl @@ -0,0 +1,56 @@ +%% Stolen with pride from https://github.com/uwiger/locks +-module(rvi_ttb). +-behaviour(tr_ttb). + +-export([on_nodes/2, + stop/0, + stop_nofetch/0, + format/2]). + +-export([patterns/0, + flags/0]). + +-export([event/1]). + +-include_lib("trace_runner/include/trace_runner.hrl"). + +%% This function is also traced. Can be used to insert markers in the trace +%% log. +event(E) -> + event(?LINE, E, none). + +event(_, _, _) -> + ok. + +on_nodes(Ns, File) -> + tr_ttb:on_nodes(Ns, File, ?MODULE). + +patterns() -> + [{authorize_rpc , event, 3, []}, + {service_edge_rpc , event, 3, []}, + {service_discovery_rpc, event, 3, []}, + {dlink_tcp_rpc , event, 3, []}, + {connection , event, 3, []}, + {dlink_tls_rpc , event, 3, []}, + {dlink_tls_conn , event, 3, []}, + {dlink_bt_rpc , event, 3, []}, + {bt_connection , event, 3, []}, + {dlink_sms_rpc , event, 3, []}, + {schedule_rpc , event, 3, []}, + {proto_json_rpc , event, 3, []}, + {proto_msgpack_rpc , event, 3, []}, + {rvi_common , event, 3, []}, + {?MODULE , event, 3, []} + | tr_ttb:default_patterns()]. + +flags() -> + {all, call}. + +stop() -> + tr_ttb:stop(). + +stop_nofetch() -> + tr_ttb:stop_nofetch(). + +format(Dir, Out) -> + tr_ttb:format(Dir, Out). diff --git a/test/call_svc_attach.sh b/test/call_svc_attach.sh new file mode 100644 index 0000000..3741e7c --- /dev/null +++ b/test/call_svc_attach.sh @@ -0,0 +1,54 @@ +#!/bin/sh +# Call service with attachment +# + +URL="http://localhost:8801" + +if [ "$#" = "2" ] +then + SVC=$1 + FILE=$2 +else + echo "Usage: $0 service file" + exit 255 +fi + + +if [ ! -f $FILE ] +then + echo "File $1 is not readable." + exit 255 +fi + +B="---//$(date +%s)//---" # boundary delimiter + +BASENAME=$(basename $FILE) + +# Create a file containing the actual call +cat > /tmp/svc_attach.json < /tmp/svc_attach.json < t_install_sms_sample_node, t_install_tls_backend_node, t_install_tls_sample_node, + t_install_tls_sample2_node, t_install_tlsj_backend_node, t_install_tlsj_sample_node, t_install_tls_backend_noverify_node, @@ -91,8 +100,12 @@ groups() -> t_start_basic_sample, t_register_lock_service, t_register_sota_service, + t_register_sota_service_opt_inline, + t_register_sota_service_opt_reject, + t_register_sota_service_opt_multipart, t_call_lock_service, t_call_sota_service, + t_call_sota_service_inline, t_multicall_sota_service, t_remote_call_lock_service, t_get_node_service_prefix, @@ -102,6 +115,7 @@ groups() -> [ t_start_tls_backend, t_start_tls_sample, + t_start_tls_sample2, t_register_lock_service, t_register_sota_service, t_call_lock_service, @@ -109,6 +123,8 @@ groups() -> t_call_sota_service, t_multicall_sota_service, t_get_node_service_prefix, + t_call_sota_service_synch, + t_call_cred_mgmt, t_check_rvi_log, t_no_errors ]}, @@ -214,19 +230,26 @@ t_backend_keys_and_cert(Config) -> CredDir = ensure_dir("basic_backend_creds"), generate_device_keys(Dir, Config), generate_cred(backend, Dir, CredDir, Config), - generate_sota_cred(backend, Dir, CredDir, Config). + generate_sota_cred(backend, Dir, CredDir, Config), + generate_sota_cred(backend, Dir, CredDir, "_inline", "/inline", Config), + generate_sota_cred(backend, Dir, CredDir, "_reject", "/reject", Config), + generate_sota_cred( + backend, Dir, CredDir, "_multipart", "/multipart", Config). t_sample_keys_and_cert(Config) -> Dir = ensure_dir("basic_sample_keys"), generate_device_keys(Dir, Config), CredDir = ensure_dir("basic_sample_creds"), generate_cred(sample, Dir, CredDir, Config), - generate_sota_cred(sample, Dir, CredDir, Config). + generate_sota_cred(sample, Dir, CredDir, Config), + generate_sota_cred(sample, Dir, CredDir, "_inline", "/inline", Config), + generate_sota_cred(sample, Dir, CredDir, "_reject", "/reject", Config), + generate_sota_cred( + sample, Dir, CredDir, "_multipart", "/multipart", Config). t_install_backend_node(_Config) -> install_backend_node("basic_backend"). - t_install_sample_node(_Config) -> install_sample_node("basic_sample"). @@ -242,6 +265,9 @@ t_install_tls_backend_node(_Config) -> t_install_tls_sample_node(_Config) -> install_sample_node("tls_sample"). +t_install_tls_sample2_node(_Config) -> + install_sample_node("tls_sample2"). + t_install_tlsj_backend_node(_Config) -> install_backend_node("tlsj_backend"). @@ -293,6 +319,9 @@ t_start_tls_backend(_Config) -> t_start_tls_sample(_Config) -> start_sample("tls_sample"). +t_start_tls_sample2(_Config) -> + generic_start(sample2, "tls_sample2"). + t_start_tlsj_backend(_Config) -> start_backend("tlsj_backend"). @@ -315,13 +344,46 @@ t_register_lock_service(_Config) -> t_register_sota_service(_Config) -> Pid = start_json_rpc_server(9987), - reg_service_request("sample", <<"sota">>, <<"http://localhost:9987">>), + reg_service_request("sample", <<"/sota">>, <<"http://localhost:9987">>), save({service, sota}, Pid), - timer:sleep(2000). + timer:sleep(1000). + +t_register_sota_service_opt_inline(_Config) -> + t_register_sota_service_(9988, <<"inline">>). + +t_register_sota_service_opt_reject(_Config) -> + t_register_sota_service_(9989, <<"reject">>). + +t_register_sota_service_opt_multipart(_Config) -> + t_register_sota_service_(9990, <<"multipart">>). + +t_register_sota_service_(Port, Mode) -> + Pid = start_json_rpc_server(Port), + ct:log("json_rpc_server(~p): ~p", [Port, Pid]), + reg_service_w_attachments( + "sample", <<"/sota/", Mode/binary>>, + <<"http://localhost:", (integer_to_binary(Port))/binary>>, Mode), + save({service, {sota, Mode}}, Pid), + timer:sleep(500). t_call_sota_service(_Config) -> call_sota_service_(sota_client, sota_bin()). +t_call_sota_service_inline(_Config) -> + call_sota_service_( + <<"/sota/inline">>, + sota_client_inline, + [{<<"mydata">>, <<"file:testfile.txt">>}], + [testfile()]). + +t_call_sota_service_synch(_Config) -> + call_sota_service_( + <<"/sota">>, + sota_client_synch, + <<"the data">>, + [{<<"rvi.synch">>, true}], + []). + t_multicall_sota_service(Config) -> with_trace(fun t_multicall_sota_service_/1, Config, "t_multicall_sota_service"). @@ -339,6 +401,9 @@ t_multicall_sota_service_(_Config) -> Ref = erlang:send_after(5000, self(), collect_timeout), collect(Pids, Ref). +t_call_cred_mgmt(Config) -> + ok. + collect([{_, Ref} | T] = L, TRef) -> receive {'DOWN', Ref, _, _, {ok, ok}} -> @@ -406,26 +471,49 @@ node_prefix_result(Res) -> proplists:get_value(<<"result">>, Res), []). call_sota_service_(RegName, Data) -> + call_sota_service_(<<"/sota">>, RegName, Data). + +call_sota_service_(Svc, RegName, Data) -> + ct:log("call_sota_service_(Svc = ~p,...)", [Svc]), {Mega, Secs, _} = os:timestamp(), Timeout = Mega * 1000000 + Secs + 60, register(RegName, self()), json_rpc_request(service_edge("backend"), <<"message">>, - [ - {<<"service_name">>, <<"jlr.com/vin/abc/sota">>}, - {<<"timeout">>, Timeout}, - {<<"parameters">>, - [ - {<<"data">>, Data}, - {<<"sendto">>, atom_to_binary(RegName, latin1)}, - {<<"rvi.max_msg_size">>, 100} - ]} - ]), + sota_args(Svc, Timeout, RegName, Data)), receive - {message, [{service_name, <<"/sota">>}, - {data, Data}]} = Res -> + {message, [{service_name, Svc}, + {data, Data} | T]} = Res -> ct:log("got json_rpc_result: ~p", [Res]), ok; + {message, Other} -> + ct:log("wrong message: ~p", [Other]), + error({unmatched, Other}); + Msg -> + ct:log("received: ~p", [Msg]), + error({unexpected, Msg}) + after 5000 -> + error(timeout) + end. + +call_sota_service_(Svc, RegName, Data, Files) -> + call_sota_service_(Svc, RegName, Data, [], Files). + +call_sota_service_(Svc, RegName, Data, XArgs, Files) -> + ct:log("call_sota_service_(Svc = ~p,...)", [Svc]), + {Mega, Secs, _} = os:timestamp(), + Timeout = Mega * 1000000 + Secs + 60, + register(RegName, self()), + CallRes = json_rpc_request(service_edge("backend"), + <<"message">>, + sota_args(Svc, Timeout, RegName, XArgs, Data), + Files), + ct:log("CallRes = ~p", [CallRes]), + receive + {message, [{service_name, Svc}, + {data, Data} | _]} = Res -> + ct:log("Got json_rpc_result: ~p", [Res]), + ok; {message, Other} -> ct:log("wrong message: ~p", [Other]), error({unmatched, Other}) @@ -433,6 +521,26 @@ call_sota_service_(RegName, Data) -> error(timeout) end. +sota_args(Svc, Timeout, RegName, Data) -> + sota_args(Svc, Timeout, RegName, [], Data). + +sota_args(Svc, Timeout, RegName, XArgs, Data) -> + ct:log("sota_args(~p, ~p, ~p, ~p, ~p)", [Svc, Timeout, RegName, + XArgs, Data]), + [{<<"service_name">>, join(<<"jlr.com/vin/abc">>, Svc)}, + {<<"timeout">>, Timeout}, + {<<"parameters">>, + [{<<"data">>, Data}, + {<<"sendto">>, atom_to_binary(RegName, latin1)}, + {<<"rvi.max_msg_size">>, 100} + | XArgs + ]} + ]. + +join(Pfx, Name) -> + re:replace(<>, "/+", "/", + [global, {return, binary}]). + t_call_lock_service(_Config) -> CallPid = spawn_cmd( [python(), @@ -485,12 +593,36 @@ unwrap_event(Id, [{<<"ts">>,T},{<<"lvl">>,L},{<<"cmp">>,C},{<<"evt">>,E}]) -> {Id, T, binary_to_integer(L), C, E}. json_rpc_request(URL, Method, Args) -> - Req = jsx:encode([{<<"jsonrpc">>, <<"2.0">>}, - {<<"id">>, 1}, - {<<"method">>, Method}, - {<<"params">>, Args}]), + json_rpc_request(URL, Method, Args, []). + +json_rpc_request(URL, Method, Args, []) -> + Req = encode_request(Method, Args), Hdrs = [{'Content-Type', "application/json"}], - json_result(exo_http:wpost(URL, {1, 1}, Hdrs, Req, 1000)). + json_result(exo_http:wpost(URL, {1, 1}, Hdrs, Req, 1000)); +json_rpc_request(URL, Method, Args, Files) -> + Req = encode_request(Method, Args), + Hdrs = [{'Content-Type', "multipart/related"}], + Reqs = [{data, "application/json", Req} + | [file_part(F) || F <- Files]], + json_result(exo_http:wpost(URL, {1, 1}, Hdrs, Reqs, 1000)). + +encode_request(Method, Args) -> + jsx:encode([{<<"jsonrpc">>, <<"2.0">>}, + {<<"id">>, 1}, + {<<"method">>, Method}, + {<<"params">>, Args}]). + +file_part(File) -> + case file:read_file(File) of + {ok, Bin} -> + Base = filename:basename(File), + {data, [{'Content-Type', "application/octet-stream"}, + {'Content-Disposition', "filename=\"" ++ Base ++ "\""}, + {'Content-Transfer-Encoding', "8bit"}, + {'Content-ID', Base}], Bin}; + Other -> + error(Other) + end. reg_service_request(Node, Svc, URL) -> check_reg_svc_response( @@ -499,6 +631,14 @@ reg_service_request(Node, Svc, URL) -> [{<<"service">>, Svc}, {<<"network_address">>, URL}])). +reg_service_w_attachments(Node, Svc, URL, Mode) -> + check_reg_svc_response( + json_rpc_request(service_edge(Node), + <<"register_service">>, + [{<<"service">>, Svc}, + {<<"network_address">>, URL}, + {<<"opts">>, [{<<"files">>, Mode}]}])). + check_reg_svc_response(JSON) -> ct:log("check_reg_svc: ~p", [JSON]), {_, Res} = lists:keyfind(<<"result">>, 1, JSON), @@ -523,15 +663,24 @@ json_result(Other) -> ct:log("json_result(~p)", [Other]), error({unexpected, Other}). - start_json_rpc_server(Port) -> {ok, Pid} = exo_http_server:start(Port, [{request_handler, {?MODULE, handle_body, [foo]}}]), + true = erlang:group_leader(erlang:group_leader(), Pid), save({server,Port}, Pid), Pid. -handle_body(Socket, _Request, Body, _St) -> - ct:log("handle_body(Body = ~p)", [Body]), +handle_body(Socket, Request, Body, St) -> + ct:log("handle_body(~p,~p)", [Request, Body]), + case exoport_exo_http:inspect_multipart_post(Request, Body) of + {ok, [JSON], []} -> + ct:log("No attachments", []), + handle_body_(Socket, Request, Body, St); + {ok, [JSON], Files} -> + ct:log("Body = ~p~nFiles = ~p", [JSON, Files]) + end. + +handle_body_(Socket, _Request, Body, _St) -> JSON = jsx:decode(Body), ct:log("Got JSON Req: ~p", [JSON]), case JSON of @@ -543,24 +692,42 @@ handle_body(Socket, _Request, Body, _St) -> {<<"parameters">>, [ {<<"data">>, Data}, {<<"sendto">>, SendTo}, - {<<"rvi.max_msg_size">>, _}]} + {<<"rvi.max_msg_size">>, _} | T ]} ]}] -> binary_to_existing_atom(SendTo, latin1) ! {message, [{service_name, SvcName}, - {data, Data}]}, - Reply = [{<<"jsonrpc">>, <<"2.0">>}, - {<<"id">>, ID}, - {<<"result">>, <<"ok">>}], - exo_http_server:response( - Socket, undefined, 200, "OK", - jsx:encode(Reply), - [{'content_type', "application/json"}]); + {data, Data} | T]}, + case lists:keymember(<<"rvi.synch">>, 1, T) of + true -> + http_reply(Socket, ID, [{<<"status">>, 0}, + {<<"answer">>, <<"OK">>}]); + false -> + http_reply(Socket, ID, <<"ok">>) + end; + [{<<"jsonrpc">>, <<"2.0">>}, + {<<"id">>, ID}, + {<<"method">>, <<"services_available">>} | _] -> + ct:log("got services_available: ~p", [JSON]), + http_reply(Socket, ID, <<"ok">>); Other -> exo_http_server:response( Socket, undefined, 501, "Internal Error", "Internal Error"), error({unrecognized, Other}) end. +http_reply(Socket, ID, Msg) -> + Reply = [{<<"jsonrpc">>, <<"2.0">>}, + {<<"id">>, ID}, + {<<"result">>, [{<<"status">>, status_reply(Msg)}, + {<<"message">>, Msg}]}], + exo_http_server:response( + Socket, undefined, 200, "OK", + jsx:encode(Reply), + [{'content_type', "application/json"}]). + +status_reply(<<"ok">>) -> 0; +status_reply(_) -> 99. + verify_service_res(Bin) -> {match,_} = re:run(Bin, <<"Service:[\\h]*jlr.com/vin/abc/lock">>, []), @@ -693,8 +860,8 @@ generate_cred(sample, KeyDir, CredDir, _Config) -> " --start='", Start, "'" " --stop='", Stop, "'" " --root_key=", root_keys(), "/root_key.pem" - " --receive='jlr.com/vin/abc/unlock jlr.com/vin/abc/lock'" - " --invoke='jlr.com/vin/abc/lock jlr.com/backend/set_state'" + " --receive='jlr.com/vin/+/unlock jlr.com/vin/+/lock'" + " --invoke='jlr.com/vin/+/lock'" " --jwt_out=", CredDir, "/lock_cred.jwt" " --cred_out=", KeyDir, "/lock_cred.json"]), ok; @@ -709,12 +876,15 @@ generate_cred(backend, KeyDir, CertDir, _Config) -> " --stop='", Stop, "'" " --root_key=", root_keys(), "/root_key.pem" " --receive='jlr.com'" - " --invoke='jlr.com'" + " --invoke='jlr.com genivi.org/backend'" " --jwt_out=", CertDir, "/backend_cred.jwt" " --cred_out=", KeyDir, "/backend_cred.json"]), ok. -generate_sota_cred(sample, KeyDir, CredDir, _Config) -> +generate_sota_cred(Node, KeyDir, CredDir, Config) -> + generate_sota_cred(Node, KeyDir, CredDir, "", "", Config). + +generate_sota_cred(sample, KeyDir, CredDir, Suffix, Sub, _Config) -> %% Don't put sota_cred.json in the certs directory, since rvi_core %% will report a parse failure for it. UUID = uuid(), @@ -727,11 +897,11 @@ generate_sota_cred(sample, KeyDir, CredDir, _Config) -> " --stop='", Stop, "'" " --root_key=", root_keys(), "/root_key.pem" " --receive='jlr.com/vin/abc/store'" - " --invoke='jlr.com/vin/abc/sota jlr.com/backend/set_state'" - " --jwt_out=", CredDir, "/sota_cred.jwt" - " --cred_out=", KeyDir, "/sota_cred.json"]), + " --invoke='jlr.com/vin/abc/sota", Sub, " jlr.com/backend/set_state'" + " --jwt_out=", CredDir, "/sota_cred", Suffix, ".jwt" + " --cred_out=", KeyDir, "/sota_cred", Suffix, ".json"]), ok; -generate_sota_cred(backend, KeyDir, CertDir, _Config) -> +generate_sota_cred(backend, KeyDir, CertDir, Suffix, _Sub, _Config) -> UUID = uuid(), {Start, Stop} = start_stop(), cmd([scripts(), "/rvi_create_credential.py" @@ -743,8 +913,8 @@ generate_sota_cred(backend, KeyDir, CertDir, _Config) -> " --root_key=", root_keys(), "/root_key.pem" " --receive='jlr.com'" " --invoke='jlr.com'" - " --jwt_out=", CertDir, "/sota_backend_cred.jwt" - " --cred_out=", KeyDir, "/sota_backend_cred.json"]), + " --jwt_out=", CertDir, "/sota_backend_cred", Suffix, ".jwt" + " --cred_out=", KeyDir, "/sota_backend_cred", Suffix, ".json"]), ok. subj() -> @@ -778,7 +948,11 @@ env(backend) -> env(sample) -> [env(), " RVI_BACKEND=127.0.0.1" - " RVI_MY_NODE_ADDR=127.0.0.1:9000"]. + " RVI_MY_NODE_ADDR=127.0.0.1:9000"]; +env(sample2) -> + [env(), + " RVI_BACKEND=127.0.0.1" + " RVI_PORT=9100"]. @@ -791,6 +965,9 @@ scripts() -> python() -> [root(), "/python"]. +testfile() -> + root() ++ "/test/testfile.txt". + root_keys() -> "root_keys". diff --git a/test/testfile.txt b/test/testfile.txt new file mode 100644 index 0000000..bc4da5b --- /dev/null +++ b/test/testfile.txt @@ -0,0 +1,3 @@ +aaaaaaaaaa +bbbbbbbbbb +cccccccccc