Skip to content

Commit

Permalink
Merge pull request #3 from MiloszKrajewski/sqs-as-storage
Browse files Browse the repository at this point in the history
SQS as storage
  • Loading branch information
MiloszKrajewski authored Nov 18, 2023
2 parents d5f5871 + 7787d03 commit 71f786f
Show file tree
Hide file tree
Showing 74 changed files with 2,963 additions and 526 deletions.
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 0.3.4 (2023/11/18)
* ADDED: SQS as scheduler

## 0.3.3 (2023/01/10)
* ADDED: Redis storage
* YAK-SHAVED: build scripts
Expand Down
5 changes: 3 additions & 2 deletions Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
<IsPackable>false</IsPackable>
<Nullable>enable</Nullable>
<ManagePackageVersionsCentrally>true</ManagePackageVersionsCentrally>
<CentralPackageTransitivePinningEnabled>true</CentralPackageTransitivePinningEnabled>
<CheckEolTargetFramework>false</CheckEolTargetFramework>
<NoWarn>$(NoWarn);NU5125;NU5048</NoWarn>
</PropertyGroup>
Expand All @@ -20,7 +21,7 @@
<RepositoryUrl>https://github.com/MiloszKrajewski/K4os.Xpovoc</RepositoryUrl>
<PackageIconUrl>https://github.com/MiloszKrajewski/K4os.Xpovoc/blob/master/doc/icon.png?raw=true</PackageIconUrl>
</PropertyGroup>
<ItemGroup>
<None Remove="*.?sproj.DotSettings"/>
<ItemGroup Condition=" '$(IsPackable)' == 'true' ">
<InternalsVisibleTo Include="$(AssemblyName).Test"/>
</ItemGroup>
</Project>
3 changes: 3 additions & 0 deletions Directory.Build.targets
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
<Project>
<ItemGroup>
<None Remove="*.?sproj.DotSettings"/>
</ItemGroup>
</Project>
32 changes: 25 additions & 7 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,33 @@
<Import Project="$([MSBuild]::GetPathOfFileAbove('Directory.Build.props', '$(MSBuildThisFileDirectory)../'))" />
-->
<ItemGroup>
<PackageVersion Include="K4os.Async.Toys" Version="0.0.18" />
<PackageVersion Include="Paramore.Brighter" Version="8.0.71" />
<PackageVersion Include="PolySharp" Version="1.13.2" PrivateAssets="All" />
<PackageVersion Include="System.Reactive" Version="4.4.1" />
<PackageVersion Include="System.Interactive" Version="4.1.1" />
<PackageVersion Include="Newtonsoft.Json" Version="12.0.3" />

<PackageVersion Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="6.0.0" />
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="6.0.0" />
<PackageVersion Include="Microsoft.Extensions.Hosting.Abstractions" Version="6.0.0" />

<PackageVersion Include="System.Interactive" Version="6.0.1" />
<PackageVersion Include="Microsoft.Reactive.Testing" Version="4.4.1" />
<PackageVersion Include="Newtonsoft.Json" Version="13.0.1" />
<PackageVersion Include="Dapper" Version="2.0.35" />
<PackageVersion Include="Polly" Version="7.2.1" />
<PackageVersion Include="Serilog" Version="2.12.0" />
<PackageVersion Include="Serilog.Extensions.Logging" Version="7.0.0" />
<PackageVersion Include="Serilog.Sinks.Console" Version="4.1.0" />
</ItemGroup>
<ItemGroup>
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.8.0" />
<PackageVersion Include="xunit" Version="2.6.1" />
<PackageVersion Include="xunit.runner.visualstudio" Version="2.5.3" PrivateAssets="All" />
<PackageVersion Include="coverlet.collector" Version="6.0.0" PrivateAssets="All" />
</ItemGroup>
<ItemGroup Condition=" '$(TargetFramework)' == 'net5.0' ">
<PackageVersion Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="6.0.0" />
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="6.0.3" />
<PackageVersion Include="Microsoft.Extensions.Hosting.Abstractions" Version="6.0.0" />
</ItemGroup>
<ItemGroup Condition=" '$(TargetFramework)' != 'net5.0' ">
<PackageVersion Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="7.0.0" />
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="7.0.1" />
<PackageVersion Include="Microsoft.Extensions.Hosting.Abstractions" Version="7.0.0" />
</ItemGroup>
</Project>
22 changes: 14 additions & 8 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,17 @@ services:
ports:
- "6379:6379"

# localstack:
# image: "localstack/localstack"
# network_mode: bridge
# ports:
# - "127.0.0.1:4566:4566"
# - "127.0.0.1:4571:4571"
# environment:
# SERVICES: sqs
sqs:
image: softwaremill/elasticmq-native
ports:
- "9324:9324"
- "9325:9325"

# dynamodb:
# image: amazon/dynamodb-local
# command: "-jar DynamoDBLocal.jar -sharedDb -dbPath ./data"
# ports:
# - "8000:8000"
# # volumes:
# # - "./docker/dynamodb:/home/dynamodblocal/data"
# working_dir: /home/dynamodblocal
35 changes: 31 additions & 4 deletions src/K4os.Shared/Extensions.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#nullable enable

using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;

// ReSharper disable UnusedMember.Global
// ReSharper disable UnusedType.Global
Expand All @@ -13,7 +13,9 @@ namespace System;

internal static class Extensions
{
public static T Required<T>(this T? subject, string? name = null) where T: class =>
public static T Required<T>(
this T? subject, [CallerArgumentExpression("subject")] string? name = null)
where T: class =>
subject ?? throw new ArgumentNullException(name ?? "<unknown>");

public static void TryDispose(this object subject)
Expand Down Expand Up @@ -68,7 +70,32 @@ public static T[] NotNull<T>(this T[]? subject) =>
subject ?? Array.Empty<T>();

public static T[] EnsureArray<T>(this IEnumerable<T>? subject) =>
subject switch { null => Array.Empty<T>(), T[] a => a, var e => e.ToArray(), };
subject switch { null => Array.Empty<T>(), T[] a => a, _ => subject.ToArray() };

public static void AddIfNotNull<K, V>(
this IDictionary<K, V> dictionary, K key, V? value) where V: class
{
if (value is not null)
dictionary.Add(key, value);
}

public static V? TryGetOrDefault<K, V>(
this IDictionary<K, V> dictionary, K key, V? fallback = default) =>
dictionary.TryGetValue(key, out var value) ? value : fallback;

public static IEnumerable<T> WhereNotNull<T>(
this IEnumerable<T?> sequence) =>
sequence.Where(x => x is not null)!;

public static IEnumerable<R> SelectNotNull<T, R>(
this IEnumerable<T> sequence, Func<T, R?> map) =>
sequence.Select(map).Where(x => x is not null)!;

public static void Await(this Task task) => task.GetAwaiter().GetResult();
public static T Await<T>(this Task<T> task) => task.GetAwaiter().GetResult();
public static void Forget(this Task task) =>
task.ContinueWith(_ => _.Exception, TaskContinuationOptions.OnlyOnFaulted);

}

internal class SharedNotNull<T> where T: class, new()
Expand Down
6 changes: 6 additions & 0 deletions src/K4os.Shared/K4os.Shared.csproj
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="PolySharp"/>
</ItemGroup>

</Project>
2 changes: 0 additions & 2 deletions src/K4os.Shared/Secrets.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
#nullable enable

using System.IO;
using System.Xml.Linq;

Expand Down
8 changes: 0 additions & 8 deletions src/K4os.Xpovoc.Abstractions/IDateTimeSource.cs

This file was deleted.

2 changes: 1 addition & 1 deletion src/K4os.Xpovoc.Abstractions/IJobScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

namespace K4os.Xpovoc.Abstractions;

public interface IJobScheduler: IDisposable, IDateTimeSource
public interface IJobScheduler: IDisposable
{
Task<Guid> Schedule(DateTimeOffset time, object payload);
}
11 changes: 11 additions & 0 deletions src/K4os.Xpovoc.Abstractions/ITimeSource.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using System;
using System.Threading;
using System.Threading.Tasks;

namespace K4os.Xpovoc.Abstractions;

public interface ITimeSource
{
DateTimeOffset Now { get; }
Task Delay(TimeSpan delay, CancellationToken token);
}
4 changes: 3 additions & 1 deletion src/K4os.Xpovoc.Abstractions/K4os.Xpovoc.Abstractions.csproj
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>netstandard2.0;net462;net5.0</TargetFrameworks>
<TargetFramework>netstandard2.0</TargetFramework>
<IsPackable>true</IsPackable>
</PropertyGroup>

</Project>
9 changes: 6 additions & 3 deletions src/K4os.Xpovoc.Abstractions/SystemTimeSource.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
using System;
using System.Threading;
using System.Threading.Tasks;

namespace K4os.Xpovoc.Abstractions;

public class SystemDateTimeSource: IDateTimeSource
public class SystemTimeSource: ITimeSource
{
public static readonly IDateTimeSource Default = new SystemDateTimeSource();
public static readonly ITimeSource Default = new SystemTimeSource();
public DateTimeOffset Now => DateTimeOffset.UtcNow;
private SystemDateTimeSource() { }
public Task Delay(TimeSpan delay, CancellationToken token) => Task.Delay(delay, token);
private SystemTimeSource() { }
}
13 changes: 8 additions & 5 deletions src/K4os.Xpovoc.Brighter/K4os.Xpovoc.Brighter.csproj
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
<?xml version="1.0" encoding="utf-8"?>
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>netstandard2.0;net462;net5.0</TargetFrameworks>
<TargetFrameworks>netstandard2.0;net462;net5.0;net6.0</TargetFrameworks>
<PackageTags>$(PackageTags) adapter brighter</PackageTags>
<IsPackable>true</IsPackable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Paramore.Brighter" VersionOverride="8.0.71" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" />
<PackageReference Include="Paramore.Brighter" VersionOverride="8.0.71"/>
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions"/>
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\K4os.Xpovoc.Abstractions\K4os.Xpovoc.Abstractions.csproj" />
<ProjectReference Include="..\K4os.Xpovoc.Abstractions\K4os.Xpovoc.Abstractions.csproj"/>
</ItemGroup>

</Project>
8 changes: 4 additions & 4 deletions src/K4os.Xpovoc.Core/Db/DbAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,21 @@ internal abstract class DbAgent

protected readonly ILogger Log;

protected DateTime Now => _dateTimeSource.Now.UtcDateTime;
protected DateTime Now => _timeSource.Now.UtcDateTime;

protected readonly IDbJobStorage JobStorage;
protected readonly ISchedulerConfig Configuration;
private readonly IDateTimeSource _dateTimeSource;
private readonly ITimeSource _timeSource;
private int _started;

protected DbAgent(
ILoggerFactory? loggerFactory,
IDateTimeSource dateTimeSource,
ITimeSource timeSource,
IDbJobStorage storage,
ISchedulerConfig config)
{
Log = (loggerFactory ?? NullLoggerFactory.Instance).CreateLogger(ObjectId);
_dateTimeSource = dateTimeSource;
_timeSource = timeSource;
Configuration = config;
JobStorage = storage;
}
Expand Down
4 changes: 2 additions & 2 deletions src/K4os.Xpovoc.Core/Db/DbCleaner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ internal class DbCleaner: DbAgent

public DbCleaner(
ILoggerFactory loggerFactory,
IDateTimeSource dateTimeSource,
ITimeSource timeSource,
IDbJobStorage storage,
ISchedulerConfig config):
base(loggerFactory, dateTimeSource, storage, config)
base(loggerFactory, timeSource, storage, config)
{
PruneInterval = config.PruneInterval.NotLessThan(ShortInterval);
}
Expand Down
12 changes: 6 additions & 6 deletions src/K4os.Xpovoc.Core/Db/DbJobScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace K4os.Xpovoc.Core.Db;
public class DbJobScheduler: IJobScheduler
{
private readonly ILoggerFactory _loggerFactory;
private readonly IDateTimeSource _dateTimeSource;
private readonly ITimeSource _timeSource;
private readonly IDbJobStorage _jobStorage;
private readonly IJobHandler _jobHandler;
private readonly Task[] _pollers;
Expand All @@ -28,14 +28,14 @@ public DbJobScheduler(
IDbJobStorage jobStorage,
IJobHandler jobHandler,
ISchedulerConfig? configuration = null,
IDateTimeSource? dateTimeSource = null)
ITimeSource? dateTimeSource = null)
{
_loggerFactory = loggerFactory ?? NullLoggerFactory.Instance;
Log = _loggerFactory.CreateLogger(GetType());

_jobStorage = jobStorage.Required(nameof(jobStorage));
_jobHandler = jobHandler.Required(nameof(jobHandler));
_dateTimeSource = dateTimeSource ?? SystemDateTimeSource.Default;
_timeSource = dateTimeSource ?? SystemTimeSource.Default;
_cancel = new CancellationTokenSource();
_ready = new TaskCompletionSource<bool>();

Expand Down Expand Up @@ -71,7 +71,7 @@ private Task Poll()
{
var poller = new DbPoller(
_loggerFactory,
_dateTimeSource,
_timeSource,
_jobStorage, _jobHandler,
_configuration);
return poller.Start(_cancel.Token, _ready.Task);
Expand All @@ -80,11 +80,11 @@ private Task Poll()
private Task Cleanup()
{
var cleaner = new DbCleaner(
_loggerFactory, _dateTimeSource, _jobStorage, _configuration);
_loggerFactory, _timeSource, _jobStorage, _configuration);
return cleaner.Start(_cancel.Token, _ready.Task);
}

public DateTimeOffset Now => _dateTimeSource.Now;
public DateTimeOffset Now => _timeSource.Now;

public async Task<Guid> Schedule(DateTimeOffset time, object payload)
{
Expand Down
4 changes: 2 additions & 2 deletions src/K4os.Xpovoc.Core/Db/DbPoller.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ internal class DbPoller: DbAgent

public DbPoller(
ILoggerFactory loggerFactory,
IDateTimeSource dateTimeSource,
ITimeSource timeSource,
IDbJobStorage storage,
IJobHandler handler,
ISchedulerConfig config):
base(loggerFactory, dateTimeSource, storage, config)
base(loggerFactory, timeSource, storage, config)
{
_workerId = Guid.NewGuid();
_jobHandler = handler.Required(nameof(handler));
Expand Down
3 changes: 2 additions & 1 deletion src/K4os.Xpovoc.Core/Db/ISchedulerConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ public interface ISchedulerConfig
TimeSpan MaximumRetryInterval { get; }
TimeSpan KeepFinishedJobsPeriod { get; }
TimeSpan PruneInterval { get; }
}
}

Loading

0 comments on commit 71f786f

Please sign in to comment.