From c9f79475d6469a317052b846f64cb9bab87fb23c Mon Sep 17 00:00:00 2001 From: roehlerw Date: Wed, 24 May 2023 15:12:10 -0400 Subject: [PATCH 1/4] fix(real time read): added check to assert clinet is still connected and reconnect if not when fetching new data resolves PRODSUP-4052 --- PluginSalesforce/API/Factory/PushTopicConnection.cs | 4 ++++ PluginSalesforce/API/Read/ReadRecordsRealTime.cs | 8 ++++---- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/PluginSalesforce/API/Factory/PushTopicConnection.cs b/PluginSalesforce/API/Factory/PushTopicConnection.cs index 6a7c204..0436451 100644 --- a/PluginSalesforce/API/Factory/PushTopicConnection.cs +++ b/PluginSalesforce/API/Factory/PushTopicConnection.cs @@ -34,6 +34,10 @@ public void Disconnect() public async IAsyncEnumerable GetCurrentMessages() { + if (!_bayeuxClient.Connected) { + Connect(); + } + var messages = _listener.GetMessages(); foreach (var message in messages) diff --git a/PluginSalesforce/API/Read/ReadRecordsRealTime.cs b/PluginSalesforce/API/Read/ReadRecordsRealTime.cs index 8521c9a..c686e4d 100644 --- a/PluginSalesforce/API/Read/ReadRecordsRealTime.cs +++ b/PluginSalesforce/API/Read/ReadRecordsRealTime.cs @@ -49,6 +49,7 @@ public static async Task ReadRecordsRealTimeAsync(RequestHelper client, Rea ? JsonConvert.DeserializeObject(request.RealTimeStateJson) : new RealTimeState(); + // build cometd client var conn = connectionFactory.GetPushTopicConnection(client, @"/topic/" + realTimeSettings.ChannelName); try @@ -61,11 +62,10 @@ public static async Task ReadRecordsRealTimeAsync(RequestHelper client, Rea using (var db = new LiteDatabase(Path.Join(path, $"{jobId}_RealTimeReadRecords.db"))) { - var realtimeRecordsCollection = db.GetCollection(CollectionName); - - // build cometd client conn.Connect(); - + + var realtimeRecordsCollection = db.GetCollection(CollectionName); + // a full init needs to happen if (jobVersion > realTimeState.JobVersion || shapeVersion > realTimeState.ShapeVersion) { From 4fd2816496ce437b62d1540af7ab2c0c2a7902c0 Mon Sep 17 00:00:00 2001 From: roehlerw Date: Thu, 25 May 2023 10:45:21 -0400 Subject: [PATCH 2/4] fix(real time read): added reconnect timeout loop and updated log levels resolves PRODSUP-4052 --- .../API/Factory/PushTopicConnection.cs | 14 ++++++++++++++ PluginSalesforce/API/Utility/Listener.cs | 2 +- PluginSalesforce/DataContracts/RealTimeEvent.cs | 2 +- 3 files changed, 16 insertions(+), 2 deletions(-) diff --git a/PluginSalesforce/API/Factory/PushTopicConnection.cs b/PluginSalesforce/API/Factory/PushTopicConnection.cs index 0436451..4c4e91b 100644 --- a/PluginSalesforce/API/Factory/PushTopicConnection.cs +++ b/PluginSalesforce/API/Factory/PushTopicConnection.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Threading; using CometD.NetCore.Client; using Naveego.Sdk.Logging; using Naveego.Sdk.Plugins; @@ -13,6 +14,8 @@ public class PushTopicConnection private readonly Listener _listener = null; private readonly string _channel = ""; + private CancellationTokenSource _cts; + public PushTopicConnection(BayeuxClient bayeuxClient, string channel) { _bayeuxClient = bayeuxClient; @@ -21,6 +24,11 @@ public PushTopicConnection(BayeuxClient bayeuxClient, string channel) } public void Connect() { + // force the client to reconnect after 30 minutes + _cts = new CancellationTokenSource(); + _cts.CancelAfter(1800 * 1000); + + // connect the client _bayeuxClient.Handshake(); _bayeuxClient.WaitFor(1000, new[] { BayeuxClient.State.CONNECTED }); _bayeuxClient.GetChannel(_channel).Subscribe(_listener); @@ -34,7 +42,13 @@ public void Disconnect() public async IAsyncEnumerable GetCurrentMessages() { + if (_cts.IsCancellationRequested) { + Logger.Debug("request to disconnect stream requested"); + Disconnect(); + } + if (!_bayeuxClient.Connected) { + Logger.Debug("request to connect stream requested"); Connect(); } diff --git a/PluginSalesforce/API/Utility/Listener.cs b/PluginSalesforce/API/Utility/Listener.cs index f4c0aa0..376bb01 100644 --- a/PluginSalesforce/API/Utility/Listener.cs +++ b/PluginSalesforce/API/Utility/Listener.cs @@ -13,7 +13,7 @@ public void OnMessage(IClientSessionChannel channel, IMessage message) { var convertedJson = message.Json; _messages.Add(convertedJson); - Logger.Info($"Got message: {convertedJson}"); + Logger.Debug($"Got message: {convertedJson}"); } public void ClearStoredMessages() diff --git a/PluginSalesforce/DataContracts/RealTimeEvent.cs b/PluginSalesforce/DataContracts/RealTimeEvent.cs index 56a0021..8f79736 100644 --- a/PluginSalesforce/DataContracts/RealTimeEvent.cs +++ b/PluginSalesforce/DataContracts/RealTimeEvent.cs @@ -28,7 +28,7 @@ public class RealTimeEvent public DateTime CreatedDate { get; set; } [JsonProperty("replayId")] - public Int32 ReplayId { get; set; } + public long ReplayId { get; set; } [JsonProperty("type")] public string Type { get; set; } From 8da05e0f5f34f0f3b3f7d2195e4b716f2cec08a7 Mon Sep 17 00:00:00 2001 From: roehlerw Date: Fri, 26 May 2023 11:24:25 -0400 Subject: [PATCH 3/4] fix(real time read): cahnged to recycling the whole client instead of just the connection resolves PRODSUP-4052 --- .../API/Factory/PushTopicConnection.cs | 11 --- .../API/Read/ReadRecordsRealTime.cs | 90 ++++++++++++------- 2 files changed, 60 insertions(+), 41 deletions(-) diff --git a/PluginSalesforce/API/Factory/PushTopicConnection.cs b/PluginSalesforce/API/Factory/PushTopicConnection.cs index 4c4e91b..bfe7cfb 100644 --- a/PluginSalesforce/API/Factory/PushTopicConnection.cs +++ b/PluginSalesforce/API/Factory/PushTopicConnection.cs @@ -14,8 +14,6 @@ public class PushTopicConnection private readonly Listener _listener = null; private readonly string _channel = ""; - private CancellationTokenSource _cts; - public PushTopicConnection(BayeuxClient bayeuxClient, string channel) { _bayeuxClient = bayeuxClient; @@ -24,10 +22,6 @@ public PushTopicConnection(BayeuxClient bayeuxClient, string channel) } public void Connect() { - // force the client to reconnect after 30 minutes - _cts = new CancellationTokenSource(); - _cts.CancelAfter(1800 * 1000); - // connect the client _bayeuxClient.Handshake(); _bayeuxClient.WaitFor(1000, new[] { BayeuxClient.State.CONNECTED }); @@ -42,11 +36,6 @@ public void Disconnect() public async IAsyncEnumerable GetCurrentMessages() { - if (_cts.IsCancellationRequested) { - Logger.Debug("request to disconnect stream requested"); - Disconnect(); - } - if (!_bayeuxClient.Connected) { Logger.Debug("request to connect stream requested"); Connect(); diff --git a/PluginSalesforce/API/Read/ReadRecordsRealTime.cs b/PluginSalesforce/API/Read/ReadRecordsRealTime.cs index c686e4d..8344091 100644 --- a/PluginSalesforce/API/Read/ReadRecordsRealTime.cs +++ b/PluginSalesforce/API/Read/ReadRecordsRealTime.cs @@ -4,6 +4,7 @@ using System.Linq; using System.Security.Cryptography; using System.Text; +using System.Threading; using System.Threading.Tasks; using System.Web; using Grpc.Core; @@ -19,8 +20,21 @@ namespace PluginSalesforce.API.Read { public static partial class Read { + // constants private const string CollectionName = "realtimerecord"; + // connection client settings + private static IPushTopicConnectionFactory _connectionFactory; + private static RealTimeSettings _realTimeSettings; + private static RequestHelper _requestHelper; + + // connection client + private static CancellationTokenSource _cts; + private static PushTopicConnection _conn; + + // state + private static RealTimeState _realTimeState; + public class RealTimeRecord { [BsonId] public string Id { get; set; } @@ -29,7 +43,7 @@ public class RealTimeRecord [BsonField] public byte[] RecordDataHash { get; set; } } - public static async Task ReadRecordsRealTimeAsync(RequestHelper client, ReadRequest request, + public static async Task ReadRecordsRealTimeAsync(RequestHelper requestHelper, ReadRequest request, IServerStreamWriter responseStream, ServerCallContext context, string permanentPath, IPushTopicConnectionFactory connectionFactory) { @@ -43,14 +57,15 @@ public static async Task ReadRecordsRealTimeAsync(RequestHelper client, Rea var recordsCount = 0; var runId = Guid.NewGuid().ToString(); - var realTimeSettings = + _realTimeSettings = JsonConvert.DeserializeObject(request.RealTimeSettingsJson); - var realTimeState = !string.IsNullOrWhiteSpace(request.RealTimeStateJson) + _realTimeState = !string.IsNullOrWhiteSpace(request.RealTimeStateJson) ? JsonConvert.DeserializeObject(request.RealTimeStateJson) : new RealTimeState(); - // build cometd client - var conn = connectionFactory.GetPushTopicConnection(client, @"/topic/" + realTimeSettings.ChannelName); + _requestHelper = requestHelper; + + GetConnectedPushTopicConnection(); try { @@ -62,17 +77,15 @@ public static async Task ReadRecordsRealTimeAsync(RequestHelper client, Rea using (var db = new LiteDatabase(Path.Join(path, $"{jobId}_RealTimeReadRecords.db"))) { - conn.Connect(); - var realtimeRecordsCollection = db.GetCollection(CollectionName); // a full init needs to happen - if (jobVersion > realTimeState.JobVersion || shapeVersion > realTimeState.ShapeVersion) + if (jobVersion > _realTimeState.JobVersion || shapeVersion > _realTimeState.ShapeVersion) { // reset real time state - realTimeState = new RealTimeState(); - realTimeState.JobVersion = jobVersion; - realTimeState.ShapeVersion = shapeVersion; + _realTimeState = new RealTimeState(); + _realTimeState.JobVersion = jobVersion; + _realTimeState.ShapeVersion = shapeVersion; // delete existing collection realtimeRecordsCollection.DeleteAll(); @@ -92,9 +105,9 @@ public static async Task ReadRecordsRealTimeAsync(RequestHelper client, Rea var query = schema.Query; // update real time state - realTimeState.LastReadTime = DateTime.UtcNow; + _realTimeState.LastReadTime = DateTime.UtcNow; - await Initialize(client, schema, query, runId, realTimeState, schemaKeys, recordsCount, + await Initialize(schema, query, runId, schemaKeys, recordsCount, realtimeRecordsCollection, responseStream); Logger.Info("Real time read initialized."); @@ -105,8 +118,8 @@ await Initialize(client, schema, query, runId, realTimeState, schemaKeys, record long currentRunRecordsCount = 0; // process all messages since last batch interval - Logger.Debug($"Getting all records since {realTimeState.LastReadTime.ToUniversalTime():O}"); - var messages = conn.GetCurrentMessages(); + Logger.Debug($"Getting all records since {_realTimeState.LastReadTime.ToUniversalTime():O}"); + var messages = _conn.GetCurrentMessages(); await foreach (var message in messages) { @@ -153,7 +166,7 @@ await Initialize(client, schema, query, runId, realTimeState, schemaKeys, record case "GAP_UNDELETE": // perform full init for GAP events runId = Guid.NewGuid().ToString(); - await Initialize(client, schema, query, runId, realTimeState, schemaKeys, recordsCount, + await Initialize(schema, query, runId, schemaKeys, recordsCount, realtimeRecordsCollection, responseStream); break; default: @@ -181,30 +194,39 @@ await Initialize(client, schema, query, runId, realTimeState, schemaKeys, record } // clear processed messages - conn.ClearStoredMessages(); + _conn.ClearStoredMessages(); // update last read time - realTimeState.LastReadTime = DateTime.Now; + _realTimeState.LastReadTime = DateTime.Now; // update real time state var realTimeStateCommit = new Record { Action = Record.Types.Action.RealTimeStateCommit, - RealTimeStateJson = JsonConvert.SerializeObject(realTimeState) + RealTimeStateJson = JsonConvert.SerializeObject(_realTimeState) }; await responseStream.WriteAsync(realTimeStateCommit); Logger.Debug( - $"Got {currentRunRecordsCount} records since {realTimeState.LastReadTime.ToUniversalTime():O}"); + $"Got {currentRunRecordsCount} records since {_realTimeState.LastReadTime.ToUniversalTime():O}"); + + if (_cts.IsCancellationRequested) { + // reconnect after 30 minutes + Logger.Debug("request to disconnect stream requested"); + _conn.Disconnect(); - await Task.Delay(realTimeSettings.BatchWindowSeconds * 1000, context.CancellationToken); + GetConnectedPushTopicConnection(); + } + + // sleep until next check window + await Task.Delay(_realTimeSettings.BatchWindowSeconds * 1000, context.CancellationToken); } } } catch (TaskCanceledException e) { Logger.Info($"Operation cancelled {e.Message}"); - conn.Disconnect(); + _conn.Disconnect(); return recordsCount; } catch (Exception e) @@ -214,12 +236,22 @@ await Initialize(client, schema, query, runId, realTimeState, schemaKeys, record } finally { - conn.Disconnect(); + _conn.Disconnect(); } return recordsCount; } + private static void GetConnectedPushTopicConnection() { + // build cometd client + _conn = _connectionFactory.GetPushTopicConnection(_requestHelper, @"/topic/" + _realTimeSettings.ChannelName); + _conn.Connect(); + + // force the client to reconnect after 30 minutes + _cts = new CancellationTokenSource(); + _cts.CancelAfter(1800 * 1000); + } + private static string GetRecordKeyEntry(List schemaKeys, Dictionary recordMap) { var entryIdStringList = new List(); @@ -351,17 +383,15 @@ private static void MutateRecordMap(Schema schema, Dictionary ra /// /// Loads all data into the local db, uploads changed records, and deletes missing records /// - /// /// /// /// - /// /// /// /// /// - public static async Task Initialize(RequestHelper client, Schema schema, string query, string runId, - RealTimeState realTimeState, List schemaKeys, long recordsCount, + public static async Task Initialize(Schema schema, string query, string runId, + List schemaKeys, long recordsCount, ILiteCollection realtimeRecordsCollection, IServerStreamWriter responseStream) { IAsyncEnumerable> allRecords; @@ -369,11 +399,11 @@ public static async Task Initialize(RequestHelper client, Schema schema, string // get all records if (string.IsNullOrWhiteSpace(query)) { - allRecords = GetRecordsForDefaultQuery(client, schema); + allRecords = GetRecordsForDefaultQuery(_requestHelper, schema); } else { - allRecords = GetRecordsForQuery(client, query); + allRecords = GetRecordsForQuery(_requestHelper, query); } await foreach (var rawRecord in allRecords) @@ -420,7 +450,7 @@ public static async Task Initialize(RequestHelper client, Schema schema, string var realTimeStateCommit = new Record { Action = Record.Types.Action.RealTimeStateCommit, - RealTimeStateJson = JsonConvert.SerializeObject(realTimeState) + RealTimeStateJson = JsonConvert.SerializeObject(_realTimeState) }; await responseStream.WriteAsync(realTimeStateCommit); From c211481ae0bb96782184665f163b8599116b4115 Mon Sep 17 00:00:00 2001 From: roehlerw Date: Fri, 26 May 2023 11:39:02 -0400 Subject: [PATCH 4/4] fix(real time read): correctly intialize connection factory resolves PRODSUP-4052 --- PluginSalesforce/API/Read/ReadRecordsRealTime.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/PluginSalesforce/API/Read/ReadRecordsRealTime.cs b/PluginSalesforce/API/Read/ReadRecordsRealTime.cs index 8344091..6b14109 100644 --- a/PluginSalesforce/API/Read/ReadRecordsRealTime.cs +++ b/PluginSalesforce/API/Read/ReadRecordsRealTime.cs @@ -64,6 +64,7 @@ public static async Task ReadRecordsRealTimeAsync(RequestHelper requestHelp : new RealTimeState(); _requestHelper = requestHelper; + _connectionFactory = connectionFactory; GetConnectedPushTopicConnection();