Skip to content

Commit

Permalink
- Set both Sealed and Invalid on new records to ensure consistency bo…
Browse files Browse the repository at this point in the history
…th during normal operations and after recovery, which clears the Sealed bit. (#864)

- Seal ConcurrentUpdater and InPlaceUpdater source records on successful RCU even when not doing Standard locking
  • Loading branch information
TedHartMS authored Aug 17, 2023
1 parent b15e1cd commit 60e162f
Show file tree
Hide file tree
Showing 10 changed files with 115 additions and 37 deletions.
22 changes: 18 additions & 4 deletions cs/src/core/Index/Common/RecordInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,14 @@ public struct RecordInfo

public void WriteInfo(bool inNewVersion, bool tombstone, long previousAddress)
{
// For Recovery reasons, we need to have the record both Sealed and Invalid:
// - Recovery removes the Sealed bit, so we need Invalid to survive from this point on to successful CAS.
// Otherwise, Scan could return partial records (e.g. a checkpoint was taken that flushed midway through the record update).
// - Revivification sets Sealed; we need to preserve it here.
// We'll clear both on successful CAS.
this.word = default;
this.Tombstone = tombstone;
this.SetValid();
this.SealAndInvalidate();
this.PreviousAddress = previousAddress;
this.IsInNewVersion = inNewVersion;
}
Expand Down Expand Up @@ -112,7 +117,9 @@ public void UnlockExclusive()
{
Debug.Assert(!IsLockedShared, "Trying to X unlock an S locked record");
Debug.Assert(IsLockedExclusive, "Trying to X unlock an unlocked record");
Debug.Assert(!IsSealed, "Trying to X unlock a Sealed record");

// Because we seal the source of an RCU and that source is likely locked, we cannot assert !IsSealed.
// Debug.Assert(!IsSealed, "Trying to X unlock a Sealed record");
word &= ~kExclusiveLockBitMask; // Safe because there should be no other threads (e.g., readers) updating the word at this point
}

Expand All @@ -123,8 +130,13 @@ public void UnlockExclusive()
public void UnlockExclusiveAndSeal()
{
Debug.Assert(!IsLockedShared, "Trying to X unlock an S locked record");
Debug.Assert(IsLockedExclusive, "Trying to X unlock an unlocked record");
Debug.Assert(!IsSealed, "Trying to X unlock a Sealed record");

// Because we seal the source of an RCU and that source is likely locked, we cannot assert !IsSealed.
// Debug.Assert(!IsSealed, "Trying to X unlock a Sealed record");

// For this we are Unlocking and Sealing without the cost of an "if EphemeralLocking", so do not assert this.
// Debug.Assert(IsLockedExclusive, "Trying to X unlock an unlocked record");

word = (word & ~kExclusiveLockBitMask) | kSealedBitMask; // Safe because there should be no other threads (e.g., readers) updating the word at this point
}

Expand Down Expand Up @@ -322,6 +334,8 @@ public bool IsInNewVersion
public void SetTombstone() => word |= kTombstoneBitMask;
public void SetValid() => word |= kValidBitMask;
public void SetInvalid() => word &= ~(kValidBitMask | kExclusiveLockBitMask);
public void SealAndInvalidate() => word &= (word & ~kValidBitMask) | kSealedBitMask;
public void UnsealAndValidate() => word = (word & ~kSealedBitMask) | kValidBitMask;

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void SetInvalidAtomic()
Expand Down
24 changes: 21 additions & 3 deletions cs/src/core/Index/FASTER/Implementation/ConditionalCopyToTail.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,22 @@ namespace FASTER.core
{
public unsafe partial class FasterKV<Key, Value> : FasterBase, IFasterKV<Key, Value>
{
/// <summary>
/// Copy a record to the tail of the log after caller has verifyied it does not exist within a specified range.
/// </summary>
/// <param name="fasterSession">Callback functions.</param>
/// <param name="pendingContext">pending context created when the operation goes pending.</param>
/// <param name="key">key of the record.</param>
/// <param name="input">input passed through.</param>
/// <param name="value">the value to insert</param>
/// <param name="output">Location to store output computed from input and value.</param>
/// <param name="userContext">user context corresponding to operation used during completion callback.</param>
/// <param name="lsn">Operation serial number</param>
/// <param name="stackCtx">Contains information about the call context, record metadata, and so on</param>
/// <param name="writeReason">The reason the CopyToTail is being done</param>
/// <param name="wantIO">Whether to do IO if the search must go below HeadAddress. ReadFromImmutable, for example,
/// is just an optimization to avoid future IOs, so if we need an IO here we just defer them to the next Read().</param>
/// <returns></returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private OperationStatus ConditionalCopyToTail<Input, Output, Context, FasterSession>(FasterSession fasterSession,
ref PendingContext<Input, Output, Context> pendingContext,
Expand All @@ -17,9 +33,11 @@ private OperationStatus ConditionalCopyToTail<Input, Output, Context, FasterSess
{
bool callerHasLock = stackCtx.recSrc.HasTransientLock;

// We are called by one of ReadFromImmutable, CompactionConditionalCopyToTail, or ContinueConditionalCopyToTail, and stackCtx is set up for the first try.
// minAddress is the stackCtx.recSrc.LatestLogicalAddress; by the time we get here, any IO below that has been done due to PrepareConditionalCopyToTailIO,
// which then went to ContinueConditionalCopyToTail, which evaluated whether the record was found at that level.
// We are called by one of ReadFromImmutable, CompactionConditionalCopyToTail, or ContinuePendingConditionalCopyToTail;
// these have already searched to see if the record is present above minAddress, and stackCtx is set up for the first try.
// minAddress is the stackCtx.recSrc.LatestLogicalAddress; by the time we get here, any IO below that has been done due to
// PrepareConditionalCopyToTailIO, which then went to ContinuePendingConditionalCopyToTail, which evaluated whether the
// record was found at that level.
while (true)
{
// ConditionalCopyToTail is different in regard to locking from the usual procedures, in that if we find a source record we don't lock--we exit with success.
Expand Down
5 changes: 4 additions & 1 deletion cs/src/core/Index/FASTER/Implementation/Helpers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,12 @@ private bool CASRecordIntoChain(ref Key key, ref OperationStackContext<Key, Valu
if (DoEphemeralLocking)
newRecordInfo.InitializeLockExclusive();

return stackCtx.recSrc.LowestReadCachePhysicalAddress == Constants.kInvalidAddress
var result = stackCtx.recSrc.LowestReadCachePhysicalAddress == Constants.kInvalidAddress
? stackCtx.hei.TryCAS(newLogicalAddress)
: SpliceIntoHashChainAtReadCacheBoundary(ref key, ref stackCtx, newLogicalAddress);
if (result)
newRecordInfo.UnsealAndValidate();
return result;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand Down
4 changes: 3 additions & 1 deletion cs/src/core/Index/FASTER/Implementation/InternalRMW.cs
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,9 @@ private OperationStatus CreateNewRecordRMW<Input, Output, Context, FasterSession
{
// Else it was a CopyUpdater so call PCU
fasterSession.PostCopyUpdater(ref key, ref input, ref value, ref hlog.GetValue(newPhysicalAddress), ref output, ref newRecordInfo, ref rmwInfo);
if (stackCtx.recSrc.ephemeralLockResult == EphemeralLockResult.HoldForSeal)

// Success should always Seal the old record if it's in mutable.
if (stackCtx.recSrc.HasMainLogSrc && stackCtx.recSrc.LogicalAddress >= hlog.ReadOnlyAddress)
srcRecordInfo.UnlockExclusiveAndSeal();
}
stackCtx.ClearNewRecord();
Expand Down
5 changes: 5 additions & 0 deletions cs/src/core/Index/FASTER/Implementation/InternalRead.cs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ private OperationStatus ReadFromMutableRegion<Input, Output, Context, FasterSess

try
{
if (srcRecordInfo.IsClosed && !useStartAddress)
return OperationStatus.RETRY_LATER;
if (srcRecordInfo.Tombstone)
return OperationStatus.NOTFOUND;

Expand Down Expand Up @@ -277,8 +279,11 @@ private OperationStatus ReadFromImmutableRegion<Input, Output, Context, FasterSe

try
{
if (srcRecordInfo.IsClosed && !useStartAddress)
return OperationStatus.RETRY_LATER;
if (srcRecordInfo.Tombstone)
return OperationStatus.NOTFOUND;

ref Value recordValue = ref stackCtx.recSrc.GetValue();

if (fasterSession.SingleReader(ref key, ref input, ref recordValue, ref output, ref srcRecordInfo, ref readInfo))
Expand Down
5 changes: 4 additions & 1 deletion cs/src/core/Index/FASTER/Implementation/InternalUpsert.cs
Original file line number Diff line number Diff line change
Expand Up @@ -346,8 +346,11 @@ private OperationStatus CreateNewRecordUpsert<Input, Output, Context, FasterSess
PostCopyToTail(ref key, ref stackCtx, ref srcRecordInfo);

fasterSession.PostSingleWriter(ref key, ref input, ref value, ref newValue, ref output, ref newRecordInfo, ref upsertInfo, WriteReason.Upsert);
if (stackCtx.recSrc.ephemeralLockResult == EphemeralLockResult.HoldForSeal)

// Success should always Seal the old record if it's in mutable.
if (stackCtx.recSrc.HasMainLogSrc && stackCtx.recSrc.LogicalAddress >= hlog.ReadOnlyAddress)
srcRecordInfo.UnlockExclusiveAndSeal();

stackCtx.ClearNewRecord();
pendingContext.recordInfo = newRecordInfo;
pendingContext.logicalAddress = newLogicalAddress;
Expand Down
4 changes: 2 additions & 2 deletions cs/src/core/Index/FASTER/Implementation/TryCopyToReadCache.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

using System.Threading;

namespace FASTER.core
{
public unsafe partial class FasterKV<Key, Value> : FasterBase, IFasterKV<Key, Value>
Expand Down Expand Up @@ -58,6 +56,8 @@ internal bool TryCopyToReadCache<Input, Output, Context, FasterSession>(FasterSe

// ReadCache entries are CAS'd in as the first entry in the hash chain.
var success = stackCtx.hei.TryCAS(newLogicalAddress | Constants.kReadCacheBitMask);
if (success)
newRecordInfo.UnsealAndValidate();
var casSuccess = success;

if (success && stackCtx.recSrc.LowestReadCacheLogicalAddress != Constants.kInvalidAddress)
Expand Down
3 changes: 3 additions & 0 deletions cs/src/core/Index/FASTER/Implementation/TryCopyToTail.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,10 @@ internal OperationStatus TryCopyToTail<Input, Output, Context, FasterSession>(re
}

if (success)
{
newRecordInfo.UnsealAndValidate();
PostCopyToTail(ref key, ref stackCtx, ref srcRecordInfo, pendingContext.InitialEntryAddress);
}
else
{
stackCtx.SetNewRecordInvalid(ref newRecordInfo);
Expand Down
72 changes: 55 additions & 17 deletions cs/test/MiscFASTERTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using FASTER.core;
using NUnit.Framework;
using System;
using static FASTER.test.TestUtils;

namespace FASTER.test
Expand Down Expand Up @@ -102,18 +103,25 @@ public void MixedTest2()

[Test]
[Category("FasterKV")]
public void ShouldCreateNewRecordIfConcurrentWriterReturnsFalse()
public void ForceRCUAndRecover([Values(UpdateOp.Upsert, UpdateOp.Delete)] UpdateOp updateOp)
{
var copyOnWrite = new FunctionsCopyOnWrite();

// FunctionsCopyOnWrite
var log = default(IDevice);
FasterKV<KeyStruct, ValueStruct> fht = default;
ClientSession<KeyStruct, ValueStruct, InputStruct, OutputStruct, Empty, IFunctions<KeyStruct, ValueStruct, InputStruct, OutputStruct, Empty>> session = default;

try
{
log = Devices.CreateLogDevice(TestUtils.MethodTestDir + "/hlog1.log", deleteOnClose: true);
using var fht = new FasterKV<KeyStruct, ValueStruct>
(128, new LogSettings { LogDevice = log, MemorySizeBits = 29 }, lockingMode: LockingMode.None);
using var session = fht.NewSession(copyOnWrite);
var checkpointDir = MethodTestDir + $"/checkpoints";
log = Devices.CreateLogDevice(MethodTestDir + "/hlog1.log", deleteOnClose: true);
fht = new FasterKV<KeyStruct, ValueStruct>
(128, new LogSettings { LogDevice = log, MemorySizeBits = 29 },
checkpointSettings: new CheckpointSettings { CheckpointDir = checkpointDir },
lockingMode: LockingMode.None);

session = fht.NewSession(copyOnWrite);

var key = default(KeyStruct);
var value = default(ValueStruct);
Expand All @@ -126,28 +134,58 @@ public void ShouldCreateNewRecordIfConcurrentWriterReturnsFalse()
var status = session.Upsert(ref key, ref input, ref value, ref output, out RecordMetadata recordMetadata1);
Assert.IsTrue(!status.Found && status.Record.Created, status.ToString());

// ConcurrentWriter returns false, so we create a new record.
// ConcurrentWriter and InPlaceUpater return false, so we create a new record.
RecordMetadata recordMetadata2;
value = new ValueStruct() { vfield1 = 1001, vfield2 = 2002 };
status = session.Upsert(ref key, ref input, ref value, ref output, out RecordMetadata recordMetadata2);
Assert.IsTrue(!status.Found && status.Record.Created, status.ToString());

if (updateOp == UpdateOp.Upsert)
{
status = session.Upsert(ref key, ref input, ref value, ref output, out recordMetadata2);
Assert.AreEqual(1, copyOnWrite.ConcurrentWriterCallCount);
Assert.IsTrue(!status.Found && status.Record.Created, status.ToString());
}
else
{
status = session.RMW(ref key, ref input, ref output, out recordMetadata2);
Assert.AreEqual(1, copyOnWrite.InPlaceUpdaterCallCount);
Assert.IsTrue(status.Found && status.Record.CopyUpdated, status.ToString());
}
Assert.Greater(recordMetadata2.Address, recordMetadata1.Address);

var recordCount = 0;
using (var iterator = fht.Log.Scan(fht.Log.BeginAddress, fht.Log.TailAddress))
{
// We should get both the old and the new records.
while (iterator.GetNext(out var info))
recordCount++;
Assert.True(iterator.GetNext(out var info)); // We should only get the new record...
Assert.False(iterator.GetNext(out info)); // ... the old record was Sealed.
}
status = session.Read(ref key, ref output);
Assert.IsTrue(status.Found, status.ToString());

fht.TryInitiateFullCheckpoint(out Guid token, CheckpointType.Snapshot);
fht.CompleteCheckpointAsync().AsTask().GetAwaiter().GetResult();

Assert.AreEqual(1, copyOnWrite.ConcurrentWriterCallCount);
Assert.AreEqual(2, recordCount);
session.Dispose();
fht.Dispose();

fht = new FasterKV<KeyStruct, ValueStruct>
(128, new LogSettings { LogDevice = log, MemorySizeBits = 29 },
checkpointSettings: new CheckpointSettings { CheckpointDir = checkpointDir },
lockingMode: LockingMode.None);

fht.Recover(token);
session = fht.NewSession(copyOnWrite);

using (var iterator = fht.Log.Scan(fht.Log.BeginAddress, fht.Log.TailAddress))
{
Assert.True(iterator.GetNext(out var info)); // We should get both records...
Assert.True(iterator.GetNext(out info)); // ... the old record was Unsealed by Recovery.
}
status = session.Read(ref key, ref output);
Assert.IsTrue(status.Found, status.ToString());
}
finally
{
if (log != null)
log.Dispose();
session?.Dispose();
fht?.Dispose();
log?.Dispose();
}
}
}
Expand Down
8 changes: 0 additions & 8 deletions cs/test/StateMachineTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -682,14 +682,6 @@ void createSessions(out SimpleFunctions f,
fht1.CompleteCheckpointAsync().AsTask().GetAwaiter().GetResult();
}


bool tryStartLUC(
ref LockableUnsafeContext<AdId, NumClicks, NumClicks, NumClicks, Empty, SimpleFunctions> luContext,
ClientSession<AdId, NumClicks, NumClicks, NumClicks, Empty, SimpleFunctions> session)
{
luContext = session.LockableUnsafeContext;
return !session.IsInPreparePhase();
}
void RecoverAndTest(IDevice log)
{
NumClicks inputArg = default;
Expand Down

0 comments on commit 60e162f

Please sign in to comment.