diff --git a/src/mqtt_sessions_process.erl b/src/mqtt_sessions_process.erl index 07c71f3..34277fb 100644 --- a/src/mqtt_sessions_process.erl +++ b/src/mqtt_sessions_process.erl @@ -19,7 +19,6 @@ %% TODO: Limit in-flight acks (both ways) -%% TODO: Drop outgoing QoS 0 messages if pending gets too large %% TODO: Refuse incoming publish messages if too many publish_jobs %% TODO: Limit incoming_data buffer size @@ -55,6 +54,7 @@ ]). -define(MAX_PACKET_ID, 65535). +-define(QUEUE_PURGE_LEN, 500). % Allow 500 packages in the queue before triggering a purge. -define(RECEIVE_MAXIMUM, 65535). -define(KEEP_ALIVE_DEFAULT, 30). % Default keep alive in seconds -define(SESSION_EXPIRY, 600). % Default session expiration @@ -256,6 +256,7 @@ handle_cast(kill, State) -> {stop, shutdown, State}. handle_info({mqtt_msg, #{ type := publish } = MqttMsg}, State) -> + % io:fwrite(standard_error, "publish: ~p~n", [MqttMsg]), State1 = relay_publish(MqttMsg, State), {noreply, State1}; @@ -955,8 +956,31 @@ queue_1(#{ type := Type } = Msg, #state{ msg_nr = MsgNr, pending = Pending } = S qos = maps:get(qos, Msg, 1), message = Msg }, - State#state{ pending = queue:in(Item, Pending) }. + State#state{ pending = queue:in(Item, maybe_purge(Pending)) }. + +maybe_purge(Queue) -> + case queue:len(Queue) > ?QUEUE_PURGE_LEN of + true -> purge(Queue); + false -> Queue + end. + +purge(Queue) -> + {value, #queued{ queued = Oldest }} = queue:peek(Queue), + {value, #queued{ queued = Newest }} = queue:peek_r(Queue), + + PurgeTime = mqtt_sessions_timestamp:timestamp(), + QoS0PurgeAge = (Newest - Oldest) / 2, + + queue:filter(fun(#queued{ qos = QoS, queued = Queued, expiry = Expiry }) -> + case QoS of + 0 -> + PurgeTime < Expiry andalso PurgeTime < (Queued + QoS0PurgeAge); + _ -> + PurgeTime < Expiry + end + end, + Queue). -spec encode( mqtt_packet_map:mqtt_version(), mqtt_packet_map:mqtt_packet() | list( mqtt_packet_map:mqtt_packet() )) -> binary(). encode(ProtocolVersion, Msg) when is_map(Msg) ->