Skip to content

Commit

Permalink
Support for Diskless Replication (microsoft#997)
Browse files Browse the repository at this point in the history
* expose diskless replication parameters

* refactor/cleanup legacy ReplicaSyncSession

* add interface to support diskless replication session and aof tasks

* core diskless replication implementation

* expose diskless replication API

* adding test for diskless replication

* update gcs extension to clearly mark logging progress

* fix gcs dispose on diskless attach, call dispose of replicationSyncManager, add more logging

* complete first diskless replication test

* fix iterator check for null when empty store

* fix iterator for object store cluster sync

* add simple diskless sync test

* cleanup code

* replica fall behind test

* wip

* register cts at wait for sync completion

* add db version alignment test

* avoid using close lock for leader based syncing

* truncate AOF after streaming checkpoint is taken

* add tests for failover with diskless replication

* fix formatting and conversion to IPEndpoint

* fix RepCommandsTests

* dispose aofSyncTask if failed to add to AofSyncTaskStore

* overload dispose ReplicaSyncSession

* explicitly dispose gcs used for full sync at replicaSyncSession sync

* dispose gcs once on return

* code cleanup

* update tests to provide more context logging

* add more comprehensive logging of syncMetadata

* add timeout for streaming checkpoint

* add clusterTimeout for diskless repl tests

* some more logging

* cleanup and refactor code

* truncate AOF only when main-memory-replication is switched on

* adding logging for cancellation when streaming

* split checkpoint commit marker to allow for disk checkpoints

* update sync metadata log message

* add progress based timeout implementation

* deprecate main-memory-replication
  • Loading branch information
vazois authored Feb 24, 2025
1 parent 261e1c6 commit 14af7b8
Show file tree
Hide file tree
Showing 44 changed files with 2,697 additions and 282 deletions.
2 changes: 1 addition & 1 deletion benchmark/BDN.benchmark/Operations/OperationsBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public virtual void GlobalSetup()
{
opts.EnableAOF = true;
opts.UseAofNullDevice = true;
opts.MainMemoryReplication = true;
opts.FastAofTruncate = true;
opts.CommitFrequencyMs = -1;
opts.AofPageSize = "128m";
opts.AofMemorySize = "256m";
Expand Down
13 changes: 10 additions & 3 deletions libs/client/ClientSession/GarnetClientSessionIncremental.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,15 @@

namespace Garnet.client
{
enum IncrementalSendType : byte
{
MIGRATE,
SYNC
}

public sealed unsafe partial class GarnetClientSession : IServerHook, IMessageConsumer
{
IncrementalSendType ist;
bool isMainStore;
byte* curr, head;
int keyValuePairCount;
Expand Down Expand Up @@ -183,9 +190,9 @@ private void TrackIterationProgress(int keyCount, int size, bool completed = fal
var duration = TimeSpan.FromTicks(Stopwatch.GetTimestamp() - lastLog);
if (completed || lastLog == 0 || duration >= iterationProgressFreq)
{
logger?.LogTrace("[{op}]: isMainStore:({storeType}) totalKeyCount:({totalKeyCount}), totalPayloadSize:({totalPayloadSize} KB)",
completed ? "COMPLETED" : "MIGRATING",
isMainStore,
logger?.LogTrace("[{op}]: store:({storeType}) totalKeyCount:({totalKeyCount}), totalPayloadSize:({totalPayloadSize} KB)",
completed ? "COMPLETED" : ist,
isMainStore ? "MAIN STORE" : "OBJECT STORE",
totalKeyCount.ToString("N0"),
((long)((double)totalPayloadSize / 1024)).ToString("N0"));
lastLog = Stopwatch.GetTimestamp();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ public void SetClusterMigrateHeader(string sourceNodeId, bool replace, bool isMa
tcsQueue.Enqueue(currTcsIterationTask);
curr = offset;
this.isMainStore = isMainStore;
this.ist = IncrementalSendType.MIGRATE;
var storeType = isMainStore ? MAIN_STORE : OBJECT_STORE;
var replaceOption = replace ? T : F;

Expand Down
115 changes: 115 additions & 0 deletions libs/client/ClientSession/GarnetClientSessionReplicationExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ public sealed unsafe partial class GarnetClientSession : IServerHook, IMessageCo
static ReadOnlySpan<byte> send_ckpt_metadata => "SEND_CKPT_METADATA"u8;
static ReadOnlySpan<byte> send_ckpt_file_segment => "SEND_CKPT_FILE_SEGMENT"u8;
static ReadOnlySpan<byte> begin_replica_recover => "BEGIN_REPLICA_RECOVER"u8;
static ReadOnlySpan<byte> attach_sync => "ATTACH_SYNC"u8;
static ReadOnlySpan<byte> sync => "SYNC"u8;

/// <summary>
/// Initiate checkpoint retrieval from replica by sending replica checkpoint information and AOF address range
Expand Down Expand Up @@ -352,5 +354,118 @@ public Task<string> ExecuteBeginReplicaRecover(bool sendStoreCheckpoint, bool se
Interlocked.Increment(ref numCommands);
return tcs.Task;
}

/// <summary>
/// Initiate attach from replica
/// </summary>
/// <param name="syncMetadata"></param>
/// <returns></returns>
public Task<string> ExecuteAttachSync(byte[] syncMetadata)
{
var tcs = new TaskCompletionSource<string>(TaskCreationOptions.RunContinuationsAsynchronously);
tcsQueue.Enqueue(tcs);
byte* curr = offset;
int arraySize = 3;

while (!RespWriteUtils.TryWriteArrayLength(arraySize, ref curr, end))
{
Flush();
curr = offset;
}
offset = curr;

//1
while (!RespWriteUtils.TryWriteDirect(CLUSTER, ref curr, end))
{
Flush();
curr = offset;
}
offset = curr;

//2
while (!RespWriteUtils.TryWriteBulkString(attach_sync, ref curr, end))
{
Flush();
curr = offset;
}
offset = curr;

//3
while (!RespWriteUtils.TryWriteBulkString(syncMetadata, ref curr, end))
{
Flush();
curr = offset;
}
offset = curr;

Flush();
Interlocked.Increment(ref numCommands);
return tcs.Task;
}

/// <summary>
/// Set CLUSTER SYNC header info
/// </summary>
/// <param name="sourceNodeId"></param>
/// <param name="isMainStore"></param>
public void SetClusterSyncHeader(string sourceNodeId, bool isMainStore)
{
currTcsIterationTask = new TaskCompletionSource<string>(TaskCreationOptions.RunContinuationsAsynchronously);
tcsQueue.Enqueue(currTcsIterationTask);
curr = offset;
this.isMainStore = isMainStore;
this.ist = IncrementalSendType.SYNC;
var storeType = isMainStore ? MAIN_STORE : OBJECT_STORE;

var arraySize = 5;
while (!RespWriteUtils.TryWriteArrayLength(arraySize, ref curr, end))
{
Flush();
curr = offset;
}
offset = curr;

// 1
while (!RespWriteUtils.TryWriteDirect(CLUSTER, ref curr, end))
{
Flush();
curr = offset;
}
offset = curr;

// 2
while (!RespWriteUtils.TryWriteBulkString(sync, ref curr, end))
{
Flush();
curr = offset;
}
offset = curr;

// 3
while (!RespWriteUtils.TryWriteAsciiBulkString(sourceNodeId, ref curr, end))
{
Flush();
curr = offset;
}
offset = curr;

// 4
while (!RespWriteUtils.TryWriteBulkString(storeType, ref curr, end))
{
Flush();
curr = offset;
}
offset = curr;

// 5
// Reserve space for the bulk string header + final newline
while (ExtraSpace + 2 > (int)(end - curr))
{
Flush();
curr = offset;
}
head = curr;
curr += ExtraSpace;
}
}
}
2 changes: 1 addition & 1 deletion libs/cluster/Server/ClusterProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public void SafeTruncateAOF(StoreType storeType, bool full, long CheckpointCover
_ = replicationManager.SafeTruncateAof(CheckpointCoveredAofAddress);
else
{
if (serverOptions.MainMemoryReplication)
if (serverOptions.FastAofTruncate)
storeWrapper.appendOnlyFile?.UnsafeShiftBeginAddress(CheckpointCoveredAofAddress, truncateLog: true);
else
{
Expand Down
1 change: 1 addition & 0 deletions libs/cluster/Server/Replication/CheckpointEntry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ public byte[] ToByteArray()

public static CheckpointEntry FromByteArray(byte[] serialized)
{
if (serialized.Length == 0) return null;
var ms = new MemoryStream(serialized);
var reader = new BinaryReader(ms);
var cEntry = new CheckpointEntry
Expand Down
21 changes: 1 addition & 20 deletions libs/cluster/Server/Replication/PrimaryOps/AofSyncTaskInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using System.Threading;
using System.Threading.Tasks;
using Garnet.client;
using Garnet.common;
using Microsoft.Extensions.Logging;
using Tsavorite.core;

Expand All @@ -24,11 +23,6 @@ internal sealed class AofSyncTaskInfo : IBulkLogEntryConsumer, IDisposable
readonly long startAddress;
public long previousAddress;

/// <summary>
/// Used to mark if syncing is in progress
/// </summary>
SingleWriterMultiReaderLock aofSyncInProgress;

/// <summary>
/// Check if client connection is healthy
/// </summary>
Expand Down Expand Up @@ -69,11 +63,6 @@ public void Dispose()

// Finally, dispose the cts
cts?.Dispose();

// Dispose only if AOF sync has not started
// otherwise sync task will dispose the client
if (aofSyncInProgress.TryWriteLock())
garnetClient?.Dispose();
}

public unsafe void Consume(byte* payloadPtr, int payloadLength, long currentAddress, long nextAddress, bool isProtected)
Expand Down Expand Up @@ -108,16 +97,8 @@ public async Task ReplicaSyncTask()
{
logger?.LogInformation("Starting ReplicationManager.ReplicaSyncTask for remote node {remoteNodeId} starting from address {address}", remoteNodeId, startAddress);

var failedToStart = false;
try
{
if (!aofSyncInProgress.TryWriteLock())
{
logger?.LogWarning("{method} AOF sync for {remoteNodeId} failed to start", nameof(ReplicaSyncTask), remoteNodeId);
failedToStart = true;
return;
}

if (!IsConnected) garnetClient.Connect();

iter = clusterProvider.storeWrapper.appendOnlyFile.ScanSingle(startAddress, long.MaxValue, scanUncommitted: true, recover: false, logger: logger);
Expand All @@ -134,7 +115,7 @@ public async Task ReplicaSyncTask()
}
finally
{
if (!failedToStart) garnetClient.Dispose();
garnetClient.Dispose();
var (address, port) = clusterProvider.clusterManager.CurrentConfig.GetWorkerAddressFromNodeId(remoteNodeId);
logger?.LogWarning("AofSync task terminated; client disposed {remoteNodeId} {address} {port} {currentAddress}", remoteNodeId, address, port, previousAddress);

Expand Down
114 changes: 111 additions & 3 deletions libs/cluster/Server/Replication/PrimaryOps/AofTaskStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public AofTaskStore(ClusterProvider clusterProvider, int initialSize = 1, ILogge
logPageSizeBits = clusterProvider.storeWrapper.appendOnlyFile.UnsafeGetLogPageSizeBits();
int logPageSize = 1 << logPageSizeBits;
logPageSizeMask = logPageSize - 1;
if (clusterProvider.serverOptions.MainMemoryReplication)
if (clusterProvider.serverOptions.FastAofTruncate)
clusterProvider.storeWrapper.appendOnlyFile.SafeTailShiftCallback = SafeTailShiftCallback;
TruncateLagAddress = clusterProvider.storeWrapper.appendOnlyFile.UnsafeGetReadOnlyAddressLagOffset() - 2 * logPageSize;
}
Expand Down Expand Up @@ -162,7 +162,7 @@ public bool TryAddReplicationTask(string remoteNodeId, long startAddress, out Ao

// Possible AOF data loss: { using null AOF device } OR { main memory replication AND no on-demand checkpoints }
bool possibleAofDataLoss = clusterProvider.serverOptions.UseAofNullDevice ||
(clusterProvider.serverOptions.MainMemoryReplication && !clusterProvider.serverOptions.OnDemandCheckpoint);
(clusterProvider.serverOptions.FastAofTruncate && !clusterProvider.serverOptions.OnDemandCheckpoint);

// Fail adding the task if truncation has happened, and we are not in possibleAofDataLoss mode
if (startAddress < TruncatedUntil && !possibleAofDataLoss)
Expand Down Expand Up @@ -217,6 +217,114 @@ public bool TryAddReplicationTask(string remoteNodeId, long startAddress, out Ao
return success;
}

public bool TryAddReplicationTasks(ReplicaSyncSession[] replicaSyncSessions, long startAddress)
{
var current = clusterProvider.clusterManager.CurrentConfig;
var success = true;
if (startAddress == 0) startAddress = ReplicationManager.kFirstValidAofAddress;

// First iterate through all sync sessions and add an AOF sync task
// All tasks will be
foreach (var rss in replicaSyncSessions)
{
if (rss == null) continue;
var replicaNodeId = rss.replicaSyncMetadata.originNodeId;
var (address, port) = current.GetWorkerAddressFromNodeId(replicaNodeId);

try
{
rss.AddAofSyncTask(new AofSyncTaskInfo(
clusterProvider,
this,
current.LocalNodeId,
replicaNodeId,
new GarnetClientSession(
new IPEndPoint(IPAddress.Parse(address), port),
clusterProvider.replicationManager.GetAofSyncNetworkBufferSettings,
clusterProvider.replicationManager.GetNetworkPool,
tlsOptions: clusterProvider.serverOptions.TlsOptions?.TlsClientOptions,
authUsername: clusterProvider.ClusterUsername,
authPassword: clusterProvider.ClusterPassword,
logger: logger),
startAddress,
logger));
}
catch (Exception ex)
{
logger?.LogWarning(ex, "{method} creating AOF sync task for {replicaNodeId} failed", nameof(TryAddReplicationTasks), replicaNodeId);
return false;
}
}

_lock.WriteLock();
try
{
if (_disposed) return false;

// Fail adding the task if truncation has happened
if (startAddress < TruncatedUntil)
{
logger?.LogWarning("{method} failed to add tasks for AOF sync {startAddress} {truncatedUntil}", nameof(TryAddReplicationTasks), startAddress, TruncatedUntil);
return false;
}

foreach (var rss in replicaSyncSessions)
{
if (rss == null) continue;

var added = false;
// Find if AOF sync task already exists
for (var i = 0; i < numTasks; i++)
{
var t = tasks[i];
Debug.Assert(t != null);
if (t.remoteNodeId == rss.replicaNodeId)
{
tasks[i] = rss.AofSyncTask;
t.Dispose();
added = true;
break;
}
}

if (added) continue;

// If AOF sync task did not exist and was not added we added below
// Check if array can hold a new AOF sync task
if (numTasks == tasks.Length)
{
var old_tasks = tasks;
var _tasks = new AofSyncTaskInfo[tasks.Length * 2];
Array.Copy(tasks, _tasks, tasks.Length);
tasks = _tasks;
Array.Clear(old_tasks);
}
// Add new AOF sync task
tasks[numTasks++] = rss.AofSyncTask;
}

success = true;
}
finally
{
_lock.WriteUnlock();

if (!success)
{
foreach (var rss in replicaSyncSessions)
{
if (rss == null) continue;
if (rss.AofSyncTask != null)
{
rss.AofSyncTask.Dispose();
}
}
}
}

return true;
}

public bool TryRemove(AofSyncTaskInfo aofSyncTask)
{
// Lock addition of new tasks
Expand Down Expand Up @@ -288,7 +396,7 @@ public long SafeTruncateAof(long CheckpointCoveredAofAddress = long.MaxValue)

if (TruncatedUntil > 0 && TruncatedUntil < long.MaxValue)
{
if (clusterProvider.serverOptions.MainMemoryReplication)
if (clusterProvider.serverOptions.FastAofTruncate)
{
clusterProvider.storeWrapper.appendOnlyFile?.UnsafeShiftBeginAddress(TruncatedUntil, snapToPageStart: true, truncateLog: true);
}
Expand Down
Loading

0 comments on commit 14af7b8

Please sign in to comment.