diff --git a/src/ibrowse.erl b/src/ibrowse.erl index 40b3f97..1653f19 100644 --- a/src/ibrowse.erl +++ b/src/ibrowse.erl @@ -87,6 +87,8 @@ send_req_direct/6, send_req_direct/7, stream_next/1, + send_chunk/2, + send_done/1, stream_close/1, set_max_sessions/3, set_max_pipeline_size/3, @@ -554,6 +556,24 @@ stream_next(Req_id) -> ok end. +send_chunk(Req_id, Data) -> + case ets:lookup(ibrowse_stream, {req_id_pid, Req_id}) of + [] -> + {error, unknown_req_id}; + [{_, Pid}] -> + catch Pid ! {send_chunk, Req_id, Data}, + ok + end. + +send_done(Req_id) -> + case ets:lookup(ibrowse_stream, {req_id_pid, Req_id}) of + [] -> + {error, unknown_req_id}; + [{_, Pid}] -> + catch Pid ! {send_done, Req_id}, + ok + end. + %% @doc Tell ibrowse to close the connection associated with the %% specified stream. Should be used in conjunction with the %% stream_to option. Note that all requests in progress on diff --git a/src/ibrowse_http_client.erl b/src/ibrowse_http_client.erl index b014e92..a7b2204 100644 --- a/src/ibrowse_http_client.erl +++ b/src/ibrowse_http_client.erl @@ -252,6 +252,42 @@ handle_info({trace, Bool}, State) -> put(my_trace_flag, Bool), {noreply, State}; +handle_info({send_chunk, Req_id, Data}, + #state{cur_req = #request{req_id = Req_id}} = State) -> + Current_request = State#state.cur_req, + Options = Current_request#request.options, + TE = is_chunked_encoding_specified(Options), + + case do_send(maybe_chunked_encode(Data, TE), State) of + ok -> + {noreply, State}; + _ -> + ets:delete(ibrowse_stream, {req_id_pid, Req_id}), + shutting_down(State), + {stop, normal, State} + end; + +handle_info({send_done, Req_id}, + #state{cur_req = #request{req_id = Req_id}} = State) -> + Current_request = State#state.cur_req, + Options = Current_request#request.options, + TE = is_chunked_encoding_specified(Options), + case TE of + true -> do_send(<<"0\r\n\r\n">>, State); + _ -> ok + end, + State_2 = inc_pipeline_counter(State), + active_once(State_2), + State_3 = case State#state.status of + idle -> + do_trace("Request send completely, switch to get_header state~n", []), + State_2#state{status = get_header}; + _ -> + State_2 + end, + State_4 = set_inac_timer(State_3), + {noreply, State_4}; + handle_info(Info, State) -> io:format("Unknown message recvd for ~1000.p:~1000.p -> ~p~n", [State#state.host, State#state.port, Info]), @@ -737,17 +773,21 @@ send_req_1(From, ReqId = make_req_id(), Resp_format = get_value(response_format, Options, list), Caller_socket_options = get_value(socket_options, Options, []), + Streaming_request = is_streaming_request(Options), + Async_pid_rec = {{req_id_pid, ReqId}, self()}, {StreamTo, Caller_controls_socket} = case get_value(stream_to, Options, undefined) of {Caller, once} when is_pid(Caller) or is_atom(Caller) -> - Async_pid_rec = {{req_id_pid, ReqId}, self()}, true = ets:insert(ibrowse_stream, Async_pid_rec), {Caller, true}; undefined -> {undefined, false}; Caller when is_pid(Caller) or is_atom(Caller) -> + case Streaming_request of true -> + true = ets:insert(ibrowse_stream, Async_pid_rec) + end, {Caller, false}; Stream_to_inv -> exit({invalid_option, {stream_to, Stream_to_inv}}) @@ -782,8 +822,12 @@ send_req_1(From, trace_request(Req), do_setopts(Socket, Caller_socket_options, State_1), TE = is_chunked_encoding_specified(Options), - case do_send(Req, State_1) of - ok -> + case {do_send(Req, State_1), Streaming_request} of + {ok, true} -> + State_2 = State_1#state{cur_req = NewReq}, + do_trace("dont send request body immediately, return reqid~n", []), + {reply, {ibrowse_req_id, ReqId}, State_2}; + {ok, false} -> case do_send_body(Body_1, State_1, TE) of ok -> trace_request_body(Body_1), @@ -804,7 +848,7 @@ send_req_1(From, end, State_4 = set_inac_timer(State_3), {noreply, State_4}; - Err -> + {Err, _} -> shutting_down(State_1), do_trace("Send failed... Reason: ~p~n", [Err]), gen_server:reply(From, {error, {send_failed, Err}}), @@ -937,6 +981,9 @@ is_chunked_encoding_specified(Options) -> true end. +is_streaming_request(Options) -> + get_value(stream_request, Options, false). + http_vsn_string({0,9}) -> "HTTP/0.9"; http_vsn_string({1,0}) -> "HTTP/1.0"; http_vsn_string({1,1}) -> "HTTP/1.1". @@ -1926,5 +1973,7 @@ trace_request_body(Body) -> ok end. +to_binary({X, _}) when is_function(X) -> to_binary(X); +to_binary(X) when is_function(X) -> <<"body generated by function">>; to_binary(X) when is_list(X) -> list_to_binary(X); to_binary(X) when is_binary(X) -> X. diff --git a/test/ibrowse_test.erl b/test/ibrowse_test.erl index d97f76c..5e2e4b9 100644 --- a/test/ibrowse_test.erl +++ b/test/ibrowse_test.erl @@ -27,7 +27,13 @@ test_head_transfer_encoding/0, test_head_transfer_encoding/1, test_head_response_with_body/0, - test_head_response_with_body/1 + test_head_response_with_body/1, + i_do_streaming_request/4, + i_do_streaming_request2/2, + test_put_request/0, + test_put_request/1, + test_put_request_chunked/0, + test_put_request_chunked/1 ]). test_stream_once(Url, Method, Options) -> @@ -233,7 +239,9 @@ dump_errors(Key, Iod) -> {local_test_fun, test_20122010, []}, {local_test_fun, test_pipeline_head_timeout, []}, {local_test_fun, test_head_transfer_encoding, []}, - {local_test_fun, test_head_response_with_body, []} + {local_test_fun, test_head_response_with_body, []}, + {local_test_fun, test_put_request, []}, + {local_test_fun, test_put_request_chunked, []} ]). unit_tests() -> @@ -283,13 +291,13 @@ verify_chunked_streaming(Options) -> [{response_format, binary} | Options]), io:format(" Fetching data with streaming as list...~n", []), Async_response_list = do_async_req_list( - Url, get, [{response_format, list} | Options]), + Url, get, i_do_async_req_list, [{response_format, list} | Options]), io:format(" Fetching data with streaming as binary...~n", []), Async_response_bin = do_async_req_list( - Url, get, [{response_format, binary} | Options]), + Url, get, i_do_async_req_list, [{response_format, binary} | Options]), io:format(" Fetching data with streaming as binary, {active, once}...~n", []), Async_response_bin_once = do_async_req_list( - Url, get, [once, {response_format, binary} | Options]), + Url, get, i_do_async_req_list, [once, {response_format, binary} | Options]), Res1 = compare_responses(Result_without_streaming, Async_response_list, Async_response_bin), Res2 = compare_responses(Result_without_streaming, Async_response_list, Async_response_bin_once), case {Res1, Res2} of @@ -307,7 +315,7 @@ test_chunked_streaming_once(Options) -> Url = "http://www.httpwatch.com/httpgallery/chunked/", io:format(" URL: ~s~n", [Url]), io:format(" Fetching data with streaming as binary, {active, once}...~n", []), - case do_async_req_list(Url, get, [once, {response_format, binary} | Options]) of + case do_async_req_list(Url, get, i_do_async_req_list, [once, {response_format, binary} | Options]) of {ok, _, _, _} -> io:format(" Success!~n", []); Err -> @@ -344,8 +352,8 @@ compare_responses(R1, R2, R3) -> %% do_async_req_list(Url, Method, [{stream_to, self()}, %% {stream_chunk_size, 1000}]). -do_async_req_list(Url, Method, Options) -> - {Pid,_} = erlang:spawn_monitor(?MODULE, i_do_async_req_list, +do_async_req_list(Url, Method, Fun, Options) -> + {Pid,_} = erlang:spawn_monitor(?MODULE, Fun, [self(), Url, Method, Options ++ [{stream_chunk_size, 1000}]]), %% io:format("Spawned process ~p~n", [Pid]), @@ -415,6 +423,26 @@ maybe_stream_next(Req_id, Options) -> ok end. +i_do_streaming_request(Parent, Url, Method, Options) -> + {Headers, Options_1} = case lists:member(chunked, Options) of + true -> {[], [{transfer_encoding, chunked} | (Options -- [chunked])]}; + false -> {[{"Content-Length", "6"}], Options} + end, + Res = ibrowse:send_req(Url, Headers, Method, <<"">>, + [{stream_to, self()} | Options_1]), + case Res of + {ibrowse_req_id, Req_id} -> + Result = i_do_streaming_request2(Req_id, Options), + Parent ! {async_result, self(), Result}; + Err -> + Parent ! {async_result, self(), Err} + end. +i_do_streaming_request2(Req_id, Options) -> + ibrowse:send_chunk(Req_id, <<"aaa">>), + ibrowse:send_chunk(Req_id, <<"bbb">>), + ibrowse:send_done(Req_id), + wait_for_async_resp(Req_id, Options, undefined, undefined, []). + execute_req(local_test_fun, Method, Args) -> io:format(" ~-54.54w: ", [Method]), Result = (catch apply(?MODULE, Method, Args)), @@ -623,3 +651,29 @@ do_trace(true, Fmt, Args) -> io:format("~s -- " ++ Fmt, [ibrowse_lib:printable_date() | Args]); do_trace(_, _, _) -> ok. + +test_put_request() -> + clear_msg_q(), + test_put_request("http://localhost:8181/ibrowse_put_request"). + +test_put_request(Url) -> + case do_async_req_list(Url, put, i_do_streaming_request, + [{stream_request, true}]) of + {ok, "204", _, _} -> + io:format(" Success!~n", []); + Err -> + io:format(" Fail: ~p~n", [Err]) + end. + +test_put_request_chunked() -> + clear_msg_q(), + test_put_request_chunked("http://localhost:8181/ibrowse_put_request"). + +test_put_request_chunked(Url) -> + case do_async_req_list(Url, put, i_do_streaming_request, + [chunked, {stream_request, true}]) of + {ok, "204", _, _} -> + io:format(" Success!~n", []); + Err -> + io:format(" Fail: ~p~n", [Err]) + end. diff --git a/test/ibrowse_test_server.erl b/test/ibrowse_test_server.erl index 75d1b44..c84b619 100644 --- a/test/ibrowse_test_server.erl +++ b/test/ibrowse_test_server.erl @@ -159,6 +159,12 @@ process_request(Sock, Sock_type, uri = {abs_path, "/ibrowse_head_test"}}) -> Resp = <<"HTTP/1.1 200 OK\r\nServer: Apache-Coyote/1.1\r\nTransfer-Encoding: chunked\r\nDate: Wed, 04 Apr 2012 16:53:49 GMT\r\nConnection: close\r\n\r\n">>, do_send(Sock, Sock_type, Resp); +process_request(Sock, Sock_type, + #request{method='PUT', + headers = _Headers, + uri = {abs_path, "/ibrowse_put_request"}}) -> + Resp = <<"HTTP/1.1 204 No Content\r\nConnection: close\r\nContent-Length: 0\r\n\r\n">>, + do_send(Sock, Sock_type, Resp); process_request(Sock, Sock_type, Req) -> do_trace("Recvd req: ~p~n", [Req]), Resp = <<"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n">>,