Skip to content

Commit

Permalink
version++
Browse files Browse the repository at this point in the history
  • Loading branch information
MiloszKrajewski committed Mar 29, 2021
1 parent c84ba96 commit 5de4c59
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 45 deletions.
6 changes: 3 additions & 3 deletions Common.targets
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
<Project>
<PropertyGroup>
<Version>0.0.17</Version>
<AssemblyVersion>0.0.17</AssemblyVersion>
<FileVersion>0.0.17</FileVersion>
<Version>0.0.18</Version>
<AssemblyVersion>0.0.18</AssemblyVersion>
<FileVersion>0.0.18</FileVersion>
</PropertyGroup>
<PropertyGroup>
<!-- Χρόνος -->
Expand Down
49 changes: 24 additions & 25 deletions src/K4os.Xpovoc.Core/Db/DbPoller.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,39 +93,38 @@ private async Task<IDbJob> Claim(CancellationToken token)
[SuppressMessage("ReSharper", "AccessToDisposedClosure")]
private async Task Process(CancellationToken token, IDbJob job)
{
// this seems like performance drag for not reason
// this seems like performance drag for no reason
// if job is hijacked do we need to try to interrupt it?
using (var hijacked = new CancellationTokenSource())
using (var combined = CancellationTokenSource
.CreateLinkedTokenSource(token, hijacked.Token))
using (var finished = new CancellationTokenSource())
using var hijacked = new CancellationTokenSource();
using var combined = CancellationTokenSource
.CreateLinkedTokenSource(token, hijacked.Token);
using var finished = new CancellationTokenSource();

var keepAlive = Task.Run(
() => MaintainClaim(finished.Token, job, hijacked),
CancellationToken.None);

var success = await Handle(combined.Token, job);

if (await CancelClaim(finished, hijacked, keepAlive))
{
var keepAlive = Task.Run(
() => MaintainClaim(finished.Token, job, hijacked),
CancellationToken.None);

var success = await Handle(combined.Token, job);

if (await CancelClaim(finished, hijacked, keepAlive))
if (success)
{
if (success)
{
await Complete(job);
}
else if (job.Attempt < _configuration.RetryLimit)
{
await Retry(job);
}
else
{
await Forget(job);
}
await Complete(job);
}
else if (job.Attempt < _configuration.RetryLimit)
{
await Retry(job);
}
else
{
Log.LogError("Execution of job {0} has been hijacked", job.JobId);
await Forget(job);
}
}
else
{
Log.LogError("Execution of job {0} has been hijacked", job.JobId);
}
}

private async Task<bool> Handle(CancellationToken token, IDbJob job)
Expand Down
1 change: 1 addition & 0 deletions src/K4os.Xpovoc.Core/K4os.Xpovoc.Core.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<LangVersion>latest</LangVersion>
</PropertyGroup>
<ItemGroup>
<Compile Include="..\K4os.Shared\Extensions.cs">
Expand Down
24 changes: 8 additions & 16 deletions src/K4os.Xpovoc.Core/Memory/MemoryJobStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,9 @@ namespace K4os.Xpovoc.Core.Memory
/// </summary>
public class MemoryJobStorage: IDbJobStorage
{
private readonly object _lock = new object();

private readonly Dictionary<Guid, MemoryJob> _jobs =
new Dictionary<Guid, MemoryJob>();
private readonly SortedSet<(DateTime, Guid)> _queue =
new SortedSet<(DateTime, Guid)>();

private readonly object _lock = new();
private readonly Dictionary<Guid, MemoryJob> _jobs = new();
private readonly SortedSet<(DateTime, Guid)> _queue = new();
private static readonly Task<IDbJob> NullJob = Task.FromResult<IDbJob>(null);

public Task<IDbJob> Claim(
Expand All @@ -33,10 +29,10 @@ public Task<IDbJob> Claim(
var job = FirstVisibleJob(now);
if (job is null) return NullJob;

_queue.Remove((job.ScheduledFor, job.JobId));
_queue.Remove((job.InvisibleUntil, job.JobId));
job.Context = worker;
job.Attempt++;

return Task.FromResult<IDbJob>(job);
}
}
Expand All @@ -48,10 +44,7 @@ private MemoryJob FirstVisibleJob(DateTime now)
{
if (time > now) break; // no more candidates

var job = _jobs[id];
if (job.InvisibleUntil > now) continue; // hidden, try next

return job; // that's the one
return _jobs[id]; // that's the one
}

return null;
Expand All @@ -63,7 +56,6 @@ public Task<bool> KeepClaim(
lock (_lock)
return Task.FromResult(job.Context as Guid? == worker);
}


public Task Complete(Guid worker, IDbJob job, DateTime now)
{
Expand Down Expand Up @@ -93,7 +85,7 @@ public Task Retry(Guid worker, IDbJob job, DateTime when)
var entry = _jobs[jobId];
entry.InvisibleUntil = when;
entry.Context = null;
_queue.Add((entry.ScheduledFor, entry.JobId));
_queue.Add((entry.InvisibleUntil, entry.JobId));
}

return Task.CompletedTask;
Expand All @@ -112,7 +104,7 @@ public Task<Guid> Schedule(object payload, DateTime when)
Attempt = 0,
};
_jobs[guid] = job;
_queue.Add((job.ScheduledFor, guid));
_queue.Add((job.InvisibleUntil, guid));

return Task.FromResult(job.JobId);
}
Expand Down
1 change: 1 addition & 0 deletions src/K4os.Xpovoc.Quarterback/K4os.Xpovoc.Quarterback.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@
<ItemGroup>
<ProjectReference Include="..\K4os.Xpovoc.Abstractions\K4os.Xpovoc.Abstractions.csproj" />
</ItemGroup>
<Import Project="..\..\Common.targets" />
<Import Project="..\..\.paket\Paket.Restore.targets" />
</Project>
2 changes: 1 addition & 1 deletion src/Playground/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ private static async Task Producer(CancellationToken token, IJobScheduler schedu

private static void ConsumeOne(object payload)
{
Thread.Sleep(1000);
// Thread.Sleep(1000);
var guid = (Guid) payload;
var result = Guids.TryAdd(guid, null);
if (!result)
Expand Down

0 comments on commit 5de4c59

Please sign in to comment.