Skip to content

Commit

Permalink
fix(real time read): cahnged to recycling the whole client instead of…
Browse files Browse the repository at this point in the history
… just the connection

resolves PRODSUP-4052
  • Loading branch information
roehlerw committed May 26, 2023
1 parent 4fd2816 commit 8da05e0
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 41 deletions.
11 changes: 0 additions & 11 deletions PluginSalesforce/API/Factory/PushTopicConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ public class PushTopicConnection
private readonly Listener _listener = null;
private readonly string _channel = "";

private CancellationTokenSource _cts;

public PushTopicConnection(BayeuxClient bayeuxClient, string channel)
{
_bayeuxClient = bayeuxClient;
Expand All @@ -24,10 +22,6 @@ public PushTopicConnection(BayeuxClient bayeuxClient, string channel)
}
public void Connect()
{
// force the client to reconnect after 30 minutes
_cts = new CancellationTokenSource();
_cts.CancelAfter(1800 * 1000);

// connect the client
_bayeuxClient.Handshake();
_bayeuxClient.WaitFor(1000, new[] { BayeuxClient.State.CONNECTED });
Expand All @@ -42,11 +36,6 @@ public void Disconnect()

public async IAsyncEnumerable<string> GetCurrentMessages()
{
if (_cts.IsCancellationRequested) {
Logger.Debug("request to disconnect stream requested");
Disconnect();
}

if (!_bayeuxClient.Connected) {
Logger.Debug("request to connect stream requested");
Connect();
Expand Down
90 changes: 60 additions & 30 deletions PluginSalesforce/API/Read/ReadRecordsRealTime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Linq;
using System.Security.Cryptography;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Web;
using Grpc.Core;
Expand All @@ -19,8 +20,21 @@ namespace PluginSalesforce.API.Read
{
public static partial class Read
{
// constants
private const string CollectionName = "realtimerecord";

// connection client settings
private static IPushTopicConnectionFactory _connectionFactory;
private static RealTimeSettings _realTimeSettings;
private static RequestHelper _requestHelper;

// connection client
private static CancellationTokenSource _cts;
private static PushTopicConnection _conn;

// state
private static RealTimeState _realTimeState;

public class RealTimeRecord
{
[BsonId] public string Id { get; set; }
Expand All @@ -29,7 +43,7 @@ public class RealTimeRecord
[BsonField] public byte[] RecordDataHash { get; set; }
}

public static async Task<int> ReadRecordsRealTimeAsync(RequestHelper client, ReadRequest request,
public static async Task<int> ReadRecordsRealTimeAsync(RequestHelper requestHelper, ReadRequest request,
IServerStreamWriter<Record> responseStream,
ServerCallContext context, string permanentPath, IPushTopicConnectionFactory connectionFactory)
{
Expand All @@ -43,14 +57,15 @@ public static async Task<int> ReadRecordsRealTimeAsync(RequestHelper client, Rea
var recordsCount = 0;
var runId = Guid.NewGuid().ToString();

var realTimeSettings =
_realTimeSettings =
JsonConvert.DeserializeObject<RealTimeSettings>(request.RealTimeSettingsJson);
var realTimeState = !string.IsNullOrWhiteSpace(request.RealTimeStateJson)
_realTimeState = !string.IsNullOrWhiteSpace(request.RealTimeStateJson)
? JsonConvert.DeserializeObject<RealTimeState>(request.RealTimeStateJson)
: new RealTimeState();

// build cometd client
var conn = connectionFactory.GetPushTopicConnection(client, @"/topic/" + realTimeSettings.ChannelName);
_requestHelper = requestHelper;

GetConnectedPushTopicConnection();

try
{
Expand All @@ -62,17 +77,15 @@ public static async Task<int> ReadRecordsRealTimeAsync(RequestHelper client, Rea

using (var db = new LiteDatabase(Path.Join(path, $"{jobId}_RealTimeReadRecords.db")))
{
conn.Connect();

var realtimeRecordsCollection = db.GetCollection<RealTimeRecord>(CollectionName);

// a full init needs to happen
if (jobVersion > realTimeState.JobVersion || shapeVersion > realTimeState.ShapeVersion)
if (jobVersion > _realTimeState.JobVersion || shapeVersion > _realTimeState.ShapeVersion)
{
// reset real time state
realTimeState = new RealTimeState();
realTimeState.JobVersion = jobVersion;
realTimeState.ShapeVersion = shapeVersion;
_realTimeState = new RealTimeState();
_realTimeState.JobVersion = jobVersion;
_realTimeState.ShapeVersion = shapeVersion;

// delete existing collection
realtimeRecordsCollection.DeleteAll();
Expand All @@ -92,9 +105,9 @@ public static async Task<int> ReadRecordsRealTimeAsync(RequestHelper client, Rea
var query = schema.Query;

// update real time state
realTimeState.LastReadTime = DateTime.UtcNow;
_realTimeState.LastReadTime = DateTime.UtcNow;

await Initialize(client, schema, query, runId, realTimeState, schemaKeys, recordsCount,
await Initialize(schema, query, runId, schemaKeys, recordsCount,
realtimeRecordsCollection, responseStream);

Logger.Info("Real time read initialized.");
Expand All @@ -105,8 +118,8 @@ await Initialize(client, schema, query, runId, realTimeState, schemaKeys, record
long currentRunRecordsCount = 0;

// process all messages since last batch interval
Logger.Debug($"Getting all records since {realTimeState.LastReadTime.ToUniversalTime():O}");
var messages = conn.GetCurrentMessages();
Logger.Debug($"Getting all records since {_realTimeState.LastReadTime.ToUniversalTime():O}");
var messages = _conn.GetCurrentMessages();

await foreach (var message in messages)
{
Expand Down Expand Up @@ -153,7 +166,7 @@ await Initialize(client, schema, query, runId, realTimeState, schemaKeys, record
case "GAP_UNDELETE":
// perform full init for GAP events
runId = Guid.NewGuid().ToString();
await Initialize(client, schema, query, runId, realTimeState, schemaKeys, recordsCount,
await Initialize(schema, query, runId, schemaKeys, recordsCount,
realtimeRecordsCollection, responseStream);
break;
default:
Expand Down Expand Up @@ -181,30 +194,39 @@ await Initialize(client, schema, query, runId, realTimeState, schemaKeys, record
}

// clear processed messages
conn.ClearStoredMessages();
_conn.ClearStoredMessages();

// update last read time
realTimeState.LastReadTime = DateTime.Now;
_realTimeState.LastReadTime = DateTime.Now;

// update real time state
var realTimeStateCommit = new Record
{
Action = Record.Types.Action.RealTimeStateCommit,
RealTimeStateJson = JsonConvert.SerializeObject(realTimeState)
RealTimeStateJson = JsonConvert.SerializeObject(_realTimeState)
};
await responseStream.WriteAsync(realTimeStateCommit);

Logger.Debug(
$"Got {currentRunRecordsCount} records since {realTimeState.LastReadTime.ToUniversalTime():O}");
$"Got {currentRunRecordsCount} records since {_realTimeState.LastReadTime.ToUniversalTime():O}");

if (_cts.IsCancellationRequested) {
// reconnect after 30 minutes
Logger.Debug("request to disconnect stream requested");
_conn.Disconnect();

await Task.Delay(realTimeSettings.BatchWindowSeconds * 1000, context.CancellationToken);
GetConnectedPushTopicConnection();
}

// sleep until next check window
await Task.Delay(_realTimeSettings.BatchWindowSeconds * 1000, context.CancellationToken);
}
}
}
catch (TaskCanceledException e)
{
Logger.Info($"Operation cancelled {e.Message}");
conn.Disconnect();
_conn.Disconnect();
return recordsCount;
}
catch (Exception e)
Expand All @@ -214,12 +236,22 @@ await Initialize(client, schema, query, runId, realTimeState, schemaKeys, record
}
finally
{
conn.Disconnect();
_conn.Disconnect();
}

return recordsCount;
}

private static void GetConnectedPushTopicConnection() {
// build cometd client
_conn = _connectionFactory.GetPushTopicConnection(_requestHelper, @"/topic/" + _realTimeSettings.ChannelName);
_conn.Connect();

// force the client to reconnect after 30 minutes
_cts = new CancellationTokenSource();
_cts.CancelAfter(1800 * 1000);
}

private static string GetRecordKeyEntry(List<string> schemaKeys, Dictionary<string, object> recordMap)
{
var entryIdStringList = new List<string>();
Expand Down Expand Up @@ -351,29 +383,27 @@ private static void MutateRecordMap(Schema schema, Dictionary<string, object> ra
/// <summary>
/// Loads all data into the local db, uploads changed records, and deletes missing records
/// </summary>
/// <param name="client"></param>
/// <param name="schema"></param>
/// <param name="query"></param>
/// <param name="runId"></param>
/// <param name="realTimeState"></param>
/// <param name="schemaKeys"></param>
/// <param name="recordsCount"></param>
/// <param name="realtimeRecordsCollection"></param>
/// <param name="responseStream"></param>
public static async Task Initialize(RequestHelper client, Schema schema, string query, string runId,
RealTimeState realTimeState, List<string> schemaKeys, long recordsCount,
public static async Task Initialize(Schema schema, string query, string runId,
List<string> schemaKeys, long recordsCount,
ILiteCollection<RealTimeRecord> realtimeRecordsCollection, IServerStreamWriter<Record> responseStream)
{
IAsyncEnumerable<Dictionary<string, object>> allRecords;

// get all records
if (string.IsNullOrWhiteSpace(query))
{
allRecords = GetRecordsForDefaultQuery(client, schema);
allRecords = GetRecordsForDefaultQuery(_requestHelper, schema);
}
else
{
allRecords = GetRecordsForQuery(client, query);
allRecords = GetRecordsForQuery(_requestHelper, query);
}

await foreach (var rawRecord in allRecords)
Expand Down Expand Up @@ -420,7 +450,7 @@ public static async Task Initialize(RequestHelper client, Schema schema, string
var realTimeStateCommit = new Record
{
Action = Record.Types.Action.RealTimeStateCommit,
RealTimeStateJson = JsonConvert.SerializeObject(realTimeState)
RealTimeStateJson = JsonConvert.SerializeObject(_realTimeState)
};

await responseStream.WriteAsync(realTimeStateCommit);
Expand Down

0 comments on commit 8da05e0

Please sign in to comment.