diff --git a/deps/rabbit/src/rabbit_networking.erl b/deps/rabbit/src/rabbit_networking.erl index 82371ec9c2cd..6788336df0e1 100644 --- a/deps/rabbit/src/rabbit_networking.erl +++ b/deps/rabbit/src/rabbit_networking.erl @@ -559,7 +559,7 @@ failed_to_recv_proxy_header(Ref, Error) -> end, rabbit_log:debug(Msg, [Error]), % The following call will clean up resources then exit - _ = catch ranch:handshake(Ref), + _ = ranch:handshake(Ref), exit({shutdown, failed_to_recv_proxy_header}). handshake(Ref, ProxyProtocolEnabled) -> @@ -571,22 +571,14 @@ handshake(Ref, ProxyProtocolEnabled) -> {error, protocol_error, Error} -> failed_to_recv_proxy_header(Ref, Error); {ok, ProxyInfo} -> - case catch ranch:handshake(Ref) of - {'EXIT', normal} -> - {error, handshake_failed}; - {ok, Sock} -> - ok = tune_buffer_size(Sock), - {ok, {rabbit_proxy_socket, Sock, ProxyInfo}} - end + {ok, Sock} = ranch:handshake(Ref), + ok = tune_buffer_size(Sock), + {ok, {rabbit_proxy_socket, Sock, ProxyInfo}} end; false -> - case catch ranch:handshake(Ref) of - {'EXIT', normal} -> - {error, handshake_failed}; - {ok, Sock} -> - ok = tune_buffer_size(Sock), - {ok, Sock} - end + {ok, Sock} = ranch:handshake(Ref), + ok = tune_buffer_size(Sock), + {ok, Sock} end. tune_buffer_size(Sock) -> diff --git a/deps/rabbit/src/rabbit_reader.erl b/deps/rabbit/src/rabbit_reader.erl index 01c3f0cb4eb8..da5eda69f057 100644 --- a/deps/rabbit/src/rabbit_reader.erl +++ b/deps/rabbit/src/rabbit_reader.erl @@ -164,10 +164,6 @@ shutdown(Pid, Explanation) -> no_return(). init(Parent, HelperSups, Ref) -> ?LG_PROCESS_TYPE(reader), - %% Note: - %% This function could return an error if the handshake times out. - %% It is less likely to happen here as compared to MQTT, so - %% crashing with a `badmatch` seems appropriate. {ok, Sock} = rabbit_networking:handshake(Ref, application:get_env(rabbit, proxy_protocol, false)), Deb = sys:debug_options([]), diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl index 2ff0a6920611..94925d75fb9c 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl @@ -71,39 +71,34 @@ close_connection(Pid, Reason) -> init(Ref) -> process_flag(trap_exit, true), logger:set_process_metadata(#{domain => ?RMQLOG_DOMAIN_CONN ++ [mqtt]}), - ProxyProtocolEnabled = application:get_env(?APP_NAME, proxy_protocol, false), - case rabbit_networking:handshake(Ref, ProxyProtocolEnabled) of + {ok, Sock} = rabbit_networking:handshake(Ref, + application:get_env(?APP_NAME, proxy_protocol, false)), + RealSocket = rabbit_net:unwrap_socket(Sock), + case rabbit_net:connection_string(Sock, inbound) of + {ok, ConnStr} -> + ConnName = rabbit_data_coercion:to_binary(ConnStr), + ?LOG_DEBUG("MQTT accepting TCP connection ~tp (~ts)", [self(), ConnName]), + _ = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), + LoginTimeout = application:get_env(?APP_NAME, login_timeout, 10_000), + erlang:send_after(LoginTimeout, self(), login_timeout), + State0 = #state{socket = RealSocket, + proxy_socket = rabbit_net:maybe_get_proxy_socket(Sock), + conn_name = ConnName, + await_recv = false, + connection_state = running, + conserve = false, + parse_state = rabbit_mqtt_packet:init_state()}, + State1 = control_throttle(State0), + State = rabbit_event:init_stats_timer(State1, #state.stats_timer), + gen_server:enter_loop(?MODULE, [], State); + {error, Reason = enotconn} -> + ?LOG_INFO("MQTT could not get connection string: ~s", [Reason]), + rabbit_net:fast_close(RealSocket), + ignore; {error, Reason} -> - ?LOG_ERROR("MQTT could not establish connection: ~s", [Reason]), - {stop, Reason}; - {ok, Sock} -> - RealSocket = rabbit_net:unwrap_socket(Sock), - case rabbit_net:connection_string(Sock, inbound) of - {ok, ConnStr} -> - ConnName = rabbit_data_coercion:to_binary(ConnStr), - ?LOG_DEBUG("MQTT accepting TCP connection ~tp (~ts)", [self(), ConnName]), - _ = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), - LoginTimeout = application:get_env(?APP_NAME, login_timeout, 10_000), - erlang:send_after(LoginTimeout, self(), login_timeout), - State0 = #state{socket = RealSocket, - proxy_socket = rabbit_net:maybe_get_proxy_socket(Sock), - conn_name = ConnName, - await_recv = false, - connection_state = running, - conserve = false, - parse_state = rabbit_mqtt_packet:init_state()}, - State1 = control_throttle(State0), - State = rabbit_event:init_stats_timer(State1, #state.stats_timer), - gen_server:enter_loop(?MODULE, [], State); - {error, Reason = enotconn} -> - ?LOG_INFO("MQTT could not get connection string: ~s", [Reason]), - rabbit_net:fast_close(RealSocket), - ignore; - {error, Reason} -> - ?LOG_ERROR("MQTT could not get connection string: ~p", [Reason]), - rabbit_net:fast_close(RealSocket), - {stop, Reason} - end + ?LOG_ERROR("MQTT could not get connection string: ~p", [Reason]), + rabbit_net:fast_close(RealSocket), + {stop, Reason} end. handle_call({info, InfoItems}, _From, State) -> diff --git a/deps/rabbitmq_stomp/src/rabbit_stomp_reader.erl b/deps/rabbitmq_stomp/src/rabbit_stomp_reader.erl index ccf7af95f24a..7bb9b8986bf6 100644 --- a/deps/rabbitmq_stomp/src/rabbit_stomp_reader.erl +++ b/deps/rabbitmq_stomp/src/rabbit_stomp_reader.erl @@ -63,55 +63,51 @@ close_connection(Pid, Reason) -> init([SupHelperPid, Ref, Configuration]) -> process_flag(trap_exit, true), - ProxyProtocolEnabled = application:get_env(rabbitmq_stomp, proxy_protocol, false), - case rabbit_networking:handshake(Ref, ProxyProtocolEnabled) of + {ok, Sock} = rabbit_networking:handshake(Ref, + application:get_env(rabbitmq_stomp, proxy_protocol, false)), + RealSocket = rabbit_net:unwrap_socket(Sock), + + case rabbit_net:connection_string(Sock, inbound) of + {ok, ConnStr} -> + ConnName = rabbit_data_coercion:to_binary(ConnStr), + ProcInitArgs = processor_args(Configuration, Sock), + ProcState = rabbit_stomp_processor:initial_state(Configuration, + ProcInitArgs), + + rabbit_log_connection:info("accepting STOMP connection ~tp (~ts)", + [self(), ConnName]), + + ParseState = rabbit_stomp_frame:initial_state(), + _ = register_resource_alarm(), + + LoginTimeout = application:get_env(rabbitmq_stomp, login_timeout, 10_000), + MaxFrameSize = application:get_env(rabbitmq_stomp, max_frame_size, ?DEFAULT_MAX_FRAME_SIZE), + erlang:send_after(LoginTimeout, self(), login_timeout), + + gen_server2:enter_loop(?MODULE, [], + rabbit_event:init_stats_timer( + run_socket(control_throttle( + #reader_state{socket = RealSocket, + conn_name = ConnName, + parse_state = ParseState, + processor_state = ProcState, + heartbeat_sup = SupHelperPid, + heartbeat = {none, none}, + max_frame_size = MaxFrameSize, + current_frame_size = 0, + state = running, + conserve_resources = false, + recv_outstanding = false})), #reader_state.stats_timer), + {backoff, 1000, 1000, 10000}); + {error, enotconn} -> + rabbit_net:fast_close(RealSocket), + terminate(shutdown, undefined); {error, Reason} -> - rabbit_log_connection:error( - "STOMP could not establish connection: ~s", [Reason]), - {stop, Reason}; - {ok, Sock} -> - RealSocket = rabbit_net:unwrap_socket(Sock), - case rabbit_net:connection_string(Sock, inbound) of - {ok, ConnStr} -> - ConnName = rabbit_data_coercion:to_binary(ConnStr), - ProcInitArgs = processor_args(Configuration, Sock), - ProcState = rabbit_stomp_processor:initial_state(Configuration, - ProcInitArgs), - - rabbit_log_connection:info("accepting STOMP connection ~tp (~ts)", - [self(), ConnName]), - - ParseState = rabbit_stomp_frame:initial_state(), - _ = register_resource_alarm(), - - LoginTimeout = application:get_env(rabbitmq_stomp, login_timeout, 10_000), - MaxFrameSize = application:get_env(rabbitmq_stomp, max_frame_size, ?DEFAULT_MAX_FRAME_SIZE), - erlang:send_after(LoginTimeout, self(), login_timeout), - - gen_server2:enter_loop(?MODULE, [], - rabbit_event:init_stats_timer( - run_socket(control_throttle( - #reader_state{socket = RealSocket, - conn_name = ConnName, - parse_state = ParseState, - processor_state = ProcState, - heartbeat_sup = SupHelperPid, - heartbeat = {none, none}, - max_frame_size = MaxFrameSize, - current_frame_size = 0, - state = running, - conserve_resources = false, - recv_outstanding = false})), #reader_state.stats_timer), - {backoff, 1000, 1000, 10000}); - {error, enotconn} -> - rabbit_net:fast_close(RealSocket), - terminate(shutdown, undefined); - {error, Reason} -> - rabbit_net:fast_close(RealSocket), - terminate({network_error, Reason}, undefined) - end + rabbit_net:fast_close(RealSocket), + terminate({network_error, Reason}, undefined) end. + handle_call({info, InfoItems}, _From, State) -> Infos = lists:map( fun(InfoItem) -> diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index d736b35212fd..ffada5519745 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -136,13 +136,10 @@ init([KeepaliveSup, heartbeat := Heartbeat, transport := ConnTransport}]) -> process_flag(trap_exit, true), - ProxyProtocolEnabled = - application:get_env(rabbitmq_stream, proxy_protocol, false), - %% Note: - %% This function could return an error if the handshake times out. - %% It is less likely to happen here as compared to MQTT, so - %% crashing with a `badmatch` seems appropriate. - {ok, Sock} = rabbit_networking:handshake(Ref, ProxyProtocolEnabled), + {ok, Sock} = + rabbit_networking:handshake(Ref, + application:get_env(rabbitmq_stream, + proxy_protocol, false)), RealSocket = rabbit_net:unwrap_socket(Sock), case rabbit_net:connection_string(Sock, inbound) of {ok, ConnStr} ->