From 60e162f0299b42fab3de65b4575dda427e92cad6 Mon Sep 17 00:00:00 2001 From: Ted Hart <15467143+TedHartMS@users.noreply.github.com> Date: Thu, 17 Aug 2023 13:11:16 -0700 Subject: [PATCH] - Set both Sealed and Invalid on new records to ensure consistency both during normal operations and after recovery, which clears the Sealed bit. (#864) - Seal ConcurrentUpdater and InPlaceUpdater source records on successful RCU even when not doing Standard locking --- cs/src/core/Index/Common/RecordInfo.cs | 22 ++++-- .../Implementation/ConditionalCopyToTail.cs | 24 ++++++- .../Index/FASTER/Implementation/Helpers.cs | 5 +- .../FASTER/Implementation/InternalRMW.cs | 4 +- .../FASTER/Implementation/InternalRead.cs | 5 ++ .../FASTER/Implementation/InternalUpsert.cs | 5 +- .../Implementation/TryCopyToReadCache.cs | 4 +- .../FASTER/Implementation/TryCopyToTail.cs | 3 + cs/test/MiscFASTERTests.cs | 72 ++++++++++++++----- cs/test/StateMachineTests.cs | 8 --- 10 files changed, 115 insertions(+), 37 deletions(-) diff --git a/cs/src/core/Index/Common/RecordInfo.cs b/cs/src/core/Index/Common/RecordInfo.cs index 9b2bb89dc..7f5a0df21 100644 --- a/cs/src/core/Index/Common/RecordInfo.cs +++ b/cs/src/core/Index/Common/RecordInfo.cs @@ -66,9 +66,14 @@ public struct RecordInfo public void WriteInfo(bool inNewVersion, bool tombstone, long previousAddress) { + // For Recovery reasons, we need to have the record both Sealed and Invalid: + // - Recovery removes the Sealed bit, so we need Invalid to survive from this point on to successful CAS. + // Otherwise, Scan could return partial records (e.g. a checkpoint was taken that flushed midway through the record update). + // - Revivification sets Sealed; we need to preserve it here. + // We'll clear both on successful CAS. this.word = default; this.Tombstone = tombstone; - this.SetValid(); + this.SealAndInvalidate(); this.PreviousAddress = previousAddress; this.IsInNewVersion = inNewVersion; } @@ -112,7 +117,9 @@ public void UnlockExclusive() { Debug.Assert(!IsLockedShared, "Trying to X unlock an S locked record"); Debug.Assert(IsLockedExclusive, "Trying to X unlock an unlocked record"); - Debug.Assert(!IsSealed, "Trying to X unlock a Sealed record"); + + // Because we seal the source of an RCU and that source is likely locked, we cannot assert !IsSealed. + // Debug.Assert(!IsSealed, "Trying to X unlock a Sealed record"); word &= ~kExclusiveLockBitMask; // Safe because there should be no other threads (e.g., readers) updating the word at this point } @@ -123,8 +130,13 @@ public void UnlockExclusive() public void UnlockExclusiveAndSeal() { Debug.Assert(!IsLockedShared, "Trying to X unlock an S locked record"); - Debug.Assert(IsLockedExclusive, "Trying to X unlock an unlocked record"); - Debug.Assert(!IsSealed, "Trying to X unlock a Sealed record"); + + // Because we seal the source of an RCU and that source is likely locked, we cannot assert !IsSealed. + // Debug.Assert(!IsSealed, "Trying to X unlock a Sealed record"); + + // For this we are Unlocking and Sealing without the cost of an "if EphemeralLocking", so do not assert this. + // Debug.Assert(IsLockedExclusive, "Trying to X unlock an unlocked record"); + word = (word & ~kExclusiveLockBitMask) | kSealedBitMask; // Safe because there should be no other threads (e.g., readers) updating the word at this point } @@ -322,6 +334,8 @@ public bool IsInNewVersion public void SetTombstone() => word |= kTombstoneBitMask; public void SetValid() => word |= kValidBitMask; public void SetInvalid() => word &= ~(kValidBitMask | kExclusiveLockBitMask); + public void SealAndInvalidate() => word &= (word & ~kValidBitMask) | kSealedBitMask; + public void UnsealAndValidate() => word = (word & ~kSealedBitMask) | kValidBitMask; [MethodImpl(MethodImplOptions.AggressiveInlining)] public void SetInvalidAtomic() diff --git a/cs/src/core/Index/FASTER/Implementation/ConditionalCopyToTail.cs b/cs/src/core/Index/FASTER/Implementation/ConditionalCopyToTail.cs index f33166ac3..777728fc9 100644 --- a/cs/src/core/Index/FASTER/Implementation/ConditionalCopyToTail.cs +++ b/cs/src/core/Index/FASTER/Implementation/ConditionalCopyToTail.cs @@ -8,6 +8,22 @@ namespace FASTER.core { public unsafe partial class FasterKV : FasterBase, IFasterKV { + /// + /// Copy a record to the tail of the log after caller has verifyied it does not exist within a specified range. + /// + /// Callback functions. + /// pending context created when the operation goes pending. + /// key of the record. + /// input passed through. + /// the value to insert + /// Location to store output computed from input and value. + /// user context corresponding to operation used during completion callback. + /// Operation serial number + /// Contains information about the call context, record metadata, and so on + /// The reason the CopyToTail is being done + /// Whether to do IO if the search must go below HeadAddress. ReadFromImmutable, for example, + /// is just an optimization to avoid future IOs, so if we need an IO here we just defer them to the next Read(). + /// [MethodImpl(MethodImplOptions.AggressiveInlining)] private OperationStatus ConditionalCopyToTail(FasterSession fasterSession, ref PendingContext pendingContext, @@ -17,9 +33,11 @@ private OperationStatus ConditionalCopyToTail= hlog.ReadOnlyAddress) srcRecordInfo.UnlockExclusiveAndSeal(); } stackCtx.ClearNewRecord(); diff --git a/cs/src/core/Index/FASTER/Implementation/InternalRead.cs b/cs/src/core/Index/FASTER/Implementation/InternalRead.cs index 7dd756923..0bbc063c1 100644 --- a/cs/src/core/Index/FASTER/Implementation/InternalRead.cs +++ b/cs/src/core/Index/FASTER/Implementation/InternalRead.cs @@ -233,6 +233,8 @@ private OperationStatus ReadFromMutableRegion= hlog.ReadOnlyAddress) srcRecordInfo.UnlockExclusiveAndSeal(); + stackCtx.ClearNewRecord(); pendingContext.recordInfo = newRecordInfo; pendingContext.logicalAddress = newLogicalAddress; diff --git a/cs/src/core/Index/FASTER/Implementation/TryCopyToReadCache.cs b/cs/src/core/Index/FASTER/Implementation/TryCopyToReadCache.cs index d1de3a36e..7e343e9e7 100644 --- a/cs/src/core/Index/FASTER/Implementation/TryCopyToReadCache.cs +++ b/cs/src/core/Index/FASTER/Implementation/TryCopyToReadCache.cs @@ -1,8 +1,6 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT license. -using System.Threading; - namespace FASTER.core { public unsafe partial class FasterKV : FasterBase, IFasterKV @@ -58,6 +56,8 @@ internal bool TryCopyToReadCache(FasterSe // ReadCache entries are CAS'd in as the first entry in the hash chain. var success = stackCtx.hei.TryCAS(newLogicalAddress | Constants.kReadCacheBitMask); + if (success) + newRecordInfo.UnsealAndValidate(); var casSuccess = success; if (success && stackCtx.recSrc.LowestReadCacheLogicalAddress != Constants.kInvalidAddress) diff --git a/cs/src/core/Index/FASTER/Implementation/TryCopyToTail.cs b/cs/src/core/Index/FASTER/Implementation/TryCopyToTail.cs index 5f5801d76..b1abc87e2 100644 --- a/cs/src/core/Index/FASTER/Implementation/TryCopyToTail.cs +++ b/cs/src/core/Index/FASTER/Implementation/TryCopyToTail.cs @@ -78,7 +78,10 @@ internal OperationStatus TryCopyToTail(re } if (success) + { + newRecordInfo.UnsealAndValidate(); PostCopyToTail(ref key, ref stackCtx, ref srcRecordInfo, pendingContext.InitialEntryAddress); + } else { stackCtx.SetNewRecordInvalid(ref newRecordInfo); diff --git a/cs/test/MiscFASTERTests.cs b/cs/test/MiscFASTERTests.cs index bc2f4830f..a79e3dcdd 100644 --- a/cs/test/MiscFASTERTests.cs +++ b/cs/test/MiscFASTERTests.cs @@ -3,6 +3,7 @@ using FASTER.core; using NUnit.Framework; +using System; using static FASTER.test.TestUtils; namespace FASTER.test @@ -102,18 +103,25 @@ public void MixedTest2() [Test] [Category("FasterKV")] - public void ShouldCreateNewRecordIfConcurrentWriterReturnsFalse() + public void ForceRCUAndRecover([Values(UpdateOp.Upsert, UpdateOp.Delete)] UpdateOp updateOp) { var copyOnWrite = new FunctionsCopyOnWrite(); // FunctionsCopyOnWrite var log = default(IDevice); + FasterKV fht = default; + ClientSession> session = default; + try { - log = Devices.CreateLogDevice(TestUtils.MethodTestDir + "/hlog1.log", deleteOnClose: true); - using var fht = new FasterKV - (128, new LogSettings { LogDevice = log, MemorySizeBits = 29 }, lockingMode: LockingMode.None); - using var session = fht.NewSession(copyOnWrite); + var checkpointDir = MethodTestDir + $"/checkpoints"; + log = Devices.CreateLogDevice(MethodTestDir + "/hlog1.log", deleteOnClose: true); + fht = new FasterKV + (128, new LogSettings { LogDevice = log, MemorySizeBits = 29 }, + checkpointSettings: new CheckpointSettings { CheckpointDir = checkpointDir }, + lockingMode: LockingMode.None); + + session = fht.NewSession(copyOnWrite); var key = default(KeyStruct); var value = default(ValueStruct); @@ -126,28 +134,58 @@ public void ShouldCreateNewRecordIfConcurrentWriterReturnsFalse() var status = session.Upsert(ref key, ref input, ref value, ref output, out RecordMetadata recordMetadata1); Assert.IsTrue(!status.Found && status.Record.Created, status.ToString()); - // ConcurrentWriter returns false, so we create a new record. + // ConcurrentWriter and InPlaceUpater return false, so we create a new record. + RecordMetadata recordMetadata2; value = new ValueStruct() { vfield1 = 1001, vfield2 = 2002 }; - status = session.Upsert(ref key, ref input, ref value, ref output, out RecordMetadata recordMetadata2); - Assert.IsTrue(!status.Found && status.Record.Created, status.ToString()); - + if (updateOp == UpdateOp.Upsert) + { + status = session.Upsert(ref key, ref input, ref value, ref output, out recordMetadata2); + Assert.AreEqual(1, copyOnWrite.ConcurrentWriterCallCount); + Assert.IsTrue(!status.Found && status.Record.Created, status.ToString()); + } + else + { + status = session.RMW(ref key, ref input, ref output, out recordMetadata2); + Assert.AreEqual(1, copyOnWrite.InPlaceUpdaterCallCount); + Assert.IsTrue(status.Found && status.Record.CopyUpdated, status.ToString()); + } Assert.Greater(recordMetadata2.Address, recordMetadata1.Address); - var recordCount = 0; using (var iterator = fht.Log.Scan(fht.Log.BeginAddress, fht.Log.TailAddress)) { - // We should get both the old and the new records. - while (iterator.GetNext(out var info)) - recordCount++; + Assert.True(iterator.GetNext(out var info)); // We should only get the new record... + Assert.False(iterator.GetNext(out info)); // ... the old record was Sealed. } + status = session.Read(ref key, ref output); + Assert.IsTrue(status.Found, status.ToString()); + + fht.TryInitiateFullCheckpoint(out Guid token, CheckpointType.Snapshot); + fht.CompleteCheckpointAsync().AsTask().GetAwaiter().GetResult(); - Assert.AreEqual(1, copyOnWrite.ConcurrentWriterCallCount); - Assert.AreEqual(2, recordCount); + session.Dispose(); + fht.Dispose(); + + fht = new FasterKV + (128, new LogSettings { LogDevice = log, MemorySizeBits = 29 }, + checkpointSettings: new CheckpointSettings { CheckpointDir = checkpointDir }, + lockingMode: LockingMode.None); + + fht.Recover(token); + session = fht.NewSession(copyOnWrite); + + using (var iterator = fht.Log.Scan(fht.Log.BeginAddress, fht.Log.TailAddress)) + { + Assert.True(iterator.GetNext(out var info)); // We should get both records... + Assert.True(iterator.GetNext(out info)); // ... the old record was Unsealed by Recovery. + } + status = session.Read(ref key, ref output); + Assert.IsTrue(status.Found, status.ToString()); } finally { - if (log != null) - log.Dispose(); + session?.Dispose(); + fht?.Dispose(); + log?.Dispose(); } } } diff --git a/cs/test/StateMachineTests.cs b/cs/test/StateMachineTests.cs index 3e2737b8e..4bbae5f45 100644 --- a/cs/test/StateMachineTests.cs +++ b/cs/test/StateMachineTests.cs @@ -682,14 +682,6 @@ void createSessions(out SimpleFunctions f, fht1.CompleteCheckpointAsync().AsTask().GetAwaiter().GetResult(); } - - bool tryStartLUC( - ref LockableUnsafeContext luContext, - ClientSession session) - { - luContext = session.LockableUnsafeContext; - return !session.IsInPreparePhase(); - } void RecoverAndTest(IDevice log) { NumClicks inputArg = default;