Skip to content

Commit

Permalink
Safe recovery of unsealed partitions
Browse files Browse the repository at this point in the history
  • Loading branch information
sakno committed Jul 3, 2024
1 parent 0eea613 commit 442bc7b
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1052,8 +1052,12 @@ public static async Task RegressionIssue244()
var path = Path.Combine(Path.GetTempPath(), Path.GetRandomFileName());
using (var state = new PersistentStateWithoutSnapshot(path, RecordsPerPartition, new() { UseCaching = true }))
{
var entries = RandomEntries();
foreach (var entry in entries[..2])
IReadOnlyList<Int64LogEntry> entries =
[
new() { Term = 1L, Content = 10L },
new() { Term = 1L, Content = 11L }
];
foreach (var entry in entries)
{
await state.AppendAsync(entry);
}
Expand All @@ -1074,16 +1078,5 @@ public static async Task RegressionIssue244()

await state.As<IRaftLog>().ReadAsync(new LogEntryConsumer(checker), 1L);
}

static Int64LogEntry[] RandomEntries()
{
var entries = new Int64LogEntry[RecordsPerPartition];
for (var i = 0; i < entries.Length; i++)
{
entries[i] = new Int64LogEntry() { Term = 1L, Content = i + 10L };
}

return entries;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ private static long Hash(ReadOnlySpan<byte> input)
static long HashRound(long hash, long data) => unchecked((hash ^ data) * prime);
}

internal ValueTask FlushAsync(in Range range)
internal ValueTask FlushAsync(in Range range, CancellationToken token = default)
{
ReadOnlyMemory<byte> data = buffer.Memory;
int offset;
Expand All @@ -166,7 +166,7 @@ internal ValueTask FlushAsync(in Range range)
data = data.Slice(offset, length);
}

return RandomAccess.WriteAsync(handle, data, offset);
return RandomAccess.WriteAsync(handle, data, offset, token);
}

internal ValueTask ClearAsync(CancellationToken token = default)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ protected Partition(DirectoryInfo location, int offset, int bufferSize, int reco
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private int ToRelativeIndex(long absoluteIndex)
protected int ToRelativeIndex(long absoluteIndex)
=> unchecked((int)(absoluteIndex - FirstIndex));

[MemberNotNullWhen(false, nameof(Previous))]
Expand Down Expand Up @@ -94,8 +94,6 @@ internal void DetachDescendant()
internal bool Contains(long recordIndex)
=> recordIndex >= FirstIndex && recordIndex <= LastIndex;

internal abstract void Initialize();

internal void ClearContext(long absoluteIndex)
{
Debug.Assert(absoluteIndex >= FirstIndex);
Expand Down Expand Up @@ -253,11 +251,6 @@ internal SparsePartition(DirectoryInfo location, int bufferSize, int recordsPerP
metadataBuffer = manager.BufferAllocator.AllocateExactly(LogEntryMetadata.Size);
}

internal override void Initialize()
{
// do nothing
}

private long GetMetadataOffset(int index) => index * (maxLogEntrySize + LogEntryMetadata.Size);

protected override LogEntryMetadata GetMetadata(int index)
Expand Down Expand Up @@ -425,7 +418,7 @@ private bool IsSealed
set => MemoryMarshal.GetReference(header.Span) = Unsafe.BitCast<bool, byte>(value);
}

internal override void Initialize()
internal void Initialize(long lastIndexHint)
{
using var handle = File.OpenHandle(FileName, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, FileOptions.SequentialScan);

Expand All @@ -434,7 +427,7 @@ internal override void Initialize()
// read header
if (RandomAccess.Read(Handle, header.Span, fileOffset: 0L) < HeaderSize)
{
header.Span.Clear();
RandomAccess.SetLength(Handle, HeaderSize);
writer.FilePosition = HeaderSize;
}
else if (IsSealed)
Expand All @@ -447,26 +440,27 @@ internal override void Initialize()

fileOffset -= footer.Length;
RandomAccess.Read(Handle, footer.Span, fileOffset);
runningIndex = int.CreateChecked(LastIndex - FirstIndex);
runningIndex = ToRelativeIndex(LastIndex);
}
else
else if (Contains(lastIndexHint))
{
// read sequentially every log entry
int footerOffset;
int footerOffset, endIndex = ToRelativeIndex(lastIndexHint);

if (PartitionNumber is 0L)
{
runningIndex = 1;
footerOffset = LogEntryMetadata.Size;
fileOffset = HeaderSize + LogEntryMetadata.Size;
}
else
{
footerOffset = 0;
footerOffset = runningIndex = 0;
fileOffset = HeaderSize;
}

for (Span<byte> metadataBuffer = stackalloc byte[LogEntryMetadata.Size], metadataTable = footer.Span;
footerOffset < footer.Length;
footerOffset < footer.Length && runningIndex <= endIndex;
footerOffset += LogEntryMetadata.Size, runningIndex++)
{
var count = RandomAccess.Read(Handle, metadataBuffer, fileOffset);
Expand All @@ -481,6 +475,10 @@ internal override void Initialize()
metadataBuffer.CopyTo(metadataTable.Slice(footerOffset, LogEntryMetadata.Size));
}
}
else
{
writer.FilePosition = HeaderSize;
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand All @@ -496,11 +494,13 @@ private long GetWriteAddress(int index)
[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))]
protected override async ValueTask PersistAsync<TEntry>(TEntry entry, int index, CancellationToken token)
{
var writeAddress = GetWriteAddress(index);
await UnsealIfNeededAsync(writeAddress, token).ConfigureAwait(false);
await UnsealIfNeededAsync(token).ConfigureAwait(false);

LogEntryMetadata metadata;
var metadataBuffer = GetMetadataBuffer(index);
var writeAddress = GetWriteAddress(index);
Debug.Assert(writeAddress >= HeaderSize);

var startPos = writeAddress + LogEntryMetadata.Size;
if (entry.Length is { } length)
{
Expand Down Expand Up @@ -534,10 +534,13 @@ protected override async ValueTask WriteThroughAsync(CachedLogEntry entry, int i
{
Debug.Assert(writer.HasBufferedData is false);

var writeAddress = GetWriteAddress(index);
await UnsealIfNeededAsync(writeAddress, token).ConfigureAwait(false);
await UnsealIfNeededAsync(token).ConfigureAwait(false);

var writeAddress = GetWriteAddress(index);
Debug.Assert(writeAddress >= HeaderSize);

var startPos = writeAddress + LogEntryMetadata.Size;

var metadata = LogEntryMetadata.Create(entry, startPos, entry.Length);
var metadataBuffer = GetMetadataBuffer(index);
metadata.Format(metadataBuffer.Span);
Expand All @@ -550,58 +553,23 @@ protected override async ValueTask WriteThroughAsync(CachedLogEntry entry, int i
writer.FilePosition = metadata.End;
}

private ValueTask UnsealIfNeededAsync(long truncatePosition, CancellationToken token)
{
ValueTask task;
if (IsSealed)
{
task = UnsealAsync(truncatePosition, token);
}
else if (token.IsCancellationRequested)
{
task = ValueTask.FromCanceled(token);
}
else if (truncatePosition < writer.FilePosition)
{
task = new();
try
{
// The caller is trying to rewrite the log entry.
// For a correctness of Initialize() method for unsealed partitions, we
// need to adjust file size. This is expensive syscall which can lead to file fragmentation.
// However, this is acceptable because rare.
RandomAccess.SetLength(Handle, truncatePosition);
}
catch (Exception e)
{
task = ValueTask.FromException(e);
}
}
else
{
task = new();
}

return task;
}
private ValueTask UnsealIfNeededAsync(CancellationToken token)
=> IsSealed ? UnsealAsync(token) : ValueTask.CompletedTask;

private async ValueTask UnsealAsync(long truncatePosition, CancellationToken token)
private async ValueTask UnsealAsync(CancellationToken token)
{
// This is expensive operation in terms of I/O. However, it is needed only when
// the consumer decided to rewrite the existing log entry, which is rare.
IsSealed = false;
await WriteHeaderAsync(token).ConfigureAwait(false);
RandomAccess.FlushToDisk(Handle);

// destroy all entries in the tail of partition
RandomAccess.SetLength(Handle, truncatePosition);
}

public override ValueTask FlushAsync(CancellationToken token = default)
{
return IsSealed
? ValueTask.CompletedTask
: runningIndex == LastIndex
: runningIndex >= LastIndex
? FlushAndSealAsync(token)
: base.FlushAsync(token);
}
Expand Down Expand Up @@ -694,7 +662,7 @@ internal LegacyPartition(DirectoryInfo location, int bufferSize, int recordsPerP
writeAddress = fileOffset;
}

internal override void Initialize()
internal void Initialize()
{
using var handle = File.OpenHandle(FileName, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, FileOptions.SequentialScan);
if (RandomAccess.Read(handle, metadata.Span, 0L) < fileOffset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ private protected PersistentState(DirectoryInfo path, int recordsPerPartition, O
{
MeasurementTags = configuration.MeasurementTags,
};

state = new(path, bufferManager.BufferAllocator, configuration.IntegrityCheck, writeMode is not WriteMode.NoFlush);

var partitionTable = new SortedSet<Partition>(Comparer<Partition>.Create(ComparePartitions));

Expand All @@ -73,38 +75,39 @@ private protected PersistentState(DirectoryInfo path, int recordsPerPartition, O
{
case > 0L:
CreateSparsePartitions(
partitionTable,
path,
bufferSize,
recordsPerPartition,
in bufferManager,
concurrentReads,
writeMode,
initialSize,
maxLogEntrySize);
partitionTable,
path,
bufferSize,
recordsPerPartition,
in bufferManager,
concurrentReads,
writeMode,
initialSize,
maxLogEntrySize);
break;
case 0L:
CreateTables(
partitionTable,
path,
bufferSize,
recordsPerPartition,
in bufferManager,
concurrentReads,
writeMode,
initialSize);
partitionTable,
path,
bufferSize,
recordsPerPartition,
in bufferManager,
concurrentReads,
writeMode,
initialSize,
state.LastIndex);
break;
case < 0L:
#pragma warning disable CS0618,CS0612
CreateLegacyPartitions(
partitionTable,
path,
bufferSize,
recordsPerPartition,
in bufferManager,
concurrentReads,
writeMode,
initialSize);
partitionTable,
path,
bufferSize,
recordsPerPartition,
in bufferManager,
concurrentReads,
writeMode,
initialSize);
break;
#pragma warning restore CS0618,CS0612
}
Expand All @@ -126,19 +129,18 @@ private protected PersistentState(DirectoryInfo path, int recordsPerPartition, O
}

partitionTable.Clear();
state = new(path, bufferManager.BufferAllocator, configuration.IntegrityCheck, writeMode is not WriteMode.NoFlush);
measurementTags = configuration.MeasurementTags;

static int ComparePartitions(Partition x, Partition y) => x.PartitionNumber.CompareTo(y.PartitionNumber);

static void CreateTables(SortedSet<Partition> partitionTable, DirectoryInfo path, int bufferSize, int recordsPerPartition, in BufferManager manager, int concurrentReads, WriteMode writeMode, long initialSize)
static void CreateTables(SortedSet<Partition> partitionTable, DirectoryInfo path, int bufferSize, int recordsPerPartition, in BufferManager manager, int concurrentReads, WriteMode writeMode, long initialSize, long lastIndex)
{
foreach (var file in path.EnumerateFiles())
{
if (long.TryParse(file.Name, out var partitionNumber))
{
var partition = new Table(file.Directory!, bufferSize, recordsPerPartition, partitionNumber, in manager, concurrentReads, writeMode, initialSize);
partition.Initialize();
partition.Initialize(lastIndex);
partitionTable.Add(partition);
}
}
Expand All @@ -151,7 +153,6 @@ static void CreateSparsePartitions(SortedSet<Partition> partitionTable, Director
if (long.TryParse(file.Name, out var partitionNumber))
{
var partition = new SparsePartition(file.Directory!, bufferSize, recordsPerPartition, partitionNumber, in manager, concurrentReads, writeMode, initialSize, maxLogEntrySize);
partition.Initialize();
partitionTable.Add(partition);
}
}
Expand Down Expand Up @@ -463,6 +464,7 @@ private async ValueTask AppendCachedAsync<TEntry>(ILogEntryProducer<TEntry> supp
}

state.LastIndex = startIndex - 1L;
await state.FlushAsync(NodeState.IndexesRange, token).ConfigureAwait(false);
}

private async Task AppendUncachedAsync<TEntry>(ILogEntryProducer<TEntry> supplier, long startIndex, bool skipCommitted, CancellationToken token)
Expand Down Expand Up @@ -491,6 +493,7 @@ private async Task AppendUncachedAsync<TEntry>(ILogEntryProducer<TEntry> supplie
}

state.LastIndex = startIndex - 1L;
await state.FlushAsync(NodeState.IndexesRange, token).ConfigureAwait(false);
}

/// <inheritdoc/>
Expand Down

0 comments on commit 442bc7b

Please sign in to comment.