Skip to content

Commit

Permalink
Merge pull request #7 from sceneskope/dev
Browse files Browse the repository at this point in the history
Merge latest updates
  • Loading branch information
nrandell authored May 17, 2018
2 parents 34a2e53 + d1fc7e8 commit 082f99b
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 60 deletions.
6 changes: 3 additions & 3 deletions directory.build.props
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@
<PackageLicenseUrl>https://opensource.org/licenses/MIT</PackageLicenseUrl>
<PackageRequireLicenseAcceptance>true</PackageRequireLicenseAcceptance>
<LangVersion>latest</LangVersion>
<VersionPrefix>3.0.0</VersionPrefix>
<VersionPrefix>4.1.0</VersionPrefix>
<DebugType>embedded</DebugType>
</PropertyGroup>
<PropertyGroup Condition="'$(IsTestProject)' != 'true' ">
<TargetFrameworks>netstandard2.0;net461</TargetFrameworks>
<RuntimeIdentifier>win7-x64</RuntimeIdentifier>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Roslynator.Analyzers" Version="1.7.0" PrivateAssets="All" />
<PackageReference Include="Roslynator.Analyzers" Version="1.8.0" PrivateAssets="All" />
</ItemGroup>
<ItemGroup Condition="'$(IsTestProject)' != 'true' and '$(IsPackable)' != 'false'">
<PackageReference Include="SourceLink.Create.CommandLine" Version="2.8.0" PrivateAssets="All" />
<PackageReference Include="SourceLink.Create.CommandLine" Version="2.8.1" PrivateAssets="All" />
</ItemGroup>
</Project>
77 changes: 42 additions & 35 deletions src/SceneSkope.ServiceFabric.EventHubs/BaseEventHubService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public abstract class BaseEventHubService : SerilogStatefulService

public string ConfigurationSectionName { get; set; } = "EventHubSource";

protected string DefaultPosition { get; set; } = PartitionReceiver.EndOfStream;
protected EventPosition DefaultPosition { get; set; } = EventPosition.FromEnd();
protected TimeSpan? DefaultAge { get; set; }
protected bool UseEpochReceiver { get; set; } = true;

Expand Down Expand Up @@ -135,87 +135,94 @@ private async Task<string> ReadOffsetAsync(string partition, IReliableDictionary
}
}

private PartitionReceiver CreateReceiver(string partition, string offset)
private PartitionReceiver CreateReceiver(ILogger log, string partition, string offset)
{
var epoch = DateTime.UtcNow.Ticks;
if (string.IsNullOrWhiteSpace(offset))
{
if (DefaultAge.HasValue)
{
var timestamp = DateTime.UtcNow.Subtract(DefaultAge.Value);
var position = EventPosition.FromEnqueuedTime(timestamp);
if (UseEpochReceiver)
{
Log.Information("Creating epoch {Epoch} receiver for {Consumer}#{Partition} from time {Timestamp}",
log.Information("Creating epoch {Epoch} receiver for {Consumer}#{Partition} from time {Timestamp}",
epoch, _consumerGroup, partition, timestamp);
return _client.CreateEpochReceiver(_consumerGroup, partition, timestamp, epoch);
return _client.CreateEpochReceiver(_consumerGroup, partition, position, epoch);
}
else
{
Log.Information("Creating receiver for {Consumer}#{Partition} from time {Timestamp}",
log.Information("Creating receiver for {Consumer}#{Partition} from time {Timestamp}",
_consumerGroup, partition, timestamp);
return _client.CreateReceiver(_consumerGroup, partition, timestamp);
return _client.CreateReceiver(_consumerGroup, partition, position);
}
}
else
{
var offsetToUse = string.IsNullOrWhiteSpace(offset) ? DefaultPosition : offset;
var offsetInclusive = offset == PartitionReceiver.StartOfStream;
var position = DefaultPosition;
if (UseEpochReceiver)
{
Log.Information("Creating epoch {Epoch} receiver for {Consumer}#{Partition} from offset {Offset} {Inclusive}",
epoch, _consumerGroup, partition, offsetToUse, offsetInclusive);
return _client.CreateEpochReceiver(_consumerGroup, partition, offsetToUse, offsetInclusive, DateTime.UtcNow.Ticks);
log.Information("Creating epoch {Epoch} receiver for {Consumer}#{Partition} from position {Position}",
epoch, _consumerGroup, partition, position);
return _client.CreateEpochReceiver(_consumerGroup, partition, position, epoch);
}
else
{
Log.Information("Creating receiver for {Consumer}#{Partition} from offset {Offset} {Inclusive}",
_consumerGroup, partition, offsetToUse, offsetInclusive);
return _client.CreateReceiver(_consumerGroup, partition, offsetToUse, offsetInclusive);
log.Information("Creating receiver for {Consumer}#{Partition} from offset {Position}",
_consumerGroup, partition, position);
return _client.CreateReceiver(_consumerGroup, partition, position);
}
}
}
else
{
var position = EventPosition.FromOffset(offset);
if (UseEpochReceiver)
{
Log.Information("Creating epoch {Epoch} receiver for {Consumer}#{Partition} from saved offset {Offset}",
log.Information("Creating epoch {Epoch} receiver for {Consumer}#{Partition} from saved offset {Offset}",
epoch, _consumerGroup, partition, offset);
return _client.CreateEpochReceiver(_consumerGroup, partition, offset, false, DateTime.UtcNow.Ticks);
return _client.CreateEpochReceiver(_consumerGroup, partition, position, epoch);
}
else
{
Log.Information("Creating receiver for {Consumer}#{Partition} from saved offset {Offset}",
log.Information("Creating receiver for {Consumer}#{Partition} from saved offset {Offset}",
_consumerGroup, partition, offset);
return _client.CreateReceiver(_consumerGroup, partition, offset, false);
return _client.CreateReceiver(_consumerGroup, partition, position);
}
}
}

private async Task ProcessPartitionAsync(string partition, IReliableDictionary<string, string> offsets, CancellationToken ct)
{
var log = Log.ForContext("partition", partition);
var offset = await ReadOffsetAsync(partition, offsets).ConfigureAwait(false);
var receiver = CreateReceiver(partition, offset);
try
while (!ct.IsCancellationRequested)
{
var handler = CreateReadingReceiver(Log, StateManager, offsets, partition, ct);
await handler.InitialiseAsync().ConfigureAwait(false);
receiver.SetReceiveHandler(handler);
var receiver = CreateReceiver(log, partition, offset);
log.Information("Receiver for {Partition} is {Identifier}", partition, receiver.Identifier);
receiver.RetryPolicy = RetryPolicy.NoRetry;
await Task.Delay(Timeout.Infinite, ct).ConfigureAwait(false);
}
catch (Exception ex)
{
ct.ThrowIfCancellationRequested();
Log.Error(ex, "Error processing partition: {Exception}", ex.Message);
}
finally
{
Log.Information("Finished processing partition iteration {Partition}", partition);
await receiver.CloseAsync().ConfigureAwait(false);
try
{
var handler = CreateReadingReceiver(log, StateManager, receiver, offsets, partition, ct);
await handler.InitialiseAsync().ConfigureAwait(false);
receiver.SetReceiveHandler(handler);
await handler.WaitForFinishedAsync(ct).ConfigureAwait(false);
}
catch (Exception ex)
{
ct.ThrowIfCancellationRequested();
log.Error(ex, "Error processing partition {Partition}: {Exception}", partition, ex.Message);
}
finally
{
log.Information("Finished processing partition for {Partition}", partition);
await receiver.CloseAsync().ConfigureAwait(false);
}
await Task.Delay(1000, ct).ConfigureAwait(false);
}
}

protected abstract IReadingReceiver CreateReadingReceiver(ILogger log, IReliableStateManager stateManager,
IReliableDictionary<string, string> offsets, string partition, CancellationToken ct);
PartitionReceiver receiver, IReliableDictionary<string, string> offsets, string partition, CancellationToken ct);
}
}
29 changes: 21 additions & 8 deletions src/SceneSkope.ServiceFabric.EventHubs/BaseReadingReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,28 @@ public abstract class BaseReadingReceiver : IReadingReceiver

private readonly IReliableDictionary<string, string> _offsets;
protected readonly string _partition;
protected CancellationToken CancellationToken { get; }
public CancellationTokenSource TokenSource { get; }

public virtual int MaxBatchSize => 100;
public int MaxBatchSize { get; set; } = 100;
protected Policy TimeoutPolicy { get; }

public PartitionReceiver Receiver { get; }

protected BaseReadingReceiver(ILogger log, IReliableStateManager stateManager,
PartitionReceiver receiver,
IReliableDictionary<string, string> offsets, string partition,
CancellationToken ct)
{
Log = log.ForContext("partition", partition);
Log = log;
StateManager = stateManager;
Receiver = receiver;
_offsets = offsets;
_partition = partition;
CancellationToken = ct;
TokenSource = CancellationTokenSource.CreateLinkedTokenSource(ct);
TimeoutPolicy =
Policy
#pragma warning disable RCS1163 // Unused parameter.
.Handle<TimeoutException>(_ => !CancellationToken.IsCancellationRequested)
.Handle<TimeoutException>(_ => !TokenSource.IsCancellationRequested)
#pragma warning restore RCS1163 // Unused parameter.
.WaitAndRetryForeverAsync(n => TimeSpan.FromMilliseconds((n < 10) ? n * 100 : 1000),
(ex, ts) => Log.Warning(ex, "Delaying {Ts} due to {Exception}", ts, ex.Message));
Expand All @@ -46,6 +50,15 @@ protected BaseReadingReceiver(ILogger log, IReliableStateManager stateManager,
public virtual Task ProcessErrorAsync(Exception error)
{
Log.Error(error, "Error reading: {Exception}", error.Message);
switch (error)
{
case ReceiverDisconnectedException _:
case OperationCanceledException _:
case EventHubsException _:
Log.Information("Cancelling the receiver");
TokenSource.Cancel();
break;
}
return Task.CompletedTask;
}

Expand All @@ -64,7 +77,6 @@ await TimeoutPolicy.ExecuteAsync(async _ =>
#pragma warning restore RCS1163 // Unused parameter.
{
var count = events.Count();
Log.Verbose("Got {Count} events to process", count);
using (var tx = StateManager.CreateTransaction())
{
foreach (var @event in events)
Expand All @@ -75,8 +87,9 @@ await TimeoutPolicy.ExecuteAsync(async _ =>
await _offsets.SetAsync(tx, _partition, lastOffset).ConfigureAwait(false);
await tx.CommitAsync().ConfigureAwait(false);
}
Log.Verbose("Processed {Count} events", count);
}, CancellationToken, false).ConfigureAwait(false);
}, TokenSource.Token, false).ConfigureAwait(false);
}

public virtual Task WaitForFinishedAsync(CancellationToken ct) => Task.Delay(Timeout.Infinite, TokenSource.Token);
}
}
4 changes: 3 additions & 1 deletion src/SceneSkope.ServiceFabric.EventHubs/IReadingReceiver.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
using System.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.EventHubs;

namespace SceneSkope.ServiceFabric.EventHubs
{
public interface IReadingReceiver : IPartitionReceiveHandler
{
Task InitialiseAsync();
Task WaitForFinishedAsync(CancellationToken ct);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Polly" Version="5.8.0" />
<PackageReference Include="ServiceFabric.Serilog" Version="4.0.0" />
<PackageReference Include="ServiceFabric.Utilities" Version="4.0.0" />
<PackageReference Include="Microsoft.Azure.EventHubs" Version="1.1.0" />
<PackageReference Include="Polly" Version="6.0.1" />
<PackageReference Include="ServiceFabric.Serilog" Version="4.1.0-dev-00150" />
<PackageReference Include="ServiceFabric.Utilities" Version="4.1.0-dev-00150" />
<PackageReference Include="Microsoft.Azure.EventHubs" Version="2.0.0" />
</ItemGroup>

</Project>
28 changes: 19 additions & 9 deletions src/SceneSkope.ServiceFabric.EventHubs/SimpleReadingReceiver.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.EventHubs;
Expand All @@ -17,23 +16,35 @@ public abstract class SimpleReadingReceiver : IReadingReceiver
private readonly IReliableDictionary<string, string> _offsets;
protected readonly string _partition;

public virtual int MaxBatchSize => 100;
public int MaxBatchSize { get; set; } = 100;

public CancellationToken CancellationToken { get; }
public PartitionReceiver Receiver { get; }

protected SimpleReadingReceiver(ILogger log, IReliableDictionary<string, string> offsets, string partition, CancellationToken ct)
public CancellationTokenSource TokenSource { get; }

protected SimpleReadingReceiver(ILogger log, PartitionReceiver receiver, IReliableDictionary<string, string> offsets, string partition, CancellationToken ct)
{
Log = log.ForContext("partition", partition);
Log = log;
Receiver = receiver;
_offsets = offsets;
_partition = partition;
CancellationToken = ct;
TokenSource = CancellationTokenSource.CreateLinkedTokenSource(ct);
}

public virtual Task InitialiseAsync() => Task.CompletedTask;

public virtual Task ProcessErrorAsync(Exception error)
{
Log.Error(error, "Error reading: {Exception}", error.Message);
switch (error)
{
case ReceiverDisconnectedException _:
case OperationCanceledException _:
case EventHubsException _:
Log.Information("Cancelling the receiver");
TokenSource.Cancel();
break;
}
return Task.CompletedTask;
}

Expand All @@ -46,20 +57,19 @@ public async Task ProcessEventsAsync(IEnumerable<EventData> events)
return;
}

var count = Log.IsEnabled(Serilog.Events.LogEventLevel.Verbose) ? events.Count() : 0;
Log.Verbose("Got {Count} events to process", count);
string latestOffset = null;
foreach (var @event in events)
{
await ProcessEventAsync(@event).ConfigureAwait(false);
latestOffset = @event.SystemProperties.Offset;
}
await OnAllEventsProcessedAsync(latestOffset).ConfigureAwait(false);
Log.Verbose("Processed {Count} events", count);
}

protected virtual Task OnAllEventsProcessedAsync(string latestOffset) => Task.CompletedTask;

protected Task SaveOffsetAsync(ITransaction tx, string latestOffset) => _offsets.SetAsync(tx, _partition, latestOffset);

public Task WaitForFinishedAsync(CancellationToken ct) => Task.Delay(Timeout.Infinite, TokenSource.Token);
}
}

0 comments on commit 082f99b

Please sign in to comment.