diff --git a/src/mqtt_sessions_process.erl b/src/mqtt_sessions_process.erl index 7eb2ad0..28ad24b 100644 --- a/src/mqtt_sessions_process.erl +++ b/src/mqtt_sessions_process.erl @@ -54,13 +54,16 @@ ]). -define(MAX_PACKET_ID, 65535). --define(QUEUE_PURGE_LEN, 500). % Allow 500 packages in the queue before triggering a purge. -define(RECEIVE_MAXIMUM, 65535). -define(KEEP_ALIVE_DEFAULT, 30). % Default keep alive in seconds -define(SESSION_EXPIRY, 600). % Default session expiration -define(SESSION_EXPIRY_DEFAULT, 3600). % Maximum allowed session expiration -define(MESSAGE_EXPIRY_DEFAULT, 3600). +-define(MAX_INFLIGHT, 500). % Max in-flight messages for any QoS +-define(MAX_INFLIGHT_ACK, 100). % Max in-flight messages waiting with QoS 1 or 2 + + -define(KILL_TIMEOUT, 5000). -type packet_id() :: 0..65535. % ?MAX_PACKET_ID @@ -105,7 +108,7 @@ packet_id = undefined :: undefined | non_neg_integer(), queued :: non_neg_integer(), expiry :: non_neg_integer(), - qos :: 0..2, + qos = 0 :: 0..2, message :: mqtt_packet_map:mqtt_packet() }). @@ -473,7 +476,7 @@ handle_connect_auth_1({ok, #{ type := connack, reason_code := ?MQTT_RC_SUCCESS } State2 = reply_connack(ConnAck1, State1), mqtt_sessions_will:connected(State2#state.will_pid, StateIfAccept#state.will, State2#state.session_expiry_interval, State2#state.user_context), - State3 = resend_unacknowledged( cleanup_pending(State2) ), + State3 = resend_unacknowledged( cleanup_pending_qos0(State2) ), {ok, State3}; handle_connect_auth_1({ok, #{ type := connack, reason_code := ReasonCode } = ConnAck, _UserContext1}, _Msg, StateIfAccept, _State) -> _ = reply_connack(ConnAck, StateIfAccept), @@ -778,39 +781,45 @@ relay_publish(#{ type := publish, message := Msg } = MqttMsg, State) -> qos => QoS, dup => false }), - {StateN, MsgN} = case QoS of + StatePurged = maybe_purge(State), + case QoS of 0 -> - {State, Msg2#{ packet_id => 0 }}; + reply(Msg2#{ packet_id => 0 }, StatePurged); _ -> - State1 = #state{ packet_id = PacketId } = inc_packet_id(State), - State2 = #state{ msg_nr = MsgNr } = inc_msg_nr(State1), - AckRec = case QoS of - 1 -> puback; - 2 -> pubrec - end, - Msg3 = Msg2#{ - packet_id => PacketId - }, - State3 = State2#state{ - awaiting_ack = (State2#state.awaiting_ack)#{ PacketId => {MsgNr, AckRec, Msg3} } - }, - {State3, Msg3} - end, - reply(MsgN, StateN). + case maps:size(StatePurged#state.awaiting_ack) > ?MAX_INFLIGHT_ACK of + true -> + ?LOG_DEBUG(#{ + in => mqtt_session, + text => <<"Dropping QoS 1/2 message, too many inflight acks">>, + result => error, + reason => buffer_full + }), + StatePurged; + false -> + State1 = #state{ packet_id = PacketId } = inc_packet_id(StatePurged), + State2 = #state{ msg_nr = MsgNr } = inc_msg_nr(State1), + AckRec = case QoS of + 1 -> puback; + 2 -> pubrec + end, + Msg3 = Msg2#{ + packet_id => PacketId + }, + State3 = State2#state{ + awaiting_ack = (State2#state.awaiting_ack)#{ PacketId => {MsgNr, AckRec, Msg3} } + }, + reply(Msg3, State3) + end + end. % --------------------------------------------------------------------------------------- % ------------------------------- queue functions --------------------------------------- % --------------------------------------------------------------------------------------- -cleanup_pending(#state{ pending = Pending } = State) -> - L1 = lists:filter( - fun - (#{ type := publish, qos := 0 }) -> true; - (_) -> false - end, - queue:to_list(Pending)), - State#state{ pending = queue:from_list(L1) }. +cleanup_pending_qos0(#state{ pending = Pending } = State) -> + Pending1 = queue:filter(fun(#queued{ qos = QoS }) -> QoS > 0 end, Pending), + State#state{ pending = Pending1 }. resend_unacknowledged(#state{ awaiting_ack = AwaitAck } = State) -> Msgs = maps:fold( @@ -852,7 +861,7 @@ do_disconnected(#state{ will_pid = WillPid } = State) -> %% @todo Cleanup pending messages and awaiting states. cleanup_state_disconnected(State) -> - cleanup_pending(State#state{ + cleanup_pending_qos0(State#state{ pending_connack = undefined, connection_pid = undefined, transport = undefined, @@ -923,13 +932,15 @@ disconnect_transport(#state{ transport = Transport } = State) when is_function(T reply(undefined, State) -> State; reply(Msg, #state{ transport = undefined } = State) -> - queue(Msg, State); + State1 = maybe_purge(State), + queue(Msg, State1); reply(Msg, State) -> - case send_transport(Msg, State) of + State1 = maybe_purge(State), + case send_transport(Msg, State1) of ok -> - State; + State1; {error, _} -> - queue(Msg, State#state{ transport = undefined }) + queue(Msg, State1#state{ transport = undefined }) end. send_transport(_Msg, #state{ transport = undefined }) -> @@ -967,13 +978,25 @@ queue_1(#{ type := Type } = Msg, #state{ msg_nr = MsgNr, pending = Pending } = S qos = maps:get(qos, Msg, 1), message = Msg }, + State#state{ pending = queue:in(Item, Pending)}. - State#state{ pending = queue:in(Item, maybe_purge(Pending)) }. - -maybe_purge(Queue) -> - case queue:len(Queue) > ?QUEUE_PURGE_LEN of - true -> purge(Queue); - false -> Queue +maybe_purge(#state{ pending = Queue, awaiting_ack = WaitAcks } = State) -> + case queue:len(Queue) > ?MAX_INFLIGHT orelse maps:size(WaitAcks) > ?MAX_INFLIGHT_ACK of + true -> + PurgedQueue = purge(Queue), + PacketIds = queue:fold( + fun + (#queued{ qos = 0 }, Acc) -> Acc; + (#queued{ packet_id = PacketId }, Acc) -> [ PacketId | Acc ] + end, + [], + PurgedQueue), + State#state{ + pending = PurgedQueue, + awaiting_ack = maps:with(PacketIds, WaitAcks) + }; + false -> + State end. purge(Queue) -> @@ -983,15 +1006,21 @@ purge(Queue) -> PurgeTime = mqtt_sessions_timestamp:timestamp(), QoS0PurgeAge = (Newest - Oldest) / 2, - queue:filter(fun(#queued{ qos = QoS, queued = Queued, expiry = Expiry }) -> - case QoS of - 0 -> - PurgeTime < Expiry andalso PurgeTime < (Queued + QoS0PurgeAge); - _ -> - PurgeTime < Expiry - end - end, - Queue). + Queue1 = queue:filter( + fun + (#queued{ qos = 0, queued = Queued, expiry = Expiry }) -> + PurgeTime < Expiry andalso PurgeTime < (Queued + QoS0PurgeAge); + (#queued{ expiry = Expiry }) -> + PurgeTime < Expiry + end, + Queue), + case queue:len(Queue1) > ?MAX_INFLIGHT of + true -> + % Drop all QoS 0 messages + queue:filter(fun(#queued{ qos = QoS }) -> QoS > 0 end, Queue1); + false -> + Queue1 + end. -spec encode( mqtt_packet_map:mqtt_version(), mqtt_packet_map:mqtt_packet() | list( mqtt_packet_map:mqtt_packet() )) -> binary(). encode(ProtocolVersion, Msg) when is_map(Msg) ->