Skip to content

Commit

Permalink
Merge pull request #8 from zotonic/wip-session-queue-purge
Browse files Browse the repository at this point in the history
Added a simple strategy to expire messages from the pending queue
  • Loading branch information
mmzeeman authored Oct 11, 2021
2 parents 2167678 + 571a142 commit 93034b4
Showing 1 changed file with 26 additions and 2 deletions.
28 changes: 26 additions & 2 deletions src/mqtt_sessions_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@


%% TODO: Limit in-flight acks (both ways)
%% TODO: Drop outgoing QoS 0 messages if pending gets too large
%% TODO: Refuse incoming publish messages if too many publish_jobs
%% TODO: Limit incoming_data buffer size

Expand Down Expand Up @@ -55,6 +54,7 @@
]).

-define(MAX_PACKET_ID, 65535).
-define(QUEUE_PURGE_LEN, 500). % Allow 500 packages in the queue before triggering a purge.
-define(RECEIVE_MAXIMUM, 65535).
-define(KEEP_ALIVE_DEFAULT, 30). % Default keep alive in seconds
-define(SESSION_EXPIRY, 600). % Default session expiration
Expand Down Expand Up @@ -256,6 +256,7 @@ handle_cast(kill, State) ->
{stop, shutdown, State}.

handle_info({mqtt_msg, #{ type := publish } = MqttMsg}, State) ->
% io:fwrite(standard_error, "publish: ~p~n", [MqttMsg]),
State1 = relay_publish(MqttMsg, State),
{noreply, State1};

Expand Down Expand Up @@ -955,8 +956,31 @@ queue_1(#{ type := Type } = Msg, #state{ msg_nr = MsgNr, pending = Pending } = S
qos = maps:get(qos, Msg, 1),
message = Msg
},
State#state{ pending = queue:in(Item, Pending) }.

State#state{ pending = queue:in(Item, maybe_purge(Pending)) }.

maybe_purge(Queue) ->
case queue:len(Queue) > ?QUEUE_PURGE_LEN of
true -> purge(Queue);
false -> Queue
end.

purge(Queue) ->
{value, #queued{ queued = Oldest }} = queue:peek(Queue),
{value, #queued{ queued = Newest }} = queue:peek_r(Queue),

PurgeTime = mqtt_sessions_timestamp:timestamp(),
QoS0PurgeAge = (Newest - Oldest) / 2,

queue:filter(fun(#queued{ qos = QoS, queued = Queued, expiry = Expiry }) ->
case QoS of
0 ->
PurgeTime < Expiry andalso PurgeTime < (Queued + QoS0PurgeAge);
_ ->
PurgeTime < Expiry
end
end,
Queue).

-spec encode( mqtt_packet_map:mqtt_version(), mqtt_packet_map:mqtt_packet() | list( mqtt_packet_map:mqtt_packet() )) -> binary().
encode(ProtocolVersion, Msg) when is_map(Msg) ->
Expand Down

0 comments on commit 93034b4

Please sign in to comment.