Skip to content

Commit

Permalink
More aggressively purge buffered messages (#19)
Browse files Browse the repository at this point in the history
  • Loading branch information
mworrell authored May 28, 2024
1 parent 186b3f1 commit 9126797
Showing 1 changed file with 77 additions and 48 deletions.
125 changes: 77 additions & 48 deletions src/mqtt_sessions_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,16 @@
]).

-define(MAX_PACKET_ID, 65535).
-define(QUEUE_PURGE_LEN, 500). % Allow 500 packages in the queue before triggering a purge.
-define(RECEIVE_MAXIMUM, 65535).
-define(KEEP_ALIVE_DEFAULT, 30). % Default keep alive in seconds
-define(SESSION_EXPIRY, 600). % Default session expiration
-define(SESSION_EXPIRY_DEFAULT, 3600). % Maximum allowed session expiration
-define(MESSAGE_EXPIRY_DEFAULT, 3600).

-define(MAX_INFLIGHT, 500). % Max in-flight messages for any QoS
-define(MAX_INFLIGHT_ACK, 100). % Max in-flight messages waiting with QoS 1 or 2


-define(KILL_TIMEOUT, 5000).

-type packet_id() :: 0..65535. % ?MAX_PACKET_ID
Expand Down Expand Up @@ -105,7 +108,7 @@
packet_id = undefined :: undefined | non_neg_integer(),
queued :: non_neg_integer(),
expiry :: non_neg_integer(),
qos :: 0..2,
qos = 0 :: 0..2,
message :: mqtt_packet_map:mqtt_packet()
}).

Expand Down Expand Up @@ -473,7 +476,7 @@ handle_connect_auth_1({ok, #{ type := connack, reason_code := ?MQTT_RC_SUCCESS }
State2 = reply_connack(ConnAck1, State1),
mqtt_sessions_will:connected(State2#state.will_pid, StateIfAccept#state.will,
State2#state.session_expiry_interval, State2#state.user_context),
State3 = resend_unacknowledged( cleanup_pending(State2) ),
State3 = resend_unacknowledged( cleanup_pending_qos0(State2) ),
{ok, State3};
handle_connect_auth_1({ok, #{ type := connack, reason_code := ReasonCode } = ConnAck, _UserContext1}, _Msg, StateIfAccept, _State) ->
_ = reply_connack(ConnAck, StateIfAccept),
Expand Down Expand Up @@ -778,39 +781,45 @@ relay_publish(#{ type := publish, message := Msg } = MqttMsg, State) ->
qos => QoS,
dup => false
}),
{StateN, MsgN} = case QoS of
StatePurged = maybe_purge(State),
case QoS of
0 ->
{State, Msg2#{ packet_id => 0 }};
reply(Msg2#{ packet_id => 0 }, StatePurged);
_ ->
State1 = #state{ packet_id = PacketId } = inc_packet_id(State),
State2 = #state{ msg_nr = MsgNr } = inc_msg_nr(State1),
AckRec = case QoS of
1 -> puback;
2 -> pubrec
end,
Msg3 = Msg2#{
packet_id => PacketId
},
State3 = State2#state{
awaiting_ack = (State2#state.awaiting_ack)#{ PacketId => {MsgNr, AckRec, Msg3} }
},
{State3, Msg3}
end,
reply(MsgN, StateN).
case maps:size(StatePurged#state.awaiting_ack) > ?MAX_INFLIGHT_ACK of
true ->
?LOG_DEBUG(#{
in => mqtt_session,
text => <<"Dropping QoS 1/2 message, too many inflight acks">>,
result => error,
reason => buffer_full
}),
StatePurged;
false ->
State1 = #state{ packet_id = PacketId } = inc_packet_id(StatePurged),
State2 = #state{ msg_nr = MsgNr } = inc_msg_nr(State1),
AckRec = case QoS of
1 -> puback;
2 -> pubrec
end,
Msg3 = Msg2#{
packet_id => PacketId
},
State3 = State2#state{
awaiting_ack = (State2#state.awaiting_ack)#{ PacketId => {MsgNr, AckRec, Msg3} }
},
reply(Msg3, State3)
end
end.


% ---------------------------------------------------------------------------------------
% ------------------------------- queue functions ---------------------------------------
% ---------------------------------------------------------------------------------------

cleanup_pending(#state{ pending = Pending } = State) ->
L1 = lists:filter(
fun
(#{ type := publish, qos := 0 }) -> true;
(_) -> false
end,
queue:to_list(Pending)),
State#state{ pending = queue:from_list(L1) }.
cleanup_pending_qos0(#state{ pending = Pending } = State) ->
Pending1 = queue:filter(fun(#queued{ qos = QoS }) -> QoS > 0 end, Pending),
State#state{ pending = Pending1 }.

resend_unacknowledged(#state{ awaiting_ack = AwaitAck } = State) ->
Msgs = maps:fold(
Expand Down Expand Up @@ -852,7 +861,7 @@ do_disconnected(#state{ will_pid = WillPid } = State) ->

%% @todo Cleanup pending messages and awaiting states.
cleanup_state_disconnected(State) ->
cleanup_pending(State#state{
cleanup_pending_qos0(State#state{
pending_connack = undefined,
connection_pid = undefined,
transport = undefined,
Expand Down Expand Up @@ -923,13 +932,15 @@ disconnect_transport(#state{ transport = Transport } = State) when is_function(T
reply(undefined, State) ->
State;
reply(Msg, #state{ transport = undefined } = State) ->
queue(Msg, State);
State1 = maybe_purge(State),
queue(Msg, State1);
reply(Msg, State) ->
case send_transport(Msg, State) of
State1 = maybe_purge(State),
case send_transport(Msg, State1) of
ok ->
State;
State1;
{error, _} ->
queue(Msg, State#state{ transport = undefined })
queue(Msg, State1#state{ transport = undefined })
end.

send_transport(_Msg, #state{ transport = undefined }) ->
Expand Down Expand Up @@ -967,13 +978,25 @@ queue_1(#{ type := Type } = Msg, #state{ msg_nr = MsgNr, pending = Pending } = S
qos = maps:get(qos, Msg, 1),
message = Msg
},
State#state{ pending = queue:in(Item, Pending)}.

State#state{ pending = queue:in(Item, maybe_purge(Pending)) }.

maybe_purge(Queue) ->
case queue:len(Queue) > ?QUEUE_PURGE_LEN of
true -> purge(Queue);
false -> Queue
maybe_purge(#state{ pending = Queue, awaiting_ack = WaitAcks } = State) ->
case queue:len(Queue) > ?MAX_INFLIGHT orelse maps:size(WaitAcks) > ?MAX_INFLIGHT_ACK of
true ->
PurgedQueue = purge(Queue),
PacketIds = queue:fold(
fun
(#queued{ qos = 0 }, Acc) -> Acc;
(#queued{ packet_id = PacketId }, Acc) -> [ PacketId | Acc ]
end,
[],
PurgedQueue),
State#state{
pending = PurgedQueue,
awaiting_ack = maps:with(PacketIds, WaitAcks)
};
false ->
State
end.

purge(Queue) ->
Expand All @@ -983,15 +1006,21 @@ purge(Queue) ->
PurgeTime = mqtt_sessions_timestamp:timestamp(),
QoS0PurgeAge = (Newest - Oldest) / 2,

queue:filter(fun(#queued{ qos = QoS, queued = Queued, expiry = Expiry }) ->
case QoS of
0 ->
PurgeTime < Expiry andalso PurgeTime < (Queued + QoS0PurgeAge);
_ ->
PurgeTime < Expiry
end
end,
Queue).
Queue1 = queue:filter(
fun
(#queued{ qos = 0, queued = Queued, expiry = Expiry }) ->
PurgeTime < Expiry andalso PurgeTime < (Queued + QoS0PurgeAge);
(#queued{ expiry = Expiry }) ->
PurgeTime < Expiry
end,
Queue),
case queue:len(Queue1) > ?MAX_INFLIGHT of
true ->
% Drop all QoS 0 messages
queue:filter(fun(#queued{ qos = QoS }) -> QoS > 0 end, Queue1);
false ->
Queue1
end.

-spec encode( mqtt_packet_map:mqtt_version(), mqtt_packet_map:mqtt_packet() | list( mqtt_packet_map:mqtt_packet() )) -> binary().
encode(ProtocolVersion, Msg) when is_map(Msg) ->
Expand Down

0 comments on commit 9126797

Please sign in to comment.