Skip to content

Commit

Permalink
feat: add SQS (preview) (#752)
Browse files Browse the repository at this point in the history
  • Loading branch information
aneojgurhem authored Sep 12, 2024
2 parents c469bdb + 245e407 commit 1560cb4
Show file tree
Hide file tree
Showing 20 changed files with 786 additions and 3 deletions.
1 change: 1 addition & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ jobs:
- rabbitmq
- rabbitmq091
- pubsub
- sqs
object:
- redis
- minio
Expand Down
48 changes: 48 additions & 0 deletions Adaptors/SQS/src/AmazonSQSClient.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// This file is part of the ArmoniK project
//
// Copyright (C) ANEO, 2021-2024. All rights reserved.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published
// by the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY, without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

using System.Threading;
using System.Threading.Tasks;

using Amazon.SQS;
using Amazon.SQS.Model;

namespace ArmoniK.Core.Adapters.SQS;

internal static class AmazonSqsClientExt
{
public static async Task<string> GetOrCreateQueueUrlAsync(this AmazonSQSClient client,
string queueName,
CancellationToken cancellationToken)
{
try
{
return (await client.GetQueueUrlAsync(queueName,
cancellationToken)
.ConfigureAwait(false)).QueueUrl;
}
catch (QueueDoesNotExistException)
{
return (await client.CreateQueueAsync(new CreateQueueRequest
{
QueueName = queueName,
},
cancellationToken)
.ConfigureAwait(false)).QueueUrl;
}
}
}
47 changes: 47 additions & 0 deletions Adaptors/SQS/src/ArmoniK.Core.Adapters.SQS.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<Nullable>enable</Nullable>
<EnableDynamicLoading>true</EnableDynamicLoading>
<GeneratePackageOnBuild>True</GeneratePackageOnBuild>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="JetBrains.Annotations" Version="2023.3.0">
<ExcludeAssets>runtime</ExcludeAssets>
</PackageReference>
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="8.0.0">
<ExcludeAssets>runtime</ExcludeAssets>
</PackageReference>
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="8.0.0">
<ExcludeAssets>runtime</ExcludeAssets>
</PackageReference>
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="8.0.1">
<ExcludeAssets>runtime</ExcludeAssets>
</PackageReference>
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.1">
<ExcludeAssets>runtime</ExcludeAssets>
</PackageReference>
<PackageReference Include="ArmoniK.Utils.Diagnostics" Version="0.5.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="ArmoniK.Utils" Version="0.5.1">
<ExcludeAssets>runtime</ExcludeAssets>
</PackageReference>
</ItemGroup>

<ItemGroup>
<PackageReference Include="AWSSDK.SQS" Version="3.7.200.20" />
<PackageReference Include="Microsoft.Extensions.Caching.Memory" Version="8.0.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\..\Base\src\ArmoniK.Core.Base.csproj">
<Private>false</Private>
<ExcludeAssets>runtime</ExcludeAssets>
</ProjectReference>
</ItemGroup>

</Project>
2 changes: 2 additions & 0 deletions Adaptors/SQS/src/ArmoniK.Core.Adapters.SQS.csproj.DotSettings
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
<s:String x:Key="/Default/CodeInspection/Daemon/ConfigureAwaitAnalysisMode/@EntryValue">Library</s:String></wpf:ResourceDictionary>
106 changes: 106 additions & 0 deletions Adaptors/SQS/src/Heart.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// This file is part of the ArmoniK project
//
// Copyright (C) ANEO, 2021-2024. All rights reserved.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published
// by the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY, without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

using System;
using System.Threading;
using System.Threading.Tasks;

namespace ArmoniK.Core.Adapters.SQS;

public class Heart
{
private readonly TimeSpan beatPeriod_;
private readonly CancellationToken cancellationToken_;

private readonly Func<CancellationToken, Task> pulse_;

private Task? runningTask_;

private CancellationTokenSource? stoppedHeartCts_;

/// <summary>
/// </summary>
/// <param name="pulse">
/// The function to execute at each beat
/// It returns a predicate indicating if the heart must continue beating
/// </param>
/// <param name="beatPeriod">Defines the timespan between two heartbeats</param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public Heart(Func<CancellationToken, Task> pulse,
TimeSpan beatPeriod,
CancellationToken cancellationToken = default)
{
cancellationToken_ = cancellationToken;
pulse_ = pulse;
beatPeriod_ = beatPeriod;
}

/// <summary>
/// Stops the heart
/// </summary>
/// <returns>A task finishing with the last heartbeat</returns>
public async Task Stop()
{
stoppedHeartCts_?.Cancel();
stoppedHeartCts_ = null;
try
{
if (runningTask_ != null)
{
await runningTask_;
}
}
catch (OperationCanceledException)
{
}
catch (AggregateException ae)
{
ae.Handle(exception => exception is not OperationCanceledException);
}
}

/// <summary>
/// Start the heart. If the heart is beating, it has no effect.
/// </summary>
public void Start()
{
if (stoppedHeartCts_ is not null) // already running with infinite loop
{
return;
}

stoppedHeartCts_ = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken_);

runningTask_ = Task<Task>.Factory.StartNew(async () =>
{
while (!stoppedHeartCts_.IsCancellationRequested)
{
var delayTask = Task.Delay(beatPeriod_,
stoppedHeartCts_.Token);
await pulse_(cancellationToken_)
.ConfigureAwait(false);

await delayTask.ConfigureAwait(false);
}
},
CancellationToken.None,
TaskCreationOptions.LongRunning,
TaskScheduler.Current)
.Unwrap();
}
}
107 changes: 107 additions & 0 deletions Adaptors/SQS/src/PullQueueStorage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// This file is part of the ArmoniK project
//
// Copyright (C) ANEO, 2021-2024. All rights reserved.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published
// by the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY, without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;

using Amazon.SQS;
using Amazon.SQS.Model;

using ArmoniK.Core.Base;
using ArmoniK.Core.Base.DataStructures;

using Microsoft.Extensions.Diagnostics.HealthChecks;
using Microsoft.Extensions.Logging;

namespace ArmoniK.Core.Adapters.SQS;

internal class PullQueueStorage : IPullQueueStorage
{
private readonly int ackDeadlinePeriod_;
private readonly int ackExtendDeadlineStep_;
private readonly AmazonSQSClient client_;

// ReSharper disable once NotAccessedField.Local
private readonly ILogger<PullQueueStorage> logger_;

private readonly string queueName_;
private bool isInitialized_;
private string? queueUrl_;

public PullQueueStorage(AmazonSQSClient client,
SQS options,
ILogger<PullQueueStorage> logger)
{
client_ = client;
logger_ = logger;
queueName_ = $"a{options.Prefix}-{options.PartitionId}";

ackDeadlinePeriod_ = options.AckDeadlinePeriod;
ackExtendDeadlineStep_ = options.AckExtendDeadlineStep;
}

public async IAsyncEnumerable<IQueueMessageHandler> PullMessagesAsync(int nbMessages,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
if (!isInitialized_)
{
throw new InvalidOperationException($"{nameof(PullQueueStorage)} should be initialized before calling this method.");
}

var messages = await client_.ReceiveMessageAsync(new ReceiveMessageRequest
{
QueueUrl = queueUrl_!,
MaxNumberOfMessages = nbMessages,
VisibilityTimeout = ackDeadlinePeriod_,
},
cancellationToken)
.ConfigureAwait(false);

foreach (var message in messages.Messages)
{
cancellationToken.ThrowIfCancellationRequested();
yield return new QueueMessageHandler(message,
client_,
queueUrl_!,
ackDeadlinePeriod_,
ackExtendDeadlineStep_);
}
}

public Task<HealthCheckResult> Check(HealthCheckTag tag)
=> Task.FromResult(isInitialized_
? HealthCheckResult.Healthy()
: HealthCheckResult.Unhealthy("Plugin is not yet initialized."));

public async Task Init(CancellationToken cancellationToken)
{
if (!isInitialized_)
{
queueUrl_ = await client_.GetOrCreateQueueUrlAsync(queueName_,
cancellationToken)
.ConfigureAwait(false);

isInitialized_ = true;
}
}

public int MaxPriority
=> int.MaxValue;
}
Loading

0 comments on commit 1560cb4

Please sign in to comment.