[MQTTv5] Subscription topic callback on_publish_received #606
-
|
Hello,
However, I have a problem for reading messages published on a particular topic
And so, on the client side, I should have a callback function that reads this new message on This works well with MQTT v3.1.1 (even for this SDK) but in MQTT version 5 I don't get my function back. Callback function # Callback function for incoming messages
def on_message_received(event_data):
topic = event_data.topic
payload = event_data.payload.decode('utf-8')
print(f"Received message on topic '{topic}': {payload}")
# Add custom logic here if needed, e.g., stop after a specific count
# if received_count == <your_target_count>:
# received_all_event.set()Code reset input_topic = "stage/" + arg_id + "/" + arg_auth + "/measures"
error_topic = "stage/" + arg_id + "/" + arg_auth + "/measures/error"
message = {'data': [{'type': 'TOTO', 'value': 49.0, 'label': 'test-mqtt-v5'}], 'time': '2025-01-09T11:49:44.699507Z'}
client = mqtt5_client_builder.mtls_from_path(
endpoint=arg_endpoint,
port=8883,
cert_filepath=arg_cert,
pri_key_filepath=arg_key,
client_id=arg_id,
ca_filepath=arg_ca,
on_lifecycle_connection_success=on_lifecycle_connection_success,
on_lifecycle_connection_failure=on_lifecycle_connection_failure,
on_publish_received=on_publish_received,
clean_session=False,
keep_alive_secs=30)
# client.on_message = on_message_received
client.start()
lifecycle_connect_success_data = future_connection_success.result(TIMEOUT)
connack_packet = lifecycle_connect_success_data.connack_packet
negotiated_settings = lifecycle_connect_success_data.negotiated_settings
print(connack_packet)
print(negotiated_settings)
# print("Subscribing to topic '{}'...".format(message_topic))
subscribe_future = client.subscribe(subscribe_packet=mqtt5.SubscribePacket(
subscriptions=[mqtt5.Subscription(
topic_filter=error_topic,
qos=mqtt5.QoS.AT_LEAST_ONCE)]
))
suback = subscribe_future.result(TIMEOUT)
print("Subscribed with {}".format(suback.reason_codes))
# print("Subscribing to topic '{}'...".format(message_topic))
subscribe_future = client.subscribe(subscribe_packet=mqtt5.SubscribePacket(
subscriptions=[mqtt5.Subscription(
topic_filter=input_topic,
qos=mqtt5.QoS.AT_LEAST_ONCE)]
))
suback = subscribe_future.result(TIMEOUT)
print("Subscribed with {}".format(suback.reason_codes))
message_json = json.dumps(message)
publishStatus = client.publish(mqtt5.PublishPacket(
topic=input_topic,
payload=json.dumps(message),
qos=mqtt5.QoS.AT_LEAST_ONCE
))
publish_completion_data = publishStatus.result(TIMEOUT)
print("PubAck received with {}".format(repr(publish_completion_data.puback.reason_code)))
# client.loop_forever()
print("Stopping Client")
client.stop()
time.sleep(5)do you know why I can't get any feedback from my subscription to the Best Reagds, Translated with DeepL.com (free version) |
Beta Was this translation helpful? Give feedback.
Replies: 4 comments 4 replies
-
|
Unsure if it's a typo, but the actual callback function is |
Beta Was this translation helpful? Give feedback.
-
|
Are you checking the reason codes in the suback/puback for success? Does the pubsub sample work for you? |
Beta Was this translation helpful? Give feedback.
-
|
I think my problem is simply that I'm closing the client too quickly, so the callback doesn't have time to be executed. time.sleep(5)
print("Stopping Client")
client.stop()sorry for bothering you with this |
Beta Was this translation helpful? Give feedback.
-
|
Hello! Reopening this discussion to make it searchable. |
Beta Was this translation helpful? Give feedback.
Unsure if it's a typo, but the actual callback function is
on_message_receivedbut you supplyon_publish_receivedto the client constructor which does not exist anywhere.