Skip to content

Commit

Permalink
More cleanup of no-longer-needed code (microsoft#423)
Browse files Browse the repository at this point in the history
* More cleanup of unused code: fields in PendingContext, LatchDestination.CreatePendingContext

* WIP removing SerialNo, ResumeSession, etc.

* A few fixes for serialNo removal from API

* Finish removal of session recovery and related cleanup

* Fix loading of earlier Checkpoints

* 'format' fixes
  • Loading branch information
TedHartMS authored and chyin6 committed Jul 2, 2024
1 parent 19f7727 commit c4db89d
Show file tree
Hide file tree
Showing 78 changed files with 1,113 additions and 2,404 deletions.
5 changes: 0 additions & 5 deletions libs/server/Storage/Functions/MainStore/CallbackMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,5 @@ public void ReadCompletionCallback(ref SpanByte key, ref SpanByte input, ref Spa
public void RMWCompletionCallback(ref SpanByte key, ref SpanByte input, ref SpanByteAndMemory output, long ctx, Status status, RecordMetadata recordMetadata)
{
}

/// <inheritdoc />
public void CheckpointCompletionCallback(int sessionID, string sessionName, CommitPoint commitPoint)
{
}
}
}
5 changes: 0 additions & 5 deletions libs/server/Storage/Functions/ObjectStore/CallbackMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,5 @@ public void ReadCompletionCallback(ref byte[] key, ref SpanByte input, ref Garne
public void RMWCompletionCallback(ref byte[] key, ref SpanByte input, ref GarnetObjectStoreOutput output, long ctx, Status status, RecordMetadata recordMetadata)
{
}

/// <inheritdoc />
public void CheckpointCompletionCallback(int sessionID, string sessionName, CommitPoint commitPoint)
{
}
}
}
2 changes: 1 addition & 1 deletion libs/server/Storage/Session/MainStore/MainStoreOps.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public unsafe GarnetStatus ReadWithUnsafeContext<TContext>(ArgSlice key, ref Spa
long ctx = default;

epochChanged = false;
var status = context.Read(ref _key, ref Unsafe.AsRef(in input), ref output, ctx, 0);
var status = context.Read(ref _key, ref Unsafe.AsRef(in input), ref output, ctx);

if (status.IsPending)
{
Expand Down
20 changes: 10 additions & 10 deletions libs/storage/Tsavorite/cs/benchmark/FixedLenYcsbBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -166,23 +166,23 @@ private void RunYcsbUnsafeContext(int thread_idx)
int r = (int)rng.Generate(100); // rng.Next() is not inclusive of the upper bound so this will be <= 99
if (r < readPercent)
{
uContext.Read(ref txn_keys_[idx], ref input, ref output, Empty.Default, 1);
uContext.Read(ref txn_keys_[idx], ref input, ref output, Empty.Default);
++reads_done;
continue;
}
if (r < upsertPercent)
{
uContext.Upsert(ref txn_keys_[idx], ref value, Empty.Default, 1);
uContext.Upsert(ref txn_keys_[idx], ref value, Empty.Default);
++writes_done;
continue;
}
if (r < rmwPercent)
{
uContext.RMW(ref txn_keys_[idx], ref input_[idx & 0x7], Empty.Default, 1);
uContext.RMW(ref txn_keys_[idx], ref input_[idx & 0x7], Empty.Default);
++writes_done;
continue;
}
uContext.Delete(ref txn_keys_[idx], Empty.Default, 1);
uContext.Delete(ref txn_keys_[idx], Empty.Default);
++deletes_done;
}

Expand Down Expand Up @@ -265,23 +265,23 @@ private void RunYcsbSafeContext(int thread_idx)
int r = (int)rng.Generate(100); // rng.Next() is not inclusive of the upper bound so this will be <= 99
if (r < readPercent)
{
session.Read(ref txn_keys_[idx], ref input, ref output, Empty.Default, 1);
session.Read(ref txn_keys_[idx], ref input, ref output, Empty.Default);
++reads_done;
continue;
}
if (r < upsertPercent)
{
session.Upsert(ref txn_keys_[idx], ref value, Empty.Default, 1);
session.Upsert(ref txn_keys_[idx], ref value, Empty.Default);
++writes_done;
continue;
}
if (r < rmwPercent)
{
session.RMW(ref txn_keys_[idx], ref input_[idx & 0x7], Empty.Default, 1);
session.RMW(ref txn_keys_[idx], ref input_[idx & 0x7], Empty.Default);
++writes_done;
continue;
}
session.Delete(ref txn_keys_[idx], Empty.Default, 1);
session.Delete(ref txn_keys_[idx], Empty.Default);
++deletes_done;
}
}
Expand Down Expand Up @@ -465,7 +465,7 @@ private void SetupYcsbUnsafeContext(int thread_idx)
}
}

uContext.Upsert(ref init_keys_[idx], ref value, Empty.Default, 1);
uContext.Upsert(ref init_keys_[idx], ref value, Empty.Default);
}
#if DASHBOARD
count += (int)kChunkSize;
Expand Down Expand Up @@ -522,7 +522,7 @@ private void SetupYcsbSafeContext(int thread_idx)
}
}

session.Upsert(ref init_keys_[idx], ref value, Empty.Default, 1);
session.Upsert(ref init_keys_[idx], ref value, Empty.Default);
}
}

Expand Down
6 changes: 0 additions & 6 deletions libs/storage/Tsavorite/cs/benchmark/Functions.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using System.Diagnostics;
using System.Runtime.CompilerServices;
using Tsavorite.core;

Expand All @@ -17,11 +16,6 @@ public void ReadCompletionCallback(ref Key key, ref Input input, ref Output outp
{
}

public void CheckpointCompletionCallback(int sessionID, string sessionName, CommitPoint commitPoint)
{
Debug.WriteLine($"Session {sessionID} ({(sessionName ?? "null")}) reports persistence until {commitPoint.UntilSerialNo}");
}

// Read functions
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool SingleReader(ref Key key, ref Input input, ref Value value, ref Output dst, ref ReadInfo readInfo)
Expand Down
20 changes: 10 additions & 10 deletions libs/storage/Tsavorite/cs/benchmark/SpanByteYcsbBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -179,23 +179,23 @@ private void RunYcsbUnsafeContext(int thread_idx)
int r = (int)rng.Generate(100); // rng.Next() is not inclusive of the upper bound so this will be <= 99
if (r < readPercent)
{
uContext.Read(ref SpanByte.Reinterpret(ref txn_keys_[idx]), ref _input, ref _output, Empty.Default, 1);
uContext.Read(ref SpanByte.Reinterpret(ref txn_keys_[idx]), ref _input, ref _output, Empty.Default);
++reads_done;
continue;
}
if (r < upsertPercent)
{
uContext.Upsert(ref SpanByte.Reinterpret(ref txn_keys_[idx]), ref _value, Empty.Default, 1);
uContext.Upsert(ref SpanByte.Reinterpret(ref txn_keys_[idx]), ref _value, Empty.Default);
++writes_done;
continue;
}
if (r < rmwPercent)
{
uContext.RMW(ref SpanByte.Reinterpret(ref txn_keys_[idx]), ref _input, Empty.Default, 1);
uContext.RMW(ref SpanByte.Reinterpret(ref txn_keys_[idx]), ref _input, Empty.Default);
++writes_done;
continue;
}
uContext.Delete(ref SpanByte.Reinterpret(ref txn_keys_[idx]), Empty.Default, 1);
uContext.Delete(ref SpanByte.Reinterpret(ref txn_keys_[idx]), Empty.Default);
++deletes_done;
}

Expand Down Expand Up @@ -293,23 +293,23 @@ private void RunYcsbSafeContext(int thread_idx)
int r = (int)rng.Generate(100); // rng.Next() is not inclusive of the upper bound so this will be <= 99
if (r < readPercent)
{
session.Read(ref SpanByte.Reinterpret(ref txn_keys_[idx]), ref _input, ref _output, Empty.Default, 1);
session.Read(ref SpanByte.Reinterpret(ref txn_keys_[idx]), ref _input, ref _output, Empty.Default);
++reads_done;
continue;
}
if (r < upsertPercent)
{
session.Upsert(ref SpanByte.Reinterpret(ref txn_keys_[idx]), ref _value, Empty.Default, 1);
session.Upsert(ref SpanByte.Reinterpret(ref txn_keys_[idx]), ref _value, Empty.Default);
++writes_done;
continue;
}
if (r < rmwPercent)
{
session.RMW(ref SpanByte.Reinterpret(ref txn_keys_[idx]), ref _input, Empty.Default, 1);
session.RMW(ref SpanByte.Reinterpret(ref txn_keys_[idx]), ref _input, Empty.Default);
++writes_done;
continue;
}
session.Delete(ref SpanByte.Reinterpret(ref txn_keys_[idx]), Empty.Default, 1);
session.Delete(ref SpanByte.Reinterpret(ref txn_keys_[idx]), Empty.Default);
++deletes_done;
}

Expand Down Expand Up @@ -514,7 +514,7 @@ private void SetupYcsbUnsafeContext(int thread_idx)
}
}

uContext.Upsert(ref SpanByte.Reinterpret(ref init_keys_[idx]), ref _value, Empty.Default, 1);
uContext.Upsert(ref SpanByte.Reinterpret(ref init_keys_[idx]), ref _value, Empty.Default);
}
#if DASHBOARD
count += (int)kChunkSize;
Expand Down Expand Up @@ -572,7 +572,7 @@ private void SetupYcsbSafeContext(int thread_idx)
}
}

session.Upsert(ref SpanByte.Reinterpret(ref init_keys_[idx]), ref _value, Empty.Default, 1);
session.Upsert(ref SpanByte.Reinterpret(ref init_keys_[idx]), ref _value, Empty.Default);
}
}

Expand Down
8 changes: 3 additions & 5 deletions libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorScan.cs
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ internal Status ConditionalScanPush<Input, Output, Context, TsavoriteSession>(Ts
if (needIO)
{
// A more recent version of the key was not (yet) found and we need another IO to continue searching.
internalStatus = PrepareIOForConditionalScan(tsavoriteSession, ref pendingContext, ref key, ref input, ref value, ref output, default, 0L,
internalStatus = PrepareIOForConditionalScan(tsavoriteSession, ref pendingContext, ref key, ref input, ref value, ref output, default,
ref stackCtx, minAddress, scanCursorState);
}
else
Expand Down Expand Up @@ -308,13 +308,13 @@ internal Status ConditionalScanPush<Input, Output, Context, TsavoriteSession>(Ts
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static OperationStatus PrepareIOForConditionalScan<Input, Output, Context, TsavoriteSession>(TsavoriteSession tsavoriteSession,
ref TsavoriteKV<Key, Value>.PendingContext<Input, Output, Context> pendingContext,
ref Key key, ref Input input, ref Value value, ref Output output, Context userContext, long lsn,
ref Key key, ref Input input, ref Value value, ref Output output, Context userContext,
ref OperationStackContext<Key, Value> stackCtx, long minAddress, ScanCursorState<Key, Value> scanCursorState)
where TsavoriteSession : ITsavoriteSession<Key, Value, Input, Output, Context>
{
// WriteReason is not surfaced for this operation, so pick anything.
var status = tsavoriteSession.Store.PrepareIOForConditionalOperation(tsavoriteSession, ref pendingContext, ref key, ref input, ref value, ref output,
userContext, lsn, ref stackCtx, minAddress, WriteReason.Compaction, OperationType.CONDITIONAL_SCAN_PUSH);
userContext, ref stackCtx, minAddress, WriteReason.Compaction, OperationType.CONDITIONAL_SCAN_PUSH);
pendingContext.scanCursorState = scanCursorState;
return status;
}
Expand Down Expand Up @@ -345,8 +345,6 @@ public void PostInitialUpdater(ref Key key, ref Input input, ref Value value, re

public void RMWCompletionCallback(ref Key key, ref Input input, ref Output output, Empty ctx, Status status, RecordMetadata recordMetadata) { }

public void CheckpointCompletionCallback(int sessionID, string sessionName, CommitPoint commitPoint) { }

public int GetRMWModifiedValueLength(ref Value value, ref Input input) => 0;
public int GetRMWInitialValueLength(ref Input input) => 0;

Expand Down
8 changes: 3 additions & 5 deletions libs/storage/Tsavorite/cs/src/core/Async/DeleteAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public Status DoFastOperation(TsavoriteKV<Key, Value> tsavoriteKV, ref PendingCo
var keyHash = deleteOptions.KeyHash ?? tsavoriteKV.comparer.GetHashCode64(ref key);
do
{
internalStatus = tsavoriteKV.InternalDelete(ref key, keyHash, ref pendingContext.userContext, ref pendingContext, tsavoriteSession, pendingContext.serialNum);
internalStatus = tsavoriteKV.InternalDelete(ref key, keyHash, ref pendingContext.userContext, ref pendingContext, tsavoriteSession);
} while (tsavoriteKV.HandleImmediateRetryStatus(internalStatus, tsavoriteSession, ref pendingContext));
output = default;
return TranslateStatus(internalStatus);
Expand Down Expand Up @@ -85,7 +85,7 @@ public ValueTask<DeleteAsyncResult<Input, Output, Context>> CompleteAsync(Cancel

[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal ValueTask<DeleteAsyncResult<Input, Output, Context>> DeleteAsync<Input, Output, Context, TsavoriteSession>(TsavoriteSession tsavoriteSession,
ref Key key, ref DeleteOptions deleteOptions, Context userContext, long serialNo, CancellationToken token = default)
ref Key key, ref DeleteOptions deleteOptions, Context userContext, CancellationToken token = default)
where TsavoriteSession : ITsavoriteSession<Key, Value, Input, Output, Context>
{
var pcontext = new PendingContext<Input, Output, Context> { IsAsync = true };
Expand All @@ -97,7 +97,7 @@ internal ValueTask<DeleteAsyncResult<Input, Output, Context>> DeleteAsync<Input,
var keyHash = deleteOptions.KeyHash ?? comparer.GetHashCode64(ref key);
do
{
internalStatus = InternalDelete(ref key, keyHash, ref userContext, ref pcontext, tsavoriteSession, serialNo);
internalStatus = InternalDelete(ref key, keyHash, ref userContext, ref pcontext, tsavoriteSession);
} while (HandleImmediateRetryStatus(internalStatus, tsavoriteSession, ref pcontext));

if (OperationStatusUtils.TryConvertToCompletedStatusCode(internalStatus, out Status status))
Expand All @@ -106,8 +106,6 @@ internal ValueTask<DeleteAsyncResult<Input, Output, Context>> DeleteAsync<Input,
}
finally
{
Debug.Assert(serialNo >= tsavoriteSession.Ctx.serialNum, "Operation serial numbers must be non-decreasing");
tsavoriteSession.Ctx.serialNum = serialNo;
tsavoriteSession.UnsafeSuspendThread();
}

Expand Down
12 changes: 5 additions & 7 deletions libs/storage/Tsavorite/cs/src/core/Async/RMWAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public Status DoFastOperation(TsavoriteKV<Key, Value> tsavoriteKV, ref PendingCo
Status status = !diskRequest.IsDefault()
? tsavoriteKV.InternalCompletePendingRequestFromContext(tsavoriteSession, diskRequest, ref pendingContext, out AsyncIOContext<Key, Value> newDiskRequest)
: tsavoriteKV.CallInternalRMW(tsavoriteSession, ref pendingContext, ref pendingContext.key.Get(), ref pendingContext.input.Get(), ref pendingContext.output, ref rmwOptions,
pendingContext.userContext, pendingContext.serialNum, out newDiskRequest);
pendingContext.userContext, out newDiskRequest);
output = pendingContext.output;
diskRequest = newDiskRequest;
return status;
Expand Down Expand Up @@ -111,7 +111,7 @@ public ValueTask<RmwAsyncResult<Input, TOutput, Context>> CompleteAsync(Cancella

[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal ValueTask<RmwAsyncResult<Input, Output, Context>> RmwAsync<Input, Output, Context, TsavoriteSession>(TsavoriteSession tsavoriteSession,
ref Key key, ref Input input, ref RMWOptions rmwOptions, Context context, long serialNo, CancellationToken token = default)
ref Key key, ref Input input, ref RMWOptions rmwOptions, Context context, CancellationToken token = default)
where TsavoriteSession : ITsavoriteSession<Key, Value, Input, Output, Context>
{
var pcontext = new PendingContext<Input, Output, Context> { IsAsync = true };
Expand All @@ -121,14 +121,12 @@ internal ValueTask<RmwAsyncResult<Input, Output, Context>> RmwAsync<Input, Outpu
try
{
Output output = default;
var status = CallInternalRMW(tsavoriteSession, ref pcontext, ref key, ref input, ref output, ref rmwOptions, context, serialNo, out diskRequest);
var status = CallInternalRMW(tsavoriteSession, ref pcontext, ref key, ref input, ref output, ref rmwOptions, context, out diskRequest);
if (!status.IsPending)
return new ValueTask<RmwAsyncResult<Input, Output, Context>>(new RmwAsyncResult<Input, Output, Context>(status, output, new RecordMetadata(pcontext.recordInfo, pcontext.logicalAddress)));
}
finally
{
Debug.Assert(serialNo >= tsavoriteSession.Ctx.serialNum, "Operation serial numbers must be non-decreasing");
tsavoriteSession.Ctx.serialNum = serialNo;
tsavoriteSession.UnsafeSuspendThread();
}

Expand All @@ -137,12 +135,12 @@ internal ValueTask<RmwAsyncResult<Input, Output, Context>> RmwAsync<Input, Outpu

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private Status CallInternalRMW<Input, Output, Context>(ITsavoriteSession<Key, Value, Input, Output, Context> tsavoriteSession, ref PendingContext<Input, Output, Context> pcontext,
ref Key key, ref Input input, ref Output output, ref RMWOptions rmwOptions, Context context, long serialNo, out AsyncIOContext<Key, Value> diskRequest)
ref Key key, ref Input input, ref Output output, ref RMWOptions rmwOptions, Context context, out AsyncIOContext<Key, Value> diskRequest)
{
OperationStatus internalStatus;
var keyHash = rmwOptions.KeyHash ?? comparer.GetHashCode64(ref key);
do
internalStatus = InternalRMW(ref key, keyHash, ref input, ref output, ref context, ref pcontext, tsavoriteSession, serialNo);
internalStatus = InternalRMW(ref key, keyHash, ref input, ref output, ref context, ref pcontext, tsavoriteSession);
while (HandleImmediateRetryStatus(internalStatus, tsavoriteSession, ref pcontext));

return HandleOperationStatus(tsavoriteSession.Ctx, ref pcontext, internalStatus, out diskRequest);
Expand Down
Loading

0 comments on commit c4db89d

Please sign in to comment.