diff --git a/src/Miningcore/Api/ApiServer.cs b/src/Miningcore/Api/ApiServer.cs index 800769832..46e2ae15a 100644 --- a/src/Miningcore/Api/ApiServer.cs +++ b/src/Miningcore/Api/ApiServer.cs @@ -52,6 +52,7 @@ SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. using Newtonsoft.Json; using Newtonsoft.Json.Serialization; using NLog; +using Prometheus; using Contract = Miningcore.Contracts.Contract; namespace Miningcore.Api @@ -120,6 +121,7 @@ public ApiServer( private ClusterConfig clusterConfig; private IWebHost webHost; private IWebHost webHostAdmin; + private IWebHost webHostMetrics; private static readonly ILogger logger = LogManager.GetCurrentClassLogger(); private static readonly Encoding encoding = new UTF8Encoding(false); @@ -130,6 +132,20 @@ public ApiServer( NullValueHandling = NullValueHandling.Ignore }; + class ApiException : Exception + { + public ApiException(string message, int? responseStatusCode = null) : base(message) + { + ResponseStatusCode = responseStatusCode; + } + + public ApiException() + { + } + + public int? ResponseStatusCode { get; } + } + private readonly ConcurrentDictionary pools = new ConcurrentDictionary(); private readonly Dictionary> requestMap; @@ -139,16 +155,15 @@ private PoolConfig GetPool(HttpContext context, Match m) { var poolId = m.Groups["poolId"]?.Value; - if (!string.IsNullOrEmpty(poolId)) - { - var pool = clusterConfig.Pools.FirstOrDefault(x => x.Id == poolId && x.Enabled); + if (string.IsNullOrEmpty(poolId)) + throw new ApiException($"Invalid pool id", 401); - if (pool != null) - return pool; - } + var pool = clusterConfig.Pools.FirstOrDefault(x => x.Id == poolId && x.Enabled); - context.Response.StatusCode = 404; - return null; + if (pool == null) + throw new ApiException($"Pool {poolId} is not known", 401); + + return pool; } private async Task SendJsonAsync(HttpContext context, object response) @@ -201,6 +216,14 @@ private async Task HandleRequest(HttpContext context) context.Response.StatusCode = 404; } + catch (ApiException ex) + { + if (ex.ResponseStatusCode.HasValue) + context.Response.StatusCode = ex.ResponseStatusCode.Value; + + await SendJsonAsync(context, ex.Message); + } + catch (Exception ex) { logger.Error(ex); @@ -284,8 +307,6 @@ private async Task GetPoolInfosAsync(HttpContext context, Match m) private async Task GetPoolInfoAsync(HttpContext context, Match m) { var pool = GetPool(context, m); - if (pool == null) - return; // load stats var stats = await cf.Run(con => statsRepo.GetLastPoolStatsAsync(con, pool.Id)); @@ -317,8 +338,6 @@ private async Task GetPoolInfoAsync(HttpContext context, Match m) private async Task GetPoolPerformanceAsync(HttpContext context, Match m) { var pool = GetPool(context, m); - if (pool == null) - return; // set range var end = clock.Now; @@ -338,8 +357,6 @@ private async Task GetPoolPerformanceAsync(HttpContext context, Match m) private async Task PagePoolMinersAsync(HttpContext context, Match m) { var pool = GetPool(context, m); - if (pool == null) - return; // set range var end = clock.Now; @@ -365,8 +382,6 @@ private async Task PagePoolMinersAsync(HttpContext context, Match m) private async Task PagePoolBlocksPagedAsync(HttpContext context, Match m) { var pool = GetPool(context, m); - if (pool == null) - return; var page = context.GetQueryParameter("page", 0); var pageSize = context.GetQueryParameter("pageSize", 20); @@ -408,8 +423,6 @@ private async Task PagePoolBlocksPagedAsync(HttpContext context, Match m) private async Task PagePoolPaymentsAsync(HttpContext context, Match m) { var pool = GetPool(context, m); - if (pool == null) - return; var page = context.GetQueryParameter("page", 0); var pageSize = context.GetQueryParameter("pageSize", 20); @@ -446,8 +459,6 @@ private async Task PagePoolPaymentsAsync(HttpContext context, Match m) private async Task GetMinerInfoAsync(HttpContext context, Match m) { var pool = GetPool(context, m); - if (pool == null) - return; var address = m.Groups["address"]?.Value; if (string.IsNullOrEmpty(address)) @@ -488,8 +499,6 @@ private async Task GetMinerInfoAsync(HttpContext context, Match m) private async Task PageMinerPaymentsAsync(HttpContext context, Match m) { var pool = GetPool(context, m); - if (pool == null) - return; var address = m.Groups["address"]?.Value; if (string.IsNullOrEmpty(address)) @@ -533,8 +542,6 @@ private async Task PageMinerPaymentsAsync(HttpContext context, Match m) private async Task PageMinerBalanceChangesAsync(HttpContext context, Match m) { var pool = GetPool(context, m); - if (pool == null) - return; var address = m.Groups["address"]?.Value; if (string.IsNullOrEmpty(address)) @@ -563,8 +570,6 @@ private async Task PageMinerBalanceChangesAsync(HttpContext context, Match m) private async Task PageMinerEarningsByDayAsync(HttpContext context, Match m) { var pool = GetPool(context, m); - if (pool == null) - return; var address = m.Groups["address"]?.Value; if (string.IsNullOrEmpty(address)) @@ -592,8 +597,6 @@ private async Task PageMinerEarningsByDayAsync(HttpContext context, Match m) private async Task GetMinerPerformanceAsync(HttpContext context, Match m) { var pool = GetPool(context, m); - if (pool == null) - return; var address = m.Groups["address"]?.Value; if (string.IsNullOrEmpty(address)) @@ -603,7 +606,7 @@ private async Task GetMinerPerformanceAsync(HttpContext context, Match m) } var mode = context.GetQueryParameter("mode", "day").ToLower(); // "day" or "month" - var result = GetMinerPerformanceInternal(mode, pool, address); + var result = await GetMinerPerformanceInternal(mode, pool, address); await SendJsonAsync(context, result); } @@ -671,7 +674,15 @@ private async Task HandleRequestAdmin(HttpContext context) context.Response.StatusCode = 404; } - catch(Exception ex) + catch (ApiException ex) + { + if (ex.ResponseStatusCode.HasValue) + context.Response.StatusCode = ex.ResponseStatusCode.Value; + + await SendJsonAsync(context, ex.Message); + } + + catch (Exception ex) { logger.Error(ex); throw; @@ -687,7 +698,10 @@ private void StartAdminApi(ClusterConfig clusterConfig) var port = clusterConfig.Api?.AdminPort ?? 4001; webHostAdmin = new WebHostBuilder() - .Configure(app => { app.Run(HandleRequestAdmin); }) + .Configure(app => + { + app.Run(HandleRequestAdmin); + }) .UseKestrel(options => { options.Listen(address, port); }) .Build(); @@ -696,6 +710,27 @@ private void StartAdminApi(ClusterConfig clusterConfig) logger.Info(() => $"Admin API Online @ {address}:{port}"); } + private void StartMetrics(ClusterConfig clusterConfig) + { + var address = clusterConfig.Api?.ListenAddress != null + ? (clusterConfig.Api.ListenAddress != "*" ? IPAddress.Parse(clusterConfig.Api.ListenAddress) : IPAddress.Any) + : IPAddress.Parse("127.0.0.1"); + + var port = clusterConfig.Api?.MetricsPort ?? 4002; + + webHostMetrics = new WebHostBuilder() + .Configure(app => + { + app.UseMetricServer(); + }) + .UseKestrel(options => { options.Listen(address, port); }) + .Build(); + + webHostMetrics.Start(); + + logger.Info(() => $"Prometheus Metrics Online @ {address}:{port}/metrics"); + } + #endregion // Admin API #region API-Surface @@ -708,6 +743,7 @@ public void Start(ClusterConfig clusterConfig) logger.Info(() => $"Launching ..."); StartApi(clusterConfig); StartAdminApi(clusterConfig); + StartMetrics(clusterConfig); } public void AttachPool(IMiningPool pool) diff --git a/src/Miningcore/Blockchain/Abstractions.cs b/src/Miningcore/Blockchain/Abstractions.cs index 16935407f..7a1f95280 100644 --- a/src/Miningcore/Blockchain/Abstractions.cs +++ b/src/Miningcore/Blockchain/Abstractions.cs @@ -27,6 +27,8 @@ public class BlockchainStats public string NetworkType { get; set; } public double NetworkHashrate { get; set; } public double NetworkDifficulty { get; set; } + public string NextNetworkTarget { get; set; } + public string NextNetworkBits { get; set; } public DateTime? LastNetworkBlockTime { get; set; } public ulong BlockHeight { get; set; } public int ConnectedPeers { get; set; } diff --git a/src/Miningcore/Blockchain/Bitcoin/BitcoinJobManager.cs b/src/Miningcore/Blockchain/Bitcoin/BitcoinJobManager.cs index 6902e66f6..ac7f8695d 100644 --- a/src/Miningcore/Blockchain/Bitcoin/BitcoinJobManager.cs +++ b/src/Miningcore/Blockchain/Bitcoin/BitcoinJobManager.cs @@ -133,6 +133,8 @@ private BitcoinJob CreateJob() BlockchainStats.LastNetworkBlockTime = clock.Now; BlockchainStats.BlockHeight = blockTemplate.Height; BlockchainStats.NetworkDifficulty = job.Difficulty; + BlockchainStats.NextNetworkTarget = blockTemplate.Target; + BlockchainStats.NextNetworkBits = blockTemplate.Bits; } else diff --git a/src/Miningcore/Blockchain/Bitcoin/BitcoinJobManagerBase.cs b/src/Miningcore/Blockchain/Bitcoin/BitcoinJobManagerBase.cs index ca406573a..ef581dde3 100644 --- a/src/Miningcore/Blockchain/Bitcoin/BitcoinJobManagerBase.cs +++ b/src/Miningcore/Blockchain/Bitcoin/BitcoinJobManagerBase.cs @@ -122,7 +122,7 @@ protected virtual void SetupJobUpdates() { logger.Info(() => $"Subscribing to ZMQ push-updates from {string.Join(", ", zmq.Values)}"); - var blockNotify = daemon.ZmqSubscribe(logger, zmq, 2) + var blockNotify = daemon.ZmqSubscribe(logger, zmq) .Select(msg => { using (msg) @@ -587,7 +587,8 @@ protected void ConfigureRewards() new RewardRecipient { Address = address, - Percentage = DevDonation.Percent + Percentage = DevDonation.Percent, + Type = "dev" } }).ToArray(); } diff --git a/src/Miningcore/Blockchain/CoinMetaData.cs b/src/Miningcore/Blockchain/CoinMetaData.cs index c02435481..790fd5c04 100644 --- a/src/Miningcore/Blockchain/CoinMetaData.cs +++ b/src/Miningcore/Blockchain/CoinMetaData.cs @@ -9,29 +9,25 @@ public class DevDonation public static readonly Dictionary Addresses = new Dictionary { { "BTC", "17QnVor1B6oK1rWnVVBrdX9gFzVkZZbhDm" }, - { "BCH", "1LJGTzNDTuTvkHpTxNSdmAEBAXAnEHDVqQ" }, + { "BCH", "qrf6uhhapq7fgkjv2ce2hcjqpk8ec2zc25et4xsphv" }, { "BCD", "1CdZ2PXisTRxyB4bkvq5oka9YjBHGU5Z36" }, { "LTC", "LTK6CWastkmBzGxgQhTTtCUjkjDA14kxzC" }, { "DOGE", "DGDuKRhBewGP1kbUz4hszNd2p6dDzWYy9Q" }, - { "NMC", "NDSLDpFEcTbuRVcWHdJyiRZThVAcb5Z79o" }, - { "DGB", "DAFtYMGVdNtqHJoBGg2xqZZwSuYAaEs2Bn" }, + { "DGB", "DEvrh1UEqm89bGJ9QTBjBonjGotKQSSBmq" }, { "ETH", "0xcb55abBfe361B12323eb952110cE33d5F28BeeE1" }, { "ETC", "0xF8cCE9CE143C68d3d4A7e6bf47006f21Cfcf93c0" }, - { "PPC", "PE8RH6HAvi8sqYg47D58TeKTjyeQFFHWR2" }, { "DASH", "XqpBAV9QCaoLnz42uF5frSSfrJTrqHoxjp" }, - { "VIA", "Vc5rJr2QdA2yo1jBoqYUAH7T59uBh2Vw5q" }, { "MONA", "MBbkeAM3VQKg474bgxJEXrtcnMg8cjHY3S" }, { "VTC", "VwDWBHzhYeuyMcHpaZ5nZryggUjHSxUKKK" }, - { "ZEC", "t1YHZHz2DGVMJiggD2P4fBQ2TAPgtLSUwZ7" }, + { "ZEC", "t1YEgm6ovXFseeFxXgFY2zXxwsScD4BbfhT" }, { "ZCL", "t1MFU1vD3YKgsK6Uh8hW7UTY8mKAV2xVqBr" }, { "ZEN", "znigQacfTvRiwD2TRhwkBHLNchQ2AZisD94" }, { "BTG", "GRao6KHQ8a4GUjAZRVbeCLfRbSkJQQaeMg" }, - { "MOON", "2QvpGimMYLyqKsczQXZjv56h6me3M8orwj" }, { "XVG", "D5xPoHLM6HPkwWSqAweECTSQirJBmRjS8i" }, { "XMR", "475YVJbPHPedudkhrcNp1wDcLMTGYusGPF5fqE7XjnragVLPdqbCHBdZg3dF4dN9hXMjjvGbykS6a77dTAQvGrpiQqHp2eH" }, { "ETN", "etnkQJwBCjmR1MfkU8D355ZWxxLMhs8miPrtKHWN4U3uUowq9ugeKccVBoEG3n13n74us5AkT8tEoTog46w4HBgn8sMuBRhm9h" }, { "RVN", "RQPJF65UoodQ2aZUkfnXoeX6gib3GNwm9u" }, - { "PGN", "PRm3ThUGfmU157NwcKzKBqWbgA2DPuFje1" }, + { "TUBE", "bxdAFKYA5sJYKM3zcn3SLaLRjsFF582VE1Uv5NChrVLm6o6UF4SdbZBZLrTBD6yEFZDzuTQGBCa8FLpX8charjxH2G3iMRX6R" }, }; } diff --git a/src/Miningcore/Blockchain/Cryptonote/CryptonoteJobManager.cs b/src/Miningcore/Blockchain/Cryptonote/CryptonoteJobManager.cs index 8d5338f69..6ff0ac032 100644 --- a/src/Miningcore/Blockchain/Cryptonote/CryptonoteJobManager.cs +++ b/src/Miningcore/Blockchain/Cryptonote/CryptonoteJobManager.cs @@ -115,6 +115,8 @@ protected async Task UpdateJob(string via = null, string json = null) BlockchainStats.LastNetworkBlockTime = clock.Now; BlockchainStats.BlockHeight = job.BlockTemplate.Height; BlockchainStats.NetworkDifficulty = job.BlockTemplate.Difficulty; + BlockchainStats.NextNetworkTarget = ""; + BlockchainStats.NextNetworkBits = ""; } return isNew; @@ -526,7 +528,8 @@ private void ConfigureRewards() new RewardRecipient { Address = address, - Percentage = DevDonation.Percent + Percentage = DevDonation.Percent, + Type = "dev" } }).ToArray(); } @@ -559,7 +562,7 @@ protected virtual void SetupJobUpdates() { logger.Info(() => $"Subscribing to ZMQ push-updates from {string.Join(", ", zmq.Values)}"); - var blockNotify = daemon.ZmqSubscribe(logger, zmq, 2) + var blockNotify = daemon.ZmqSubscribe(logger, zmq) .Select(msg => { using (msg) diff --git a/src/Miningcore/Blockchain/Equihash/EquihashJobManager.cs b/src/Miningcore/Blockchain/Equihash/EquihashJobManager.cs index 94d33de54..f15698bff 100644 --- a/src/Miningcore/Blockchain/Equihash/EquihashJobManager.cs +++ b/src/Miningcore/Blockchain/Equihash/EquihashJobManager.cs @@ -157,6 +157,8 @@ private EquihashJob CreateJob() BlockchainStats.LastNetworkBlockTime = clock.Now; BlockchainStats.BlockHeight = blockTemplate.Height; BlockchainStats.NetworkDifficulty = job.Difficulty; + BlockchainStats.NextNetworkTarget = blockTemplate.Target; + BlockchainStats.NextNetworkBits = blockTemplate.Bits; } else diff --git a/src/Miningcore/Blockchain/Ethereum/Configuration/EthereumPoolConfigExtra.cs b/src/Miningcore/Blockchain/Ethereum/Configuration/EthereumPoolConfigExtra.cs index 1781e7a23..5d75d3557 100644 --- a/src/Miningcore/Blockchain/Ethereum/Configuration/EthereumPoolConfigExtra.cs +++ b/src/Miningcore/Blockchain/Ethereum/Configuration/EthereumPoolConfigExtra.cs @@ -18,6 +18,8 @@ portions of the Software. SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ +using Miningcore.Configuration; + namespace Miningcore.Blockchain.Ethereum.Configuration { public class EthereumPoolConfigExtra @@ -36,5 +38,10 @@ public class EthereumPoolConfigExtra /// Useful to specify the real chain type when running geth /// public string ChainTypeOverride { get; set; } + + /// + /// getWork stream published via ZMQ + /// + public ZmqPubSubEndpointConfig BtStream { get; set; } } } diff --git a/src/Miningcore/Blockchain/Ethereum/EthereumJobManager.cs b/src/Miningcore/Blockchain/Ethereum/EthereumJobManager.cs index 07cb56670..d3aa270c7 100644 --- a/src/Miningcore/Blockchain/Ethereum/EthereumJobManager.cs +++ b/src/Miningcore/Blockchain/Ethereum/EthereumJobManager.cs @@ -26,6 +26,7 @@ SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. using System.Net; using System.Numerics; using System.Reactive.Linq; +using System.Reactive.Threading.Tasks; using System.Text; using System.Threading; using System.Threading.Tasks; @@ -94,11 +95,7 @@ protected async Task UpdateJobAsync() try { - var task = isParity ? - GetBlockTemplateAsync() : - GetBlockTemplateGethAsync(); - - return UpdateJob(await task); + return UpdateJob(await GetBlockTemplateAsync()); } catch(Exception ex) @@ -152,6 +149,8 @@ protected bool UpdateJob(EthereumBlockTemplate blockTemplate) BlockchainStats.LastNetworkBlockTime = clock.Now; BlockchainStats.BlockHeight = job.BlockTemplate.Height; BlockchainStats.NetworkDifficulty = job.BlockTemplate.Difficulty; + BlockchainStats.NextNetworkTarget = job.BlockTemplate.Target; + BlockchainStats.NextNetworkBits = ""; } return isNew; @@ -165,7 +164,15 @@ protected bool UpdateJob(EthereumBlockTemplate blockTemplate) return false; } - private async Task GetBlockTemplateAsync() + private Task GetBlockTemplateAsync() + { + if (isParity) + return GetBlockTemplateParityAsync(); + + return GetBlockTemplateGethAsync(); + } + + private async Task GetBlockTemplateParityAsync() { logger.LogInvoke(); @@ -225,7 +232,6 @@ private async Task GetBlockTemplateGethAsync() private EthereumBlockTemplate AssembleBlockTemplate(string[] work) { - // only parity returns the 4th element (block height) if (work.Length < 4) { logger.Error(() => $"Error(s) refreshing blocktemplate: getWork did not return blockheight. Are you really connected to a Parity daemon?"); @@ -400,7 +406,9 @@ public override void Configure(PoolConfig poolConfig, ClusterConfig clusterConfi if (poolConfig.EnableInternalStratum == true) { // ensure dag location is configured - var dagDir = !string.IsNullOrEmpty(extraPoolConfig?.DagDir) ? Environment.ExpandEnvironmentVariables(extraPoolConfig.DagDir) : Dag.GetDefaultDagDirectory(); + var dagDir = !string.IsNullOrEmpty(extraPoolConfig?.DagDir) ? + Environment.ExpandEnvironmentVariables(extraPoolConfig.DagDir) : + Dag.GetDefaultDagDirectory(); // create it if necessary Directory.CreateDirectory(dagDir); @@ -622,7 +630,7 @@ protected override async Task PostStartInitAsync(CancellationToken ct) } } - SetupJobUpdates(); + await SetupJobUpdatesAsync(); } private void ConfigureRewards() @@ -637,83 +645,129 @@ private void ConfigureRewards() new RewardRecipient { Address = address, - Percentage = DevDonation.Percent + Percentage = DevDonation.Percent, + Type = "dev" } }).ToArray(); } } - protected virtual void SetupJobUpdates() + protected virtual async Task SetupJobUpdatesAsync() { - var enableStreaming = extraPoolConfig?.EnableDaemonWebsocketStreaming == true; - - if (enableStreaming && !poolConfig.Daemons.Any(x => - x.Extra.SafeExtensionDataAs()?.PortWs.HasValue == true)) + if (extraPoolConfig?.BtStream == null) { - logger.Warn(() => $"'{nameof(EthereumPoolConfigExtra.EnableDaemonWebsocketStreaming).ToLowerCamelCase()}' enabled but not a single daemon found with a configured websocket port ('{nameof(EthereumDaemonEndpointConfigExtra.PortWs).ToLowerCamelCase()}'). Falling back to polling."); - enableStreaming = false; - } + var enableStreaming = extraPoolConfig?.EnableDaemonWebsocketStreaming == true; - if (enableStreaming) - { - // collect ports - var wsDaemons = poolConfig.Daemons - .Where(x => x.Extra.SafeExtensionDataAs()?.PortWs.HasValue == true) - .ToDictionary(x => x, x => + if (enableStreaming && !poolConfig.Daemons.Any(x => + x.Extra.SafeExtensionDataAs()?.PortWs.HasValue == true)) + { + logger.Warn(() => $"'{nameof(EthereumPoolConfigExtra.EnableDaemonWebsocketStreaming).ToLowerCamelCase()}' enabled but not a single daemon found with a configured websocket port ('{nameof(EthereumDaemonEndpointConfigExtra.PortWs).ToLowerCamelCase()}'). Falling back to polling."); + enableStreaming = false; + } + + if (enableStreaming) + { + // collect ports + var wsDaemons = poolConfig.Daemons + .Where(x => x.Extra.SafeExtensionDataAs()?.PortWs.HasValue == true) + .ToDictionary(x => x, x => + { + var extra = x.Extra.SafeExtensionDataAs(); + + return (extra.PortWs.Value, extra.HttpPathWs, extra.SslWs); + }); + + logger.Info(() => $"Subscribing to WebSocket(s) {string.Join(", ", wsDaemons.Keys.Select(x => $"{(wsDaemons[x].SslWs ? "wss" : "ws")}://{x.Host}:{wsDaemons[x].Value}").Distinct())}"); + + if (isParity) + { + // stream work updates + var getWorkObs = daemon.WebsocketSubscribe(logger, wsDaemons, EC.ParitySubscribe, new[] { (object)EC.GetWork }) + .Select(data => + { + try + { + var psp = DeserializeRequest(data).ParamsAs>(); + return psp?.Result; + } + + catch (Exception ex) + { + logger.Info(() => $"Error deserializing pending block: {ex.Message}"); + } + + return null; + }); + + Jobs = getWorkObs.Where(x => x != null) + .Select(AssembleBlockTemplate) + .Select(UpdateJob) + .Do(isNew => + { + if (isNew) + logger.Info(() => $"New block {currentJob.BlockTemplate.Height} detected"); + }) + .Where(isNew => isNew) + .Select(_ => GetJobParamsForStratum(true)) + .Publish() + .RefCount(); + } + + else { - var extra = x.Extra.SafeExtensionDataAs(); + var wsSubscription = "newHeads"; + var isRetry = false; + retry: - return (extra.PortWs.Value, extra.HttpPathWs, extra.SslWs); - }); + // stream work updates + var getWorkObs = daemon.WebsocketSubscribe(logger, wsDaemons, EC.Subscribe, new[] { (object)wsSubscription, new object() }); - logger.Info(() => $"Subscribing to WebSocket push-updates from {string.Join(", ", wsDaemons.Keys.Select(x => x.Host).Distinct())}"); + // test subscription + var subcriptionResponse = await getWorkObs + .Take(1) + .Select(x => JsonConvert.DeserializeObject>(Encoding.UTF8.GetString(x))) + .ToTask(); - if (isParity) - { - // stream work updates - var getWorkObs = daemon.WebsocketSubscribe(logger, wsDaemons, EC.ParitySubscribe, new[] { (object) EC.GetWork }) - .Select(data => + if (subcriptionResponse.Error != null) { - try + // older versions of geth only support subscriptions to "newBlocks" + if (!isRetry && subcriptionResponse.Error.Code == (int)BitcoinRPCErrorCode.RPC_METHOD_NOT_FOUND) { - var psp = DeserializeRequest(data).ParamsAs>(); - return psp?.Result; - } + wsSubscription = "newBlocks"; - catch(Exception ex) - { - logger.Info(() => $"Error deserializing pending block: {ex.Message}"); + isRetry = true; + goto retry; } - return null; - }); + logger.ThrowLogPoolStartupException($"Unable to subscribe to geth websocket '{wsSubscription}': {subcriptionResponse.Error.Message} [{subcriptionResponse.Error.Code}]"); + } - Jobs = getWorkObs.Where(x => x != null) - .Select(AssembleBlockTemplate) - .Select(UpdateJob) - .Do(isNew => - { - if (isNew) - logger.Info(() => $"New block {currentJob.BlockTemplate.Height} detected"); - }) - .Where(isNew => isNew) - .Select(_ => GetJobParamsForStratum(true)) - .Publish() - .RefCount(); + Jobs = getWorkObs.Where(x => x != null) + .Select(_ => Observable.FromAsync(UpdateJobAsync)) + .Concat() + .Do(isNew => + { + if (isNew) + logger.Info(() => $"Detected new block {currentJob.BlockTemplate.Height} via WebSocket"); + }) + .Where(isNew => isNew) + .Select(_ => GetJobParamsForStratum(true)) + .Publish() + .RefCount(); + } } else { - // stream work updates - var getWorkObs = daemon.WebsocketSubscribe(logger, wsDaemons, EC.Subscribe, new[] { (object) "newHeads" }); + var pollingInterval = poolConfig.BlockRefreshInterval > 0 ? poolConfig.BlockRefreshInterval : 1000; - Jobs = getWorkObs.Where(x => x != null) + Jobs = Observable.Interval(TimeSpan.FromMilliseconds(pollingInterval)) .Select(_ => Observable.FromAsync(UpdateJobAsync)) .Concat() .Do(isNew => { if (isNew) - logger.Info(() => $"New block {currentJob.BlockTemplate.Height} detected"); + logger.Info(() => $"Detected new block {currentJob.BlockTemplate.Height} via RPC Polling"); }) .Where(isNew => isNew) .Select(_ => GetJobParamsForStratum(true)) @@ -724,15 +778,16 @@ protected virtual void SetupJobUpdates() else { - var pollingInterval = poolConfig.BlockRefreshInterval > 0 ? poolConfig.BlockRefreshInterval : 1000; + var btStream = BtStreamSubscribe(extraPoolConfig.BtStream); - Jobs = Observable.Interval(TimeSpan.FromMilliseconds(pollingInterval)) - .Select(_ => Observable.FromAsync(UpdateJobAsync)) - .Concat() + Jobs = btStream.Where(x => x != null) + .Select(JsonConvert.DeserializeObject) + .Select(AssembleBlockTemplate) + .Select(UpdateJob) .Do(isNew => { if (isNew) - logger.Info(() => $"New block {currentJob.BlockTemplate.Height} detected"); + logger.Info(() => $"Detected new block {currentJob.BlockTemplate.Height} via BT-Stream"); }) .Where(isNew => isNew) .Select(_ => GetJobParamsForStratum(true)) diff --git a/src/Miningcore/Blockchain/Ethereum/EthereumPool.cs b/src/Miningcore/Blockchain/Ethereum/EthereumPool.cs index dfda0a2f6..354e910cd 100644 --- a/src/Miningcore/Blockchain/Ethereum/EthereumPool.cs +++ b/src/Miningcore/Blockchain/Ethereum/EthereumPool.cs @@ -221,7 +221,7 @@ private async Task EnsureInitialWorkSent(StratumClient client) lock (context) { - if (context.IsAuthorized && context.IsAuthorized && !context.IsInitialWorkSent) + if (context.IsSubscribed && context.IsAuthorized && !context.IsInitialWorkSent) { context.IsInitialWorkSent = true; sendInitialWork = true; @@ -349,7 +349,7 @@ protected override async Task OnRequestAsync(StratumClient client, break; case EthereumStratumMethods.ExtraNonceSubscribe: - //await client.RespondErrorAsync(StratumError.Other, "not supported", request.Id, false); + await client.RespondErrorAsync(StratumError.Other, "not supported", request.Id, false); break; default: diff --git a/src/Miningcore/Configuration/ClusterConfig.cs b/src/Miningcore/Configuration/ClusterConfig.cs index d74b7c753..8047ef8f5 100644 --- a/src/Miningcore/Configuration/ClusterConfig.cs +++ b/src/Miningcore/Configuration/ClusterConfig.cs @@ -595,6 +595,11 @@ public partial class ApiConfig /// Port for admin-apis /// public int? AdminPort { get; set; } + + /// + /// Port for prometheus compatible metrics endpoint /metrics + /// + public int? MetricsPort { get; set; } } public partial class ZmqPubSubEndpointConfig diff --git a/src/Miningcore/DaemonInterface/DaemonClient.cs b/src/Miningcore/DaemonInterface/DaemonClient.cs index 8921aa0ac..9a3c071b9 100644 --- a/src/Miningcore/DaemonInterface/DaemonClient.cs +++ b/src/Miningcore/DaemonInterface/DaemonClient.cs @@ -101,12 +101,16 @@ public void Configure(DaemonEndpointConfig[] endPoints, string digestAuthRealm = { var handler = new SocketsHttpHandler { - Credentials = new NetworkCredential(endpoint.User, endpoint.Password), - PreAuthenticate = true, AutomaticDecompression = DecompressionMethods.Deflate | DecompressionMethods.GZip, }; - if (endpoint.Ssl && !endpoint.ValidateCert) + if(!string.IsNullOrEmpty(endpoint.User)) + { + handler.Credentials = new NetworkCredential(endpoint.User, endpoint.Password); + handler.PreAuthenticate = true; + } + + if ((endpoint.Ssl || endpoint.Http2) && !endpoint.ValidateCert) { handler.SslOptions = new SslClientAuthenticationOptions { @@ -295,12 +299,12 @@ public IObservable WebsocketSubscribe(ILogger logger, Dictionary ZmqSubscribe(ILogger logger, Dictionary portMap, int numMsgSegments = 2) + public IObservable ZmqSubscribe(ILogger logger, Dictionary portMap) { logger.LogInvoke(); return Observable.Merge(portMap.Keys - .Select(endPoint => ZmqSubscribeEndpoint(logger, endPoint, portMap[endPoint].Socket, portMap[endPoint].Topic, numMsgSegments))) + .Select(endPoint => ZmqSubscribeEndpoint(logger, endPoint, portMap[endPoint].Socket, portMap[endPoint].Topic))) .Publish() .RefCount(); } @@ -326,32 +330,38 @@ private async Task BuildRequestTask(ILogger logger, DaemonEndpo requestUrl += $"{(endPoint.HttpPath.StartsWith("/") ? string.Empty : "/")}{endPoint.HttpPath}"; // build http request - var request = new HttpRequestMessage(HttpMethod.Post, requestUrl); - var json = JsonConvert.SerializeObject(rpcRequest, payloadJsonSerializerSettings ?? serializerSettings); - request.Content = new StringContent(json, Encoding.UTF8, "application/json"); + using (var request = new HttpRequestMessage(HttpMethod.Post, requestUrl)) + { + request.Headers.ConnectionClose = false; // enable keep-alive - if (endPoint.Http2) - request.Version = new Version(2, 0); + if (endPoint.Http2) + request.Version = new Version(2, 0); - // build auth header - if (!string.IsNullOrEmpty(endPoint.User)) - { - var auth = $"{endPoint.User}:{endPoint.Password}"; - var base64 = Convert.ToBase64String(Encoding.UTF8.GetBytes(auth)); - request.Headers.Authorization = new AuthenticationHeaderValue("Basic", base64); - } + // build request content + var json = JsonConvert.SerializeObject(rpcRequest, payloadJsonSerializerSettings ?? serializerSettings); + request.Content = new StringContent(json, Encoding.UTF8, "application/json"); - logger.Trace(() => $"Sending RPC request to {requestUrl}: {json}"); + // build auth header + if (!string.IsNullOrEmpty(endPoint.User)) + { + var auth = $"{endPoint.User}:{endPoint.Password}"; + var base64 = Convert.ToBase64String(Encoding.UTF8.GetBytes(auth)); + request.Headers.Authorization = new AuthenticationHeaderValue("Basic", base64); + } - // send request - using(var response = await httpClients[endPoint].SendAsync(request, ct)) - { - // deserialize response - var jsonResponse = await response.Content.ReadAsStringAsync(); + logger.Trace(() => $"Sending RPC request to {requestUrl}: {json}"); - using (var reader = new StringReader(jsonResponse)) + // send request + using (var response = await httpClients[endPoint].SendAsync(request, ct)) { - using(var jreader = new JsonTextReader(reader)) + // read response + var responseContent = await response.Content.ReadAsStringAsync(); + + if (!response.IsSuccessStatusCode) + throw new HttpRequestException($"{(int) response.StatusCode} {responseContent}"); + + // deserialize response + using (var jreader = new JsonTextReader(new StringReader(responseContent))) { var result = serializer.Deserialize(jreader); @@ -383,12 +393,15 @@ private async Task[]> BuildBatchRequestTask(ILogger logg // build http request using(var request = new HttpRequestMessage(HttpMethod.Post, requestUrl)) { - var json = JsonConvert.SerializeObject(rpcRequests, serializerSettings); - request.Content = new StringContent(json, Encoding.UTF8, "application/json"); + request.Headers.ConnectionClose = false; // enable keep-alive if (endPoint.Http2) request.Version = new Version(2, 0); + // build request content + var json = JsonConvert.SerializeObject(rpcRequests, serializerSettings); + request.Content = new StringContent(json, Encoding.UTF8, "application/json"); + // build auth header if (!string.IsNullOrEmpty(endPoint.User)) { @@ -415,18 +428,15 @@ private async Task[]> BuildBatchRequestTask(ILogger logg // deserialize response var jsonResponse = await response.Content.ReadAsStringAsync(); - using(var reader = new StringReader(jsonResponse)) + using (var jreader = new JsonTextReader(new StringReader(jsonResponse))) { - using(var jreader = new JsonTextReader(reader)) - { - var result = serializer.Deserialize[]>(jreader); + var result = serializer.Deserialize[]>(jreader); - // telemetry - sw.Stop(); - PublishTelemetry(TelemetryCategory.RpcRequest, sw.Elapsed, string.Join(", ", batch.Select(x => x.Method)), true); + // telemetry + sw.Stop(); + PublishTelemetry(TelemetryCategory.RpcRequest, sw.Elapsed, string.Join(", ", batch.Select(x => x.Method)), true); - return result; - } + return result; } } } @@ -524,7 +534,10 @@ private IObservable WebsocketSubscribeEndpoint(ILogger logger, DaemonEnd var protocol = conf.Ssl ? "wss" : "ws"; var uri = new Uri($"{protocol}://{endPoint.Host}:{conf.Port}{conf.HttpPath}"); - logger.Info(() => $"Establishing WebSocket connection to {uri}"); + if(!endPoint.ValidateCert) + client.Options.RemoteCertificateValidationCallback = (sender, certificate, chain, sslPolicyErrors) => true; + + logger.Debug(() => $"Establishing WebSocket connection to {uri}"); await client.ConnectAsync(uri, cts.Token); // subscribe @@ -577,7 +590,8 @@ private IObservable WebsocketSubscribeEndpoint(ILogger logger, DaemonEnd logger.Error(() => $"{ex.GetType().Name} '{ex.Message}' while streaming websocket responses. Reconnecting in 5s"); } - await Task.Delay(TimeSpan.FromSeconds(5), cts.Token); + if(!cts.IsCancellationRequested) + await Task.Delay(TimeSpan.FromSeconds(5), cts.Token); } } }); @@ -586,7 +600,7 @@ private IObservable WebsocketSubscribeEndpoint(ILogger logger, DaemonEnd })); } - private IObservable ZmqSubscribeEndpoint(ILogger logger, DaemonEndpointConfig endPoint, string url, string topic, int numMsgSegments = 2) + private IObservable ZmqSubscribeEndpoint(ILogger logger, DaemonEndpointConfig endPoint, string url, string topic) { return Observable.Defer(() => Observable.Create(obs => { diff --git a/src/Miningcore/GlobalSuppressions.cs b/src/Miningcore/GlobalSuppressions.cs new file mode 100644 index 000000000..da678c017 --- /dev/null +++ b/src/Miningcore/GlobalSuppressions.cs @@ -0,0 +1,8 @@ + +// This file is used by Code Analysis to maintain SuppressMessage +// attributes that are applied to this project. +// Project-level suppressions either have no target or are given +// a specific target and scoped to a namespace, type, member, etc. + +[assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "RCS1194:Implement exception constructors.", Justification = "", Scope = "type", Target = "~T:Miningcore.Api.ApiServer.ApiException")] + diff --git a/src/Miningcore/Mining/PoolBase.cs b/src/Miningcore/Mining/PoolBase.cs index 99dc12ee1..b98f69045 100644 --- a/src/Miningcore/Mining/PoolBase.cs +++ b/src/Miningcore/Mining/PoolBase.cs @@ -338,7 +338,7 @@ private void OutputPoolInfo() Network Difficulty: {blockchainStats.NetworkDifficulty} Network Hash Rate: {FormatUtil.FormatHashrate(blockchainStats.NetworkHashrate)} Stratum Port(s): {(poolConfig.Ports?.Any() == true ? string.Join(", ", poolConfig.Ports.Keys) : string.Empty)} -Pool Fee: {(poolConfig.RewardRecipients?.Any() == true ? poolConfig.RewardRecipients.Sum(x => x.Percentage) : 0)}% +Pool Fee: {(poolConfig.RewardRecipients?.Any() == true ? poolConfig.RewardRecipients.Where(x=> x.Type != "dev").Sum(x => x.Percentage) : 0)}% "; logger.Info(() => msg); diff --git a/src/Miningcore/Mining/ShareReceiver.cs b/src/Miningcore/Mining/ShareReceiver.cs index 63ef9c42c..4d2fae94b 100644 --- a/src/Miningcore/Mining/ShareReceiver.cs +++ b/src/Miningcore/Mining/ShareReceiver.cs @@ -43,7 +43,7 @@ public ShareReceiver(IMasterClock clock, IMessageBus messageBus) private ClusterConfig clusterConfig; private CompositeDisposable disposables = new CompositeDisposable(); private readonly ConcurrentDictionary pools = new ConcurrentDictionary(); - private readonly BufferBlock<(string SocketUrl, ZMessage Message)> queue = new BufferBlock<(string SocketUrl, ZMessage Message)>(); + private readonly BufferBlock<(string Url, ZMessage Message)> queue = new BufferBlock<(string Url, ZMessage Message)>(); private readonly CancellationTokenSource cts = new CancellationTokenSource(); JsonSerializer serializer = new JsonSerializer @@ -95,27 +95,21 @@ private void StartMessageReceiver() using (new CompositeDisposable(sockets)) { - // setup poll-items + var urls = clusterConfig.ShareRelays.Select(x => x.Url).ToArray(); var pollItems = sockets.Select(_ => ZPollItem.CreateReceiver()).ToArray(); while (!cts.IsCancellationRequested) { if (sockets.PollIn(pollItems, out var messages, out var error, timeout)) { - // emit received messages for (var i = 0; i < messages.Length; i++) { var msg = messages[i]; if (msg != null) - { - var socketUrl = clusterConfig.ShareRelays[i].Url; - - queue.Post((socketUrl, msg)); - } + queue.Post((urls[i], msg)); } - // log error if (error != null) logger.Error(() => $"{nameof(ShareReceiver)}: {error.Name} [{error.Name}] during receive"); } @@ -148,11 +142,11 @@ private void ProcessMessages() { try { - var (socketUrl, msg) = await queue.ReceiveAsync(cts.Token); + var (url, msg) = await queue.ReceiveAsync(cts.Token); using (msg) { - ProcessMessage(socketUrl, msg); + ProcessMessage(url, msg); } } diff --git a/src/Miningcore/Miningcore.csproj b/src/Miningcore/Miningcore.csproj index c00073e7c..51e9561d9 100644 --- a/src/Miningcore/Miningcore.csproj +++ b/src/Miningcore/Miningcore.csproj @@ -47,6 +47,7 @@ + @@ -56,12 +57,14 @@ - + + + diff --git a/src/Miningcore/Stratum/StratumClient.cs b/src/Miningcore/Stratum/StratumClient.cs index b2e719f94..16391ae26 100644 --- a/src/Miningcore/Stratum/StratumClient.cs +++ b/src/Miningcore/Stratum/StratumClient.cs @@ -32,6 +32,7 @@ SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. using System.Reactive.Subjects; using System.Security.Authentication; using System.Security.Cryptography.X509Certificates; +using System.Text; using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; @@ -272,18 +273,22 @@ private async Task FillReceivePipeAsync() { while (true) { + logger.Debug(() => $"[{ConnectionId}] [NET] Waiting for data ..."); + var memory = receivePipe.Writer.GetMemory(MaxInboundRequestLength + 1); // read from network directly into pipe memory var cb = await networkStream.ReadAsync(memory, cts.Token); if (cb == 0) - break; // EOF + break; // EOF + + logger.Debug(() => $"[{ConnectionId}] [NET] Received data: {StratumConstants.Encoding.GetString(memory.ToArray(), 0, cb)}"); LastReceive = clock.Now; // hand off to pipe - receivePipe.Writer.Advance(cb); - + receivePipe.Writer.Advance(cb); + var result = await receivePipe.Writer.FlushAsync(cts.Token); if (result.IsCompleted) break; @@ -295,13 +300,17 @@ private async Task ProcessReceivePipeAsync(TcpProxyProtocolConfig proxyProtocol, { while (true) { - var result = await receivePipe.Reader.ReadAsync(cts.Token); + logger.Debug(() => $"[{ConnectionId}] [PIPE] Waiting for data ..."); + var result = await receivePipe.Reader.ReadAsync(cts.Token); + var buffer = result.Buffer; SequencePosition? position = null; if (buffer.Length > MaxInboundRequestLength) - throw new InvalidDataException($"Incoming data exceeds maximum of {MaxInboundRequestLength}"); + throw new InvalidDataException($"Incoming data exceeds maximum of {MaxInboundRequestLength}"); + + logger.Debug(() => $"[{ConnectionId}] [PIPE] Received data: {result.Buffer.AsString(StratumConstants.Encoding)}"); do { @@ -312,15 +321,13 @@ private async Task ProcessReceivePipeAsync(TcpProxyProtocolConfig proxyProtocol, { var slice = buffer.Slice(0, position.Value); - logger.Trace(() => $"[{ConnectionId}] Received data: {slice.AsString(StratumConstants.Encoding)}"); - if (!expectingProxyHeader || !ProcessProxyHeader(slice, proxyProtocol)) await ProcessRequestAsync(onRequestAsync, slice); // Skip consumed section buffer = buffer.Slice(buffer.GetPosition(1, position.Value)); } - } while (position != null); + } while (position != null); receivePipe.Reader.AdvanceTo(buffer.Start, buffer.End); @@ -341,25 +348,39 @@ private async Task ProcessSendQueueAsync() private async Task SendMessage(object msg) { - logger.Trace(() => $"[{ConnectionId}] Sending: {JsonConvert.SerializeObject(msg)}"); + logger.Debug(() => $"[{ConnectionId}] Sending: {JsonConvert.SerializeObject(msg)}"); - using(var ctsTimeout = new CancellationTokenSource()) - { - using(var ctsComposite = CancellationTokenSource.CreateLinkedTokenSource(cts.Token, ctsTimeout.Token)) - { - // serialize to JSON directly onto network stream - using(var writer = new StreamWriter(networkStream, StratumConstants.Encoding, MaxOutboundRequestLength, true)) - { - serializer.Serialize(writer, msg); - } - - // append terminator - networkStream.WriteByte(0xa); + var buffer = ArrayPool.Shared.Rent(MaxOutboundRequestLength); - // Send to network - ctsTimeout.CancelAfter(sendTimeout); - await networkStream.FlushAsync(ctsComposite.Token); + try + { + using (var stream = new MemoryStream(buffer, true)) + { + // serialize + using (var writer = new StreamWriter(stream, StratumConstants.Encoding, MaxOutboundRequestLength, true)) + { + serializer.Serialize(writer, msg); + } + + stream.WriteByte((byte) '\n'); // terminator + + // send + using (var ctsTimeout = new CancellationTokenSource()) + { + using (var ctsComposite = CancellationTokenSource.CreateLinkedTokenSource(cts.Token, ctsTimeout.Token)) + { + ctsTimeout.CancelAfter(sendTimeout); + + await networkStream.WriteAsync(buffer, 0, (int) stream.Position, ctsComposite.Token); + await networkStream.FlushAsync(ctsComposite.Token); + } + } } + } + + finally + { + ArrayPool.Shared.Return(buffer); } }