Skip to content

Commit

Permalink
Add PutEntity support for optimistic and WritePrepared pessimistic tr…
Browse files Browse the repository at this point in the history
…ansactions (#12606)

Summary:

The patch extends optimistic transactions and WriteCommitted pessimistic transactions with support for the `PutEntity` API. Similarly to the other APIs, `PutEntity` is available via both the `Transaction` and `TransactionDB` interfaces, where using the latter executes the write in a single-operation transaction as usual. Support for read APIs and other write policies (WritePrepared, WriteUnprepared) will be added in separate PRs.

Reviewed By: jaykorean

Differential Revision: D56911242
  • Loading branch information
ltamasi authored and facebook-github-bot committed May 6, 2024
1 parent 3fdc724 commit 09ea695
Show file tree
Hide file tree
Showing 11 changed files with 506 additions and 42 deletions.
41 changes: 25 additions & 16 deletions include/rocksdb/utilities/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ class Transaction {
virtual void SetSnapshot() = 0;

// Similar to SetSnapshot(), but will not change the current snapshot
// until Put/Merge/Delete/GetForUpdate/MultigetForUpdate is called.
// until Put/PutEntity/Merge/Delete/GetForUpdate/MultigetForUpdate is called.
// By calling this function, the transaction will essentially call
// SetSnapshot() for you right before performing the next write/GetForUpdate.
//
Expand Down Expand Up @@ -268,10 +268,10 @@ class Transaction {
// points.
virtual void SetSavePoint() = 0;

// Undo all operations in this transaction (Put, Merge, Delete, PutLogData)
// since the most recent call to SetSavePoint() and removes the most recent
// SetSavePoint().
// If there is no previous call to SetSavePoint(), returns Status::NotFound()
// Undo all operations in this transaction (Put, PutEntity, Merge, Delete,
// PutLogData) since the most recent call to SetSavePoint() and removes the
// most recent SetSavePoint(). If there is no previous call to SetSavePoint(),
// returns Status::NotFound()
virtual Status RollbackToSavePoint() = 0;

// Pop the most recent save point.
Expand Down Expand Up @@ -461,9 +461,9 @@ class Transaction {
virtual Iterator* GetIterator(const ReadOptions& read_options,
ColumnFamilyHandle* column_family) = 0;

// Put, Merge, Delete, and SingleDelete behave similarly to the corresponding
// functions in WriteBatch, but will also do conflict checking on the
// keys being written.
// Put, PutEntity, Merge, Delete, and SingleDelete behave similarly to the
// corresponding functions in WriteBatch, but will also do conflict checking
// on the keys being written.
//
// assume_tracked=true expects the key be already tracked. More
// specifically, it means the the key was previous tracked in the same
Expand All @@ -489,6 +489,10 @@ class Transaction {
const bool assume_tracked = false) = 0;
virtual Status Put(const SliceParts& key, const SliceParts& value) = 0;

virtual Status PutEntity(ColumnFamilyHandle* column_family, const Slice& key,
const WideColumns& columns,
bool assume_tracked = false) = 0;

virtual Status Merge(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value,
const bool assume_tracked = false) = 0;
Expand Down Expand Up @@ -528,6 +532,10 @@ class Transaction {
virtual Status PutUntracked(const SliceParts& key,
const SliceParts& value) = 0;

virtual Status PutEntityUntracked(ColumnFamilyHandle* column_family,
const Slice& key,
const WideColumns& columns) = 0;

virtual Status MergeUntracked(ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) = 0;
virtual Status MergeUntracked(const Slice& key, const Slice& value) = 0;
Expand Down Expand Up @@ -556,18 +564,18 @@ class Transaction {
// Similar to WriteBatch::PutLogData
virtual void PutLogData(const Slice& blob) = 0;

// By default, all Put/Merge/Delete operations will be indexed in the
// transaction so that Get/GetForUpdate/GetIterator can search for these
// By default, all Put/PutEntity/Merge/Delete operations will be indexed in
// the transaction so that Get/GetForUpdate/GetIterator can search for these
// keys.
//
// If the caller does not want to fetch the keys about to be written,
// they may want to avoid indexing as a performance optimization.
// Calling DisableIndexing() will turn off indexing for all future
// Put/Merge/Delete operations until EnableIndexing() is called.
// Put/PutEntity/Merge/Delete operations until EnableIndexing() is called.
//
// If a key is Put/Merge/Deleted after DisableIndexing is called and then
// is fetched via Get/GetForUpdate/GetIterator, the result of the fetch is
// undefined.
// If a key is written (using Put/PutEntity/Merge/Delete) after
// DisableIndexing is called and then is fetched via
// Get/GetForUpdate/GetIterator, the result of the fetch is undefined.
virtual void DisableIndexing() = 0;
virtual void EnableIndexing() = 0;

Expand All @@ -578,9 +586,10 @@ class Transaction {
// number of keys that need to be checked for conflicts at commit time.
virtual uint64_t GetNumKeys() const = 0;

// Returns the number of Puts/Deletes/Merges that have been applied to this
// transaction so far.
// Returns the number of Put/PutEntity/Delete/Merge operations that have been
// applied to this transaction so far.
virtual uint64_t GetNumPuts() const = 0;
virtual uint64_t GetNumPutEntities() const = 0;
virtual uint64_t GetNumDeletes() const = 0;
virtual uint64_t GetNumMerges() const = 0;

Expand Down
2 changes: 1 addition & 1 deletion include/rocksdb/utilities/write_batch_with_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class WriteBatchWithIndex : public WriteBatchBase {
"Cannot call this method without attribute groups");
}
return Status::NotSupported(
"PutEntity not supported by WriteBatchWithIndex");
"PutEntity with AttributeGroups not supported by WriteBatchWithIndex");
}

using WriteBatchBase::Merge;
Expand Down
1 change: 1 addition & 0 deletions unreleased_history/new_features/put_entity_txn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Optimistic transactions and pessimistic transactions with the WriteCommitted policy now support the `PutEntity` API. Support for read APIs and other write policies (WritePrepared, WriteUnprepared) will be added later.
171 changes: 171 additions & 0 deletions utilities/transactions/optimistic_transaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1142,8 +1142,12 @@ TEST_P(OptimisticTransactionTest, UntrackedWrites) {
delete txn;
txn = txn_db->BeginTransaction(write_options);

const WideColumns untracked_columns{{"hello", "world"}};

ASSERT_OK(txn->Put("tracked", "1"));
ASSERT_OK(txn->PutUntracked("untracked", "1"));
ASSERT_OK(txn->PutEntityUntracked(txn_db->DefaultColumnFamily(), "untracked",
untracked_columns));
ASSERT_OK(txn->MergeUntracked("untracked", "2"));
ASSERT_OK(txn->DeleteUntracked("untracked"));

Expand All @@ -1159,8 +1163,12 @@ TEST_P(OptimisticTransactionTest, UntrackedWrites) {
delete txn;
txn = txn_db->BeginTransaction(write_options);

const WideColumns untracked_new_columns{{"foo", "bar"}};

ASSERT_OK(txn->Put("tracked", "10"));
ASSERT_OK(txn->PutUntracked("untracked", "A"));
ASSERT_OK(txn->PutEntityUntracked(txn_db->DefaultColumnFamily(), "untracked",
untracked_new_columns));

// Write to tracked key outside of the transaction and verify that the
// untracked keys are not written when the commit fails.
Expand Down Expand Up @@ -1687,6 +1695,169 @@ TEST_P(OptimisticTransactionTest, TimestampedSnapshotSetCommitTs) {
ASSERT_TRUE(s.IsNotSupported());
}

TEST_P(OptimisticTransactionTest, PutEntitySuccess) {
constexpr char foo[] = "foo";
const WideColumns foo_columns{
{kDefaultWideColumnName, "bar"}, {"col1", "val1"}, {"col2", "val2"}};
const WideColumns foo_new_columns{
{kDefaultWideColumnName, "baz"}, {"colA", "valA"}, {"colB", "valB"}};

ASSERT_OK(txn_db->PutEntity(WriteOptions(), txn_db->DefaultColumnFamily(),
foo, foo_columns));

{
std::unique_ptr<Transaction> txn(txn_db->BeginTransaction(WriteOptions()));

ASSERT_NE(txn, nullptr);
ASSERT_EQ(txn->GetNumPutEntities(), 0);

{
PinnableSlice value;
ASSERT_OK(txn->GetForUpdate(ReadOptions(), foo, &value));
ASSERT_EQ(value, foo_columns[0].value());
}

ASSERT_OK(
txn->PutEntity(txn_db->DefaultColumnFamily(), foo, foo_new_columns));

ASSERT_EQ(txn->GetNumPutEntities(), 1);

{
PinnableSlice value;
ASSERT_OK(txn->GetForUpdate(ReadOptions(), foo, &value));
ASSERT_EQ(value, foo_new_columns[0].value());
}

ASSERT_OK(txn->Commit());
}

{
PinnableSlice value;
ASSERT_OK(
txn_db->Get(ReadOptions(), txn_db->DefaultColumnFamily(), foo, &value));
ASSERT_EQ(value, foo_new_columns[0].value());
}
}

TEST_P(OptimisticTransactionTest, PutEntityWriteConflict) {
constexpr char foo[] = "foo";
const WideColumns foo_columns{
{kDefaultWideColumnName, "bar"}, {"col1", "val1"}, {"col2", "val2"}};
constexpr char baz[] = "baz";
const WideColumns baz_columns{
{kDefaultWideColumnName, "quux"}, {"colA", "valA"}, {"colB", "valB"}};

ASSERT_OK(txn_db->PutEntity(WriteOptions(), txn_db->DefaultColumnFamily(),
foo, foo_columns));
ASSERT_OK(txn_db->PutEntity(WriteOptions(), txn_db->DefaultColumnFamily(),
baz, baz_columns));

std::unique_ptr<Transaction> txn(txn_db->BeginTransaction(WriteOptions()));
ASSERT_NE(txn, nullptr);

const WideColumns foo_new_columns{{kDefaultWideColumnName, "FOO"},
{"hello", "world"}};
const WideColumns baz_new_columns{{kDefaultWideColumnName, "BAZ"},
{"ping", "pong"}};

ASSERT_OK(
txn->PutEntity(txn_db->DefaultColumnFamily(), foo, foo_new_columns));
ASSERT_OK(
txn->PutEntity(txn_db->DefaultColumnFamily(), baz, baz_new_columns));

// This PutEntity outside of a transaction will conflict with the previous
// write
const WideColumns foo_conflict_columns{{kDefaultWideColumnName, "X"},
{"conflicting", "write"}};
ASSERT_OK(txn_db->PutEntity(WriteOptions(), txn_db->DefaultColumnFamily(),
foo, foo_conflict_columns));

{
PinnableSlice value;
ASSERT_OK(
txn_db->Get(ReadOptions(), txn_db->DefaultColumnFamily(), foo, &value));
ASSERT_EQ(value, foo_conflict_columns[0].value());
}

ASSERT_TRUE(txn->Commit().IsBusy()); // Txn should not commit

// Verify that transaction did not write anything
{
PinnableSlice value;
ASSERT_OK(
txn_db->Get(ReadOptions(), txn_db->DefaultColumnFamily(), foo, &value));
ASSERT_EQ(value, foo_conflict_columns[0].value());
}

{
PinnableSlice value;
ASSERT_OK(
txn_db->Get(ReadOptions(), txn_db->DefaultColumnFamily(), baz, &value));
ASSERT_EQ(value, baz_columns[0].value());
}
}

TEST_P(OptimisticTransactionTest, PutEntityWriteConflictTxnTxn) {
constexpr char foo[] = "foo";
const WideColumns foo_columns{
{kDefaultWideColumnName, "bar"}, {"col1", "val1"}, {"col2", "val2"}};
constexpr char baz[] = "baz";
const WideColumns baz_columns{
{kDefaultWideColumnName, "quux"}, {"colA", "valA"}, {"colB", "valB"}};

ASSERT_OK(txn_db->PutEntity(WriteOptions(), txn_db->DefaultColumnFamily(),
foo, foo_columns));
ASSERT_OK(txn_db->PutEntity(WriteOptions(), txn_db->DefaultColumnFamily(),
baz, baz_columns));

std::unique_ptr<Transaction> txn(txn_db->BeginTransaction(WriteOptions()));
ASSERT_NE(txn, nullptr);

const WideColumns foo_new_columns{{kDefaultWideColumnName, "FOO"},
{"hello", "world"}};
const WideColumns baz_new_columns{{kDefaultWideColumnName, "BAZ"},
{"ping", "pong"}};

ASSERT_OK(
txn->PutEntity(txn_db->DefaultColumnFamily(), foo, foo_new_columns));
ASSERT_OK(
txn->PutEntity(txn_db->DefaultColumnFamily(), baz, baz_new_columns));

std::unique_ptr<Transaction> conflicting_txn(
txn_db->BeginTransaction(WriteOptions()));
ASSERT_NE(conflicting_txn, nullptr);

const WideColumns foo_conflict_columns{{kDefaultWideColumnName, "X"},
{"conflicting", "write"}};
ASSERT_OK(conflicting_txn->PutEntity(txn_db->DefaultColumnFamily(), foo,
foo_conflict_columns));
ASSERT_OK(conflicting_txn->Commit());

{
PinnableSlice value;
ASSERT_OK(
txn_db->Get(ReadOptions(), txn_db->DefaultColumnFamily(), foo, &value));
ASSERT_EQ(value, foo_conflict_columns[0].value());
}

ASSERT_TRUE(txn->Commit().IsBusy()); // Txn should not commit

// Verify that transaction did not write anything
{
PinnableSlice value;
ASSERT_OK(
txn_db->Get(ReadOptions(), txn_db->DefaultColumnFamily(), foo, &value));
ASSERT_EQ(value, foo_conflict_columns[0].value());
}

{
PinnableSlice value;
ASSERT_OK(
txn_db->Get(ReadOptions(), txn_db->DefaultColumnFamily(), baz, &value));
ASSERT_EQ(value, baz_columns[0].value());
}
}

INSTANTIATE_TEST_CASE_P(
InstanceOccGroup, OptimisticTransactionTest,
testing::Values(OccValidationPolicy::kValidateSerial,
Expand Down
21 changes: 20 additions & 1 deletion utilities/transactions/pessimistic_transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).


#include "utilities/transactions/pessimistic_transaction.h"

#include <map>
Expand Down Expand Up @@ -218,6 +217,21 @@ inline Status WriteCommittedTxn::GetForUpdateImpl(
value, exclusive, do_validate);
}

Status WriteCommittedTxn::PutEntityImpl(ColumnFamilyHandle* column_family,
const Slice& key,
const WideColumns& columns,
bool do_validate, bool assume_tracked) {
return Operate(column_family, key, do_validate, assume_tracked,
[column_family, &key, &columns, this]() {
Status s = GetBatchForWrite()->PutEntity(column_family, key,
columns);
if (s.ok()) {
++num_put_entities_;
}
return s;
});
}

Status WriteCommittedTxn::Put(ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value,
const bool assume_tracked) {
Expand Down Expand Up @@ -896,6 +910,11 @@ Status PessimisticTransaction::LockBatch(WriteBatch* batch,
RecordKey(column_family_id, key);
return Status::OK();
}
Status PutEntityCF(uint32_t column_family_id, const Slice& key,
const Slice& /* unused */) override {
RecordKey(column_family_id, key);
return Status::OK();
}
Status MergeCF(uint32_t column_family_id, const Slice& key,
const Slice& /* unused */) override {
RecordKey(column_family_id, key);
Expand Down
24 changes: 23 additions & 1 deletion utilities/transactions/pessimistic_transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

#pragma once


#include <algorithm>
#include <atomic>
#include <mutex>
Expand Down Expand Up @@ -249,6 +248,25 @@ class WriteCommittedTxn : public PessimisticTransaction {
Status PutUntracked(ColumnFamilyHandle* column_family, const SliceParts& key,
const SliceParts& value) override;

// `key` does NOT include timestamp even when it's enabled.
Status PutEntity(ColumnFamilyHandle* column_family, const Slice& key,
const WideColumns& columns,
bool assume_tracked = false) override {
const bool do_validate = !assume_tracked;

return PutEntityImpl(column_family, key, columns, do_validate,
assume_tracked);
}

Status PutEntityUntracked(ColumnFamilyHandle* column_family, const Slice& key,
const WideColumns& columns) override {
constexpr bool do_validate = false;
constexpr bool assume_tracked = false;

return PutEntityImpl(column_family, key, columns, do_validate,
assume_tracked);
}

using TransactionBaseImpl::Delete;
// `key` does NOT include timestamp even when it's enabled.
Status Delete(ColumnFamilyHandle* column_family, const Slice& key,
Expand Down Expand Up @@ -288,6 +306,10 @@ class WriteCommittedTxn : public PessimisticTransaction {
TValue* value, bool exclusive,
const bool do_validate);

Status PutEntityImpl(ColumnFamilyHandle* column_family, const Slice& key,
const WideColumns& columns, bool do_validate,
bool assume_tracked);

template <typename TKey, typename TOperation>
Status Operate(ColumnFamilyHandle* column_family, const TKey& key,
const bool do_validate, const bool assume_tracked,
Expand Down
Loading

0 comments on commit 09ea695

Please sign in to comment.