Skip to content

Commit

Permalink
Merge pull request #19 from naveego/issue/PRODSUP-4052/sf-realtime-lo…
Browse files Browse the repository at this point in the history
…ng-running

PRODSUP-4052: Fix Long Runnign SF Real Time Reads
  • Loading branch information
roehlerw authored Jun 5, 2023
2 parents 3f13556 + c211481 commit af887f2
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 33 deletions.
7 changes: 7 additions & 0 deletions PluginSalesforce/API/Factory/PushTopicConnection.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Threading;
using CometD.NetCore.Client;
using Naveego.Sdk.Logging;
using Naveego.Sdk.Plugins;
Expand All @@ -21,6 +22,7 @@ public PushTopicConnection(BayeuxClient bayeuxClient, string channel)
}
public void Connect()
{
// connect the client
_bayeuxClient.Handshake();
_bayeuxClient.WaitFor(1000, new[] { BayeuxClient.State.CONNECTED });
_bayeuxClient.GetChannel(_channel).Subscribe(_listener);
Expand All @@ -34,6 +36,11 @@ public void Disconnect()

public async IAsyncEnumerable<string> GetCurrentMessages()
{
if (!_bayeuxClient.Connected) {
Logger.Debug("request to connect stream requested");
Connect();
}

var messages = _listener.GetMessages();

foreach (var message in messages)
Expand Down
93 changes: 62 additions & 31 deletions PluginSalesforce/API/Read/ReadRecordsRealTime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Linq;
using System.Security.Cryptography;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Web;
using Grpc.Core;
Expand All @@ -19,8 +20,21 @@ namespace PluginSalesforce.API.Read
{
public static partial class Read
{
// constants
private const string CollectionName = "realtimerecord";

// connection client settings
private static IPushTopicConnectionFactory _connectionFactory;
private static RealTimeSettings _realTimeSettings;
private static RequestHelper _requestHelper;

// connection client
private static CancellationTokenSource _cts;
private static PushTopicConnection _conn;

// state
private static RealTimeState _realTimeState;

public class RealTimeRecord
{
[BsonId] public string Id { get; set; }
Expand All @@ -29,7 +43,7 @@ public class RealTimeRecord
[BsonField] public byte[] RecordDataHash { get; set; }
}

public static async Task<int> ReadRecordsRealTimeAsync(RequestHelper client, ReadRequest request,
public static async Task<int> ReadRecordsRealTimeAsync(RequestHelper requestHelper, ReadRequest request,
IServerStreamWriter<Record> responseStream,
ServerCallContext context, string permanentPath, IPushTopicConnectionFactory connectionFactory)
{
Expand All @@ -43,13 +57,16 @@ public static async Task<int> ReadRecordsRealTimeAsync(RequestHelper client, Rea
var recordsCount = 0;
var runId = Guid.NewGuid().ToString();

var realTimeSettings =
_realTimeSettings =
JsonConvert.DeserializeObject<RealTimeSettings>(request.RealTimeSettingsJson);
var realTimeState = !string.IsNullOrWhiteSpace(request.RealTimeStateJson)
_realTimeState = !string.IsNullOrWhiteSpace(request.RealTimeStateJson)
? JsonConvert.DeserializeObject<RealTimeState>(request.RealTimeStateJson)
: new RealTimeState();

var conn = connectionFactory.GetPushTopicConnection(client, @"/topic/" + realTimeSettings.ChannelName);
_requestHelper = requestHelper;
_connectionFactory = connectionFactory;

GetConnectedPushTopicConnection();

try
{
Expand All @@ -62,17 +79,14 @@ public static async Task<int> ReadRecordsRealTimeAsync(RequestHelper client, Rea
using (var db = new LiteDatabase(Path.Join(path, $"{jobId}_RealTimeReadRecords.db")))
{
var realtimeRecordsCollection = db.GetCollection<RealTimeRecord>(CollectionName);

// build cometd client
conn.Connect();


// a full init needs to happen
if (jobVersion > realTimeState.JobVersion || shapeVersion > realTimeState.ShapeVersion)
if (jobVersion > _realTimeState.JobVersion || shapeVersion > _realTimeState.ShapeVersion)
{
// reset real time state
realTimeState = new RealTimeState();
realTimeState.JobVersion = jobVersion;
realTimeState.ShapeVersion = shapeVersion;
_realTimeState = new RealTimeState();
_realTimeState.JobVersion = jobVersion;
_realTimeState.ShapeVersion = shapeVersion;

// delete existing collection
realtimeRecordsCollection.DeleteAll();
Expand All @@ -92,9 +106,9 @@ public static async Task<int> ReadRecordsRealTimeAsync(RequestHelper client, Rea
var query = schema.Query;

// update real time state
realTimeState.LastReadTime = DateTime.UtcNow;
_realTimeState.LastReadTime = DateTime.UtcNow;

await Initialize(client, schema, query, runId, realTimeState, schemaKeys, recordsCount,
await Initialize(schema, query, runId, schemaKeys, recordsCount,
realtimeRecordsCollection, responseStream);

Logger.Info("Real time read initialized.");
Expand All @@ -105,8 +119,8 @@ await Initialize(client, schema, query, runId, realTimeState, schemaKeys, record
long currentRunRecordsCount = 0;

// process all messages since last batch interval
Logger.Debug($"Getting all records since {realTimeState.LastReadTime.ToUniversalTime():O}");
var messages = conn.GetCurrentMessages();
Logger.Debug($"Getting all records since {_realTimeState.LastReadTime.ToUniversalTime():O}");
var messages = _conn.GetCurrentMessages();

await foreach (var message in messages)
{
Expand Down Expand Up @@ -153,7 +167,7 @@ await Initialize(client, schema, query, runId, realTimeState, schemaKeys, record
case "GAP_UNDELETE":
// perform full init for GAP events
runId = Guid.NewGuid().ToString();
await Initialize(client, schema, query, runId, realTimeState, schemaKeys, recordsCount,
await Initialize(schema, query, runId, schemaKeys, recordsCount,
realtimeRecordsCollection, responseStream);
break;
default:
Expand Down Expand Up @@ -181,30 +195,39 @@ await Initialize(client, schema, query, runId, realTimeState, schemaKeys, record
}

// clear processed messages
conn.ClearStoredMessages();
_conn.ClearStoredMessages();

// update last read time
realTimeState.LastReadTime = DateTime.Now;
_realTimeState.LastReadTime = DateTime.Now;

// update real time state
var realTimeStateCommit = new Record
{
Action = Record.Types.Action.RealTimeStateCommit,
RealTimeStateJson = JsonConvert.SerializeObject(realTimeState)
RealTimeStateJson = JsonConvert.SerializeObject(_realTimeState)
};
await responseStream.WriteAsync(realTimeStateCommit);

Logger.Debug(
$"Got {currentRunRecordsCount} records since {realTimeState.LastReadTime.ToUniversalTime():O}");
$"Got {currentRunRecordsCount} records since {_realTimeState.LastReadTime.ToUniversalTime():O}");

await Task.Delay(realTimeSettings.BatchWindowSeconds * 1000, context.CancellationToken);
if (_cts.IsCancellationRequested) {
// reconnect after 30 minutes
Logger.Debug("request to disconnect stream requested");
_conn.Disconnect();

GetConnectedPushTopicConnection();
}

// sleep until next check window
await Task.Delay(_realTimeSettings.BatchWindowSeconds * 1000, context.CancellationToken);
}
}
}
catch (TaskCanceledException e)
{
Logger.Info($"Operation cancelled {e.Message}");
conn.Disconnect();
_conn.Disconnect();
return recordsCount;
}
catch (Exception e)
Expand All @@ -214,12 +237,22 @@ await Initialize(client, schema, query, runId, realTimeState, schemaKeys, record
}
finally
{
conn.Disconnect();
_conn.Disconnect();
}

return recordsCount;
}

private static void GetConnectedPushTopicConnection() {
// build cometd client
_conn = _connectionFactory.GetPushTopicConnection(_requestHelper, @"/topic/" + _realTimeSettings.ChannelName);
_conn.Connect();

// force the client to reconnect after 30 minutes
_cts = new CancellationTokenSource();
_cts.CancelAfter(1800 * 1000);
}

private static string GetRecordKeyEntry(List<string> schemaKeys, Dictionary<string, object> recordMap)
{
var entryIdStringList = new List<string>();
Expand Down Expand Up @@ -351,29 +384,27 @@ private static void MutateRecordMap(Schema schema, Dictionary<string, object> ra
/// <summary>
/// Loads all data into the local db, uploads changed records, and deletes missing records
/// </summary>
/// <param name="client"></param>
/// <param name="schema"></param>
/// <param name="query"></param>
/// <param name="runId"></param>
/// <param name="realTimeState"></param>
/// <param name="schemaKeys"></param>
/// <param name="recordsCount"></param>
/// <param name="realtimeRecordsCollection"></param>
/// <param name="responseStream"></param>
public static async Task Initialize(RequestHelper client, Schema schema, string query, string runId,
RealTimeState realTimeState, List<string> schemaKeys, long recordsCount,
public static async Task Initialize(Schema schema, string query, string runId,
List<string> schemaKeys, long recordsCount,
ILiteCollection<RealTimeRecord> realtimeRecordsCollection, IServerStreamWriter<Record> responseStream)
{
IAsyncEnumerable<Dictionary<string, object>> allRecords;

// get all records
if (string.IsNullOrWhiteSpace(query))
{
allRecords = GetRecordsForDefaultQuery(client, schema);
allRecords = GetRecordsForDefaultQuery(_requestHelper, schema);
}
else
{
allRecords = GetRecordsForQuery(client, query);
allRecords = GetRecordsForQuery(_requestHelper, query);
}

await foreach (var rawRecord in allRecords)
Expand Down Expand Up @@ -420,7 +451,7 @@ public static async Task Initialize(RequestHelper client, Schema schema, string
var realTimeStateCommit = new Record
{
Action = Record.Types.Action.RealTimeStateCommit,
RealTimeStateJson = JsonConvert.SerializeObject(realTimeState)
RealTimeStateJson = JsonConvert.SerializeObject(_realTimeState)
};

await responseStream.WriteAsync(realTimeStateCommit);
Expand Down
2 changes: 1 addition & 1 deletion PluginSalesforce/API/Utility/Listener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public void OnMessage(IClientSessionChannel channel, IMessage message)
{
var convertedJson = message.Json;
_messages.Add(convertedJson);
Logger.Info($"Got message: {convertedJson}");
Logger.Debug($"Got message: {convertedJson}");
}

public void ClearStoredMessages()
Expand Down
2 changes: 1 addition & 1 deletion PluginSalesforce/DataContracts/RealTimeEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class RealTimeEvent
public DateTime CreatedDate { get; set; }

[JsonProperty("replayId")]
public Int32 ReplayId { get; set; }
public long ReplayId { get; set; }

[JsonProperty("type")]
public string Type { get; set; }
Expand Down

0 comments on commit af887f2

Please sign in to comment.