From f5434d3a7411695303eae1c43f6419e33fdf2212 Mon Sep 17 00:00:00 2001 From: Arnaldo Cesco Date: Fri, 17 Jun 2022 15:42:30 +0200 Subject: [PATCH] Do not send data before introspection during initialisation A race condition on the publisher allowed to send data without having exchanged Astarte MQTT v1 initialisation messages before, resulting in e.g. introspection error. Force message ordering: data can be exchanged only if introspection has been agreed upon between device and Astarte. This is done via mutex. Signed-off-by: Arnaldo Cesco --- device/device.go | 14 ++++++++++++-- device/protocol_mqtt_v1.go | 13 ++++++++++--- device/store.go | 4 ++-- 3 files changed, 24 insertions(+), 7 deletions(-) diff --git a/device/device.go b/device/device.go index b4df70c..c32c719 100644 --- a/device/device.go +++ b/device/device.go @@ -21,6 +21,7 @@ import ( "os" "path/filepath" "runtime" + "sync" "github.com/astarte-platform/astarte-go/client" "github.com/astarte-platform/astarte-go/interfaces" @@ -34,6 +35,11 @@ const ( DefaultInitialConnectionAttempts = 10 ) +type messageQueue struct { + sync.Mutex + queue chan astarteMessageInfo +} + // Device is the base struct for Astarte Devices type Device struct { deviceID string @@ -43,7 +49,7 @@ type Device struct { astarteAPIClient *client.Client brokerURL string db *gorm.DB - messageQueue chan astarteMessageInfo + inflightMessages messageQueue isSendingStoredMessages bool volatileMessages []astarteMessageInfo lastSentIntrospection string @@ -232,7 +238,11 @@ func (d *Device) Connect(result chan<- error) { } // Now that the client is up and running, we can start sending messages - d.messageQueue = make(chan astarteMessageInfo, d.opts.MaxInflightMessages) + d.inflightMessages.queue = make(chan astarteMessageInfo, d.opts.MaxInflightMessages) + // When initialized, mutexes are unlocked (see https://pkg.go.dev/sync#Mutex), + // so we lock it in order to allow publishing messages + // only if introspection has already been sent + d.inflightMessages.Lock() go d.sendLoop() // All good: notify, and our routine is over. diff --git a/device/protocol_mqtt_v1.go b/device/protocol_mqtt_v1.go index 8f48bae..566237e 100644 --- a/device/protocol_mqtt_v1.go +++ b/device/protocol_mqtt_v1.go @@ -57,6 +57,8 @@ func (d *Device) initializeMQTTClient() error { }) opts.SetConnectionLostHandler(func(client mqtt.Client, err error) { + // If connection is lost, we should stop dequeuing messages from the channel + d.inflightMessages.Lock() if d.OnErrors != nil { d.OnErrors(d, err) } @@ -253,6 +255,9 @@ func astarteOnConnectHandler(d *Device, sessionPresent bool) { } } + // Since control messages have been sent, we allow to send data + d.inflightMessages.Unlock() + // If some messages must be retried, do so d.resendFailedMessages() @@ -384,8 +389,10 @@ func (d *Device) UnsetProperty(interfaceName, path string) error { // The main publishing loop: retrieves messages from the publishing channel and sends them one at a time, in order func (d *Device) sendLoop() { - for next := range d.messageQueue { - d.publishMessage(next) + for { + d.inflightMessages.Lock() + d.publishMessage(<-d.inflightMessages.queue) + d.inflightMessages.Unlock() } } @@ -463,7 +470,7 @@ func (d *Device) enqueueRawMqttV1Message(astarteInterface interfaces.AstarteInte fmt.Println("Sending previously stored messages with non-discard retention, the current message may be scheduled later") } message := makeAstarteMessageInfo(expiry, retention, astarteInterface.Name, interfacePath, astarteInterface.MajorVersion, qos, bsonPayload) - d.messageQueue <- message + d.inflightMessages.queue <- message return nil } diff --git a/device/store.go b/device/store.go index 96bd8b8..cb2a23d 100644 --- a/device/store.go +++ b/device/store.go @@ -117,7 +117,7 @@ func (d *Device) resendStoredMessages() { for _, message := range messages { if !isStoredMessageExpired(message) && !d.isInterfaceOutdatedInIntrospection(message.InterfaceName, message.InterfaceMajor) { // if the message is not expired, try resending it - d.messageQueue <- message + d.inflightMessages.queue <- message } else { // else, it can be removed d.removeFailedMessageFromStorage(message.StorageId) @@ -135,7 +135,7 @@ func (d *Device) resendVolatileMessages() { d.volatileMessages = d.volatileMessages[1:] // try resending the message only if it is not expired if !isStoredMessageExpired(message) && !d.isInterfaceOutdatedInIntrospection(message.InterfaceName, message.InterfaceMajor) { - d.messageQueue <- message + d.inflightMessages.queue <- message } } }