Skip to content

Commit

Permalink
notification available in websocket
Browse files Browse the repository at this point in the history
  • Loading branch information
Hecate2 committed Feb 15, 2023
1 parent 27de7f3 commit 387e51e
Showing 1 changed file with 103 additions and 16 deletions.
119 changes: 103 additions & 16 deletions Fairy.WebSocket.Subscribe.cs
Original file line number Diff line number Diff line change
@@ -1,30 +1,45 @@
using Neo.Json;
using Neo.Ledger;
using Neo.Network.P2P.Payloads;
using Neo.Persistence;
using Neo.SmartContract;
using Neo.SmartContract.Native;
using Neo.VM;
using System.Collections.Concurrent;
using System.Linq;
using System.Net.WebSockets;

namespace Neo.Plugins
{
public partial class Fairy : RpcServer
{
protected Block committedBlock;
protected ConcurrentDictionary<SemaphoreSlim, WebSocket> committedBlockSemaphores = new();
List<SemaphoreSlim> keysToRemove = new();
protected Block? committingBlock;
protected ConcurrentDictionary<NotifyEventArgs, UInt256> notifications = new();
protected ConcurrentDictionary<SemaphoreSlim, WebSocket> committingBlockSemaphores = new();
protected ConcurrentDictionary<SemaphoreSlim, WebSocket> notificationSemaphores = new();
protected DataCache latestSnapshot;

protected void RegisterBlockchainEvents()
{
Blockchain.Committed += delegate (NeoSystem @system, Block @block)
Blockchain.Committing += delegate (NeoSystem system, Block block, DataCache snapshot, IReadOnlyList<Blockchain.ApplicationExecuted> applicationExecutedList)
{
committedBlock = block;
foreach (var item in committedBlockSemaphores)
if (item.Value.State == WebSocketState.Open)
// block
committingBlock = block;
foreach (var item in committingBlockSemaphores)
item.Key.Release();

// notifications
if (notificationSemaphores.Count > 0)
{
latestSnapshot = snapshot;
notifications.Clear();
foreach (Blockchain.ApplicationExecuted app in applicationExecutedList)
foreach (NotifyEventArgs notification in app.Notifications)
if (app.Transaction != null)
notifications[notification] = app.Transaction.Hash;
foreach (var item in notificationSemaphores)
item.Key.Release();
else
keysToRemove.Add(item.Key);
foreach (SemaphoreSlim key in keysToRemove)
committedBlockSemaphores.TryRemove(key, out _);
keysToRemove.Clear();
}
};
}

Expand Down Expand Up @@ -57,12 +72,12 @@ protected virtual JToken UnsubscribeMethod(WebSocket webSocket, JArray _params,
}

[WebsocketMethod]
protected virtual Action SubscribeCommittedBlock(WebSocket webSocket, JArray _params, CancellationToken cancellationToken)
protected virtual Action SubscribeCommittingBlock(WebSocket webSocket, JArray _params, CancellationToken cancellationToken)
{
return async () =>
{
SemaphoreSlim semaphore = new(1);
committedBlockSemaphores[semaphore] = webSocket;
committingBlockSemaphores[semaphore] = webSocket;
while (true)
{
try
Expand All @@ -71,20 +86,92 @@ protected virtual Action SubscribeCommittedBlock(WebSocket webSocket, JArray _pa
switch (webSocket.State)
{
case WebSocketState.Open:
await webSocket.SendAsync(committedBlock.ToJson(system.Settings).ToByteArray(false), WebSocketMessageType.Text, true, CancellationToken.None);
if (committingBlock != null)
await webSocket.SendAsync(committingBlock.ToJson(system.Settings).ToByteArray(false), WebSocketMessageType.Text, true, CancellationToken.None);
if (cancellationToken.IsCancellationRequested)
{
committingBlockSemaphores.Remove(semaphore, out _);
return;
}
break;
case WebSocketState.Closed:
committingBlockSemaphores.Remove(semaphore, out _);
await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, webSocket.State.ToString(), CancellationToken.None);
webSocket.Dispose();
return;
default:
committingBlockSemaphores.Remove(semaphore, out _);
webSocket.Dispose();
return;
}
}
catch (Exception ex)
{
committingBlockSemaphores.Remove(semaphore, out _);
await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, ex.StackTrace, CancellationToken.None);
webSocket.Dispose();
return;
}
}
};
}

[WebsocketMethod]
protected virtual Action SubscribeContractEvent(WebSocket webSocket, JArray _params, CancellationToken cancellationToken)
{
HashSet<UInt160> contracts = _params.Count > 0 ? ((JArray)_params[0]).Select(v => UInt160.Parse(v.AsString())).ToHashSet() : new();
HashSet<string> eventNames = _params.Count > 1 ? ((JArray)_params[1]).Select(v => v.AsString().ToLower()).ToHashSet() : new();
return async () =>
{
SemaphoreSlim semaphore = new(1);
notificationSemaphores[semaphore] = webSocket;
while (true)
{
try
{
await semaphore.WaitAsync();
switch (webSocket.State)
{
case WebSocketState.Open:
JArray json = new();
foreach (var item in notifications)
{
NotifyEventArgs notification = item.Key;
if (contracts.Count > 0 && !contracts.Contains(notification.ScriptHash))
continue;
if (eventNames.Count > 0 && !eventNames.Contains(notification.EventName.ToLower()))
continue;
JObject notificationJson = new();
notificationJson["tx"] = item.Value.ToString();
notificationJson["scripthash"] = notification.ScriptHash.ToString();
notificationJson["contractname"] = NativeContract.ContractManagement.GetContract(latestSnapshot, notification.ScriptHash)?.Manifest.Name;
notificationJson["eventname"] = notification.EventName;
notificationJson["eventargs"] = notification.State.ToJson();
json.Add(notificationJson);
}
if (json.Count > 0)
await webSocket.SendAsync(json.ToByteArray(false), WebSocketMessageType.Text, true, CancellationToken.None);
if (cancellationToken.IsCancellationRequested)
{
notificationSemaphores.Remove(semaphore, out _);
return;
}
break;
case WebSocketState.Closed:
notificationSemaphores.Remove(semaphore, out _);
await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, webSocket.State.ToString(), CancellationToken.None);
webSocket.Dispose();
return;
default:
notificationSemaphores.Remove(semaphore, out _);
webSocket.Dispose();
return;
}
}
catch
catch (Exception ex)
{
notificationSemaphores.Remove(semaphore, out _);
await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, ex.StackTrace, CancellationToken.None);
webSocket.Dispose();
return;
}
Expand Down

0 comments on commit 387e51e

Please sign in to comment.