Skip to content

Commit

Permalink
fix(io): add device initialization
Browse files Browse the repository at this point in the history
Introduced the `Initialize` method for device initialization, ensuring proper setup for client devices.
  • Loading branch information
asvol committed Jan 14, 2025
1 parent 0eac987 commit 09ba75d
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 23 deletions.
72 changes: 72 additions & 0 deletions src/Asv.IO.Test/Devices/Explorer/DeviceExplorerTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
using System;
using System.Threading.Tasks;
using Asv.Cfg.Test;
using Asv.IO;
using Asv.IO.Device;
using FluentAssertions;
using JetBrains.Annotations;
using TimeProviderExtensions;
using Xunit;
using Xunit.Abstractions;

namespace Asv.IO.Test.Devices.Explorer;

[TestSubject(typeof(DeviceExplorer))]
public class DeviceExplorerTest:IDisposable
{
private readonly ManualTimeProvider _routerTime;
private readonly IVirtualConnection _link;
private readonly IDeviceExplorer _explorer;
private readonly ClientDeviceBrowserConfig _explorerConfig;

public DeviceExplorerTest(ITestOutputHelper log)
{
_routerTime = new ManualTimeProvider();
var loggerFactory = new TestLoggerFactory(log, _routerTime, "ROUTER");
var protocol = Protocol.Create(builder =>
{
builder.SetLog(loggerFactory);
builder.SetTimeProvider(_routerTime);
builder.Protocols.RegisterExampleProtocol();
builder.Formatters.RegisterSimpleFormatter();
});
_link = protocol.CreateVirtualConnection();

_explorerConfig = new ClientDeviceBrowserConfig
{
DeviceTimeoutMs = 100,
DeviceCheckIntervalMs = 100
};
_explorer = DeviceExplorer.Create(_link.Client, builder =>
{
builder.SetTimeProvider(_routerTime);
builder.SetLog(loggerFactory);
builder.SetConfig(_explorerConfig);
builder.Factories.RegisterExampleDevice(new ExampleDeviceConfig());
});
}

[Fact]
public async Task DeviceList_CreateAndRemoveDevices_Success()
{
Assert.Equal(0, _explorer.Devices.Count);

await _link.Server.Send(new ExampleMessage1 { SenderId = 1 });
Assert.Equal(1, _explorer.Devices.Count);

await _link.Server.Send(new ExampleMessage1 { SenderId = 1 });
Assert.Equal(1, _explorer.Devices.Count);

await _link.Server.Send(new ExampleMessage1 { SenderId = 3 });
Assert.Equal(2, _explorer.Devices.Count);

_routerTime.Advance(TimeSpan.FromMilliseconds(Math.Max(_explorerConfig.DeviceTimeoutMs, _explorerConfig.DeviceCheckIntervalMs)));
Assert.Equal(0, _explorer.Devices.Count);
}

public void Dispose()
{
_explorer.Dispose();
_link.Dispose();
}
}
52 changes: 33 additions & 19 deletions src/Asv.IO/Devices/Client/ClientDevice.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,6 @@ public class ClientDeviceConfig
public int RequestDelayAfterFailMs { get; set; } = 1000;
}

public static class ClientDevice
{
#region Static



#endregion
}

public abstract class ClientDevice<TDeviceId> : AsyncDisposableWithCancel, IClientDevice
where TDeviceId:DeviceId
{
Expand All @@ -35,11 +26,11 @@ public abstract class ClientDevice<TDeviceId> : AsyncDisposableWithCancel, IClie
private ImmutableArray<IMicroserviceClient> _microservices;
private int _isTryReconnectInProgress;
private readonly ILogger _logger;
private bool _needToRequestAgain;
private bool _needToRequestAgain = true; // first request must be done
private ITimer? _reconnectionTimer;
private IDisposable? _sub1;
private IDisposable? _sub2;

private int _isInitialized;


protected ClientDevice(TDeviceId id, ClientDeviceConfig config, ImmutableArray<IClientDeviceExtender> extenders, IMicroserviceContext context)
Expand All @@ -53,11 +44,16 @@ protected ClientDevice(TDeviceId id, ClientDeviceConfig config, ImmutableArray<I
Id = id;
_name = new ReactiveProperty<string?>(id.ToString());
_logger = context.LoggerFactory.CreateLogger(id.AsString());
Task.Factory.StartNew(InternalInitFirst);
}

private void InternalInitFirst()
{

public void Initialize()
{
if (Interlocked.CompareExchange(ref _isInitialized, 0, 1) == 1)
{
_logger.ZLogTrace($"Skip double initialization [{Id}]");
return;
}

_sub1 = Link.State.DistinctUntilChanged()
.Where(s => s == LinkState.Disconnected)
.Subscribe(_ => _needToRequestAgain = true);
Expand All @@ -66,6 +62,24 @@ private void InternalInitFirst()
.Where(_ => _needToRequestAgain)
.Where(s => s == LinkState.Connected)
.Subscribe(_ => TryReconnect(null));
try
{
InternalInitializeOnce();
}
catch (Exception e)
{
_logger.ZLogError(e, $"Error on initialize device [{Id}]: {e.Message}");
throw;
}

}

/// <summary>
/// This method is called only once, right after ctor
/// </summary>
protected virtual void InternalInitializeOnce()
{
// do nothing
}

// ReSharper disable once AsyncVoidMethod
Expand Down Expand Up @@ -150,19 +164,19 @@ protected void UpdateDeviceName(string? name)
/// This method is called before the microservices are created
/// Can be called multiple times, if initialization fails.
/// </summary>
protected virtual Task InitBeforeMicroservices(CancellationToken cancel)
protected virtual ValueTask InitBeforeMicroservices(CancellationToken cancel)
{
return Task.CompletedTask;
return ValueTask.CompletedTask;
}
protected abstract IAsyncEnumerable<IMicroserviceClient> InternalCreateMicroservices(CancellationToken cancel);
/// <summary>
/// This method is called after the microservices have been created and initialized.
/// Can be called multiple times, if initialization fails.
/// </summary>
/// <returns>A Task representing the asynchronous operation.</returns>
protected virtual Task InitAfterMicroservices(CancellationToken cancel)
protected virtual ValueTask InitAfterMicroservices(CancellationToken cancel)
{
return Task.CompletedTask;
return ValueTask.CompletedTask;
}

public ImmutableArray<IMicroserviceClient> Microservices => _microservices;
Expand Down
4 changes: 4 additions & 0 deletions src/Asv.IO/Devices/Client/IClientDevice.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ namespace Asv.IO;

public interface IClientDevice:IDisposable, IAsyncDisposable
{
/// <summary>
/// This method is used to initialize the device.
/// </summary>
void Initialize();
DeviceId Id { get; }
ReadOnlyReactiveProperty<string?> Name { get; }
ReadOnlyReactiveProperty<ClientDeviceState> State { get; }
Expand Down
4 changes: 3 additions & 1 deletion src/Asv.IO/Devices/Explorer/DeviceExplorer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ internal DeviceExplorer(ClientDeviceBrowserConfig config, IEnumerable<IClientDev
private void RemoveOldDevices(object? state)
{
var itemsToDelete = _lastSeen
.Where(x => _context.TimeProvider.GetElapsedTime(x.Value) > _deviceTimeout).ToImmutableArray();
.Where(x => _context.TimeProvider.GetElapsedTime(x.Value) >= _deviceTimeout).ToImmutableArray();
if (itemsToDelete.Length == 0) return;
_lock.EnterWriteLock();
try
Expand All @@ -73,6 +73,7 @@ private void RemoveOldDevices(object? state)
device.Dispose();
}
_lastSeen.TryRemove(item.Key, out _);
_devices.Remove(item.Key);
_logger.ZLogInformation($"Remove device {item.Key}");
}
}
Expand Down Expand Up @@ -121,6 +122,7 @@ private void CheckNewDevice(IProtocolMessage msg)
}
var device = currentProvider.CreateDevice(msg, deviceId, _context, _extenders );
_logger.ZLogInformation($"New device {deviceId} created by {currentProvider}");
device.Initialize();
_devices.Add(deviceId, device);
}
finally
Expand Down
7 changes: 5 additions & 2 deletions src/Asv.IO/Example/Device/ExampleDevice.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,29 @@
using System.Runtime.CompilerServices;
using System.Threading;
using Asv.Common;
using R3;

namespace Asv.IO.Device;

public class ExampleDeviceConfig:ClientDeviceConfig
{
public byte SelfId { get; set; } = 255;
public double LinkTimeoutMs { get; set; } = 1000;
public int DowngradeErrorCount { get; set; } = 3;
}
public class ExampleDevice:ClientDevice<ExampleDeviceId>
{
private readonly ExampleDeviceId _id;
private readonly byte _selfId;
private readonly TimeBasedLinkIndicator _link;
public const string DeviceClass = "Example";
public const string DeviceClass = "ExampleDevice";
public ExampleDevice(ExampleDeviceId id, ExampleDeviceConfig config, ImmutableArray<IClientDeviceExtender> extenders, IMicroserviceContext context)
: base(id, config, extenders, context)
{
_id = id;
_selfId = config.SelfId;
_link = new TimeBasedLinkIndicator(TimeSpan.FromMilliseconds(config.LinkTimeoutMs),3,context.TimeProvider);
_link = new TimeBasedLinkIndicator(TimeSpan.FromMilliseconds(config.LinkTimeoutMs),config.DowngradeErrorCount,context.TimeProvider);
context.Connection.OnRxMessage.Subscribe(x => _link.Upgrade());
}

public override ILinkIndicator Link => _link;
Expand Down
2 changes: 1 addition & 1 deletion src/Asv.IO/Example/Device/ExampleDeviceFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public class ExampleDeviceFactory(ExampleDeviceConfig config) : ClientDeviceFact

protected override bool InternalTryIdentify(ExampleMessageBase msg, out ExampleDeviceId? deviceId)
{
deviceId = new ExampleDeviceId(ExampleDevice.DeviceClass, msg.Id);
deviceId = new ExampleDeviceId(ExampleDevice.DeviceClass, msg.SenderId);
return true;
}

Expand Down

0 comments on commit 09ba75d

Please sign in to comment.