Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #320, Allows retrieving and listening configurations of different namespaces. #326

Open
wants to merge 5 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<NoWarn>$(NoWarn),1573,1591,1712</NoWarn>
<NoWarn>$(NoWarn),1573,1591,1712,NU1903</NoWarn>
<CodeAnalysisRuleSet>..\..\_stylecop\codeanalysis.ruleset</CodeAnalysisRuleSet>
</PropertyGroup>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,10 @@ public class ConfigListener
/// Configuration group
/// </summary>
public string Group { get; set; }

/// <summary>
/// Configuration namespace
/// </summary>
public string Namespace { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using global::Microsoft.Extensions.Configuration;
using global::Microsoft.Extensions.Logging;
using Nacos.V2;
using Nacos.V2.Utils;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
Expand Down Expand Up @@ -39,11 +40,13 @@ public NacosV2ConfigurationProvider(NacosV2ConfigurationSource configurationSour

foreach (var item in configurationSource.Listeners)
{
var listener = new MsConfigListener(item.DataId, item.Group, item.Optional, this, _logger);
var listener = new MsConfigListener(item, this, _logger);

tasks.Add(_client.AddListener(item.DataId, item.Group, listener));
tasks.Add(item.Namespace.IsNullOrWhiteSpace()
? _client.AddListener(item.DataId, item.Group, listener)
: _client.AddListener(item.DataId, item.Group, item.Namespace, listener));

_listenerDict.Add($"{item.DataId}#{item.Group}", listener);
_listenerDict.Add($"{item.DataId}#{item.Group}#{item.Namespace}", listener);
}

Task.WaitAll(tasks.ToArray());
Expand All @@ -66,8 +69,11 @@ public void Dispose()
var arr = item.Key.Split('#');
var dataId = arr[0];
var group = arr[1];
var tenant = arr[2];

tasks.Add(_client.RemoveListener(dataId, group, item.Value));
tasks.Add(tenant.IsNullOrWhiteSpace()
? _client.RemoveListener(dataId, group, item.Value)
: _client.RemoveListener(dataId, group, tenant, item.Value));
}

Task.WaitAll(tasks.ToArray());
Expand All @@ -87,10 +93,12 @@ public override void Load()
{
try
{
var config = _client.GetConfig(listener.DataId, listener.Group, 3000)
var config = (listener.Namespace.IsNullOrWhiteSpace()
? _client.GetConfig(listener.DataId, listener.Group, 3000)
: _client.GetConfig(listener.DataId, listener.Group, listener.Namespace, 3000))
.ConfigureAwait(false).GetAwaiter().GetResult();

_configDict.AddOrUpdate($"{_configurationSource.GetNamespace()}#{listener.Group}#{listener.DataId}", config, (x, y) => config);
_configDict.AddOrUpdate(GetCacheKey(listener), config, (x, y) => config);

var data = _parser.Parse(config);

Expand Down Expand Up @@ -123,6 +131,9 @@ public override void Load()
}
}

private string GetCacheKey(ConfigListener listener)
=> $"{(listener.Namespace.IsNullOrWhiteSpace() ? _configurationSource.GetNamespace() : listener.Namespace)}#{listener.Group}#{listener.DataId}";

// for test
internal void SetListener(string key, MsConfigListener listener)
{
Expand All @@ -131,21 +142,17 @@ internal void SetListener(string key, MsConfigListener listener)

internal class MsConfigListener : IListener
{
private string _dataId;
private string _group;
private bool _optional;
private NacosV2ConfigurationProvider _provider;
private string _key;
private ILogger _logger;

internal MsConfigListener(string dataId, string group, bool optional, NacosV2ConfigurationProvider provider, ILogger logger)
internal MsConfigListener(ConfigListener listener, NacosV2ConfigurationProvider provider, ILogger logger)
{
this._dataId = dataId;
this._group = group;
this._optional = optional;
this._optional = listener.Optional;
this._provider = provider;
this._logger = logger;
_key = $"{provider._configurationSource.GetNamespace()}#{_group}#{_dataId}";
_key = provider.GetCacheKey(listener);
}


Expand All @@ -160,7 +167,7 @@ public void ReceiveConfigInfo(string configInfo)

foreach (var listener in _provider._configurationSource.Listeners)
{
var key = $"{_provider._configurationSource.GetNamespace()}#{listener.Group}#{listener.DataId}";
var key = _provider.GetCacheKey(listener);

if (!_provider._configDict.TryGetValue(key, out var config))
{
Expand Down
12 changes: 8 additions & 4 deletions src/Nacos/V2/Config/Impl/ClientWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ public ClientWorker(ILogger logger, ConfigFilterChainManager configFilterChainMa
: new ConfigHttpTransportClient(logger, options, serverListManager, _cacheMap);
}

public async Task AddTenantListeners(string dataId, string group, List<IListener> listeners)
public Task AddTenantListeners(string dataId, string group, List<IListener> listeners)
=> AddTenantListeners(dataId, group, _agent.GetTenant(), listeners);

public async Task AddTenantListeners(string dataId, string group, string tenant, List<IListener> listeners)
{
group = ParamUtils.Null2DefaultGroup(group);
string tenant = _agent.GetTenant();

CacheData cache = AddCacheDataIfAbsent(dataId, group, tenant);
foreach (var listener in listeners)
Expand Down Expand Up @@ -68,10 +70,12 @@ internal async Task AddTenantListenersWithContent(string dataId, string group, s
}
}

public async Task RemoveTenantListener(string dataId, string group, IListener listener)
public Task RemoveTenantListener(string dataId, string group, IListener listener)
=> RemoveTenantListener(dataId, group, _agent.GetTenant(), listener);

public async Task RemoveTenantListener(string dataId, string group, string tenant, IListener listener)
{
group = ParamUtils.Null2DefaultGroup(group);
string tenant = _agent.GetTenant();

CacheData cache = GetCache(dataId, group, tenant);
if (cache != null)
Expand Down
14 changes: 14 additions & 0 deletions src/Nacos/V2/Config/NacosConfigService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using Nacos.V2.Config.Impl;
using Nacos.V2.Config.Utils;
using Nacos.V2.Exceptions;
using Nacos.V2.Utils;
using System.Collections.Generic;
using System.Threading.Tasks;

Expand All @@ -28,9 +29,17 @@ public NacosConfigService(ILoggerFactory loggerFactory, IOptions<NacosSdkOptions
public Task AddListener(string dataId, string group, IListener listener)
=> _worker.AddTenantListeners(dataId, group, new List<IListener> { listener });

public Task AddListener(string dataId, string group, string tenant, IListener listener)
=> tenant.IsNullOrWhiteSpace()
? _worker.AddTenantListeners(dataId, group, new List<IListener> { listener })
: _worker.AddTenantListeners(dataId, group, tenant, new List<IListener> { listener });

public async Task<string> GetConfig(string dataId, string group, long timeoutMs)
=> await GetConfigInner(_namespace, dataId, group, timeoutMs).ConfigureAwait(false);

public Task<string> GetConfig(string dataId, string group, string tenant, long timeoutMs)
=> GetConfigInner(tenant.IsNullOrWhiteSpace() ? _namespace : tenant, dataId, group, timeoutMs);

public async Task<string> GetConfigAndSignListener(string dataId, string group, long timeoutMs, IListener listener)
{
string content = await GetConfig(dataId, group, timeoutMs).ConfigureAwait(false);
Expand Down Expand Up @@ -59,6 +68,11 @@ public async Task<bool> RemoveConfig(string dataId, string group)
public Task RemoveListener(string dataId, string group, IListener listener)
=> _worker.RemoveTenantListener(dataId, group, listener);

public Task RemoveListener(string dataId, string group, string tenant, IListener listener)
=> tenant.IsNullOrWhiteSpace()
? _worker.RemoveTenantListener(dataId, group, listener)
: _worker.RemoveTenantListener(dataId, group, tenant, listener);

public Task ShutDown() => Task.CompletedTask;

private async Task<string> GetConfigInner(string tenant, string dataId, string group, long timeoutMs)
Expand Down
33 changes: 32 additions & 1 deletion src/Nacos/V2/INacosConfigService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,16 @@ public interface INacosConfigService
/// <returns>config value</returns>
Task<string> GetConfig(string dataId, string group, long timeoutMs);

/// <summary>
/// Get config.
/// </summary>
/// <param name="dataId">dataId</param>
/// <param name="group">group</param>
/// <param name="tenant">namespace</param>
/// <param name="timeoutMs">read timeout</param>
/// <returns>config value</returns>
Task<string> GetConfig(string dataId, string group, string tenant, long timeoutMs);

/// <summary>
/// Get config and register Listener.
/// If you want to pull it yourself when the program starts to get the configuration for the first time, and the
Expand All @@ -38,6 +48,18 @@ public interface INacosConfigService
/// <param name="listener">Listener</param>
Task AddListener(string dataId, string group, IListener listener);

/// <summary>
/// Add a listener to the configuration, after the server modified the configuration, the client will use the
/// incoming listener callback. Recommended asynchronous processing, the application can implement the getExecutor
/// method in the ManagerListener, provide a thread pool of execution. If provided, use the main thread callback, May
/// block other configurations or be blocked by other configurations.
/// </summary>
/// <param name="dataId">dataId</param>
/// <param name="group">group</param>
/// <param name="tenant">namespace</param>
/// <param name="listener">Listener</param>
Task AddListener(string dataId, string group, string tenant, IListener listener);

/// <summary>
/// Publish config.
/// </summary>
Expand Down Expand Up @@ -94,6 +116,15 @@ public interface INacosConfigService
/// <param name="listener">listener</param>
Task RemoveListener(string dataId, string group, IListener listener);

/// <summary>
/// Remove listener.
/// </summary>
/// <param name="dataId">dataId</param>
/// <param name="group">group</param>
/// <param name="tenant">tenant</param>
/// <param name="listener">listener</param>
Task RemoveListener(string dataId, string group, string tenant, IListener listener);

/// <summary>
/// Get server status.
/// </summary>
Expand All @@ -105,4 +136,4 @@ public interface INacosConfigService
/// </summary>
Task ShutDown();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,38 @@ public void Init_Should_ThrowException_When_Listeners_Is_Empty()
});
}

[Fact]
public void Init_Should_Call_AddListener()
{
_mockSvc.Setup(x => x.AddListener("d1", "g", It.IsAny<IListener>())).Returns(Task.CompletedTask);
_mockSvc.Setup(x => x.AddListener("d2", "g", "ns", It.IsAny<IListener>())).Returns(Task.CompletedTask);

var cs = new NacosV2ConfigurationSource(null, null)
{
Namespace = "cs",
Listeners = new System.Collections.Generic.List<ConfigListener>
{
new ConfigListener { DataId = "d1", Group = "g" },
new ConfigListener { DataId = "d2", Group = "g", Namespace = "ns" }
},
NacosConfigurationParser = DefaultJsonConfigurationStringParser.Instance
};

var provider = new NacosV2ConfigurationProvider(cs, _mockSvc.Object, null);

_mockSvc.Verify(x => x.AddListener("d1", "g", It.IsAny<IListener>()), Times.Once);
_mockSvc.Verify(x => x.AddListener("d2", "g", "ns", It.IsAny<IListener>()), Times.Once);
}

[Fact]
public void Dispose_Should_Call_RemoveListener()
{
var provider = GetProviderForMultiListeners();
_mockSvc.Setup(x => x.RemoveListener("d1", "g", It.IsAny<IListener>())).Returns(Task.CompletedTask);
_mockSvc.Setup(x => x.RemoveListener("d2", "g", "ns", It.IsAny<IListener>())).Returns(Task.CompletedTask);
provider.Dispose();
_mockSvc.Verify(x => x.RemoveListener("d1", "g", It.IsAny<IListener>()), Times.Once);
_mockSvc.Verify(x => x.RemoveListener("d2", "g", "ns", It.IsAny<IListener>()), Times.Once);
}

[Fact]
Expand Down Expand Up @@ -80,11 +105,11 @@ public void MsConfigListener_Should_Not_Overwrite_When_Contains_Same_Key_And_Rec
var provider = GetProviderForMultiListeners();
provider.Load();

MsConfigListener l1 = new MsConfigListener("d1", "g", false, provider, null);
MsConfigListener l2 = new MsConfigListener("d2", "g", false, provider, null);
MsConfigListener l1 = new MsConfigListener(new ConfigListener() { DataId = "d1", Group = "g" }, provider, null);
MsConfigListener l2 = new MsConfigListener(new ConfigListener() { DataId = "d2", Group = "g", Namespace = "ns" }, provider, null);

provider.SetListener("d1#g", l1);
provider.SetListener("d2#g", l2);
provider.SetListener("cs#d1#g", l1);
provider.SetListener("ns#d2#g", l2);

l1.ReceiveConfigInfo(new { all = "d1_1" }.ToJsonString());

Expand All @@ -99,11 +124,11 @@ public void MsConfigListener_Should_Not_Overwrite_When_Contains_Same_Key_And_Rec
var provider = GetProviderForMultiListeners();
provider.Load();

MsConfigListener l1 = new MsConfigListener("d1", "g", false, provider, null);
MsConfigListener l2 = new MsConfigListener("d2", "g", false, provider, null);
MsConfigListener l1 = new MsConfigListener(new ConfigListener() { DataId = "d1", Group = "g" }, provider, null);
MsConfigListener l2 = new MsConfigListener(new ConfigListener() { DataId = "d2", Group = "g", Namespace = "ns" }, provider, null);

provider.SetListener("d1#g", l1);
provider.SetListener("d2#g", l2);
provider.SetListener("cs#d1#g", l1);
provider.SetListener("ns#d2#g", l2);

l2.ReceiveConfigInfo(new { all = "d2_1" }.ToJsonString());

Expand All @@ -115,15 +140,15 @@ public void MsConfigListener_Should_Not_Overwrite_When_Contains_Same_Key_And_Rec
private NacosV2ConfigurationProvider GetProviderForMultiListeners()
{
_mockSvc.Setup(x => x.GetConfig("d1", "g", 3000)).ReturnsAsync(new { all = "d1" }.ToJsonString());
_mockSvc.Setup(x => x.GetConfig("d2", "g", 3000)).ReturnsAsync(new { all = "d2" }.ToJsonString());
_mockSvc.Setup(x => x.GetConfig("d2", "g", "ns", 3000)).ReturnsAsync(new { all = "d2" }.ToJsonString());

var cs = new NacosV2ConfigurationSource(null, null)
{
Namespace = "cs",
Listeners = new System.Collections.Generic.List<ConfigListener>
{
new ConfigListener { DataId = "d1", Group = "g" },
new ConfigListener { DataId = "d2", Group = "g" }
new ConfigListener { DataId = "d2", Group = "g", Namespace = "ns" }
},
NacosConfigurationParser = DefaultJsonConfigurationStringParser.Instance
};
Expand Down
Loading