From 387e51e7d0dbdcde2031d48187ce4f89eca9d620 Mon Sep 17 00:00:00 2001 From: Hecate2 <2474101468@qq.com> Date: Wed, 15 Feb 2023 13:09:01 +0800 Subject: [PATCH] notification available in websocket --- Fairy.WebSocket.Subscribe.cs | 119 ++++++++++++++++++++++++++++++----- 1 file changed, 103 insertions(+), 16 deletions(-) diff --git a/Fairy.WebSocket.Subscribe.cs b/Fairy.WebSocket.Subscribe.cs index 9dc6695..40f75d6 100644 --- a/Fairy.WebSocket.Subscribe.cs +++ b/Fairy.WebSocket.Subscribe.cs @@ -1,30 +1,45 @@ using Neo.Json; using Neo.Ledger; using Neo.Network.P2P.Payloads; +using Neo.Persistence; +using Neo.SmartContract; +using Neo.SmartContract.Native; +using Neo.VM; using System.Collections.Concurrent; +using System.Linq; using System.Net.WebSockets; namespace Neo.Plugins { public partial class Fairy : RpcServer { - protected Block committedBlock; - protected ConcurrentDictionary committedBlockSemaphores = new(); - List keysToRemove = new(); + protected Block? committingBlock; + protected ConcurrentDictionary notifications = new(); + protected ConcurrentDictionary committingBlockSemaphores = new(); + protected ConcurrentDictionary notificationSemaphores = new(); + protected DataCache latestSnapshot; protected void RegisterBlockchainEvents() { - Blockchain.Committed += delegate (NeoSystem @system, Block @block) + Blockchain.Committing += delegate (NeoSystem system, Block block, DataCache snapshot, IReadOnlyList applicationExecutedList) { - committedBlock = block; - foreach (var item in committedBlockSemaphores) - if (item.Value.State == WebSocketState.Open) + // block + committingBlock = block; + foreach (var item in committingBlockSemaphores) + item.Key.Release(); + + // notifications + if (notificationSemaphores.Count > 0) + { + latestSnapshot = snapshot; + notifications.Clear(); + foreach (Blockchain.ApplicationExecuted app in applicationExecutedList) + foreach (NotifyEventArgs notification in app.Notifications) + if (app.Transaction != null) + notifications[notification] = app.Transaction.Hash; + foreach (var item in notificationSemaphores) item.Key.Release(); - else - keysToRemove.Add(item.Key); - foreach (SemaphoreSlim key in keysToRemove) - committedBlockSemaphores.TryRemove(key, out _); - keysToRemove.Clear(); + } }; } @@ -57,12 +72,12 @@ protected virtual JToken UnsubscribeMethod(WebSocket webSocket, JArray _params, } [WebsocketMethod] - protected virtual Action SubscribeCommittedBlock(WebSocket webSocket, JArray _params, CancellationToken cancellationToken) + protected virtual Action SubscribeCommittingBlock(WebSocket webSocket, JArray _params, CancellationToken cancellationToken) { return async () => { SemaphoreSlim semaphore = new(1); - committedBlockSemaphores[semaphore] = webSocket; + committingBlockSemaphores[semaphore] = webSocket; while (true) { try @@ -71,20 +86,92 @@ protected virtual Action SubscribeCommittedBlock(WebSocket webSocket, JArray _pa switch (webSocket.State) { case WebSocketState.Open: - await webSocket.SendAsync(committedBlock.ToJson(system.Settings).ToByteArray(false), WebSocketMessageType.Text, true, CancellationToken.None); + if (committingBlock != null) + await webSocket.SendAsync(committingBlock.ToJson(system.Settings).ToByteArray(false), WebSocketMessageType.Text, true, CancellationToken.None); if (cancellationToken.IsCancellationRequested) + { + committingBlockSemaphores.Remove(semaphore, out _); return; + } break; case WebSocketState.Closed: + committingBlockSemaphores.Remove(semaphore, out _); await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, webSocket.State.ToString(), CancellationToken.None); webSocket.Dispose(); return; default: + committingBlockSemaphores.Remove(semaphore, out _); + webSocket.Dispose(); + return; + } + } + catch (Exception ex) + { + committingBlockSemaphores.Remove(semaphore, out _); + await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, ex.StackTrace, CancellationToken.None); + webSocket.Dispose(); + return; + } + } + }; + } + + [WebsocketMethod] + protected virtual Action SubscribeContractEvent(WebSocket webSocket, JArray _params, CancellationToken cancellationToken) + { + HashSet contracts = _params.Count > 0 ? ((JArray)_params[0]).Select(v => UInt160.Parse(v.AsString())).ToHashSet() : new(); + HashSet eventNames = _params.Count > 1 ? ((JArray)_params[1]).Select(v => v.AsString().ToLower()).ToHashSet() : new(); + return async () => + { + SemaphoreSlim semaphore = new(1); + notificationSemaphores[semaphore] = webSocket; + while (true) + { + try + { + await semaphore.WaitAsync(); + switch (webSocket.State) + { + case WebSocketState.Open: + JArray json = new(); + foreach (var item in notifications) + { + NotifyEventArgs notification = item.Key; + if (contracts.Count > 0 && !contracts.Contains(notification.ScriptHash)) + continue; + if (eventNames.Count > 0 && !eventNames.Contains(notification.EventName.ToLower())) + continue; + JObject notificationJson = new(); + notificationJson["tx"] = item.Value.ToString(); + notificationJson["scripthash"] = notification.ScriptHash.ToString(); + notificationJson["contractname"] = NativeContract.ContractManagement.GetContract(latestSnapshot, notification.ScriptHash)?.Manifest.Name; + notificationJson["eventname"] = notification.EventName; + notificationJson["eventargs"] = notification.State.ToJson(); + json.Add(notificationJson); + } + if (json.Count > 0) + await webSocket.SendAsync(json.ToByteArray(false), WebSocketMessageType.Text, true, CancellationToken.None); + if (cancellationToken.IsCancellationRequested) + { + notificationSemaphores.Remove(semaphore, out _); + return; + } break; + case WebSocketState.Closed: + notificationSemaphores.Remove(semaphore, out _); + await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, webSocket.State.ToString(), CancellationToken.None); + webSocket.Dispose(); + return; + default: + notificationSemaphores.Remove(semaphore, out _); + webSocket.Dispose(); + return; } } - catch + catch (Exception ex) { + notificationSemaphores.Remove(semaphore, out _); + await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, ex.StackTrace, CancellationToken.None); webSocket.Dispose(); return; }