Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,4 @@ test/unit/out/

www/cache/
__pycache__
.DS_Store
4 changes: 4 additions & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
Client library:
- Add `MOSQ_OPT_DELAYED_ACK` option to delaying the PUBACK message
until the on_message callback returns.

2.0.0 - 2020-12-03
==================

Expand Down
4 changes: 4 additions & 0 deletions include/mosquitto.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ enum mosq_opt_t {
MOSQ_OPT_TCP_NODELAY = 11,
MOSQ_OPT_BIND_ADDRESS = 12,
MOSQ_OPT_TLS_USE_OS_CERTS = 13,
MOSQ_OPT_DELAYED_ACK = 14,
};


Expand Down Expand Up @@ -1467,6 +1468,9 @@ libmosq_EXPORT int mosquitto_opts_set(struct mosquitto *mosq, enum mosq_opt_t op
* MOSQ_OPT_TLS_USE_OS_CERTS - Set to 1 to instruct the client to load and
* trust OS provided CA certificates for use with TLS connections.
* Set to 0 (the default) to only use manually specified CA certs.
*
* MOSQ_OPT_DELAYED_ACK - Set to 1 to instruct the client to acknowledge QoS 1
* messages after the on_message callback is called.
*/
libmosq_EXPORT int mosquitto_int_option(struct mosquitto *mosq, enum mosq_opt_t option, int value);

Expand Down
7 changes: 6 additions & 1 deletion lib/handle_publish.c
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,9 @@ int handle__publish(struct mosquitto *mosq)
return MOSQ_ERR_SUCCESS;
case 1:
util__decrement_receive_quota(mosq);
rc = send__puback(mosq, mid, 0, NULL);
if(!mosq->delayed_ack){
rc = send__puback(mosq, mid, 0, NULL);
}
pthread_mutex_lock(&mosq->callback_mutex);
if(mosq->on_message){
mosq->in_callback = true;
Expand All @@ -148,6 +150,9 @@ int handle__publish(struct mosquitto *mosq)
mosq->in_callback = false;
}
pthread_mutex_unlock(&mosq->callback_mutex);
if(mosq->delayed_ack){
rc = send__puback(mosq, mid, 0, NULL);
}
message__cleanup(&message);
mosquitto_property_free_all(&properties);
return rc;
Expand Down
1 change: 1 addition & 0 deletions lib/mosquitto.c
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ int mosquitto_reinitialise(struct mosquitto *mosq, const char *id, bool clean_st
mosq->reconnect_delay_max = 1;
mosq->reconnect_exponential_backoff = false;
mosq->threaded = mosq_ts_none;
mosq->delayed_ack = false;
#ifdef WITH_TLS
mosq->ssl = NULL;
mosq->ssl_ctx = NULL;
Expand Down
1 change: 1 addition & 0 deletions lib/mosquitto_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ struct mosquitto {
#ifdef WITH_EPOLL
uint32_t events;
#endif
bool delayed_ack;
};

#define STREMPTY(str) (str[0] == '\0')
Expand Down
4 changes: 4 additions & 0 deletions lib/options.c
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,10 @@ int mosquitto_int_option(struct mosquitto *mosq, enum mosq_opt_t option, int val
mosq->tcp_nodelay = (bool)value;
break;

case MOSQ_OPT_DELAYED_ACK:
mosq->delayed_ack = (bool)value;
break;

default:
return MOSQ_ERR_INVAL;
}
Expand Down