diff --git a/PluginSalesforce/API/Factory/PushTopicConnection.cs b/PluginSalesforce/API/Factory/PushTopicConnection.cs index 6a7c204..bfe7cfb 100644 --- a/PluginSalesforce/API/Factory/PushTopicConnection.cs +++ b/PluginSalesforce/API/Factory/PushTopicConnection.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Threading; using CometD.NetCore.Client; using Naveego.Sdk.Logging; using Naveego.Sdk.Plugins; @@ -21,6 +22,7 @@ public PushTopicConnection(BayeuxClient bayeuxClient, string channel) } public void Connect() { + // connect the client _bayeuxClient.Handshake(); _bayeuxClient.WaitFor(1000, new[] { BayeuxClient.State.CONNECTED }); _bayeuxClient.GetChannel(_channel).Subscribe(_listener); @@ -34,6 +36,11 @@ public void Disconnect() public async IAsyncEnumerable GetCurrentMessages() { + if (!_bayeuxClient.Connected) { + Logger.Debug("request to connect stream requested"); + Connect(); + } + var messages = _listener.GetMessages(); foreach (var message in messages) diff --git a/PluginSalesforce/API/Read/ReadRecordsRealTime.cs b/PluginSalesforce/API/Read/ReadRecordsRealTime.cs index 8521c9a..6b14109 100644 --- a/PluginSalesforce/API/Read/ReadRecordsRealTime.cs +++ b/PluginSalesforce/API/Read/ReadRecordsRealTime.cs @@ -4,6 +4,7 @@ using System.Linq; using System.Security.Cryptography; using System.Text; +using System.Threading; using System.Threading.Tasks; using System.Web; using Grpc.Core; @@ -19,8 +20,21 @@ namespace PluginSalesforce.API.Read { public static partial class Read { + // constants private const string CollectionName = "realtimerecord"; + // connection client settings + private static IPushTopicConnectionFactory _connectionFactory; + private static RealTimeSettings _realTimeSettings; + private static RequestHelper _requestHelper; + + // connection client + private static CancellationTokenSource _cts; + private static PushTopicConnection _conn; + + // state + private static RealTimeState _realTimeState; + public class RealTimeRecord { [BsonId] public string Id { get; set; } @@ -29,7 +43,7 @@ public class RealTimeRecord [BsonField] public byte[] RecordDataHash { get; set; } } - public static async Task ReadRecordsRealTimeAsync(RequestHelper client, ReadRequest request, + public static async Task ReadRecordsRealTimeAsync(RequestHelper requestHelper, ReadRequest request, IServerStreamWriter responseStream, ServerCallContext context, string permanentPath, IPushTopicConnectionFactory connectionFactory) { @@ -43,13 +57,16 @@ public static async Task ReadRecordsRealTimeAsync(RequestHelper client, Rea var recordsCount = 0; var runId = Guid.NewGuid().ToString(); - var realTimeSettings = + _realTimeSettings = JsonConvert.DeserializeObject(request.RealTimeSettingsJson); - var realTimeState = !string.IsNullOrWhiteSpace(request.RealTimeStateJson) + _realTimeState = !string.IsNullOrWhiteSpace(request.RealTimeStateJson) ? JsonConvert.DeserializeObject(request.RealTimeStateJson) : new RealTimeState(); - var conn = connectionFactory.GetPushTopicConnection(client, @"/topic/" + realTimeSettings.ChannelName); + _requestHelper = requestHelper; + _connectionFactory = connectionFactory; + + GetConnectedPushTopicConnection(); try { @@ -62,17 +79,14 @@ public static async Task ReadRecordsRealTimeAsync(RequestHelper client, Rea using (var db = new LiteDatabase(Path.Join(path, $"{jobId}_RealTimeReadRecords.db"))) { var realtimeRecordsCollection = db.GetCollection(CollectionName); - - // build cometd client - conn.Connect(); - + // a full init needs to happen - if (jobVersion > realTimeState.JobVersion || shapeVersion > realTimeState.ShapeVersion) + if (jobVersion > _realTimeState.JobVersion || shapeVersion > _realTimeState.ShapeVersion) { // reset real time state - realTimeState = new RealTimeState(); - realTimeState.JobVersion = jobVersion; - realTimeState.ShapeVersion = shapeVersion; + _realTimeState = new RealTimeState(); + _realTimeState.JobVersion = jobVersion; + _realTimeState.ShapeVersion = shapeVersion; // delete existing collection realtimeRecordsCollection.DeleteAll(); @@ -92,9 +106,9 @@ public static async Task ReadRecordsRealTimeAsync(RequestHelper client, Rea var query = schema.Query; // update real time state - realTimeState.LastReadTime = DateTime.UtcNow; + _realTimeState.LastReadTime = DateTime.UtcNow; - await Initialize(client, schema, query, runId, realTimeState, schemaKeys, recordsCount, + await Initialize(schema, query, runId, schemaKeys, recordsCount, realtimeRecordsCollection, responseStream); Logger.Info("Real time read initialized."); @@ -105,8 +119,8 @@ await Initialize(client, schema, query, runId, realTimeState, schemaKeys, record long currentRunRecordsCount = 0; // process all messages since last batch interval - Logger.Debug($"Getting all records since {realTimeState.LastReadTime.ToUniversalTime():O}"); - var messages = conn.GetCurrentMessages(); + Logger.Debug($"Getting all records since {_realTimeState.LastReadTime.ToUniversalTime():O}"); + var messages = _conn.GetCurrentMessages(); await foreach (var message in messages) { @@ -153,7 +167,7 @@ await Initialize(client, schema, query, runId, realTimeState, schemaKeys, record case "GAP_UNDELETE": // perform full init for GAP events runId = Guid.NewGuid().ToString(); - await Initialize(client, schema, query, runId, realTimeState, schemaKeys, recordsCount, + await Initialize(schema, query, runId, schemaKeys, recordsCount, realtimeRecordsCollection, responseStream); break; default: @@ -181,30 +195,39 @@ await Initialize(client, schema, query, runId, realTimeState, schemaKeys, record } // clear processed messages - conn.ClearStoredMessages(); + _conn.ClearStoredMessages(); // update last read time - realTimeState.LastReadTime = DateTime.Now; + _realTimeState.LastReadTime = DateTime.Now; // update real time state var realTimeStateCommit = new Record { Action = Record.Types.Action.RealTimeStateCommit, - RealTimeStateJson = JsonConvert.SerializeObject(realTimeState) + RealTimeStateJson = JsonConvert.SerializeObject(_realTimeState) }; await responseStream.WriteAsync(realTimeStateCommit); Logger.Debug( - $"Got {currentRunRecordsCount} records since {realTimeState.LastReadTime.ToUniversalTime():O}"); + $"Got {currentRunRecordsCount} records since {_realTimeState.LastReadTime.ToUniversalTime():O}"); - await Task.Delay(realTimeSettings.BatchWindowSeconds * 1000, context.CancellationToken); + if (_cts.IsCancellationRequested) { + // reconnect after 30 minutes + Logger.Debug("request to disconnect stream requested"); + _conn.Disconnect(); + + GetConnectedPushTopicConnection(); + } + + // sleep until next check window + await Task.Delay(_realTimeSettings.BatchWindowSeconds * 1000, context.CancellationToken); } } } catch (TaskCanceledException e) { Logger.Info($"Operation cancelled {e.Message}"); - conn.Disconnect(); + _conn.Disconnect(); return recordsCount; } catch (Exception e) @@ -214,12 +237,22 @@ await Initialize(client, schema, query, runId, realTimeState, schemaKeys, record } finally { - conn.Disconnect(); + _conn.Disconnect(); } return recordsCount; } + private static void GetConnectedPushTopicConnection() { + // build cometd client + _conn = _connectionFactory.GetPushTopicConnection(_requestHelper, @"/topic/" + _realTimeSettings.ChannelName); + _conn.Connect(); + + // force the client to reconnect after 30 minutes + _cts = new CancellationTokenSource(); + _cts.CancelAfter(1800 * 1000); + } + private static string GetRecordKeyEntry(List schemaKeys, Dictionary recordMap) { var entryIdStringList = new List(); @@ -351,17 +384,15 @@ private static void MutateRecordMap(Schema schema, Dictionary ra /// /// Loads all data into the local db, uploads changed records, and deletes missing records /// - /// /// /// /// - /// /// /// /// /// - public static async Task Initialize(RequestHelper client, Schema schema, string query, string runId, - RealTimeState realTimeState, List schemaKeys, long recordsCount, + public static async Task Initialize(Schema schema, string query, string runId, + List schemaKeys, long recordsCount, ILiteCollection realtimeRecordsCollection, IServerStreamWriter responseStream) { IAsyncEnumerable> allRecords; @@ -369,11 +400,11 @@ public static async Task Initialize(RequestHelper client, Schema schema, string // get all records if (string.IsNullOrWhiteSpace(query)) { - allRecords = GetRecordsForDefaultQuery(client, schema); + allRecords = GetRecordsForDefaultQuery(_requestHelper, schema); } else { - allRecords = GetRecordsForQuery(client, query); + allRecords = GetRecordsForQuery(_requestHelper, query); } await foreach (var rawRecord in allRecords) @@ -420,7 +451,7 @@ public static async Task Initialize(RequestHelper client, Schema schema, string var realTimeStateCommit = new Record { Action = Record.Types.Action.RealTimeStateCommit, - RealTimeStateJson = JsonConvert.SerializeObject(realTimeState) + RealTimeStateJson = JsonConvert.SerializeObject(_realTimeState) }; await responseStream.WriteAsync(realTimeStateCommit); diff --git a/PluginSalesforce/API/Utility/Listener.cs b/PluginSalesforce/API/Utility/Listener.cs index f4c0aa0..376bb01 100644 --- a/PluginSalesforce/API/Utility/Listener.cs +++ b/PluginSalesforce/API/Utility/Listener.cs @@ -13,7 +13,7 @@ public void OnMessage(IClientSessionChannel channel, IMessage message) { var convertedJson = message.Json; _messages.Add(convertedJson); - Logger.Info($"Got message: {convertedJson}"); + Logger.Debug($"Got message: {convertedJson}"); } public void ClearStoredMessages() diff --git a/PluginSalesforce/DataContracts/RealTimeEvent.cs b/PluginSalesforce/DataContracts/RealTimeEvent.cs index 56a0021..8f79736 100644 --- a/PluginSalesforce/DataContracts/RealTimeEvent.cs +++ b/PluginSalesforce/DataContracts/RealTimeEvent.cs @@ -28,7 +28,7 @@ public class RealTimeEvent public DateTime CreatedDate { get; set; } [JsonProperty("replayId")] - public Int32 ReplayId { get; set; } + public long ReplayId { get; set; } [JsonProperty("type")] public string Type { get; set; }