You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I have been using the library, in my need to use tasks to generate threads and be able to do several things in parallel or in separate threads, I am trying to send a large number of messages but they fail to send, both the AI and the mosquitto forum (broker I use) tell me that the mqttnet library is not prepared to work with threads and that the message queue fills up and therefore can not be published. Making tests, I observed that the variable PendingApplicationMessagesCount had the value of 30000+, that is to say that there is that amount of pending messages to publish and it does not work.
I hope you can help me.
Thank you.
`using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Extensions.ManagedClient;
using System;
using System.Collections.Concurrent;
using System.Linq;
namespace Publisher
{
class Program3
{
static ConcurrentQueue _messageQueue = new ConcurrentQueue();
private static int _DeviceCount = 100;
private static int _ParamCount = 100;
private static int _ClientCount = 5; // Número de clientes MQTT
static async Task Main(string[] args)
{
List<Task> tasks = new List<Task>();
CancellationTokenSource cts = new CancellationTokenSource();
// Crear múltiples clientes MQTT
for (int i = 0; i < _ClientCount; i++)
{
int clientIndex = i;
tasks.Add(Task.Run(async () =>
{
var options = new ManagedMqttClientOptionsBuilder()
.WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
.WithClientOptions(new MqttClientOptionsBuilder()
.WithClientId($"Client{clientIndex}")
.WithTcpServer("localhost").Build())
.Build();
var mqttClient = new MqttFactory().CreateManagedMqttClient();
await mqttClient.StartAsync(options);
while (!cts.Token.IsCancellationRequested)
{
if (_messageQueue.TryDequeue(out var item))
{
var message = new MqttApplicationMessageBuilder()
.WithTopic(item.Topic)
.WithPayload(item.DateTimeString)
.WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.ExactlyOnce)
.WithRetainFlag(false)
.Build();
var managedMessage = new ManagedMqttApplicationMessage()
{
ApplicationMessage = message
};
try
{
await mqttClient.EnqueueAsync(managedMessage);
}
catch (Exception ex)
{
Console.WriteLine($"Error publishing message: {ex.Message}");
}
}
}
}, cts.Token));
}
// Generar mensajes
for (int i = 1; i <= _DeviceCount; i++)
{
int index = i;
tasks.Add(Task.Run(() =>
{
while (!cts.Token.IsCancellationRequested)
{
for (int j = 1; j <= _ParamCount; j++)
{
string topic = $"test/{index}/{j}";
_messageQueue.Enqueue(new Message()
{
Topic = topic,
DateTimeString = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff")
});
}
Thread.Sleep(999);
}
}, cts.Token));
}
await Task.WhenAll(tasks);
Console.ReadLine();
}
private static async void _Publish(CancellationTokenSource cts)
{
// Setup and start a managed MQTT client.
var options = new ManagedMqttClientOptionsBuilder()
.WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
.WithClientOptions(new MqttClientOptionsBuilder()
.WithClientId("Client1")
.WithTcpServer("localhost").Build())
.Build();
ManagedMqttClient mqttClient = (ManagedMqttClient)new MqttFactory().CreateManagedMqttClient();
// StartAsync returns immediately, as it starts a new thread using Task.Run,
// and so the calling thread needs to wait.
await mqttClient.StartAsync(options);
while (!cts.Token.IsCancellationRequested)
{
if (_messageQueue.TryDequeue(out var item))
{
var message = new MqttApplicationMessageBuilder()
.WithTopic(item.Topic)
.WithPayload(item.DateTimeString)
.WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.ExactlyOnce)
.WithRetainFlag(false)
.Build();
var managedMessage = new ManagedMqttApplicationMessage()
{
ApplicationMessage = message
};
await mqttClient.EnqueueAsync(managedMessage);
}
}
}
}
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
-
Hello,
I have been using the library, in my need to use tasks to generate threads and be able to do several things in parallel or in separate threads, I am trying to send a large number of messages but they fail to send, both the AI and the mosquitto forum (broker I use) tell me that the mqttnet library is not prepared to work with threads and that the message queue fills up and therefore can not be published. Making tests, I observed that the variable PendingApplicationMessagesCount had the value of 30000+, that is to say that there is that amount of pending messages to publish and it does not work.
I hope you can help me.
Thank you.
`using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Extensions.ManagedClient;
using System;
using System.Collections.Concurrent;
using System.Linq;
namespace Publisher
{
class Program3
{
static ConcurrentQueue _messageQueue = new ConcurrentQueue();
private static int _DeviceCount = 100;
private static int _ParamCount = 100;
private static int _ClientCount = 5; // Número de clientes MQTT
}
`
Beta Was this translation helpful? Give feedback.
All reactions