Skip to content

Commit

Permalink
Fixing bug with getting a service instance (#57)
Browse files Browse the repository at this point in the history
* Fixing bug with getting a service instance

* Fixing small issues

* Reduced the slow path when we have a list of services already, no need to await at all for that

* Version
  • Loading branch information
Drawaes authored and deanshackley committed Mar 8, 2017
1 parent 56ed212 commit 11a4ffe
Show file tree
Hide file tree
Showing 12 changed files with 223 additions and 167 deletions.
288 changes: 144 additions & 144 deletions CondenserDotNet.sln

Large diffs are not rendered by default.

13 changes: 10 additions & 3 deletions src/CondenserDotNet.Client/Services/BlockingWatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ internal class BlockingWatcher<T> where T : class
private readonly Action<T> _onNew;
private T _instances;
private WatcherState _state = WatcherState.NotInitialized;
private static int s_getServiceDelay = 2000;

public BlockingWatcher(Func<string, Task<HttpResponseMessage>> client, Action<T> onNew = null)
{
Expand All @@ -23,11 +24,17 @@ public BlockingWatcher(Func<string, Task<HttpResponseMessage>> client, Action<T>

public async Task<T> ReadAsync()
{
if (!await _haveFirstResults.WaitAsync())
T instances = Volatile.Read(ref _instances);
if (instances == null)
{
return null;
var delayTask = Task.Delay(s_getServiceDelay);
var taskThatFinished = await Task.WhenAny(delayTask, _haveFirstResults.WaitAsync());
if (delayTask == taskThatFinished)
{
throw new System.Net.Sockets.SocketException();
}
instances = Volatile.Read(ref _instances);
}
T instances = Volatile.Read(ref _instances);
return instances;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace CondenserDotNet.Client.Services
{
public class NoServiceInstanceFoundException:Exception
{
public NoServiceInstanceFoundException(string serviceName, Exception innerException)
:base($"Unable to find an instance of the service {serviceName}", innerException)
{
ServiceName = serviceName;
}

public string ServiceName { get;}

public override string ToString()
{
return Message;
}
}
}
9 changes: 6 additions & 3 deletions src/CondenserDotNet.Client/Services/ServiceRegistry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using CondenserDotNet.Core.Routing;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using Microsoft.Extensions.Logging;

namespace CondenserDotNet.Client.Services
{
Expand All @@ -16,9 +17,11 @@ public class ServiceRegistry : IServiceRegistry, IDisposable
private readonly HttpClient _client;
private readonly CancellationTokenSource _cancel = new CancellationTokenSource();
private readonly Dictionary<string, ServiceWatcher> _watchedServices = new Dictionary<string, ServiceWatcher>(StringComparer.OrdinalIgnoreCase);

public ServiceRegistry(Func<HttpClient> httpClientFactory = null)
private readonly ILogger _logger;

public ServiceRegistry(Func<HttpClient> httpClientFactory = null, ILoggerFactory loggerFactory = null)
{
_logger = loggerFactory?.CreateLogger<ServiceRegistry>();
_client = httpClientFactory?.Invoke() ?? new HttpClient() { BaseAddress = new Uri("http://localhost:8500") };
}

Expand Down Expand Up @@ -48,7 +51,7 @@ public Task<InformationService> GetServiceInstanceAsync(string serviceName)
if (!_watchedServices.TryGetValue(serviceName, out watcher))
{
watcher = new ServiceWatcher(serviceName, _client, _cancel.Token,
new RandomRoutingStrategy<InformationServiceSet>());
new RandomRoutingStrategy<InformationServiceSet>(), _logger);
_watchedServices.Add(serviceName, watcher);
}
}
Expand Down
19 changes: 16 additions & 3 deletions src/CondenserDotNet.Client/Services/ServiceWatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using CondenserDotNet.Core;
using CondenserDotNet.Core.DataContracts;
using CondenserDotNet.Core.Routing;
using Microsoft.Extensions.Logging;

namespace CondenserDotNet.Client.Services
{
Expand All @@ -15,10 +16,14 @@ internal class ServiceWatcher
private readonly BlockingWatcher<List<InformationServiceSet>> _watcher;
private readonly IRoutingStrategy<InformationServiceSet> _routingStrategy;
private readonly Task _watcherTask;
private readonly ILogger _logger;
private readonly string _serviceName;

internal ServiceWatcher(string serviceName, HttpClient client, CancellationToken cancel,
IRoutingStrategy<InformationServiceSet> routingStrategy)
IRoutingStrategy<InformationServiceSet> routingStrategy, ILogger logger)
{
_serviceName = serviceName;
_logger = logger;
_routingStrategy = routingStrategy;
string lookupUrl = $"{HttpUtils.ServiceHealthUrl}{serviceName}?passing&index=";
Func<string,Task<HttpResponseMessage>> action =
Expand All @@ -29,8 +34,16 @@ internal ServiceWatcher(string serviceName, HttpClient client, CancellationToken

internal async Task<InformationService> GetNextServiceInstanceAsync()
{
var instances = await _watcher.ReadAsync();
return _routingStrategy.RouteTo(instances)?.Service;
try
{
var instances = await _watcher.ReadAsync();
return _routingStrategy.RouteTo(instances)?.Service;
}
catch(Exception ex)
{
_logger?.LogError("Unable to get an instance of {serviceName} the error was {excception}",_serviceName, ex);
throw new NoServiceInstanceFoundException(_serviceName,ex);
}
}
}
}
6 changes: 2 additions & 4 deletions src/CondenserDotNet.Configuration/Consul/ConsulRegistry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,7 @@ public string this[string key]
{
get
{
string returnValue;
if (TryGetValue(key, out returnValue))
if (TryGetValue(key, out string returnValue))
{
return returnValue;
}
Expand Down Expand Up @@ -211,8 +210,7 @@ public void AddWatchOnSingleKey(string keyToWatch, Action<string> callback)
{
lock (_configKeys)
{
string currentValue;
TryGetValue(keyToWatch, out currentValue);
TryGetValue(keyToWatch, out string currentValue);
_configWatchers.Add(new ConfigurationWatcher() { CallBack = callback, KeyToWatch = keyToWatch, CurrentValue = currentValue });
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using CondenserDotNet.Server;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using System.Diagnostics;

namespace CondenserDotNet.Middleware.Pipelines
{
Expand All @@ -26,7 +27,7 @@ public class ServiceWithCustomClient : IConsulService, IDisposable
private string _serviceId;
private readonly ILogger _logger;
private int _calls;
private int _totalRequestTime;
private long _totalRequestTime;
private readonly ConcurrentQueue<IPipeConnection> _pooledConnections = new ConcurrentQueue<IPipeConnection>();
private readonly PipeFactory _factory;

Expand All @@ -50,6 +51,8 @@ public ServiceWithCustomClient(CurrentState stats, ILoggerFactory loggingFactory

public async Task CallService(HttpContext context)
{
System.Threading.Interlocked.Increment(ref _calls);
var sw = Stopwatch.StartNew();
try
{
if (!_pooledConnections.TryDequeue(out IPipeConnection socket))
Expand All @@ -71,6 +74,8 @@ public async Task CallService(HttpContext context)
{
throw new NotImplementedException();
}
sw.Stop();
System.Threading.Interlocked.Add(ref _totalRequestTime, sw.ElapsedMilliseconds);
}

public override int GetHashCode()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ public TrailingHeadersMiddleware(RequestDelegate next)
public async Task Invoke(HttpContext context)
{
var trailingHeaders = new TrailingHeadersFeature(context);
var stream = new ChunkingStream();
stream.InnerStream = context.Response.Body;
var stream = new ChunkingStream()
{
InnerStream = context.Response.Body
};
context.Response.Body = stream;
context.Response.Headers["Transfer-Encoding"] = "chunked";
context.Features.Set<ITrailingHeadersFeature>(trailingHeaders);
Expand Down
5 changes: 2 additions & 3 deletions src/CondenserDotNet.Server/Routes/ChangeRoutingStrategy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ public override async Task CallService(HttpContext context)
}
var queryDictionary = QueryHelpers.ParseQuery(query.Value);

StringValues values;
if (queryDictionary.TryGetValue("strategy", out values))
if (queryDictionary.TryGetValue("strategy", out StringValues values))
{
var router = _provider.GetServices<IRoutingStrategy<IService>>()
.SingleOrDefault(x => x.Name.Equals(values[0], StringComparison.OrdinalIgnoreCase));
Expand All @@ -59,7 +58,7 @@ public override async Task CallService(HttpContext context)
else
{
await context.Response.WriteAsync("No query string args");
context.Response.StatusCode = (int) HttpStatusCode.BadRequest;
context.Response.StatusCode = (int)HttpStatusCode.BadRequest;
}
}

Expand Down
10 changes: 9 additions & 1 deletion test/Condenser.Tests.Integration/ServiceLookupFacts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,18 @@ public async Task TestRegisterAndCheckRegistered()
}
}

[Fact]
public async Task TestThatAnErrorIsReturnedWhenConsulIsNotAvailable()
{
using (var serviceRegistry = new ServiceRegistry(() => new HttpClient() { BaseAddress = new Uri( "http://localhost:7000" )}))
{
await Assert.ThrowsAsync<NoServiceInstanceFoundException>(async () => await serviceRegistry.GetServiceInstanceAsync("TestService"));
}
}

[Fact]
public async Task TestRegisterAndCheckUpdates()
{
Console.WriteLine(nameof(TestRegisterAndCheckUpdates));
var serviceName = Guid.NewGuid().ToString();
var opts = Options.Create(new ServiceManagerConfig() { ServiceName = serviceName, ServiceId = serviceName, ServicePort = 2222 });
using (var manager = new ServiceManager(opts))
Expand Down
3 changes: 1 addition & 2 deletions test/CondenserTests/RadixTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ public void TestCompression()
//The key length should be 5 long
Assert.Equal(5, tree.GetTopNode().ChildrenNodes.KeyLength);

string matchedpath;
var returnservice = tree.GetServiceFromRoute("/test1/test2/test3/test4/test5/test7",out matchedpath);
var returnservice = tree.GetServiceFromRoute("/test1/test2/test3/test4/test5/test7", out string matchedpath);
Assert.Equal("/test1/test2/test3/test4/test5".ToUpperInvariant(), matchedpath);
//Assert.Equal(returnservice.ServiceId, service.ServiceId);
}
Expand Down
2 changes: 1 addition & 1 deletion version.props
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<Project>
<PropertyGroup>
<VersionPrefix>2.0.1</VersionPrefix>
<VersionPrefix>2.0.3</VersionPrefix>
<VersionSuffix>beta</VersionSuffix>
</PropertyGroup>
</Project>

0 comments on commit 11a4ffe

Please sign in to comment.